holysky5
3/23/2019 - 3:20 PM

RedisBatchLock

Redis多个锁

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;

import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.*;
import org.springframework.stereotype.Component;

/**
 * 
 * ClassName: RedisBatchLockRegistry 
 * @Description: Redis分布式锁,支持批量获取和释放锁
 * 
 * org.springframework.integration.redis.util.RedisLockRegistry 存在两个问题:
 * 1.不支持锁的批量获取和释放
 * 2.锁的重试时间只能为100毫秒
 * 
 * 该类参考自RedisLockRegistry,与之对比不同点为:
 * 1.增加了锁的批量获取和释放
 * 2.去除了本地锁实现(RedisLockRegistry的方式实现实际上为Redis锁+本地锁双重锁)
 * 
 * @author zhangq
 * @date 2017年5月24日
 *
 * =================================================================================================
 *     Task ID			  Date			     Author		      Description
 * ----------------+----------------+-------------------+-------------------------------------------
 *
 */
@Component
public class RedisBatchLockRegistry {

	/**
	 * Logger
	 */
	private static final Logger LOG = LoggerFactory.getLogger(RedisBatchLockRegistry.class);

	/**
	 * Redis模版
	 */
	@Autowired
	private RedisTemplate<String, String> redisTemplate;

	public RedisBatchLockRegistry(){}
	/**
	 * 
	 * <p>Title: 初始化</p> 
	 * <p>Description: </p> 
	 * @param connectionFactory
	 */
	public RedisBatchLockRegistry(RedisConnectionFactory connectionFactory) {
	}

	/**
	 * 
	 * @Description: 尝试批量获取锁
	 * @param list
	 * @param maxWait
	 * @return boolean  
	 * @author zhangq
	 * @throws InterruptedException 
	 * @date 2017年5月27日
	 */
	@SuppressWarnings("rawtypes")
	public boolean tryLock(final Set<String> keys, long maxWait, long timeout) {
		try {
			LOG.info("尝试获取锁:{}", keys);

			// 需要获取的锁
			final List<String> needLocking = new CopyOnWriteArrayList<String>();
			needLocking.addAll(keys);

			// 已经获取的锁
			List<String> locked = new CopyOnWriteArrayList<String>();

			// 获取等待时间的最后时间段
			long expireAt = System.currentTimeMillis() + maxWait;

			// 循环判断锁是否一直存在
			while (System.currentTimeMillis() < expireAt) {

				// 通过管道批量获取锁
				LOG.info("进入tryLock while,needLocking:{}", needLocking);
				List<Object> results = redisTemplate.executePipelined(new RedisCallback() {

					@Override
					public Object doInRedis(RedisConnection connection) throws DataAccessException {
						LOG.info("connection:{}, needLocking:{}", connection, needLocking);
						String value = System.currentTimeMillis() + "";
						for (String key : needLocking) {
							LOG.info("key :{}", key);
							// 如果不存在则 SET。SET if Not eXists。
							connection.setNX(key.getBytes(), value.getBytes());
							// LOG.info("pipelined setNX result:{}, key:{},
							// value:{}", flag, key, key);
						}
						return null;
					}
				});

				// 提交redis执行计数
				LOG.info("needLocking:{},results:{}", needLocking, results);
				for (int i = 0; i < results.size(); i++) {
					// 是否执行成功
					Boolean success = (Boolean) results.get(i);
					// 锁的KEY
					String key = needLocking.get(i);

					// setnx成功,获得锁
					if (success) {
						// 设置锁的过期时间
						redisTemplate.expire(key, timeout, TimeUnit.MILLISECONDS);
						locked.add(key);
					}
				}

				// 移除已锁定资源
				needLocking.removeAll(locked);

				// 是否锁住全部资源
				if (CollectionUtils.isEmpty(needLocking)) {
					// 全部资源均已锁住,返回成功true
					return true;
				} else {
					// 补偿处理,防止异常情况下(宕机/重启/连接超时等)导致的锁永不过期
					List<String> exceptionLock = new CopyOnWriteArrayList<String>();
					for (String key : needLocking) {
						String value = redisTemplate.opsForValue().get(key);
						// 当前时间 > 上锁时间 + 超时时间 + 2秒(经验时间),表示为该过期却未过期的数据,即异常数据
						if (null != value && System.currentTimeMillis() > Long.parseLong(value) + timeout + 2000) {
							exceptionLock.add(key);
						}
					}
					// 删除所有异常的 KEY
					if (CollectionUtils.isNotEmpty(exceptionLock)) {
						unLock(exceptionLock);
					}

					// 部分资源未能锁住,间隔N毫秒后重试
					Thread.sleep(10);
				}
			}

			// 仍有资源未被锁住(needLocking不为空),释放已锁定的资源,并返回失败false
			if (!CollectionUtils.isEmpty(needLocking)) {
				LOG.info("can not get the lock, keys:{}", needLocking);
				unLock(locked);
			}
		} catch (Exception e) {
			LOG.info("尝试获取锁失败:{}", e);
		}

		return false;
	}

	/**
	 * 
	 * @Description: 批量释放锁
	 * @param list   
	 * @return void  
	 * @author zhangq
	 * @date 2017年5月27日
	 */
	public void unLock(Collection<String> keys) {
		try{
			// 防止多个服务器处理同一个key需要watch操作(相当于是禁止了其他client处理这个key)
			//redisTemplate.watch(keys);

			// 批量释放锁
			redisTemplate.delete(keys);

			// 锁已经过期了,释放watch操作
			//redisTemplate.unwatch();
		}catch (Exception e){
			LOG.error("批量释放锁失败!",e);
		}finally {
			RedisConnectionUtils.unbindConnection(redisTemplate.getConnectionFactory());
		}
	}
}