johnthethird
3/29/2010 - 4:48 AM

riak-tee.rb

# Tee with Sinatra
# http://rigelgroupllc.com/wp/blog/tee-with-sinatra
# This proxy script will accept HTTP requests meant for Riak
# and in addition to passing them on to Riak will send a duplicate
# request to an ElasticSearch cluster
#   -- John Lynch, www.rigelgroupllc.com
#

require 'rubygems'
require 'sinatra'
require 'typhoeus'

OPTIONS = {}
OPTIONS[:riak_host] = "localhost"
OPTIONS[:riak_port] = "8098"
OPTIONS[:es_host] = "localhost"
OPTIONS[:es_port] = "9200"
OPTIONS[:riak_timeout] = 5000 # milliseconds
OPTIONS[:es_timeout] = 5000 # milliseconds
 
class Rack::Proxy
  
  def initialize(app)
    @app = app
    @hydra = Typhoeus::Hydra.new
  end
 
  def call(env)
    req = Rack::Request.new(env)
    # We need to use it twice, so read in the stream. This is an obvious problem with large bodies, so beware.
    req_body = req.body.read if req.body 
    
    riak_url = "http://#{OPTIONS[:riak_host]}:#{OPTIONS[:riak_port]}#{req.fullpath}"

    opts = {:timeout => OPTIONS[:riak_timeout]}
    opts.merge!(:method => req.request_method.downcase.to_sym)
    opts.merge!(:headers => {"Content-type" => req.content_type}) if req.content_type
    opts.merge!(:body => req_body) if req_body && req_body.length > 0

    riak_req = Typhoeus::Request.new(riak_url, opts)
    riak_response = {}
    riak_req.on_complete do |response|
      riak_response[:code] = response.code
      riak_response[:headers] = response.headers_hash
      riak_response[:body] = response.body
    end
    @hydra.queue riak_req

    # If we are putting or posting JSON, send a copy to the ElasticSearch index named "riak"
    if (req.put? || req.post?) && req.content_type == "application/json" 
      req.path =~ %r{^/riak/([^/]+)/([^/]+)}
      bucket, key = $1, $2
      es_url = "http://#{OPTIONS[:es_host]}:#{OPTIONS[:es_port]}/riak/#{bucket}/#{key}"
      opts = {:timeout => OPTIONS[:es_timeout]}
      opts.merge!(:method => req.request_method.downcase.to_sym)
      opts.merge!(:body => req_body) if req_body  && req_body.length > 0
      es_req = Typhoeus::Request.new(es_url, opts)
      es_response = {}
      es_req.on_complete do |response|
        es_response[:code] = response.code
        es_response[:headers] = response.headers_hash
        es_response[:body] = response.body
      end
      @hydra.queue es_req
    end

    # Concurrently executes both HTTP requests, blocks until they both finish
    @hydra.run
    
    #If we wrote to ES add a custom header
    riak_response[:headers].merge!("X-ElasticSearch-ResCode" => es_response[:code].to_s) if es_response && es_response[:code]
    
    #Typhoeus can add nil headers, lets get rid of them
    riak_response[:headers].delete_if {|k,v| v == nil} 

    # Return original Riak response to client
    [riak_response[:code], riak_response[:headers], riak_response[:body]]
  end
end

use Rack::Proxy