# Tee with Sinatra
# http://rigelgroupllc.com/wp/blog/tee-with-sinatra
# This proxy script will accept HTTP requests meant for Riak
# and in addition to passing them on to Riak will send a duplicate
# request to an ElasticSearch cluster
# -- John Lynch, www.rigelgroupllc.com
require 'rubygems'
require 'sinatra'
require 'typhoeus'
OPTIONS[:riak_host] = "localhost"
OPTIONS[:riak_port] = "8098"
OPTIONS[:es_host] = "localhost"
OPTIONS[:es_port] = "9200"
OPTIONS[:riak_timeout] = 5000 # milliseconds
OPTIONS[:es_timeout] = 5000 # milliseconds
class Rack::Proxy
def initialize(app)
@app = app
@hydra = Typhoeus::Hydra.new
def call(env)
req = Rack::Request.new(env)
# We need to use it twice, so read in the stream. This is an obvious problem with large bodies, so beware.
req_body = req.body.read if req.body
riak_url = "http://#{OPTIONS[:riak_host]}:#{OPTIONS[:riak_port]}#{req.fullpath}"
opts = {:timeout => OPTIONS[:riak_timeout]}
opts.merge!(:method => req.request_method.downcase.to_sym)
opts.merge!(:headers => {"Content-type" => req.content_type}) if req.content_type
opts.merge!(:body => req_body) if req_body && req_body.length > 0
riak_req = Typhoeus::Request.new(riak_url, opts)
riak_response = {}
riak_req.on_complete do |response|
riak_response[:code] = response.code
riak_response[:headers] = response.headers_hash
riak_response[:body] = response.body
@hydra.queue riak_req
# If we are putting or posting JSON, send a copy to the ElasticSearch index named "riak"
if (req.put? || req.post?) && req.content_type == "application/json"
req.path =~ %r{^/riak/([^/]+)/([^/]+)}
bucket, key = $1, $2
es_url = "http://#{OPTIONS[:es_host]}:#{OPTIONS[:es_port]}/riak/#{bucket}/#{key}"
opts = {:timeout => OPTIONS[:es_timeout]}
opts.merge!(:method => req.request_method.downcase.to_sym)
opts.merge!(:body => req_body) if req_body && req_body.length > 0
es_req = Typhoeus::Request.new(es_url, opts)
es_response = {}
es_req.on_complete do |response|
es_response[:code] = response.code
es_response[:headers] = response.headers_hash
es_response[:body] = response.body
@hydra.queue es_req
# Concurrently executes both HTTP requests, blocks until they both finish
#If we wrote to ES add a custom header
riak_response[:headers].merge!("X-ElasticSearch-ResCode" => es_response[:code].to_s) if es_response && es_response[:code]
#Typhoeus can add nil headers, lets get rid of them
riak_response[:headers].delete_if {|k,v| v == nil}
# Return original Riak response to client
[riak_response[:code], riak_response[:headers], riak_response[:body]]
use Rack::Proxy