Thursday, March 19, 2015

Camel Paho connector

The incoming version of the Apache Camel (2.16) will bring Paho component which provides connector for the MQTT messaging protocol using the Eclipse Paho library. Paho is one of the most popular MQTT libraries, so if you would like to integrate it with your Java project - Camel Paho connector is a way to go.

How can I use Paho connector?

The basic URI format for the Paho connector is as follows:
paho:queueName[?options]
To me a little more concrete, the following snippet reads messages from the MQTT broker installed on the same host as the Camel router:
from("paho:some/queue").
  to("mock:test");
This one reads messages from the remote MQTT broker:
from("paho:some/queue?brokerUrl=tcp://iot.eclipse.org:1883").
  to("mock:test");
While this sends messages to the MQTT broker:
from("direct:test").
  to("paho:some/target/queue");
The complete Spring Boot based application sending message to the MQTT broker every second, is as simple as:
@SpringBootApplication
class MqttRouter extends FatJarRouter {

    @Override
    void configure() {
        from("timer://trigger").
          setBody().expression { randomUUID().toString() }.
          to("paho:topic?brokerUrl={{broker.url}}")
    }

}

Adding Paho connector to your Maven project

Paho connector is shipped in the dedicated jar which Maven users should add to their pom.xml file:
<dependency>
    <groupid>org.apache.camel</groupid>
    <artifactid>camel-paho</artifactid>
    <version>2.16.0</version>
</dependency>
Keep in mind that Paho artifacts are not hosted in the Maven Central, so you need to add Eclipse Paho repository to your POM xml file as well:
<repositories>
  <repository>
    <id>eclipse-paho</id>
    <url>https://repo.eclipse.org/content/repositories/paho-releases</url>
    <snapshots>
      <enabled>false</enabled>
    </snapshots>
  </repository>
</repositories>

Default payload type

By default Camel Paho component operates on the binary payloads extracted out of (or put into) the MQTT message:
// Receive payload
byte[] payload = (byte[]) consumerTemplate.receiveBody("paho:topic");
 
// Send payload
byte[] payload = "message".getBytes();
producerTemplate.sendBody("paho:topic", payload);
But of course Camel build-in type conversion API can perform the automatic data type transformations for you. In the example below Camel automatically converts binary payload into String (and conversely):
// Receive payload
String payload = consumerTemplate.receiveBody("paho:topic", String.class);
 
// Send payload
String payload = "message";
producerTemplate.sendBody("paho:topic", payload);

Connection options

The convention-over-configuration approach used in Camel is really handy for the most of the situations, but sometimes you would like to have more fine-grained control over the MQTT client connection. To cover such situations just add the bean of type org.eclipse.paho.client.mqttv3.MqttConnectOptions to your Camel registry. For Spring applications that would mean adding bean to your application context. The snippet below uses password-based authentication to connect to the MQTT broker:
@Bean
MqttConnectOptions connectOptions() {
  MqttConnectOptions connectOptions = new MqttConnectOptions();
  connectOptions.setUserName("henry");
  connectOptions.setPassword("secret".toCharArray());
  return connectOptions;
}
That's it. Camel automatically picks up this MqttConnectOptions bean from the registry and use it to establish connection with the MQTT broker.

What's next?

If you are interested in all the available options of the Camel Paho connector, visit the component page. If you are looking for complete example of the project that can be easily deployed into the micro-computers like Raspberry Pi check out CamelM2M MQTT quickstart. If you are wondering how fast MQTT connector can be - check out my previous blog post related to the MQTT performance on Raspberry Pi.

3 comments:

  1. Hi Henryk;

    Currently I'm testing the new Paho component in camel and I'm facing some troubles, in fact just one that I guess it is an issue in this component.

    I'm configuring the following:

    from("paho:active?brokerUrl=tcp://localhost:1883") //this connects to ActiveMQ
    .to("paho:test?brokerUrl=tcp://localhost:1884"); //this connects to Moquette

    I'm bridging from "active" topic in ActiveMQ to a supposedly "test" topic in Moquette, because when Moquette receives the publications from camel to the "test" topic that I've defined, this is not true due that Moquette is logging that the topic where camel is publishing is: "test?brokerUrl=tcp%3A%2F%2Flocalhost%3A1884"

    I've confirmed that paho is sending the topic name as "test?brokerUrl=tcp%3A%2F%2Flocalhost%3A1884" because I've created a paho subscriber by passing that strange name and the messages were delivered to my subscriber.

    Also, I've tried with WebSphereMQ and when I camel connects and publishes a messages in the "test" topic that I've defined in WMQ, nothing happens, camel does not report an issue neither the message is delivered to my subscriber.

    Do you believe this is an issue or I'm doing something wrong? Did I misunderstand something?

    ReplyDelete
  2. Hi Marce,

    This kind of bridge is supposed to work. Let's investigate this together. Can you shoot me an e-mail to hekonsek@gmail.com?

    Cheers!

    ReplyDelete
  3. Thanks Henryk I'll send you the email

    ReplyDelete