Database partitioning

We’ve been experimenting lately with database partitioning (in version 2.3.2-dev, make sure to update your kettle.jar to the latest snapshot). In our context, database partitioning means that we divide our data over a cluster of several databases.

A typical way of doing that is that you divide the customer_id by the number of hosts in the cluster and get the remainder. If the remainder is 0, you store the data on the first host in the cluster, 1 for the second, 2 for the third, etc.

This sort of thing is something that we’ve been implementing in Kettle for the last couple of weeks. The reasoning is simple: if one database is not up to the task, split the load over 2 or 5 or 10 databases on any amount of hosts.  ( Now imagine all the PCs at work all running an in-memory database :-) )
Besides small changes to the Kettle transformation engine, we also added meta-data that allows you to specify the information it needs to work. Let’s take a look at a sample transformation:

Database Partitiong Sample

As you can see from the image above, we have added some text to the top of the partitioned steps. “P” indicates that the step is partitioned, “x5″ is the number of copies that will be launched for the same amount of partitions. Also: the fieldname that we partition on is shown between brackets. Finally, the partitioning schema name is shown.

Let’s take a look on what the transformation does:

  1. It performs a “SELECT * FROM T2″ on a cluster. In this case that means that the same statement is executed 5 times on 5 different databases/hosts.
  2. It adds a sequence “new_id” in a partitioned way: using 5 different steps.
  3. We re-partition the data and store it on a different cluster with 3 hosts called “Cluster with 3″
  4. We store the data in 3 text files using the new internal variable ${Internal.Step.Partition.ID}

Now, how do we enter the meta-data for this?

Well, to start off, we added a new tab to the database dialog:

Reference database connection

As you can see, nothing much changed, the information entered in the usual place: hostname/databas/port is our reference database. This is the database you expore and reference in general. However, all the DDL is now executed against all the hosts in this cluster: (cluster tab)

Database clustering

As you can see, we can add the hosts that participate in the cluster as well as the database name and port. Note that I don’t have a lot of machines to play with, so I’m just using 5 different databases on the same host.

What we need to do then is create a partitioning schema for the transformation. You can enter the partitioning schema in the transformation settings dialog (CTRL-T):

Schema editor

The button “Get partitions” allows you to take over the partition IDs you specified in one of the partitioned database connections.

With this information we can now configure the steps we want to partition. Click right on the step in the graphical view:

Partitioning step menu

This will allow you to select the partitioning method:

Partitioning method

As you can see, we only do Rest of division at the moment, but in the future (with your help) we could implement any kind of partitioning including mirroring, etc. In the case of the rest-of-division partitioning method we need the fieldname to get the ID from:

Partitioning field

Finally, we need to specify the partitioning schema: this will allow the step to “map” onto the correct partitioning schema. Also, when the schema of the previous step is different from the next step, we need to “re-partition” : we need to calculate a new partition in stead of sticking with the old one.

Partition schema selection

That’s all there is to it to enable database partitioning in Kettle.

At this time, we are experimenting with this new engine and would very much like to have feedback and test results. Let us know what you think and if you can see other uses for these new features. At this time, nothing is definite yet and there is plenty of room for improvement.

Until next time,

Matt

5 comments

  • Hi Matt!

    This is a great feature! I’m very busy at the moment, but I will try and find some time to test this.

    Keep it up, Kettle Rocks Bigtime!

  • Thanks for the comment Roland.

    One thing I would like to mention is that in addition to this we will also be adding new steps to support this new way of processing data. The first step is ready and is called “Sorted Merge”. It merges sorted streams of data on the sorting key. This step again might have other uses outside of the DB-partitioning logic, but it allows you to read from the cluster and keep the results sorted.

    All the best,

    Matt

  • Tom

    That’s great!

  • Soichi Hayashi

    Thank you for this article. I understand now that the partitioning is really created to store data in multiple database – thus the name “database partitioning”. I believe the way you have implemented partition allows user to do far more. For example, if you are sorting a hash partitioned data, then you can run process on the data to do a lot of interesting functions such as removing duplication. I think having “database” on the name is somewhat awkward and gives impression that partitioning can not be used for other purposes.

    Also, as of current GA version of pentaho, I can not use partitioning on the step that is clustered across multiple slave servers. Before I submit a bug report, I’d like to know if this by design and that I can not assign each partitions on a different slave servers to increase performance.

  • Hi Soichi,

    It’s true what you say, however, a simple distribution would also allow you to do deduplication in parallel, I’m not really sure you need partitioning for that. I’ll certainly keep a name-change in mind though. (originally it was partitioning schema BTW)

    As far as mixing partititioning and clustering is concerned: that works, but you should have #partitions >= #slave-servers-in-a-cluster

    Obviously, otherwise it becomes hard to distribute the partitions over the slave servers. The assignment is automatically. If that should not work you should file a bug report with a use-case. If you want to individually/manually assign partitions to a slave server, file a feature request with a use-case.

    All the best,

    Matt