Real-time Twitter trending on a budget with riemann

I recently stumbled upon this article by Michael Noll which explains a strategy for computing twitter trends with Storm.

I love Storm, but not everyone has a cluster already, and I think computing tops is a problem that lends itself well to single node computing since datasets often are very simple (in the twitter trend case we store a tuple of hashtag and time of insertion) and thus can fit in a single box’s memory capacity while being able to service many events per second.

It turns out, riemann is a great tool for tackling this type of problem and is able to handle a huge amount of events per second while keeping a small and concise configuration.

It goes without saying that Storm will be a better performer when you are trying to compute a vast amount of data (for instance, the real twitter firehose).

Accumulating tweets

In this example we will compute twitter trends from a sample of the firehose, as provided by twitter. The tweetstream ruby library provides a very easy way to process the “sample hose” and here is a small script which extracts hash tags from tweets and publishes them to a local riemann instance:

require 'tweetstream'
require 'riemann/client'

TweetStream.configure do |config|
  config.consumer_key       = 'xxx'
  config.consumer_secret    = 'xxx'
  config.oauth_token        = 'xxx'
  config.oauth_token_secret = 'xxx'
  config.auth_method        = :oauth

riemann = do |status|
  status.text.scan(/\s#([[:alnum:]]+)/).map{|x| x.first.downcase}.each do |tag|
    riemann << {service: tag, metric: 1.0, tags: ["twitter"], ttl: 3600}

For each tweet in the firehose we emit a riemann event tagged with twitter and a metric of 1.0, the service is the tag which was found.

The rationale for computing trends is as follows:

Riemann provides several facilities out of the box which can be used to implement this, most noticeably:

With recent changes in riemann’s top function we can use this simple configuration to compute trends:

(let [store    (index)
      trending (top 10 (juxt :metric :time) (tag "top" store) store)]
    (by :service (moving-time-window 3600 (smap folds/sum trending)))))

Let’s break down what happens in this configuration.

Now let’s look a bit more in-depth at what is provided by the trending stream. We are using the 4-arity version of top, so in this case:

Fetching results

Running twitter-hose.rb against such a configuration we can now query the index to retrieve. With the ruby riemann-client gem we just retrieve the indexed elements tagged with top:

require 'riemann/client'
require 'pp'

client =
pp client['tagged "top"']

Going further

It might be interesting to play with a better comparison function than (juxt :metric :time), it would be interesting to compute a decay factor from the time and apply it to the metric and let comparisons be done on this output.

The skeleton of such a function could be:

(def decay-factor xxx)

(defn decaying [{:keys [metric time] :as event}]
  (let [ (unix-time)]
    (- metric (* ((unix-time) - time) decay-factor))))

This would allow expiring old trends quicker.

The full code for this example is available at:


Other applications

When transferring that problem domain to the typical datasets riemann handles, the top stream can be a great way to find outliers in a production environment, in terms of CPU consumption, bursts of log types.

Toy scaling strategies

I’d like to advise implementers to look beyond riemann for scaling top extraction from streams, as tools like Storm are great for these use cases.

But in jest, I’ll mention that since the riemann-kafka plugin - by yours truly - allows producing and consuming to and from kafka queues, intermediate riemann cores could compute local tops and send the aggregated results over to a central riemann instance which would then determine the overall top.

I hope this gives you a good glimpse of what riemann can provide beyond simple threshold alerts.