Tuesday, February 21, 2012

Integration by examples: Consuming InputStreams with Camel

Imagine the following scenario - you need to integrate your system with the external stream of the CSV data.

Your system needs to read CSV records from legacy ERP application. Administrator of the legacy ERP system uses CRON task to check for the new records in her application. The task is executed every 10 minutes. New records are inserted at the beginning of the text file located on the same file system on which the ERP application is deployed.


At the end of the day the text file with exported CSV records is erased. However until the end of a day the size of the file may be even larger than few gigabytes. The administrator of the ERP system heard about SOA so she exposed the CSV file with the Apache HTTP server :) .

Your task is to read the stream of the CSV file using the HTTP protocol. You need to read the stream line by line (i.e. record by record) and add it to the processing queue. Try to be as up to the date with the ERP application as possible. The previous sentence implies that you have to add CSV record to the processing queue as soon you read it from the stream.

Good and bad ideas

The following question may come to your mind - why can't I read the entire file into the memory, analyze it and repeat these steps in the loop? You need to drop this idea because of its significant drawbacks, such as:
  • reading file as large few gigabytes via network (especially via Internet) at once is extremely slow.
  • you waste the memory of your ESB installation because you need to hold the entire file in the memory.
  • you block processing of CSV records downloaded already until entire CSV file is fetched and available for parsing. Then you parse the latter file and flood your system with the swarm of messages containing the CSV records.
What you want to do is to read line by line from the HTTP input stream and immediately send received CSV record to the processing queue. If the stream ends or is closed you would like to reopen it and scan it for new records (remember that CSV file is updated regularly and you want to be up to the date with it).

Possible solution using Camel

Camel Stream Component has been designed to solve problems like the one described above. Just below this paragrath you can see the sample code demonstrating how could you use this useful component to deal with consuming our hypothetical stream of CSV data.



public class StreamConsumerTest {

  public static void main(String[] args) throws Exception {
    // Create Camel context.
    CamelContext camelContext = new DefaultCamelContext();
    // Configure the routing.
    camelContext.addRoutes(new RouteBuilder() {

      @Override
      public void configure() throws Exception {
        // Read CSV stream from the given URL...
        from("stream:url?url=http://erp.legacy.com/export&scanStream=true&retry=true").
          // ...and send records to the processing queue.
          to("seda:processingQueue");
        // Read records from the processing queue
        // and do something with them.
        from("seda:processingQueue").to("log:out")
      }

    });
    // Start Camel context.
    camelContext.start();
    // Demonstrate example for 30 seconds.
    TimeUnit.SECONDS.sleep(30);
  }

}

Options scanStream and retry passed to the Stream Component tells it to continuously monitors the stream. If the connection is broken or the stream is over, Camel will reopen the latter in order to continue retrieval of the most recent records from the CSV file.

As soon as a line of data (CSV record in our case) is read from the stream Camel passes it to the SEDA processing queue. In our simple example we just read the records from the queue and send it to the logger in order to print the results on the screen.

That's the complete solution for our integration problem. Indeed, Camel is as concise as powerful :) .

What else you can do with Stream Component?

Need some tuning of the Stream Component to adjust it to your special needs? Take a peek at the options below then.

You can tell Camel to fetch multiple lines at ones and group them into single message.
public class StreamConsumerTest {
from("stream:url?url=http://example.com?groupLines=10").to("seda:processingQueue")

You can (and should) explicitly choose encoding of the stream instead using JVM defaults.
// read 10 lines at once
from("stream:url?url=http://example.com?encoding=UTF-8").to("seda:processingQueue")

You can delay initial read from the stream.
// wait 5 seconds before reading the stream
from("stream:url?url=http://example.com?delay=5000").to("seda:processingQueue")

You can delay stream reading interval.
// pause for the second after each line read from the stream
from("stream:url?url=http://example.com?scanStreamDelay=1000").to("seda:processingQueue")