Redis多个锁
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.*;
import org.springframework.stereotype.Component;
/**
*
* ClassName: RedisBatchLockRegistry
* @Description: Redis分布式锁,支持批量获取和释放锁
*
* org.springframework.integration.redis.util.RedisLockRegistry 存在两个问题:
* 1.不支持锁的批量获取和释放
* 2.锁的重试时间只能为100毫秒
*
* 该类参考自RedisLockRegistry,与之对比不同点为:
* 1.增加了锁的批量获取和释放
* 2.去除了本地锁实现(RedisLockRegistry的方式实现实际上为Redis锁+本地锁双重锁)
*
* @author zhangq
* @date 2017年5月24日
*
* =================================================================================================
* Task ID Date Author Description
* ----------------+----------------+-------------------+-------------------------------------------
*
*/
@Component
public class RedisBatchLockRegistry {
/**
* Logger
*/
private static final Logger LOG = LoggerFactory.getLogger(RedisBatchLockRegistry.class);
/**
* Redis模版
*/
@Autowired
private RedisTemplate<String, String> redisTemplate;
public RedisBatchLockRegistry(){}
/**
*
* <p>Title: 初始化</p>
* <p>Description: </p>
* @param connectionFactory
*/
public RedisBatchLockRegistry(RedisConnectionFactory connectionFactory) {
}
/**
*
* @Description: 尝试批量获取锁
* @param list
* @param maxWait
* @return boolean
* @author zhangq
* @throws InterruptedException
* @date 2017年5月27日
*/
@SuppressWarnings("rawtypes")
public boolean tryLock(final Set<String> keys, long maxWait, long timeout) {
try {
LOG.info("尝试获取锁:{}", keys);
// 需要获取的锁
final List<String> needLocking = new CopyOnWriteArrayList<String>();
needLocking.addAll(keys);
// 已经获取的锁
List<String> locked = new CopyOnWriteArrayList<String>();
// 获取等待时间的最后时间段
long expireAt = System.currentTimeMillis() + maxWait;
// 循环判断锁是否一直存在
while (System.currentTimeMillis() < expireAt) {
// 通过管道批量获取锁
LOG.info("进入tryLock while,needLocking:{}", needLocking);
List<Object> results = redisTemplate.executePipelined(new RedisCallback() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
LOG.info("connection:{}, needLocking:{}", connection, needLocking);
String value = System.currentTimeMillis() + "";
for (String key : needLocking) {
LOG.info("key :{}", key);
// 如果不存在则 SET。SET if Not eXists。
connection.setNX(key.getBytes(), value.getBytes());
// LOG.info("pipelined setNX result:{}, key:{},
// value:{}", flag, key, key);
}
return null;
}
});
// 提交redis执行计数
LOG.info("needLocking:{},results:{}", needLocking, results);
for (int i = 0; i < results.size(); i++) {
// 是否执行成功
Boolean success = (Boolean) results.get(i);
// 锁的KEY
String key = needLocking.get(i);
// setnx成功,获得锁
if (success) {
// 设置锁的过期时间
redisTemplate.expire(key, timeout, TimeUnit.MILLISECONDS);
locked.add(key);
}
}
// 移除已锁定资源
needLocking.removeAll(locked);
// 是否锁住全部资源
if (CollectionUtils.isEmpty(needLocking)) {
// 全部资源均已锁住,返回成功true
return true;
} else {
// 补偿处理,防止异常情况下(宕机/重启/连接超时等)导致的锁永不过期
List<String> exceptionLock = new CopyOnWriteArrayList<String>();
for (String key : needLocking) {
String value = redisTemplate.opsForValue().get(key);
// 当前时间 > 上锁时间 + 超时时间 + 2秒(经验时间),表示为该过期却未过期的数据,即异常数据
if (null != value && System.currentTimeMillis() > Long.parseLong(value) + timeout + 2000) {
exceptionLock.add(key);
}
}
// 删除所有异常的 KEY
if (CollectionUtils.isNotEmpty(exceptionLock)) {
unLock(exceptionLock);
}
// 部分资源未能锁住,间隔N毫秒后重试
Thread.sleep(10);
}
}
// 仍有资源未被锁住(needLocking不为空),释放已锁定的资源,并返回失败false
if (!CollectionUtils.isEmpty(needLocking)) {
LOG.info("can not get the lock, keys:{}", needLocking);
unLock(locked);
}
} catch (Exception e) {
LOG.info("尝试获取锁失败:{}", e);
}
return false;
}
/**
*
* @Description: 批量释放锁
* @param list
* @return void
* @author zhangq
* @date 2017年5月27日
*/
public void unLock(Collection<String> keys) {
try{
// 防止多个服务器处理同一个key需要watch操作(相当于是禁止了其他client处理这个key)
//redisTemplate.watch(keys);
// 批量释放锁
redisTemplate.delete(keys);
// 锁已经过期了,释放watch操作
//redisTemplate.unwatch();
}catch (Exception e){
LOG.error("批量释放锁失败!",e);
}finally {
RedisConnectionUtils.unbindConnection(redisTemplate.getConnectionFactory());
}
}
}