Real-time streaming data aggregation

Dear Kettle users,

Most of you usually use a data integration engine to process data in a batch-oriented way.  Pentaho Data Integration (Kettle) is typically deployed to run monthly, nightly, hourly workloads.  Sometimes folks run micro-batches of work every minute or so.  However, it’s lesser known that our beloved transformation engine can also be used to stream data indefinitely (never ending) from a source to a target.  This sort of data integration is sometimes referred to as being “streaming“, “real-time“, “near real-time“, “continuous” and so on.  Typical examples of situations where you have a never-ending supply of data that needs to be processed the instance it becomes available are JMS (Java Message Service), RDBMS log sniffing, on-line fraud analyses, web or application log sniffing or of-course … Twitter!  Since Twitter is easily accessed it’s common for examples to pop up regarding it’s usage and in this blog post too we will use this service to demo the Pentaho Data Integration capabilities wrt to processing streaming data.

Here’s what we want to do:

  1. Continuously read all the tweets that are being sent on Twitter.
  2. Extract all the hash-tags used
  3. Count the number of hash-tags used in a one-minute time-window
  4. Report on all the tags that are being used more than once
  5. Put the output in a browser window, continuously update every minute.

This is a very generic example but the logic of this can be applied to different fields like JMS, HL7, log sniffing and so on.  It differs from the excellent work that Vincent from Open-BI described earlier this week on his blog in the sense that his Talend job finishes where ours will never end and where ours will do time-based aggregation in contrast to aggregation over a finite data set.

Also note that in order for Kettle to fully support multiple streaming data sources we would have to implement support for “windowed” (time-based) joins and other nifty things.  We’ve seen very little demand for this sort of requirement in the past, perhaps because people don’t know it’s possible with Kettle.  In any case, if you currently are in need of full streaming data support, have a look at SQLStream, they can help you. SQLStream is co-founded by Pentaho’s Julian Hyde of Mondrian fame.

OK, let’s see how we can solve our little problem with Kettle instead…

1. Continuously read all the tweets that are being sent on Twitter.

For this we are going to use one of the public Twitter web services, one that delivers a never-ending stream of JSON messages:

http://stream.twitter.com/1/statuses/sample.json?delimited=length

Since the format of the output is never-ending and specific in nature I wrote a small “User Defined Java Class” script:

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
HttpClient client = SlaveConnectionManager.getInstance().createHttpClient();
client.setTimeout(10000);
client.setConnectionTimeout(10000);

Credentials creds = new UsernamePasswordCredentials(getParameter("USERNAME"), getParameter("PASSWORD"));
client.getState().setCredentials(AuthScope.ANY, creds);
client.getParams().setAuthenticationPreemptive(true);

HttpMethod method = new PostMethod("http://stream.twitter.com/1/statuses/sample.json?delimited=length");

// Execute request
//
InputStream inputStream=null;
BufferedInputStream bufferedInputStream=null;
try {
int result = client.executeMethod(method);

// the response
//
inputStream = method.getResponseBodyAsStream();
bufferedInputStream = new BufferedInputStream(inputStream, 1000);

StringBuffer bodyBuffer = new StringBuffer();
int opened=0;
int c;
while ( (c=bufferedInputStream.read())!=-1  && !isStopped()) {
char ch = (char)c;
bodyBuffer.append(ch);
if (ch=='{') opened++; else if (ch=='}') opened--;
if (ch=='}' && opened==0) {
// one JSON block, pass it on!
//
Object[] r = createOutputRow(new Object[0], data.outputRowMeta.size());
String jsonString = bodyBuffer.toString();

int startIndex = jsonString.indexOf("{");
if (startIndex<0) startIndex=0;

// System.out.print("index="+startIndex+" json="+jsonString.substring(startIndex));

r[0] = jsonString.substring(startIndex);
putRow(data.outputRowMeta, r);

bodyBuffer.setLength(0);
}
}
} catch(Exception e) {
throw new KettleException("Unable to get tweets", e);
} finally {
bufferedInputStream.reset();
bufferedInputStream.close();
}

setOutputDone();
return false;
}

As the experienced UDJC writers among you will notice: this step never ends as long as the twitter service keeps on sending more data.  Depending on the stability and popularity of twitter that can be “a very long time“.

You could improve the code even further to re-connect to the service every time it drops away.  Personally I would not do this.  I would rather have the transformation terminate with an error (as it is implemented now), send an alert (e-mail, database, SNMP) and re-start the transformation in a loop in a job.  That way you have a trace in case twitter dies for a few hours.

2. Extract all the hash-tags used

First we’ll parse the JSON returned by the twitter service, extract the first 5 hash-tags from the message, split this up into 5 rows and count the tags…

3. Count the number of hash-tags used in a one-minute time-window

The counting is easy as you can simply use a “Group by”  step.  However, how can we aggregate in a time-based fashion without too much tinkering?   Well, we now have the “Single Threader” step which has the option to aggregate in a time-based manner so we might as well use this option:

This step simply accumulates all records in memory until 60 seconds have passed and then performs one iteration of the single threaded execution of the specified transformation.  This is a special execution method that doesn’t use the typical parallel engine.  Another cool thing about this engine is that the records that go into the engine in the time-window can be grouped and sorted without the transformation being restarted every minute.

4. Report on all the tags that are being used more than once

The filtering is done with a simple “Filter Rows” step.  However, thanks to the magic of the “Single Threader” step we can sort the rows descending by the tag occurrence count in that one-minute time-window.  It’s also interesting to note that if you have huge amounts of data, that you can easily parallelize your work by starting multiple copies of the single threader step and/or with some clever data partitioning.  In our case we could partition by hash-tag or re-aggregate the aggregated data.

5. Put the output in a browser window, continuously update every minute.

As shown in an earlier blog post, we can do this quite easily with a “Text File Output” step.  However, we also want to put a small header and a separator between the data from every minute so we end up with a transformation that looks like this:

The script to print the header looks like this:

var out;
if (out==null) {
out = _step_.getTrans().getServletPrintWriter();
out.println("'Real-time' twitter hashtag report, minute based");
out.flush();
}

The separator between each minute is simple too:

if (nr==1) {
var out = _step_.getTrans().getServletPrintWriter();
  out.println("============================================");
out.println();
  out.flush();
}

You can execute this transformation on a Carte instance (4.2.0) and see the following output:

'Real-time' twitter hashtag report, minute based
=================================================

nr;hashtag;count;from;to
1;tatilmayonezi;5;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
2;AUGUST6THBUZZNIGHTCLUB;3;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
3;teamfollowback;3;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
4;ayamzaman;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
5;dnd;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
6;follow;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
7;malhação;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
8;rappernames;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
9;thingswelearnedontwitter;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
=================================================

1;ska;5;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
2;followplanetjedward;4;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
3;chistede3pesos;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
4;NP;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
5;rappernames;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
6;tatilmayonezi;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
7;teamfollowback;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
8;AvrilBeatsVolcano;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
9;CM6;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
10;followme;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
11;Leão;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
12;NewArtists;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
13;OOMF;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
14;RETWEET;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
15;sougofollow;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
16;swag;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
17;thingswelearnedontwitter;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000

...

For reference, I used the following URL to start the streaming report:

http://cluster:cluster@127.0.0.1:8282/kettle/executeTrans/?trans=%2Fhome%2Fmatt%2Ftest-stuff%2FTwitter Stream%2FRead a twitter stream.ktr&USERNAME=MyTwitterAccount&PASSWORD=MyPassword

I placed the complete example over here in case you want to try this yourself on PDI/Kettle version 4.2.0-RC1 or later. Things you can add to make it even cooler is to have this transformation send an e-mail every time a certain hash-tag gets used more than 15 times in a given minute.  That sort of alerting support for example gives you easy access to emerging new trends, events and memes.

For reference, take a look at this earlier blog post of mine where I describe the internal cleanup mechanisms inside of Kettle that prevent our transformation from ever running out of memory or resources.

Until next time,

Matt

21 comments

  • Matt,

    Fantastic work, thanks for sharing !
    I love that and you gave me ideas !!!
    And thanks for quoting my article.
    I will quickly adapt my last Talend (quick n dirty) jobs for a much robust Kettle version.

    Keep up the good work.

    Many thanks
    Vincent

  • Hello very interesting post Matt.

    Now, I was wondering. Would you be able to do almost the same, but in “server” mode, once deployed on the Carte server specifically.

    So I mean: listen on some Carte servlet for POST and GET for some event and process them on he fly just like you are doing now.
    Would such a thing be possible? Wouldn’t that be interesting?

    Thanks.

  • Rodrigo Haces

    This actually works GREAT, my only question is why I’m not getting the feed live? I mean it waits for the first 3 “minute stream batch” for me to see a value in my browser and I don’t see everything, I see only the first 2 minutes and after that I’m 1 minute behind

  • Pingback: Weekly Digest for July 28th | William Stearns

  • Hi Rodrigo,

    I think most browsers don’t like the streaming nature of the data coming through. I tried briefly to have the sample generate HTML but that didn’t take since there’s never a tag I guess.
    Perhaps it would’ve been better to have created something fancy in JavaScript but I really didn’t have the time.
    Perhaps others can have fun with that. :-)

    Cheers,

    Matt

  • Hi Raphaël,

    At the moment, the Carte servlets are not yet pluggable. It probably would be a good idea to extend our plugin architecture to support servlets for the Carte/DI servers.

    However, you can already write plugins for the Pentaho BI server so perhaps it would be easier to have a service running over there.

    All that being said, creating the steps to expose or read out a web service are reasonably trivial in nature as you can see from this example.

    Good luck!

    Matt

  • Tribhuwan

    Hi Matt
    I have downloaded this example and trying to run in PDI4.2 RC1. First time it was throwing error that Twitter.Status not found, then I placed that twitter jar file in pdi lib directory. Now while running it is throwing
    error “Cannot compare types “long” and “java.math.BigInteger”.
    Can you please suggest what is going wrong. It would be very helpful for me.

  • Tribhuwan, the username and password that you need to specify for the webservice (your twitter username and password) are defined as parameters of the transformation. You can specify them in the execution dialog.

    http://wiki.pentaho.com/display/EAI/Named+Parameters

  • Johannes Dollen

    Nice article!
    @Matt: maybe you could add the “import” lines at the top of the User Defined Java Class.

    I actually prefered to use the “statuses/filter” method of the Twitter Streaming API, so I changed some lines:
    PostMethod method = new PostMethod(“https://stream.twitter.com/1/statuses/filter.json”);
    method.addParameter(“track”, “”);

    To make it all work, I had to comment out the next line:
    // bufferedInputStream.reset();
    ..else it came with “resetting to invalid mark” errors (cause there is no mark and no need to reset).

    - Could anyone maybe help me out with taking care of timeout=>reconnect/rerun steps in Kettle?
    In the example above there’s a timeout of 10 seconds. Twitter sends a keep-alive newline every 30 seconds, but what if the server is not reachable?

  • Jegordon

    Hi to all,

    Does anyone know if there is any limit about the time/posts you can get from these web services? i dont know why, but after a while, my transformation get crashed.

    Thanks a lot

    jegordon

  • René

    Hi,
    I have a question. How can I load an .asp page? I’m not interested in the output. In the code, I execute some sql.
    I need to program a job or consumer that call that web page. Is the any way to do it with ETL o something in Pentaho?
    Thanks.

  • Hi PDI! kettle passionates, I’m trying but fails at first step? does twitter know uses https? or has disabled this web service?

  • it’s https:// indeed.

  • Hey Matt,
    could you post the whole example?
    Really cool stuff!

  • var out = _step_.getTrans().getServletPrintWriter();
    out.println(“”);
    out.println(“Hello World!”);
    out.println(“???????????”);
    out.println(“”);

    The above does not work…. Results in:

    Hello World!
    ???????????

    I am assuming something in the encoding of the printWrite. I’ve gone so far as setting -Dfile.encoding=UTF8 and tried to reverse engineer the printer writer, but to no avail. I deal with Japanese, French, German, and other import formats. Let me know if this is a bug or not.

  • It seems you’re blog also does not support UTF-8. Please see this gist:

    https://gist.github.com/4366869

    https://gist.github.com/4366869

  • Terrance, the bug you linked to was resolved in 4.2.0. That’s why I prefer to keep defect and support discussions in JIRA and on the Kettle forum, not on this blog.

  • Franziska

    Hi you all,

    I’m currently looking how to stream Twitter data with Kettle and was wondering whether this code is supposed to still work? It somehow seems to me that it doesn’t.

  • I think twitter is on https:// these days but please, like I said over and over again: post questions and problems on the forum and in our case tracking system.

  • Thiago

    Hi. Tanks for share.

    I get this error. Any ideia?

    2013/07/02 11:28:25 – Get tweets streaming.0 – ERROR (version 4.4.0-stable, build 17588 from 2012-11-21 16.02.21 by buildguy) :
    Unexpected error
    java.lang.NullPointerException
    at Processor.processRow(Processor.java:79)
    at org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClass.processRow(UserDefinedJavaClass.java:1181)
    at org.pentaho.di.trans.step.RunThread.run(RunThread.java:50)
    at java.lang.Thread.run(Unknown Source)

  • A. Loutfi

    Hello Matt,

    I was used this post to create a stream for tweets and it was worked.
    But now I have a null pointer exception.

    Is this changes caused by the interupt of API 1.0

    Can you please help me for thi issue ?