sabato 14 maggio 2011

Talend Cache Management

Background : Talend Open Studio can read/write from/to many different sources, so it is generally easy for a good data integration architect to design solution that "cache" recordsets in db tables, temporary files etc.
However this adds additional steps, sometimes even additional architecture components such as a database, adding complexity to the job itslef.
Sometimes it would be easy to have a simple buffer in memory to store temporary information, maybe populated incrementally with a number of iterations.Or it would be nice to be able to dump this buffer to disk and re-load it whenever needed.

The solution

A couple of components that handle the cache management tasks, both in memory and on disk.
They are easy to use in any Talend Data Integration job, allowing temporary or persistent data storage. Cache files and memory buffers can be loaded incrementally using loops.
It is also possible to "init" a chache form a previously stored file and incrementally append new records.
Storing data in memory can quickly use up the java heap, also because this release of the routines is not yet optimized. Currently there is no data compression involved, this could be included in a future release.I succesfully managed to load a 6 Million records table (4 fields) in memory, but you should account for concurrent processes that also may use heavily the heap.

Current version

Version : 0.2
Release Date : May 11 2011
Status : Beta


The job tCacheDemo demomstrates the usage of the two components

The Cache Load Loop subJob loads the cache buffer in memory and generates a set of cache files.
To demonstrate the incremental load capabilities, a Loop is used to process multiple times a data source (in this examples an RSS feed, any recordset would work).
tLoop_1 This step simply performs 100 iterations.

tRSSInput_1 A sample data source returning 10 records.In the demo job it is configured to read a xml file "news.rss", being a local copy of a Google news RSS Feed.

tCacheOutput_1 This component is set to cache data both on disk and in memory. The global buffer variable name is an arbitrary name that was set to "cache".
The Cache file name is set to context.baseDir+"/cache"+((Integer)globalMap.get("tLoop_1_CURRENT_VALUE"))+".dat" to demonstrate the ability to change the file name at each iteration.Finally the append to file option is unchecked to reset the cache files at each run. Feel free to play around with these settings.
This component is a "DATA_AUTOPROPAGATE" component, meaning that the same flow that arrives in input can also be retrieved in output, allowing the component to be used as a "middle" step in a flow.

The "Record count" subJob simply uses a tJava component to output the two record counters available in the tCacheOutput component, one being the number of records processed in the current iteration and the other being the number of records stored in the memory cache.

tJava_1 Simply outputs a string in the console, the code is
System.out.println("Iteration records : "+((Integer)globalMap.get("tCacheOutput_1_NB_LINE"))+" Record in memory cache : "+((Integer)globalMap.get("tCacheOutput_1_NB_CACHE_LINE")) );

Finally the Cache read subjob is activated once all the iterations are terminated (via a "subjob OK" trigger link, originating from tLoop_1) and starts the output to the tLogRow component. In a real world application this would be connected to a destination table, a tBufferOutout or another flow consumer.

tCacheInput_1 By setting this component to read from the memory "cache" buffer, all the records stored with the 100 iterations are returned.
An optional check is set to remove from memory the buffer once it is read. You can decide to leave data in memory if you need to process it again (also with another tCacheInput component).
It is possible to alternatively set the cache source to one of the cache files generated.


  • Download tCacheInput

  • Download tCacheOuput

  • Download sample Job Rimember to configure the baseDir context variable, this will be the location for the cache files and the sample input data

  • Download sample data


    giovedì 5 maggio 2011

    Autoincrement key in PostgreSQL and MySQL

    I'm switching from MySQL to PostgreSQL and was wondering how I can do autoincrement values. I saw in the PostgreSQL docs a datatype "serial".This is equivalent to AUTOINCREMENT in MySQL.

    CREATE TABLE foo (
    id SERIAL,
    bar varchar);
    INSERT INTO "foo" (bar) values ('blah');
    INSERT INTO "foo" (bar) values ('blah');
    SELECT * FROM foo;
    The AUTO_INCREMENT attribute can be used to generate a unique identity for new rows:
    CREATE TABLE animals (
         name CHAR(30) NOT NULL,
         PRIMARY KEY (id)
    INSERT INTO animals (name) VALUES
    SELECT * FROM animals;
    Which returns:
    | id | name    |
    |  1 | dog     |
    |  2 | cat     |
    |  3 | penguin |
    |  4 | lax     |
    |  5 | whale   |
    |  6 | ostrich |
    No value was specified for the AUTO_INCREMENT column, so MySQL assigned sequence numbers automatically.
    You can also explicitly assign NULL or 0 to the column to generate sequence numbers. 
    You can retrieve the most recent AUTO_INCREMENT value with the LAST_INSERT_ID() SQL function or the mysql_insert_id() C API function. 
    These functions are connection-specific, so their return values are not affected by another connection which is also performing inserts. 
    Use a large enough integer data type for the AUTO_INCREMENT column to hold the maximum sequence value you will need. 
    When the column reaches the upper limit of the data type, the next attempt to generate a sequence number fails. 
    For example, if you use TINYINT, the maximum permissible sequence number is 127. 
    For TINYINT UNSIGNED, the maximum is 255. 
    Note For a multiple-row insert, LAST_INSERT_ID() and mysql_insert_id() actually return the AUTO_INCREMENT key from the first of the inserted rows. 
    This enables multiple-row inserts to be reproduced correctly on other servers in a replication setup. 
    For MyISAM and BDB tables you can specify AUTO_INCREMENT on a secondary column in a multiple-column index. 
    In this case, the generated value for the AUTO_INCREMENT column is calculated as MAX(auto_increment_column) + 1 WHERE prefix=given-prefix. 
    This is useful when you want to put data into ordered groups.  
    CREATE TABLE animals (
        grp ENUM('fish','mammal','bird') NOT NULL,
        name CHAR(30) NOT NULL,
        PRIMARY KEY (grp,id)
    INSERT INTO animals (grp,name) VALUES
    SELECT * FROM animals ORDER BY grp,id;
    Which returns:
    | grp    | id | name    |
    | fish   |  1 | lax     |
    | mammal |  1 | dog     |
    | mammal |  2 | cat     |
    | mammal |  3 | whale   |
    | bird   |  1 | penguin |
    | bird   |  2 | ostrich |
    In this case (when the AUTO_INCREMENT column is part of a multiple-column index), AUTO_INCREMENT values are reused if you delete the row with the biggest AUTO_INCREMENT value in any group. 
    This happens even for MyISAM tables, for which AUTO_INCREMENT values normally are not reused.
    If the AUTO_INCREMENT column is part of multiple indexes, MySQL will generate sequence values using the index that begins with the AUTO_INCREMENT column, if there is one. 
    For example, if the animals table contained indexes PRIMARY KEY (grp, id) and INDEX (id), MySQL would ignore the PRIMARY KEY for generating sequence values.
    As a result, the table would contain a single sequence, not a sequence per grp value.
    To start with an AUTO_INCREMENT value other than 1, you can set that value with CREATE TABLE or ALTER TABLE, like this:
    mysql> ALTER TABLE tbl AUTO_INCREMENT = 100;