zyuzuguldu
4/6/2017 - 9:16 AM

Notes on Storm+Trident tuning

Notes on Storm+Trident tuning

=== Tuning Storm+Trident

Tuning a dataflow system is easy: 

----
The First Rule of Dataflow Tuning:
* Ensure each stage is always ready to accept records, and
* Deliver each processed record promptly to its destination
----

That may seem insultingly simplistic, but my point is that a) if you respect the laws of physics and economics, you can make your dataflow obey the First Rule; b) once your dataflow does obey the First Rule, stop tuning it.

Outline:

* Topology; Little's Law
  - skew
* System: machines; workers/machine, machine sizing; (zookeeper, kafka sizing)
* Throttling: batch size; kafka-partitions; max pending; trident batch delay; spout delay; timeout
* Congestion: number of ackers; queue sizing (exec send, exec recv, transfer)
* Memory: Max heap (Xmx), new gen/survivor size; (queue sizes)
* Ulimit, other ntwk sysctls for concurrency and ntwk; Netty vs ZMQ transport; drpc.worker.threads;
* Other important settings: preferIPv4; `transactional.zookeeper.root` (parent name for transactional state ledger in Zookeeper); `` (java options passed to _your_ worker function), `topology.worker.shared.thread.pool.size`
* Don't touch: `zmq.hwm` (unless you are seeing unreliable network trnsport under bursty load), disruptor wait strategy, worker receive buffer size,  `zmq.threads`

==== Goal

First, identify your principal goal: latency, throughput, memory or cost. We'll just discuss latency and throughput as goals -- tuning for cost means balancing the throughput (records/hour per machine) and cost of infrastructure (amortized $/hour per machine), so once you've chosen your hardware, tuning for cost is equivalent to tuning for throughput. I'm also going to concentrate on typical latency/throughput, and not on variance or 99th percentile figures or somesuch.

Next, identify your dataflow's principal bottleneck, the constraining resource that most tightly bounds the performance of its slowest stage. A dataflow can't pass through more records per second than the cumulative output of its most constricted stage, and it can't deliver records in less end-to-end time than the stage with the longest delay.

The principal bottleneck may be:

* _IO volume_:  there's a hardware bottleneck to the number of bytes per second that a machine's disks or network connection can sustain. Event log processing often involves large amounts of data requiring only parsing or other trivial transformations before storage -- throughput of such dataflows are IO bound.
* _CPU_: a CPU-bound flow spends more time in calculations to process a record
* _concurrency_: network requests to an external resource often require almost no CPU and minimal volume. If your principal goal is throughput, the flow is only bound by how many network requests you can make in parallel.
* _remote rate bottleneck bound_: alternatively, you may be calling an external resource that imposes a maximum throughput out of your control. A legacy datastore might only be able to serve a certain volume of requests before its performance degrades, or terms-of-service restrictions from a third-party web API (Google's Geolocation API.)
* _memory_: large windowed joins or memory-intensive analytics algorithms may require so much RAM it defines the machine characteristics

==== Initial tuning

If you're memory-bound, use machines with lots of RAM. Otherwise, start tuning on a machine with lots of cores and over-provision the RAM, we'll optimize the hardware later.

For a CPU-bound flow:

* Construct a topology with parallelism one
* set max-pending to one, use one acker per worker, and ensure that storm's `nofiles` ulimit is large (65000 is a decent number).
* Set the trident-batch-delay to be comfortably larger than the end-to-end latency -- there should be a short additional delay after each batch completes. 
* Time the flow through each stage.
* Increase the parallelism of CPU-bound stages to nearly saturate the CPU, and at the same time adjust the batch size so that state operations (aggregates, bulk database reads/writes, kafka spout fetches) don't slow down the total batch processing time.
* Keep an eye on the GC activity. You should see no old-gen or STW GCs, and efficient new-gen gcs (your production goal no more than one new-gen gc every 10 seconds, and no more than 10ms pause time per new-gen gc, but for right now just overprovision -- set the new-gen size to give infrequent collections and don't worry about pause times).

Once you have roughly dialed in the batch size and parallelism, check in with the First Rule. The stages upstream of your principal bottleneck should always have records ready to process. The stages downstream should always have capacity to accept and promptly deliver processed records.

==== Provisioning

Use one worker per topology per machine: storm passes tuples directly from sending executor to receiving executor if they're within the same worker. Also set number of ackers equal to number of workers -- the default of one per topology never makes sense (future versions of Storm will fix this).

Match your spout parallelism to its downstream flow. Use the same number of kafka partitions as kafka spouts (or a small multiple). If there are more spouts than kafka machines*kpartitions, the extra spouts will sit idle.

For CPU-bound stages, set one executor per core for the bounding stage (or one less than cores at large core count). Don't adjust the parallelism without reason -- even a shuffle implies network transfer. Shuffles don't impart any load-balancing.

For map states or persistentAggregates -- things where results are accumulated into memory structures -- allocate one stage per worker. Cache efficiency and batch request overhead typically improve with large record set sizes.

===== Concurrency Bound

In a concurrency bound problem, use very high parallelism
If possible, use a QueryFunction to combine multiple queries into a batch request.

===== Sidebar: Little's Law

* `Throughput (recs/s) = Capacity / Latency`
* you can't have better throughput than the collective rate of your slowest stage;
* you can't have better latency than the sum of the individual latencies.
    
If all records must pass through a stage that handles 10 records per second, then the flow cannot possibly proceed faster than 10 records per second, and it cannot have latency smaller than 100ms (1/10)

* with 20 parallel stages, the 95th percentile latency of your slowest stage becomes the median latency of the full set. (TODO: nail down numbers)


==== Batch Size

Set the batch size to optimize the throughput of your most expensive batch operation -- a bulk database operation, network request, or intensive aggregation. (There might instead be a natural batch size: for example the twitter `users/lookup` API call returns information on up to 100 distinct user IDs.)

===== Kafka Spout: Max-fetch-bytes

The batch count for the Kafka spout is controlled indirectly by the max fetch bytes. The resulting total batch size is at most `(kafka partitions) * (max fetch bytes)`.

For example, given a topology with six kafka spouts and four brokers with three kafka-partitions per broker, you have twelve kafka-partitions total, two per spout. When the MBCoordinator calls for a new batch, each spout produces two sub-batches (one for each kafka-partition), each into its own trident-partition. Now also say you have records of 1000 +/- 100 bytes, and that you set max-fetch-bytes to 100_000. The spout fetches the largest discrete number of records that sit within max-fetch-bytes -- so in this case, each sub-batch will have between 90 and 111 records. That means the full batch will have between 1080 and 1332 records, and 1_186_920 to 1_200_000 bytes.

===== Choosing a value

* `each()` functions should not care about batch size.
* `partitionAggregate`, `partitionPersist`, `partitionQuery` do.

Typically, you'll find that there are three regimes:

1. when it's too small, response time is flat -- it's dominated by bookeeping.
2. it then grows slowly with batch size. For example, a bulk put to elasticsearch will take about 200ms for 100 records, about 250ms for 1000 records, and about 300ms for 2000 records (TODO: nail down these numbers).
3. at some point, you start overwhelming some resource on the other side, and execution time increases sharply.

Since the execution time increases slowly in case (2), you get better and better records-per-second throughput. Choose a value that is near the top range of (2) but comfortably less than regime (3).

===== Executor send buffer size

Don't worry about this setting until most other things stabilize -- it's mostly important for ensuring that a burst of records doesn't clog the send queue.

Set the executor send buffer to be larger than the batch record count of the spout or first couple stages. Since it applies universally, don't go crazy with this value. It has to be an even power of two (1024, 2048, 4096, 8192, 16384).

==== Garbage Collection and other JVM options

Our worker JVM options:

	worker.childopts: >-
	    -Xmx2600m -Xms2600m -Xss256k -XX:MaxPermSize=128m -XX:PermSize=96m
	    -XX:NewSize=1000m -XX:MaxNewSize=1000m -XX:MaxTenuringThreshold=1 -XX:SurvivorRatio=6
	    -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled
	    -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly
	    -server -XX:+AggressiveOpts -XX:+UseCompressedOops -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true
	    -Xloggc:logs/gc-worker-%ID%.log -verbose:gc
	    -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1m
	    -XX:+PrintGCDetails -XX:+PrintHeapAtGC -XX:+PrintGCTimeStamps -XX:+PrintClassHistogram
	    -XX:+PrintTenuringDistribution -XX:-PrintGCApplicationStoppedTime -XX:-PrintGCApplicationConcurrentTime
	    -XX:+PrintCommandLineFlags -XX:+PrintFlagsFinal

This sets:

* New-gen size to 1000 MB (`-XX:MaxNewSize=1000m`). Almost all the objects running through storm are short-lived -- that's what the First Rule of data stream tuning says -- so almost all your activity is here.
* Apportions that new-gen space to give you 800mb for newly-allocated objects and 100mb for objects that survive the first garbage collection pass.
* Initial perm-gen size of 96m (a bit generous, but Clojure uses a bit more perm-gen than normal Java code would), and a hard cap of 128m (this should not change much after startup, so I want it to die hard if it does).
* Implicit old-gen size of 1500 MB (total heap minus new- and perm-gens) The biggest demand on old-gen space comes from long-lived state objects: for example an LRU counting cache or dedupe'r. A good initial estimate for the old-gen size is the larger of a) twice the old-gen occupancy you observe in a steady-state flow, or b) 1.5 times the new-gen size. The settings above are governed by case (b).
* Total heap of 2500 MB (`-Xmx2500m`): a 1000 MB new-gen, a 100 MB perm-gen, and the implicit 1500 MB old-gen. Don't use gratuitously more heap than you need -- long gc times can cause timeouts and jitter. Heap size larger than 12GB is trouble on AWS, and heap size larger than 32GB is trouble everywhere.
* Tells it to use the "concurrent-mark-and-sweep" collector for long-lived objects, and to only do so when the old-gen becomes crowded.
* Enables that a few mysterious performance options
* Logs GC activity at max verbosity, with log rotation

If you watch your GC logs, in steady-state you should see

* No stop-the-world (STW) gc's -- nothing in the logs about aborting parts of CMS
* old-gen GCs should not last longer than 1 second or happen more often than every 10 minutes
* new-gen GCs should not last longer than 50 ms or happen more often than every 10 seconds
* new-gen GCs should not fill the survivor space
* perm-gen occupancy is constant

Side note: regardless of whether you're tuning your overall flow for latency or throughput, you want to tune the GC for latency (low pause times). Since things like committing a batch can't proceed until the last element is received, local jitter induces global drag.

==== Tempo and Throttling

Max-pending (`TOPOLOGY_MAX_SPOUT_PENDING`) sets the number of tuple trees live in the system at any one time.

Trident-batch-delay (`topology.trident.batch.emit.interval.millis`) sets the maximum pace at which the trident Master Batch Coordinator will issue new seed tuples. It's a cap, not an add-on: if t-b-d is 500ms and the most recent batch was released 486ms, the spout coordinator will wait 14ms before dispensing a new seed tuple. If the next pending entry isn't cleared for 523ms, it will be dispensed immediately. If it took 1400ms, it will also be released immediately -- but no make-up tuples are issued.

Trident-batch-delay is principally useful to prevent congestion, especially around startup. As opposed to a traditional Storm spout, a Trident spout will likely dispatch hundreds of records with each batch. If max-pending is 20, and the spout releases 500 records per batch, the spout will try to cram 10,000 records into its send queue.


==== Machine Sizing


==== 

* System: machines; workers/machine, machine sizing; (zookeeper, kafka sizing)
* Throttling: batch size; kafka-partitions; max pending; trident batch delay; spout delay; timeout
* Congestion: number of ackers; queue sizing (exec send, exec recv, transfer); `zmq.threads`
* Memory: Max heap (Xmx), new gen/survivor size; (queue sizes)
* Ulimit, other ntwk sysctls for concurrency and ntwk; Netty vs ZMQ transport; drpc.worker.threads;
* Other important settings: preferIPv4; `transactional.zookeeper.root` (parent name for transactional state ledger in Zookeeper); `` (java options passed to _your_ worker function), `topology.worker.shared.thread.pool.size`
* Don't touch: `zmq.hwm` (unless you are seeing unreliable network trnsport under bursty load), disruptor wait strategy, worker receive buffer size
=== Lifecycle of a Record

==== Components

* **supervisor**
  - JVM process launched on each storm worker machine. Does not execute your code -- supervises it.
  - number of workers set by number of `supervisor.slots.ports`

* **worker**
  - jvm process launched by the supervisor
  - intra-worker transport is more efficient, so run one worker per topology per machine
  - if worker dies, supervisor will restart
  
* **Coordinator** generates new transaction ID
  - figures out what kafka hosts
  - sends tuple, which influences spout to dispatch a new batch
  - each transaction ID corresponds identically to single trident batch and vice-versa
  - Transaction IDs for a given topo_launch are serially incremented globally.
  - knows about Zookeeper /transactional; so it recovers the transaction ID

* **Kafka Spout** -- suppose 6 kafka spouts (3 per worker, 2 workers), reading from 24 partitions
  - each spout would ping 4 partitions assigned to it, pulling in `max_fetch_size` bytes from each: so we would get `12 * max_fetch_size` bytes on each worker, `24 * max_fetch_size` bytes in each batch
  - Each record becomes one kafka message, which becomes exactly one tuple
  - In our case, incoming records are about 1000 bytes, and messages add a few percent of size. (4000 records takes 4_731_999 bytes, which fits in a 5_000_000 max_fetch_size request).
  - Each trident batch is assembled in parallel across all spouts
  - So trident batch size is
    - `spout_batch_kb     ~= max_fetch_size * kafka_machines * kpartitions_per_broker / 1024`
    - `spout_batch_tuples ~= spout_batch_kb * 1024 / bytes_per_record`
    - `record_bytes       ~= 1000 bytes`

* **Executor**
  - Each executor is responsible for one bolt or spout
  - so with 3 kafka spouts on a worker, there are three executors spouting



==== Storm Transport

Each executor (bolt or spout) has two disruptor queues: its 'send queue' (the individual tuples it emits) and its 'receive queue' (batches of tuples staged for processing)footnote:[It might seem odd that the spout has a receive queue, but much of storm's internal bookkeeping is done using tuples -- there's actually a regular amount of traffic sent to each spout].

===== Disruptor Queue

At the heart

===== Spout Tuple Handling

* If the spout executor's async-loop decides conditions are right, it calls the spout's `nextTuple()` method.
* The spout can then emit zero, one or many tuples, which the emitter publishes non-blocking into the spout's executor send queue (see below for details).
* Each executor send queue (spout or bolt) has an attached router (`transfer-fn`). In an infinite loop, it
  - lays claim to all messages currently in the queue (everything between its last-read position and the write head), and loads them into a local tuple-batch.
  - sorts tuples into two piles: local ones, destined for tasks on this worker; and remote ones, destined for tasks on other workers.
  - all the remote tuples are published (blocking) as a single batch into the worker's transfer queue; they'll be later sent over the network each to the appropriate worker
  - the router regroups the tuples by task, and publishes (blocking) each tuple-batch into that task's executor receive buffer.
  Note that the executor send queue holds individual _tuples_, where as the worker transfer queue and executor receive queues hold _collections of tuples_. An executor send queue size of 1024 slots with an executor receive queue size of 2048 slots means there won't ever be more than `2048 * 1024` tuples waiting for that executor to process. It's important also to recognize that, although the code uses the label `tuple-batch` for these collections of tuples, they have nothing to do with the higher-level concept of a 'Trident batch' you'll meet later.

===== Bolt Tuple Handling



===== Worker Transfer and Receive Handlers


Unlike the transfer and the executor queues, the worker's receive buffer is a ZeroMQ construct, not a disruptor queue

==== Acking In Storm

* Noah is processed, produces Ham and Shem. Ack clears Noah, implicates Ham and Shem
* Shem is processed, produces Abe. Ack clears Shem, implicates Abe
* Ham is processed, produces non;e. Ack clears Ham


* Alice does a favor for Bob and Charlie. Alice is now in the clear; Bob and Charlie owe
* 


* For every record generated, send it to the acker
* Who keeps it in a table
* For every record completed, send it to the acker
* Who removes it from the table
* Maintain tickets in a tree structure so you know what to retry

Instead,

* When the tuple tree is created, send an ack-init: the clan id along with its edge checksum
* When each tuple is successfully completed, send an ack holding two sixty-four bit numbers: the tupletree id, and the XOR of its edge id and all the edge ids it generated. Do this for each of its tupletree ids.
* The acker holds a single O(1) lookup table
    - it's actually a set of lookup tables: current, old and dead. new tuple trees are added to the current bucket; every timeout number of seconds, current becomes old, and old becomes dead -- they are declared failed and their records retried.
* The spout holds the original tuple until it receives notice from the acker. The spout won't fetch more than the max-pending number of tuples: this is to protect the spout against memory pressure , and the downstream system against congestion.



When a tuple is born in the spout,

* creates a `root-id` -- this will identify the tuple tree. Let's say it had the value `3`.
* for all the places the tuple will go, makes an `edge-id` (`executor.clj:465`)
  - set the ack tree as `{ root_id: edge_id }`. Say the tuple was to be sent to three places; it would call `out_tuple(... {3: 100})`, `out_tuple(... {3: 101})`, `out_tuple(... {3: 102})`.
* XORs all the edge_id's together to form a partial checksum: `100 ^ 101 ^ 102`.
* sends an `init_stream` tuple to the acker as `root_id, partial_checksum, spout_id`
* the tuple's `ack val` starts at zero.

When a tuple is sent from a bolt, it claims one or more anchors (the tuples it came from), and one or more destination task ids.


===== Acker Walkthrough

When a tuple is born in the spout,

* creates a `root-id` -- this will identify the tuple tree. Let's say it had the value `3`.
* for all the places the tuple will go, makes an `edge-id` (`executor.clj:465`)
  - set the ack tree as `{ root_id: edge_id }`. Say the tuple was to be sent to three places; it would call `out_tuple(... {3: 100})`, `out_tuple(... {3: 101})`, `out_tuple(... {3: 102})`.
* XORs all the edge_id's together to form a partial checksum: `100 ^ 101 ^ 102`.
* sends an `init_stream` tuple to the acker as `root_id, partial_checksum, spout_id`
* the tuple's `ack val` starts at zero.

When a tuple is sent from a bolt, it claims one or more anchors (the tuples it came from), and one or more destination task ids.

[[acker_lifecycle_simple]]
.Acker Lifecycle: Simple
[cols="1*<.<d,1*<.<d,1*<.<d",options="header"]
|=======
| Event    		 	| Tuples			    	| Acker Tree
| spout emits one tuple to bolt-0 	| noah:   `<~,     { noah: a  }>`   	|
| spout sends an acker-init tuple, seeding the ack tree with `noah: a`
                                       	|                                 	| `{ noah: a }`
| bolt-0 emits two tuples to bolt-1 anchored on `noah`. Those new tuples each create an edge-id for each anchor, which is XORed into the anchor's `ackVal` and used in the new tuple's message-id.
                                        | shem: `<~,       { noah: b  }>` + 
                                          ham:  `<~,       { noah: c  }>` + 
                                          noah: `<b^c,     { noah: a  }>` 	|
| bolt-0 acks acks `noah` using the XOR of its ackVal and tuple tree: `noah: a^b^c`. Since `a^a^b^c = b^c`, this clears off the key `a`, but implicates the keys `b` and `c` -- the tuple tree remains incomplete.
                                      	|                                    	| `{ noah: b^c }`
| bolt-1 processes `shem`, emits `abe` to bolt-2
                                       	| abe:    `<~,     { noah: d  }>` + 
                                     	  shem:   `<d,     { noah: b  }>`  	|
| bolt-1 acks `shem` with `noah: d^b`  	|                                      	| `{ noah: c^d }`
| bolt-1 processes `ham`, emits nothing	| ham:    `<~,     { noah: c  }>`	|
| bolt-1 acks `ham` with `noah: c`   	|                                   	| `{ noah: d }`
| bolt-1 processes `abe`, emits nothing	| abe:    `<~,     { noah: d  }>`	|
| bolt-1 acks `abe` with `noah: d`	|                                  	| `{ noah: 0 }`
| acker removes noah from ledger, notifies spout
                                        |                                    	| `{}`
|	|	|
| `______________________`            	| `______________________________`	| `___________________`
|=======

===== Acker

* Acker is just a regular bolt -- all the interesting action takes place in its execute method.
* it knows
  - id == `tuple[0]` (TODO what is this)
  - the tuple's stream-id
  - there is a time-expiring data structure, the `RotatingHashMap`
    - it's actually a small number of hash maps;
    - when you go to update or add to it, it performs the operation on the right component HashMap.
    - periodically (when you receive a tick tuple), it will pull off oldest component HashMap, mark it as dead; invoke the expire callback for each element in that HashMap.
* get the current checksum from `pending[id]`.

pending has objects like `{ val: "(checksum)", spout_task: "(task_id)" }`

* when it's an ACKER-INIT-STREAM
  `pending[:val] = pending[:val] ^ tuple[1]`
*


pseudocode

    class Acker < Bolt

  def initialize
	  self.ackables = ExpiringHash.new
	end

  	def execute(root_id, partial_checksum, from_task_id)
	  stream_type = tuple.stream_type
	  ackables.expire_stalest_bucket if (stream_type == :tick_stream)
	  curr = ackables[root_id]

	  case stream_type
	  when :init_stream
	    curr[:val]        = (curr[:val]	|| 0) ^ partial_checksum
	    curr[:spout_task] = from_task_id
	  when :ack_stream
	    curr[:val]        = (curr[:val]	|| 0) ^ partial_checksum
	  when :fail_stream
	    curr[:failed]     = true
	  end

	  ackables[root_id] = curr

	  if    curr[:spout_task] && (curr[:val] == 0)
	    ackables.delete(root_id)
	    collector.send_direct(curr[:spout_task], :ack_stream, [root_id])
	  elsif curr[:failed]
	    ackables.delete(root_id)
	    collector.send_direct(curr[:spout_task], :fail_stream, [root_id])
	  end

	  collector.ack # yeah, we have to ack as well -- we're a bolt
	end

    end






===== A few details

There's a few details to clarify:

First, the spout must never block when emitting -- if it did, critical bookkeeping tuples might get trapped, locking up the flow. So its emitter keeps an "overflow buffer", and publishes as follows:

* if there are tuples in the overflow buffer add the tuple to it -- the queue is certainly full.
* otherwise, publish the tuple to the flow with the non-blocking call. That call will either succeed immediately ...
* or fail with an `InsufficientCapacityException`, in which case add the tuple to the overflow buffer

The spout's async-loop won't call `nextTuple` if overflow is present, so the overflow buffer only has to accomodate the maximum number of tuples emitted in a single `nextTuple` call.



===== Code Locations

Since the Storm+Trident code is split across multiple parent directories, it can be hard to track where its internal logic lives. Here's a guide to the code paths as of version `0.9.0-wip`.

[[storm_transport_code]]
.Storm Transport Code
|=======
| Role			 	| source path				    	|
| `async-loop`		 	| `clj/b/s/util.clj`		    	|
| Spout instantiation	 	| `clj/b/s/daemon/executor.clj`  	| `mk-threads :spout`
| Bolt instantiation	 	| `clj/b/s/daemon/executor.clj`  	| `mk-threads :bolt`
| Disruptor Queue facade 	| `clj/b/s/disruptor.clj` and `jvm/b/s/utils/disruptor.java`  	|
| Emitter->Send Q logic	 	| `clj/b/s/daemon/executor.clj`  	| `mk-executor-transfer-fn`
| Router (drains exec send Q)	| `clj/b/s/daemon/worker.clj`	    	| `mk-transfer-fn`	| infinite loop attached to each disruptor queue
| Local Send Q -> exec Rcv Q 	| `clj/b/s/daemon/worker.clj`	    	| `mk-transfer-local-fn`	| invoked within the transfer-fn and receive thread
| Worker Rcv Q -> exec Rcv Q 	| `clj/b/s/messaging/loader.clj` 	| `launch-receive-thread!`	| Worker Rcv Q -> exec Rcv Q
| Trans Q -> zmq	 	| `clj/b/s/daemon/worker.clj`	    	| `mk-transfer-tuples-handler`
| `..`			 	| `clj/b/s/daemon/task.clj`	    	|
| `..`			 	| `clj/b/s/daemon/acker.clj`	    	|
| `..`			 	| `clj/b/s/`			    	|
|=======


=== More on Transport


* **Queues between Spout and Wu-Stage**: exec.send/transfer/exec.receive buffers
  - output of each spout goes to its executor send buffer
  - router batches records destined for local executors directly to their receive disruptor Queues, and records destined for _all_ remote workers in a single m-batch into this worker's transfer queue buffer.
  - ?? each spout seems to match with a preferred downstream executor
    **question**: does router load _all_ local records, or just one special executors', directly send buf=> receive buf
  - IMPLICATION: If you can, size the send buffer to be bigger than `(messages/trident batch)/spout` (i.e., so that each executor's portion of a batch fits in it).
  - router in this case recognizes all records are local, so just deposits each m-batch directly in wu-bolt's exec.receive buffer.
  - The contents of the various queues live in memory, as is their wont. IMPLICATION: The steady-state size of all the various buffers should fit in an amount of memory you can afford. The default worker heap size is fairly modest -- ??768 MB??.

* **Wu-bolt** -- suppose 6 wu-bolts (3 per worker, 2 workers)
  - Each takes about `8ms/rec` to process a batch.
  - As long as the pipeline isn't starved, this is _always_ the limit of the flow. (In fact, let's say that's what we mean by the pipeline being starved)
  - with no shuffle, each spout's records are processed serially by single wukong doohickey
  - IMPLICATION: max spout pending must be larger than `(num of wu-bolt executors)` for our use case. (There is controversy about how _much_ larger; initially, we're going to leave this as a large multiple).

* **Queues between Wu stage and State+ES stage**
  - each input tuple to wu-stage results in about 5x the number of output tuples
  - If ??each trident batch is serially processed by exactly one wukong ruby process??, each wu executor outputs `5 * adacts_per_batch`
  - IMPLICATION: size exec.send buffer to hold an wu-stage-batch's worth of output tuples.

* **Group-by guard**
  - records are routed to ES+state bolts uniquely by group-by key.
  - network transfer, and load on the transfer buffer, are inevitable here
  - IMPLICATION: size transfer buffer comfortably larger than `wukong_parallelism/workers_count`

* **ES+state bolt** -- Transactional state with ES-backed cache map.
  - each state batch gets a uniform fraction of aggregables
  - tuple tree for each initial tuple (kafka message) exhausts here, and the transaction is cleared.
  - the batch's slot in the pending queue is cleared.
  - we want `(time to go thru state-bolt) * (num of wu-bolt executors) < (time to go thru one wu-bolt)`, because we do not want the state-bolt stage to be the choking portion of flow.

* **Batch size**:
  - _larger_: a large batch will condense more in the aggregation step -- there will be proportionally fewer PUTs to elasticsearch per inbound adact
  - _larger_: saving a large batch to ES is more efficient per record (since batch write time increases slowly with batch size)
  - _smaller_: the wu-stage is very slow (8ms/record), and when the flow starts the first wave of batches have to work through a pipeline bubble. This means you must size the processing timeout to be a few times longer than the wu-stage time, and means the cycle time of discovering a flow will fail is cumbersome.
  - IMPLICATION: use batch sizes of thousands of records, but keep wukong latency under 10_000 ms.
    - initially, more like 2_000 ms

* **Transactionality**: If any tuple in a batch fails, all tuples in that batch will be retried.
  - with transactional (non-opaque), they are retried for sure in same batch.
  - with opaque transactional, they might be retried in different or shared batches.


==== Variables

	  storm_machines               --       4 ~~ .. How fast you wanna go?
	  kafka_machines               --       4 ~~ .. see `kpartitions_per_broker`
	  kpartitions_per_broker       --       4 ~~ .. such that `kpartitions_per_broker * kafka_machines` is a strict multiple of `spout_parallelism`.
	  zookeeper_machines           --       3 ~~ .. three, for reliability. These should be very lightly loaded
	  workers_per_machine          --       1 ~~ ?? one per topology per machine -- transport between executors is more efficient when it's in-worker
	  workers_count                --       4 ~~ .. `storm_machines * workers_per_machine`

	  spouts_per_worker	       --       4 ~~ .. same as `wukongs_per_worker` to avoid shuffle
	  wukongs_per_worker	       --       4 ~~ .. `cores_per_machine / workers_per_machine` (or use one less than cores per machine)
	  esstates_per_worker          --       1 ~~ .. 1 per worker: large batches distill aggregates more, and large ES batch sizes are more efficient, and this stage is CPU-light.
	  shuffle between spout and wu --   false ~~ .. avoid network transfer

	  spout_parallelism	       --       4 ~~ .. `workers_count * spouts_per_worker`
	  wukong_parallelism	       --      16 ~~ .. `workers_count * wukongs_per_worker`
	  esstate_parallelism          --       4 ~~ .. `workers_count * esstates_per_worker`

	  wu_batch_ms_target           --     800 ~~ .. 800ms processing time seems humane. Choose high enough to produce efficient batches, low enough to avoid timeouts, and low enough to make topology launch humane.
	  wu_tuple_ms                  --       8 ~~ .. measured average time for wu-stage to process an adact
	  adact_record_bytes           --    1000 ~~ .. measured average adact bytesize.
	  aggregable_record_bytes      --     512 ~~ ?? measured average aggregable bytesize.
	  spout_batch_tuples           --    1600 ~~ .? `(wu_batch_ms_target / wu_tuple_ms) * wukong_parallelism`
	  spout_batch_kb               --    1600 ~~ .. `spout_batch_tuples * record_bytes / 1024`
	  fetch_size_bytes             -- 100_000 ~~ .. `spout_batch_kb * 1024 / (kpartitions_per_broker * kafka_machines)`

	  wukong_batch_tuples          --    8000 ~~ ?? about 5 output aggregables per input adact
	  wukong_batch_kb              --      xx ~~ ?? each aggregable is about yy bytes

	  pending_ratio                --       2 ~~ .. ratio of pending batch slots to workers; must be comfortably above 1, but small enough that `adact_batch_kb * max_spout_pending << worker_heap_size`
	  max_spout_pending            --      32 ~~ .. `spout_pending_ratio * wukong_parallelism`

	  worker_heap_size_mb          --     768 ~~ .. enough to not see GC activity in worker JVM. Worker heap holds counting cache map, max_spout_pending batches, and so forth
	  counting_cachemap_slots      --   65535 ~~ .. enough that ES should see very few `exists` GET requests (i.e. very few records are evicted from counting cache)

	  executor_send_slots	       --   16384 ~~ .. (messages)  larger than (output tuples per batch per executor). Must be a power of two.
	  transfer_buffer_mbatches     --      32 ~~ ?? (m-batches) ?? some function of network latency/thruput and byte size of typical executor send buffer. Must be a power of two.
	  executor_receive_mbatches    --   16384 ~~ ?? (m-batches) ??. Must be a power of two.
	  receiver_buffer_mbatches     --       8 ~~ .. magic number, leave at 8. Must be a power of two.

	  trident_batch_ms             --     100 ~~ .. small enough to ensure continuous processing
	  spout_sleep_ms               --      10 ~~ .. small enough to ensure continuous processing; in development, set it large enough that you're not spammed with dummy transactions (eg 2000ms)

	  scheduler                    --    isol ~~ .. Do not run multiple topologies in production without this

==== Refs

* http://www.slideshare.net/lukjanovsv/twitter-storm?from_search=1