java中的几种锁:synchronized,reentrantlock,reentrantreadwritelock已基本可以满足编程需求,但其粒度都太大,同一时刻只有一个线程能进入同步块,这对于某些高并发的场景并不适用。
下面来提供几个更细的粒度锁:
1. 分段锁
借鉴concurrenthashmap的分段思想,先生成一定数量的锁,具体使用的时候再根据key来返回对应的lock。这是几个实现里最简单,性能最高,也是最终被采用的锁策略,代码如下:
/**
* 分段锁,系统提供一定数量的原始锁,根据传入对象的哈希值获取对应的锁并加锁
* 注意:要锁的对象的哈希值如果发生改变,有可能导致锁无法成功释放!!!
*/
public class segmentlock<t> {
private integer segments = 16;//默认分段数量
private final hashmap<integer, reentrantlock> lockmap = new hashmap<>();
public segmentlock() {
init(null, false);
}
public segmentlock(integer counts, boolean fair) {
init(counts, fair);
}
private void init(integer counts, boolean fair) {
if (counts != null) {
segments = counts;
}
for (int i = 0; i < segments; i++) {
lockmap.put(i, new reentrantlock(fair));
}
}
public void lock(t key) {
reentrantlock lock = lockmap.get(key.hashcode() % segments);
lock.lock();
}
public void unlock(t key) {
reentrantlock lock = lockmap.get(key.hashcode() % segments);
lock.unlock();
}
}
2. 哈希锁
上述分段锁的基础上发展起来的第二种锁策略,目的是实现真正意义上的细粒度锁。每个哈希值不同的对象都能获得自己独立的锁。在测试中,在被锁住的代码执行速度飞快的情况下,效率比分段锁慢 30% 左右。如果有长耗时操作,感觉表现应该会更好。代码如下:
public class hashlock<t> {
private boolean isfair = false;
private final segmentlock<t> segmentlock = new segmentlock<>();//分段锁
private final concurrenthashmap<t, lockinfo> lockmap = new concurrenthashmap<>();
public hashlock() {
}
public hashlock(boolean fair) {
isfair = fair;
}
public void lock(t key) {
lockinfo lockinfo;
segmentlock.lock(key);
try {
lockinfo = lockmap.get(key);
if (lockinfo == null) {
lockinfo = new lockinfo(isfair);
lockmap.put(key, lockinfo);
} else {
lockinfo.count.incrementandget();
}
} finally {
segmentlock.unlock(key);
}
lockinfo.lock.lock();
}
public void unlock(t key) {
lockinfo lockinfo = lockmap.get(key);
if (lockinfo.count.get() == 1) {
segmentlock.lock(key);
try {
if (lockinfo.count.get() == 1) {
lockmap.remove(key);
}
} finally {
segmentlock.unlock(key);
}
}
lockinfo.count.decrementandget();
lockinfo.unlock();
}
private static class lockinfo {
public reentrantlock lock;
public atomicinteger count = new atomicinteger(1);
private lockinfo(boolean fair) {
this.lock = new reentrantlock(fair);
}
public void lock() {
this.lock.lock();
}
public void unlock() {
this.lock.unlock();
}
}
}
3. 弱引用锁
哈希锁因为引入的分段锁来保证锁创建和销毁的同步,总感觉有点瑕疵,所以写了第三个锁来寻求更好的性能和更细粒度的锁。这个锁的思想是借助java的弱引用来创建锁,把锁的销毁交给jvm的垃圾回收,来避免额外的消耗。
有点遗憾的是因为使用了concurrenthashmap作为锁的容器,所以没能真正意义上的摆脱分段锁。这个锁的性能比 hashlock 快10% 左右。锁代码:
/**
* 弱引用锁,为每个独立的哈希值提供独立的锁功能
*/
public class weakhashlock<t> {
private concurrenthashmap<t, weaklockref<t, reentrantlock>> lockmap = new concurrenthashmap<>();
private referencequeue<reentrantlock> queue = new referencequeue<>();
public reentrantlock get(t key) {
if (lockmap.size() > 1000) {
clearemptyref();
}
weakreference<reentrantlock> lockref = lockmap.get(key);
reentrantlock lock = (lockref == null ? null : lockref.get());
while (lock == null) {
lockmap.putifabsent(key, new weaklockref<>(new reentrantlock(), queue, key));
lockref = lockmap.get(key);
lock = (lockref == null ? null : lockref.get());
if (lock != null) {
return lock;
}
clearemptyref();
}
return lock;
}
@suppresswarnings("unchecked")
private void clearemptyref() {
reference<? extends reentrantlock> ref;
while ((ref = queue.poll()) != null) {
weaklockref<t, ? extends reentrantlock> weaklockref = (weaklockref<t, ? extends reentrantlock>) ref;
lockmap.remove(weaklockref.key);
}
}
private static final class weaklockref<t, k> extends weakreference<k> {
final t key;
private weaklockref(k referent, referencequeue<? super k> q, t key) {
super(referent, q);
this.key = key;
}
}
}
4.基于key(主键)的互斥锁
keylock是对所需处理的数据的key(主键)进行加锁,只要是对不同key操作,其就可以并行处理,大大提高了线程的并行度
keylock有如下几个特性:
1、细粒度,高并行性
2、可重入
3、公平锁
4、加锁开销比reentrantlock大,适用于处理耗时长、key范围大的场景
public class keylock<k> {
// 保存所有锁定的key及其信号量
private final concurrentmap<k, semaphore> map = new concurrenthashmap<k, semaphore>();
// 保存每个线程锁定的key及其锁定计数
private final threadlocal<map<k, lockinfo>> local = new threadlocal<map<k, lockinfo>>() {
@override
protected map<k, lockinfo> initialvalue() {
return new hashmap<k, lockinfo>();
}
};
/**
* 锁定key,其他等待此key的线程将进入等待,直到调用{@link #unlock(k)}
* 使用hashcode和equals来判断key是否相同,因此key必须实现{@link #hashcode()}和
* {@link #equals(object)}方法
*
* @param key
*/
public void lock(k key) {
if (key == null)
return;
lockinfo info = local.get().get(key);
if (info == null) {
semaphore current = new semaphore(1);
current.acquireuninterruptibly();
semaphore previous = map.put(key, current);
if (previous != null)
previous.acquireuninterruptibly();
local.get().put(key, new lockinfo(current));
} else {
info.lockcount++;
}
}
/**
* 释放key,唤醒其他等待此key的线程
* @param key
*/
public void unlock(k key) {
if (key == null)
return;
lockinfo info = local.get().get(key);
if (info != null && --info.lockcount == 0) {
info.current.release();
map.remove(key, info.current);
local.get().remove(key);
}
}
/**
* 锁定多个key
* 建议在调用此方法前先对keys进行排序,使用相同的锁定顺序,防止死锁发生
* @param keys
*/
public void lock(k[] keys) {
if (keys == null)
return;
for (k key : keys) {
lock(key);
}
}
/**
* 释放多个key
* @param keys
*/
public void unlock(k[] keys) {
if (keys == null)
return;
for (k key : keys) {
unlock(key);
}
}
private static class lockinfo {
private final semaphore current;
private int lockcount;
private lockinfo(semaphore current) {
this.current = current;
this.lockcount = 1;
}
}
}
keylock使用示例:
private int[] accounts;
private keylock<integer> lock = new keylock<integer>();
public boolean transfer(int from, int to, int money) {
integer[] keys = new integer[] {from, to};
arrays.sort(keys); //对多个key进行排序,保证锁定顺序防止死锁
lock.lock(keys);
try {
//处理不同的from和to的线程都可进入此同步块
if (accounts[from] < money)
return false;
accounts[from] -= money;
accounts[to] += money;
return true;
} finally {
lock.unlock(keys);
}
}
测试代码如下:
//场景:多线程并发转账
public class test {
private final int[] account; // 账户数组,其索引为账户id,内容为金额
public test(int count, int money) {
account = new int[count];
arrays.fill(account, money);
}
boolean transfer(int from, int to, int money) {
if (account[from] < money)
return false;
account[from] -= money;
try {
thread.sleep(2);
} catch (exception e) {
}
account[to] += money;
return true;
}
int getamount() {
int result = 0;
for (int m : account)
result += m;
return result;
}
public static void main(string[] args) throws exception {
int count = 100; //账户个数
int money = 10000; //账户初始金额
int threadnum = 8; //转账线程数
int number = 10000; //转账次数
int maxmoney = 1000; //随机转账最大金额
test test = new test(count, money);
//不加锁
// runner runner = test.new nonlockrunner(maxmoney, number);
//加synchronized锁
// runner runner = test.new synchronizedrunner(maxmoney, number);
//加reentrantlock锁
// runner runner = test.new reentrantlockrunner(maxmoney, number);
//加keylock锁
runner runner = test.new keylockrunner(maxmoney, number);
thread[] threads = new thread[threadnum];
for (int i = 0; i < threadnum; i++)
threads[i] = new thread(runner, "thread-" + i);
long begin = system.currenttimemillis();
for (thread t : threads)
t.start();
for (thread t : threads)
t.join();
long time = system.currenttimemillis() - begin;
system.out.println("类型:" + runner.getclass().getsimplename());
system.out.printf("耗时:%dms\n", time);
system.out.printf("初始总金额:%d\n", count * money);
system.out.printf("终止总金额:%d\n", test.getamount());
}
// 转账任务
abstract class runner implements runnable {
final int maxmoney;
final int number;
private final random random = new random();
private final atomicinteger count = new atomicinteger();
runner(int maxmoney, int number) {
this.maxmoney = maxmoney;
this.number = number;
}
@override
public void run() {
while(count.getandincrement() < number) {
int from = random.nextint(account.length);
int to;
while ((to = random.nextint(account.length)) == from)
;
int money = random.nextint(maxmoney);
dotransfer(from, to, money);
}
}
abstract void dotransfer(int from, int to, int money);
}
// 不加锁的转账
class nonlockrunner extends runner {
nonlockrunner(int maxmoney, int number) {
super(maxmoney, number);
}
@override
void dotransfer(int from, int to, int money) {
transfer(from, to, money);
}
}
// synchronized的转账
class synchronizedrunner extends runner {
synchronizedrunner(int maxmoney, int number) {
super(maxmoney, number);
}
@override
synchronized void dotransfer(int from, int to, int money) {
transfer(from, to, money);
}
}
// reentrantlock的转账
class reentrantlockrunner extends runner {
private final reentrantlock lock = new reentrantlock();
reentrantlockrunner(int maxmoney, int number) {
super(maxmoney, number);
}
@override
void dotransfer(int from, int to, int money) {
lock.lock();
try {
transfer(from, to, money);
} finally {
lock.unlock();
}
}
}
// keylock的转账
class keylockrunner extends runner {
private final keylock<integer> lock = new keylock<integer>();
keylockrunner(int maxmoney, int number) {
super(maxmoney, number);
}
@override
void dotransfer(int from, int to, int money) {
integer[] keys = new integer[] {from, to};
arrays.sort(keys);
lock.lock(keys);
try {
transfer(from, to, money);
} finally {
lock.unlock(keys);
}
}
}
}
测试结果:
(8线程对100个账户随机转账总共10000次):
类型:nonlockrunner(不加锁)
耗时:2482ms
初始总金额:1000000
终止总金额:998906(无法保证原子性)
类型:synchronizedrunner(加synchronized锁)
耗时:20872ms
初始总金额:1000000
终止总金额:1000000
类型:reentrantlockrunner(加reentrantlock锁)
耗时:21588ms
初始总金额:1000000
终止总金额:1000000
类型:keylockrunner(加keylock锁)
耗时:2831ms
初始总金额:1000000
终止总金额:1000000