raphaelfeng
6/24/2011 - 10:07 PM

A way of implementing a poll-based chat inside Redis

A way of implementing a poll-based chat inside Redis


'''
redis_simple_chat.py

Written June 24, 2011 by Josiah Carlson
Released under the GNU GPL v2
available: http://www.gnu.org/licenses/gpl-2.0.html

Other licenses may be available upon request.

This module intends to offer a simple way for Redis to hold state for chat
channels over time. Clients send messages, check for messages since they last
checked, and some cron daemon calls a cleanup function on occasion.

Requires a recent version of https://github.com/andymccurdy/redis-py/ or a
work-alike.

'''

import time

import redis

def send_message(conn, channel, message):
    '''
    This function will send a message to the provided channel.
    '''
    # get the id for this message
    with conn.Lock('lock:channel:' + channel, timeout=5):
        id = conn.zincrby('ids:', channel, 1)
        pipeline = conn.pipeline(transaction=True)
        # store the data
        pipeline.hset('messages:' + channel, id, message)
        # update the timeline
        pipeline.zadd('timeline:' + channel, id, id)
        # and keep a record about when this channel last got a message
        pipeline.zadd('updated:', channel, time.time())
        pipeline.execute()

def check_messages(conn, client, channel, limit=10):
    '''
    This function will check messages for a client on a given channel.
    '''
    # we must ensure that only one instance of this client is fetching data
    # at a time
    with conn.Lock('lock:client:' + client, timeout=5):
        with conn.lock('lock:channel:' + channel, timeout=5):
            pipeline = conn.pipeline(True)
            cl_ch = client + ':' + channel
            progress = 'clients:' + cl_ch
            # find out which messages we already know about
            pipeline.zinterstore(progress + ':tmp', ['timeline:' + channel, progress], aggregate='MAX')
            # set the last time this client checked for messages
            pipeline.zadd('clients:', cl_ch, time.time())
            results = pipeline.execute()
            if results[-1]:
                # Client timed out, or client is new.
                pipeline.zunionstore(progress, ['timeline:' + channel])
                pipeline.delete(progress + ':tmp')
                pipeline.execute()
                return 'new connection'
            elif not results[0]:
                # First messages in a new channel, or a channel that had been
                # deleted due to timeout.
                conn.zunionstore(progress + 'tmp', ['timeline:' + channel])
            if limit is not None:
                pipeline.zremrangebyrank(progress + ':tmp', limit, -1)
            # get the ids, update the known ids
            pipeline.zrange(progress + ':tmp', 0, -1)
            pipeline.zunionstore(progress, [progress, progress + ':tmp'], aggregate='MAX')
            # discard memory of timed-out messages
            pipeline.zinterstore(progress, [progress, 'timeline:' + channel], aggregate='MAX')
            pipeline.delete(progress + ':tmp')
            ids = pipeline.execute()[-4]
            # return the known messages
            return [msg for msg in conn.hmget('messages:' + channel, ids) if msg]

def clean_out_channel_backlog(conn, backlog=100, channel_timeout=900, client_timeout=300):
    '''
    This function cleans out old messages from channels, old channels, and
    information about old client/channel pairs.
    '''
    # only one backlog cleanup function call can run at a time
    with conn.Lock('lock:cleanup:'):
        pipeline = conn.pipeline(True)

        # find those channels that haven't been updated for a while
        ch_timeout = time.time() - channel_timeout
        for channel, score in conn.zrangebyscore('updated:', 0, ch_timeout, withscores=True):
            with conn.Lock('lock:channel:' + channel, timeout=5):
                if conn.zscore('updated:', channel) == score:
                    pipeline.delete(
                        'messages:' + channel,
                        'timeline:' + channel,
                    )
                    pipeline.zrem('updated:', channel)
                    pipeline.zrem('ids:', channel)
                    pipeline.execute()

        # get a prioritized list of those channels that have the most volume
        pipeline.zinterstore('ids:tmp', {'ids:cleanup': 1, 'ids:': -1})
        pipeline.zunionstore('ids:cleanup', ['ids:'])
        known = pipeline.execute()[0]
        if not known:
            # if we've never cleaned up before, clean them all
            known = conn.zunionstore('ids:tmp', ['ids:cleanup'])

        # iterate over chunks of channels to clean out old messages
        for i in xrange(0, known, 100):
            for channel in conn.zrange('ids:tmp', i, i+99):
                with conn.Lock('lock:channel:' + channel, timeout=5):
                    # remove old messages from the timeline
                    message_ids = conn.zrange('timeline:' + channel, 0, -backlog-1)
                    for id in message_ids:
                        pipeline.hdel('messages:' + channel, id)
                        pipeline.zrem('timeline:' + channel, id)
                    pipeline.execute()
        conn.delete('ids:tmp')

        # clean out old clients that haven't checked recently
        cl_timeout = time.time() - client_timeout
        for client, score in conn.zrange('clients:', 0, cl_timeout, withscores=True):
            with conn.Lock('lock:client' + client, timeout=5):
                if conn.zscore('clients:', client) == score:
                    # remove the notice of when the item was last called
                    pipeline.zrem('clients:', client)
                    # clean out the progress zset
                    pipeline.delete('clients:' + client)
                    pipeline.execute()