I recently stumbled upon this article http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-trending-topics-in-storm/ by Michael Noll which explains a strategy for computing twitter trends with Storm.
I love Storm, but not everyone has a cluster already, and I think computing tops is a problem that lends itself well to single node computing since datasets often are very simple (in the twitter trend case we store a tuple of hashtag and time of insertion) and thus can fit in a single box’s memory capacity while being able to service many events per second.
It turns out, riemann is a great tool for tackling this type of problem and is able to handle a huge amount of events per second while keeping a small and concise configuration.
It goes without saying that Storm will be a better performer when you are trying to compute a vast amount of data (for instance, the real twitter firehose).
Accumulating tweets
In this example we will compute twitter trends from a sample of the firehose, as provided by twitter. The tweetstream ruby library provides a very easy way to process the “sample hose” and here is a small script which extracts hash tags from tweets and publishes them to a local riemann instance:
require 'tweetstream'
require 'riemann/client'
TweetStream.configure do |config|
config.consumer_key = 'xxx'
config.consumer_secret = 'xxx'
config.oauth_token = 'xxx'
config.oauth_token_secret = 'xxx'
config.auth_method = :oauth
riemann = Riemann::Client.new
TweetStream::Client.new.sample do |status|
status.text.scan(/\s#([[:alnum:]]+)/).map{|x| x.first.downcase}.each do |tag|
riemann << {service: tag, metric: 1.0, tags: ["twitter"], ttl: 3600}
For each tweet in the firehose we emit a riemann event tagged with
and a metric of 1.0, the service is the tag which was found.
Computing trends in riemann
The rationale for computing trends is as follows:
- Keep a moving time window of an hour
- Compute per-tag counts
- Sort by computed count, then by time
- Keep the top N events
Riemann provides several facilities out of the box which can be used to implement this, most noticeably:
- The
stream which separates events in two streams: top & bottom - The
With recent changes in riemann’s top
function we can use this simple
configuration to compute trends:
(let [store (index)
trending (top 10 (juxt :metric :time) (tag "top" store) store)]
(by :service (moving-time-window 3600 (smap folds/sum trending)))))
Let’s break down what happens in this configuration.
- We create an index and a
stream which keeps the top 10 trending hashtags, we’ll get back to this one later. - For each incoming event, we split on service (the hashtag), and then sum all occurences in the last hour
- This generate an event whose metric is the number of occurences in an hour which gets sent to trending
Now let’s look a bit more in-depth at what is provided by the trending
stream. We are using the 4-arity version of top
, so in this case:
- We want to compute the top 10 (first argument)
- We compare and sort events using the
(juxt :metric :time)
yields a vector, which is the result of applying its arguments to its input. For an input event{:metric 1.0 :time 2}
our function will yield[1.0 2]
, we leverage the fact that vectors implement theComparable
interface and thus will correctly sort event by metric, then time - We send events belonging to the top 10 to the stream
(tag "top" store)
- We send events not belonging to the top 10 or bumped from the top 10
to the stream
Fetching results
Running twitter-hose.rb against such a configuration we can now query
the index to retrieve. With the ruby riemann-client
gem we just
retrieve the indexed elements tagged with top:
require 'riemann/client'
require 'pp'
client = Riemann::Client.new
pp client['tagged "top"']
Going further
It might be interesting to play with a better comparison function than
(juxt :metric :time)
, it would be interesting to compute a decay
factor from the time and apply it to the metric and let comparisons be
done on this output.
The skeleton of such a function could be:
(def decay-factor xxx)
(defn decaying [{:keys [metric time] :as event}]
(let [ (unix-time)]
(- metric (* ((unix-time) - time) decay-factor))))
This would allow expiring old trends quicker.
The full code for this example is available at:
Other applications
When transferring that problem domain to the typical datasets riemann handles, the top stream can be a great way to find outliers in a production environment, in terms of CPU consumption, bursts of log types.
Toy scaling strategies
I’d like to advise implementers to look beyond riemann for scaling top extraction from streams, as tools like Storm are great for these use cases.
But in jest, I’ll mention that since the riemann-kafka plugin - by yours truly - allows producing and consuming to and from kafka queues, intermediate riemann cores could compute local tops and send the aggregated results over to a central riemann instance which would then determine the overall top.
I hope this gives you a good glimpse of what riemann can provide beyond simple threshold alerts.