Thursday, March 19, 2015

Camel Paho connector

The incoming version of the Apache Camel (2.16) will bring Paho component which provides connector for the MQTT messaging protocol using the Eclipse Paho library. Paho is one of the most popular MQTT libraries, so if you would like to integrate it with your Java project - Camel Paho connector is a way to go.

How can I use Paho connector?

The basic URI format for the Paho connector is as follows:
paho:queueName[?options]
To me a little more concrete, the following snippet reads messages from the MQTT broker installed on the same host as the Camel router:
from("paho:some/queue").
  to("mock:test");
This one reads messages from the remote MQTT broker:
from("paho:some/queue?brokerUrl=tcp://iot.eclipse.org:1883").
  to("mock:test");
While this sends messages to the MQTT broker:
from("direct:test").
  to("paho:some/target/queue");
The complete Spring Boot based application sending message to the MQTT broker every second, is as simple as:
@SpringBootApplication
class MqttRouter extends FatJarRouter {

    @Override
    void configure() {
        from("timer://trigger").
          setBody().expression { randomUUID().toString() }.
          to("paho:topic?brokerUrl={{broker.url}}")
    }

}

Adding Paho connector to your Maven project

Paho connector is shipped in the dedicated jar which Maven users should add to their pom.xml file:
<dependency>
    <groupid>org.apache.camel</groupid>
    <artifactid>camel-paho</artifactid>
    <version>2.16.0</version>
</dependency>
Keep in mind that Paho artifacts are not hosted in the Maven Central, so you need to add Eclipse Paho repository to your POM xml file as well:
<repositories>
  <repository>
    <id>eclipse-paho</id>
    <url>https://repo.eclipse.org/content/repositories/paho-releases</url>
    <snapshots>
      <enabled>false</enabled>
    </snapshots>
  </repository>
</repositories>

Default payload type

By default Camel Paho component operates on the binary payloads extracted out of (or put into) the MQTT message:
// Receive payload
byte[] payload = (byte[]) consumerTemplate.receiveBody("paho:topic");
 
// Send payload
byte[] payload = "message".getBytes();
producerTemplate.sendBody("paho:topic", payload);
But of course Camel build-in type conversion API can perform the automatic data type transformations for you. In the example below Camel automatically converts binary payload into String (and conversely):
// Receive payload
String payload = consumerTemplate.receiveBody("paho:topic", String.class);
 
// Send payload
String payload = "message";
producerTemplate.sendBody("paho:topic", payload);

Connection options

The convention-over-configuration approach used in Camel is really handy for the most of the situations, but sometimes you would like to have more fine-grained control over the MQTT client connection. To cover such situations just add the bean of type org.eclipse.paho.client.mqttv3.MqttConnectOptions to your Camel registry. For Spring applications that would mean adding bean to your application context. The snippet below uses password-based authentication to connect to the MQTT broker:
@Bean
MqttConnectOptions connectOptions() {
  MqttConnectOptions connectOptions = new MqttConnectOptions();
  connectOptions.setUserName("henry");
  connectOptions.setPassword("secret".toCharArray());
  return connectOptions;
}
That's it. Camel automatically picks up this MqttConnectOptions bean from the registry and use it to establish connection with the MQTT broker.

What's next?

If you are interested in all the available options of the Camel Paho connector, visit the component page. If you are looking for complete example of the project that can be easily deployed into the micro-computers like Raspberry Pi check out CamelM2M MQTT quickstart. If you are wondering how fast MQTT connector can be - check out my previous blog post related to the MQTT performance on Raspberry Pi.

Friday, March 13, 2015

Raspberry Pi 2 and Camel: The MQTT client performance

Raspberry Pi 2 comes armed with the 900 MHz quad-core ARM Cortex-A7 and 1 GB of memory. This is a pretty powerful hardware considering the hobbyist Internet Of Things applications, but it's still relatively slow comparing to the industrial-grade microcomputers. As soon as my very own Raspberry Pi 2 has been shipped to me, I started to wonder how fast this car key size computer can be.


Let's test MQTT client

Together with the friend of mine we decided to create a simple proof of concept demonstrating how fast Raspberry Pi 2 can be in the typical field device or the gateway scenario. The field device scenario is the Raspberry Pi acting as the edge node collecting the information from the sensors...


The gateway scenario is when Raspberry Pi is collecting the messages from the other microcomputers, controllers or sensors...


In both cases RPi is very likely to send the messages to the external MQTT message broker for the further analysis.

Testing conditions

For the purpose of our tests we decided to stick to the first scenario i.e. Raspberry Pi collecting the events and sending them directly to the MQTT broker. We used SSH protocol to put a small Apache Camel application bootstrapped using the Spring Boot to the Pi device. In the Camel router we generated test events using internal Camel timer (emulating the sensor read) and for each event collected we sent a message to the external ActiveMQ MQTT broker. Camel Paho component has been chosen as the MQTT client. In order to decouple events collection from the MQTT sending process, we used in-memory SEDA queue.

What is important, we decided the test the performance of the MQTT at the level 2 of the QOS. Level 2 of MQTT QOS comes with the guaranteed exactly-once message delivery pattern. It provides the highest level of the client reliability, but consumes more client resources (Raspberry Pi processing power and memory in this particular case). 

The broker itself didn't perform any action against the messages it receives. There was no subscriber registered to the MQTT topic we sent messages to. We started dockerized ActiveMQ 5.11 as a message broker (the mentioned Docker image is a part of the Fabric8 project):

docker run -p 1883:1883 -e AMQ_MQTT_PORT=1883 -it fabric8/fabric8-mq:2.0.29

We took the advantage of the Java UUID API to create test messages. Each message was 36-byte long random UUID. Such message size may seem to be small, but events generated by the field sensors are usually not much larger - the majority of the IoT solutions generate the huge number of the small messages.

The application code 

Apache Camel in the conjunction with the Spring Boot creates pretty powerful tool for the M2M solutions. For example to create the application for the purposes of our tests, all the Groovy code we had to write is the snippet presented below:

@SpringBootApplication
class MqttProducerGateway extends FatJarRouter {

    @Override
    void configure() {
        // Read events from the sensors
        from("timer://mockSensor").
                setBody().expression { randomUUID().toString() }.
                to("seda://events") // Enqueue the events in the in-memory queue

        from("seda://events?concurrentConsumers={{broker.consumers:15}}").
                to("paho:topic?brokerUrl={{broker.url}}")
    }

}

That's it! We packaged that code as a fat jar to make deployment via SSH easier. The application is configured from the command line just before the execution of the tests:

java -Dbroker.url=tcp://192.168.1.6 -jar camel-mqtt-benchmark.jar 

First run: 3 consumers sending messages to the MQTT broker

In the first benchmark we used 3 concurrent consumers threads, reading messages from the in-memory SEDA queue and sending those messages to the MQTT broker. Application performs pretty well (up to ~315 messages per second) until Paho client got overwhelmed with the messages produced by the timer. When the number of the messages to be processed became too large, the performance of the gateway started to decrease. The interesting point here is that many messages are produced, while not so many are consumed - that's why we considered increasing the number of the concurrent consumers in the next benchmarking session.


Second run: 15 consumers sending messages to the MQTT broker

We decided to increase the number of the consumers reading messages from the in-memory SEDA queue to 15. After that change route performed really well (up to ~580 messages per second) until Paho client got overwhelmed with the messages produced by the timer. Once again, when the number of the messages to be processed started to be too large, the performance of the gateway started to decrease.

Can we do better?

The biggest problem so far seems to be the fact the Paho slows down when we generate too much messages comparing to the consumption rate. By adding the throttler to the test running 15 consumers, we managed to keep the processing rate on the level of the 700 messages per second. By tuning the consumers settings and reducing the QOS we might increase the number of the messages processed by the Raspberry Pi even more. I will describe how we used Camel throttler to get the stable 700 messages per second in the separated article in the future, as this awesome Camel feature deserves more attention. 

Raspberry Pi 2 results TL;DR;

Raspberry Pi 2 is really fast! As for such small and cheap (35$) device, the performance of the unit is really impressive. You can send almost 700 small QOS 2 messages (36 bytes each) per second from Raspbberry Pi 2 gateway to the MQTT server.

If you plan to run the Paho MQTT client on the RPi 2 remember to:
  • enqueue messages in the internal in-memory queue and use at least 15 concurrent threads to process these messages (as Paho or sensors IO operations may become a bottleneck otherwise)
  • do not let sensors to put too many messages into the queue, otherwise the overall performance of the gateway is decreased significantly. Consider using Camel throttler to limit the number of the messages sent to the queue.