/// 分布式Redis.
class RedisShard {
/// 构造函数.
public function __construct($shards) {
$this->reinit($shards);
}
/// 析构函数.
/// 脚本结束时,phpredis不会自动关闭redis连接,这里添加自动关闭连接支持.
/// 可以通过手动unset本类对象快速释放资源.
public function __destruct() {
if(isset($this->shard)){
$this->shard['redis']->close();
}
}
/// 重新初始化.
public function reinit($shards){
$index = 0;
$this->shards = array();
foreach($shards as $shard){
$this->shards[$index] = explode(':', $shard); //格式:host:port:db
$this->shards[$index]['index'] = $index;
++$index;
}
}
/// 转发方法调用到真正的redis对象.
public function __call($name, $arguments) {
$result = call_user_func_array(array($this->redis($arguments[0]), $name), $arguments);
if($result === false and in_array($name, array('set', 'setex', 'incr'))) {
trigger_error("redis error: " . $this->shard[0] . ':' . $this->shard[1] . ':' .$this->shard[2] . " $name " . implode(' ', $arguments), E_USER_NOTICE);
}
return $result;
}
/// 获取1至max间的唯一序号name,达到max后会从1开始.
/// redis的递增到最大值后会返回错误,本方法实现安全的递增。
/// 失败返回false,最要确保已用redis()方法连到生成序号的某个redis对象.
public function id($name, $max) {
if(isset($this->shard)){
$id = $this->shard['redis']->incr('_id_' . $name);
if($id){
$max = intval($max/count($this->shards));
if($id % $max == 0){
while($this->shard['redis']->decrBy('_id_' . $name, $max) >= $max){
}
$id = $max;
}
else if($id > $max){
$id %= $max;
}
return ($id - 1)*count($this->shards) + ($this->shard['index'] + 1);
}
}
return false;
}
/// 连接并返回key对应的redis对象.
public function redis($key){
//TODO: crc32在32位系统下会返回负数,因我们是部署在64位系统上,暂时忽略.
assert(PHP_INT_SIZE === 8);
$index = crc32($key) % count($this->shards);
$shard = $this->shards[$index];
if(isset($this->shard)){
//尝试重用已有连接.
if($this->shard[0] == $shard[0] and $this->shard[1] == $shard[1]){
if($this->shard[2] != $shard[2]){
if(! $this->shard['redis']->select($shard[2])){
trigger_error('redis error: select ' . $shard[0] . ':' . $shard[1] . ':' .$shard[2], E_USER_ERROR);
return false;
}
$this->shard[2] = $shard[2];
}
return $this->shard['redis'];
}
$this->shard['redis']->close();
unset($this->shard);
}
//新建连接.
$shard['redis'] = new Redis();
if(! $shard['redis']->connect($shard[0], $shard[1])){
trigger_error('redis error: connect ' . $shard[0] . ':' . $shard[1], E_USER_ERROR);
return false;
}
$db = intval($shard[2]);
if($db != 0 and !$shard['redis']->select($db)){
trigger_error('redis error: select ' . $shard[0] . ':' . $shard[1] . ':' .$shard[2], E_USER_ERROR);
$shard['redis']->close();
return false;
}
if(ENABLE_DEVELOP){
trigger_error('redis connect success. ' . $shard[0] . ':' . $shard[1] . ':' . $shard[2], E_USER_NOTICE);
}
$this->shard = $shard;
return $this->shard['redis'];
}
}