I was looking for a sliding window processing of events. This is something very common in the world of Complex Event Processing that can be roughly translated to querying events adding a time dimension to the query.
We can easily find some great products/libs namely: The excelent Oracle CEP, Jboss Drools Fusion, Esper, and the very useful Clojure WerkzEEP, Embedded Event Processing. Excepting EEP, all the others are very complete. They offer a DSL so you can query and aggregate streams of events in a very intuitive manner. But I was looking for something veeeery simple like EEP.
EEP comes with some basic sliding window buffers. Be it based on size, be it based on time. The point with time based windows on EEP, is that they are basically a buffer waiting for some time to be elapsed before it sends all the entries to a handler. So in this case it is not so suitable for answering questions like: “How much my e-commerce sold in the last 4 minutes?”
To answer that, we can’t accumulate events for 4 minutes, send all of the to a handler, and empty the buffer. If so, two subsequential queries to the buffer will return wrong values. Ex.:
Suppose every tn is a minute elapsed. See? If time elapses at t3 you sum and get 30, but if you want to know the total sold one minute later, you get 0! It is not what I wanted. I then decided to go for my own implementation using Meltdown.
Meltdown is the clojure api for Reactor, an async, message passing and stream processing lib. The concepts behind are very simple and if you tried something like Go Channels, core.async or Pulsar, there is no mistery. Just abstract the underlaying details and go writing code.
Ok, enough. Lets see some code (The comments in the code explains everything):
I could have embedded the ticker channel in the slide function. Or it could be a record implementing some protocol, etc. But this is quite simple and enough for now.
There is no mistery, but for sure the combination of Meltdown’s clean interface plus clojure’s ability to handle concurrency make this type of construction very straightforward.
Notice, however, that bigger your window interval, bigger your buffer. Faster you add events to the price reactor, faster your buffer becomes bigger. Seems no problem for the power of Reactor.
Tha is all. CEP is very useful in the filds of pattern/behavior detection and real-time analysis for detecting fraud, etc. It pays off to study and find opportunities to provide smart solutions using it.