Clustering & partitioning

Let’s have a quick look at some clustering examples in the new 3.0 engine:

Clustering combined with multiple copies of a step

This example runs all steps in the transformation in a clustered mode. That means that there are 4 slave transformations that run in parallel with each other.
The interesting part is that first of all the “Fixed Input” step is running in parallel, each copy reading a certain part of a file.
The second thing to mention about it is that we now allow you to run multiple copies of a single step on a cluster. In this example, we run 3 copies of a step per slave transformation. In total there are 12 copies of the sort step in action in parallel.

IMPORTANT: this is a test-transformation, a real world sorting exercise would also include a “Sorted Merge” step to keep the data sorted. I was too lazy to redo the screenshots though :-)

The clustered transformations now also support logging to databases:

Slave transformations logging to database

As you can see, we added the name of the transformation in the dialog to show the log of all the slave transformations executed.
This feature makes it easier to see the execution log on the remote server, after the run.

It is now also possible to use the execution stats in a JavaScript job entry (previously called Evaluation):

Evaluating runtimes

As such, you can validate that the number of input rows where the same as the output rows. The numbers returned to are the total amounts, the sum of the stats for all the slave transformations.

To finish up for today, here is a sample where we partition data in a certain fashion:

Clustering and partitioning

This sample reads a file on the master, then sends if off to 2 slave servers (over TCP/IP sockets).
The data then needs to be partitioned. That means that data from one slave server needs to go to all the others. For this Nx(N-1) sockets are opened for each partition that’s running on the slave server. In this case we have 2 partitions running meaning that we open 4 extra sockets to make sure the data can find a way to the correct partition.
In the picture above, the “Text File Output” and “Get the partition Nr” steps are partitioned using the same partition schema and as such, the data stays in the same “swimming lane” without any performance loss.

The Dummy Step (note that the icon has changed since I took the screenshot) is doing the re-partitioning here. If you open the generated slave transformations you will see the form that these transformations take:

A typical slave transformation

A slave transformation sample using 20 extra sockets to re-partition the data.

We are also correctly setting a number of internal variables for your convenience now. This allows you to read/write the data from/to partitioned files, etc.

The Dx2 or Dx10 means that the partitioning is dynamic: 2 or 10 partitions per slave server in the cluster:

Dynamic partitioning

The advances in 3.0 shown above are making sure that the Kettle engine allows you to scale beyond a single box. In fact, it will allow you to get the maximum out of your favorite disk subsystem by attacking your data in parallel.

Until next time,

Matt

P.S. Don’t you just love these new icons ;-)

9 comments

  • Joe Montana

    Really, previous icons where terrific…too bad!

  • Well, this is open source software. If you want, you can always put them back. In fact, we split up all GUI related code into a separate jar file called kettle-ui-3.0.jar. Simply changing the icons in there will do.
    We even went further. You can have your own Look&Feel manager plugged in, change the location of the icons, put them in your own jar files, etc.
    So you see, there is still a bit of hope for all you people (like me) that had their sense of taste ruined by the old icons…

    Matt

  • Good job Matt. A very interesting insight! I cant wait for the new version when i get release plus i have to admit i do love the icons?

    Question is… did you create them yourself? If not i need to know whwre you got them. The java is starting to come along! (even if i cant get tomcat working on Ubuntu for some reason)

    All the best,

    Harris

  • Hi Harris,

    Obviously I didn’t create them. You saw the old icons, you know what we would end up with in that case.
    No, Wes Brown, the Pentaho Pixel Manager created them.

    Cheers Highlander!

    Matt

  • Great job Matt. First impression is important.
    That new icons and icon-plugin are surely make great impression.

    PDI looks more proffesional.

    Thanks Matt.
    Thanks Wes Brown.

  • Shane

    Matt,
    I am very curious to learn more about the partitioning capabilities of Pentaho but I am having trouble finding documentation that really explains how it is implemented. In my case I want to use partitioning as a means to leverage the multiple cores in my server, not for clusters of servers or even database partitions. A few questions:

    1. Does Pentaho support partitioning data by specific keys?
    In other tools this is necessary especially when using key-based components/steps. How does Pentaho handle such steps with partitioned data?

    2. Can you use multiple partitioning schemes within one transformation, repartitioning as necessary?
    Again going back to the key-based example, the keys that you are using at the beginning of a transformation aren’t always the keys that you need to use at the end

    3. Are other partitioning methods supported? Round-Robin? Expression? Range?

    Thanks!
    Shane

  • Hi Shane,

    If you just want to partition for divide-and-conquer reasons, I suggest you simply use the standard mod partitioning method that we offer out of the box.
    This method takes an ID (or even the hash code of a String, Date, etc) and divides that number by the total number of steps that you have in the cluster.
    That number is the amount of steps per slave server in a cluster (cluster schema dialog) times the number of steps to launch per partition. (partition dialog)
    With this system, if you are running on 8 dual core servers, you can perhaps launch 4 partitions (step copies) per slave servers or in total 32 copies.
    That enables very good parallelism in the transformation.

    So, the answer to 1. is indeed, you can partition by a specific key.

    The answer to 2. is also yes. This functionality was implemented as part of the 3.0 release and works also across a cluster. If you have 5 partitions in step A going to step B with 20 partitions, we open 100 data conduits (remote step sockets) to allow such re-partitioning.

    As for 3. I know that this is largely un-documented, but you can write your own partitioning method plugins that allow you to do anything basically. (see also : ModPartitioner and the Partitioner interface) Please note that we don’t consider round-robin is not a partitioning method per-se as it doesn’t really does active partitioning. However, it’s the standard way of distributing the data across the node, you don’t need to do anything for that.

    All the best,

    Matt

  • Hi there,

    I am just playing around with the partitioning option. Can I partition on more than 1 field? I tried to enter 2 string fields country_business and country_private separated by ‘,’ as key but this did not work:

    Unable to find partitioning field name [country_business,country_private] in the output row…[country_business String], [country_private String]

    Cheers,

    Ulrich

  • Sorry Ulrich, at the moment we only support one field. However, it can be any data type.
    Perhaps you can concatenate strings to get where you want to be.
    And although it involves some hacking, we also support customer data partitioning types.