MarkJane
4/12/2017 - 7:18 AM

分布式Redis.php

/// 分布式Redis.
class RedisShard {
    /// 构造函数.
    public function __construct($shards) {
        $this->reinit($shards);
    }

    /// 析构函数.
    /// 脚本结束时,phpredis不会自动关闭redis连接,这里添加自动关闭连接支持.
    /// 可以通过手动unset本类对象快速释放资源.
    public function __destruct() {
        if(isset($this->shard)){
            $this->shard['redis']->close();
        }
    }

    /// 重新初始化.
    public function reinit($shards){
        $index = 0;
        $this->shards = array();
        foreach($shards as $shard){
            $this->shards[$index] = explode(':', $shard); //格式:host:port:db
            $this->shards[$index]['index'] = $index;
            ++$index;
        }        
    }

    /// 转发方法调用到真正的redis对象.
    public function __call($name, $arguments) {
        $result = call_user_func_array(array($this->redis($arguments[0]), $name), $arguments);
        if($result === false and in_array($name, array('set', 'setex', 'incr'))) {
            trigger_error("redis error: " . $this->shard[0] . ':' . $this->shard[1] . ':' .$this->shard[2] . " $name " . implode(' ', $arguments), E_USER_NOTICE);
        }
        return $result;
    }

    /// 获取1至max间的唯一序号name,达到max后会从1开始.
    /// redis的递增到最大值后会返回错误,本方法实现安全的递增。
    /// 失败返回false,最要确保已用redis()方法连到生成序号的某个redis对象.
    public function id($name, $max) {
        if(isset($this->shard)){
            $id = $this->shard['redis']->incr('_id_' . $name);
            if($id){
                $max = intval($max/count($this->shards));
                if($id % $max == 0){
                    while($this->shard['redis']->decrBy('_id_' . $name, $max) >= $max){
                    }
                    $id = $max;
                }
                else if($id > $max){
                    $id %= $max;
                }
                return ($id - 1)*count($this->shards) + ($this->shard['index'] + 1);
            }
        }
        return false;
    }

    /// 连接并返回key对应的redis对象.
    public function redis($key){
        //TODO: crc32在32位系统下会返回负数,因我们是部署在64位系统上,暂时忽略.
        assert(PHP_INT_SIZE === 8);
        $index = crc32($key) % count($this->shards);
        $shard = $this->shards[$index];
        if(isset($this->shard)){
            //尝试重用已有连接.
            if($this->shard[0] == $shard[0] and $this->shard[1] == $shard[1]){
                if($this->shard[2] != $shard[2]){
                    if(! $this->shard['redis']->select($shard[2])){
                        trigger_error('redis error: select ' . $shard[0] . ':' . $shard[1] . ':' .$shard[2], E_USER_ERROR);
                        return false;
                    }
                    $this->shard[2] = $shard[2];
                }
                return $this->shard['redis'];
            }
            $this->shard['redis']->close();
            unset($this->shard);
        }
        //新建连接.
        $shard['redis'] = new Redis();
        if(! $shard['redis']->connect($shard[0], $shard[1])){
            trigger_error('redis error: connect ' . $shard[0] . ':' . $shard[1], E_USER_ERROR);
            return false;
        }
        $db = intval($shard[2]);
        if($db != 0 and !$shard['redis']->select($db)){
            trigger_error('redis error: select ' . $shard[0] . ':' . $shard[1] . ':' .$shard[2], E_USER_ERROR);
            $shard['redis']->close();
            return false;
        }
        if(ENABLE_DEVELOP){
            trigger_error('redis connect success. ' . $shard[0] . ':' . $shard[1] . ':' . $shard[2], E_USER_NOTICE);
        }        
        $this->shard = $shard;
        return $this->shard['redis'];
    }
}