Apache Spark (Structured Streaming) ActiveMQ Connector

spark Jan 19, 2021

What are we trying to address through this blog?

The main focus of this blog is to create an application to fetch data from ActiveMQ's broker, by establishing a secure connection to a topic or a queue with the use of JMS.

Some terminologies for context:

  1. ActiveMQ: ActiveMQ is an open-source protocol that functions as an implementation of ‘message-oriented-middleware’ (MOM), it’s basic function is to send messages between different applications. ActiveMQ translates messages from sender to receiver, it can connect multiple clients and allows messages to be held in a queue, instead of requiring both the client and server to be available simultaneously in order to communicate, messaging can still happen even if one application is temporarily unavailable. [More information: https://activemq.apache.org/ ]
  2. Spark: Apache Spark is a data processing framework that can quickly perform processing tasks on very large data sets, and can also distribute data processing tasks across multiple computers, either on its own or in tandem with other distributed computing tools. [More Information: https://spark.apache.org/ ]
  3. JMS: JMS (Java Message Service) is an API that provides the facility to create, send, and read messages. It provides loosely coupled reliable and asynchronous communication. The message is a piece of information. It can be a text, XML document, JSON data, or an Entity (Java Object), etc. [More Information: https://www.oracle.com/technical-resources/articles/java/intro-java-message-service.html ]

How does ActiveMQ work?

ActiveMQ sends messages between producers(which create messages and submit them for delivery) and consumers(which receive and process messages). The message is a piece of information that can be a text, XML document, JSON data or an Entity (Java Object), etc.
The ActiveMQ broker routes each message through one of two types of destinations:

  1. Queue: Messages awaits delivery to a single consumer (in a messaging domain called point-to-point)
  2. Topic: Messages are delivered to multiple consumers that are subscribed to that particular topic (in a messaging domain called pub-sub).

The ActiveMQ gives us the flexibility to send messages through both, queues and topics, using a single broker. In point-to-point messaging, the broker acts as a ‘load balancer’ by routing each message from the queue to one of the available consumers in a round-robin pattern. Whereas when we use pub/sub messaging, the broker delivers each message to every consumer that is subscribed to that particular topic.

ActiveMQ is a JMS provider, which means that it implements the functionality specified in the JMS API. Client applications—producers and consumers—use the JMS API to send and receive messages. Non-JMS clients can also connect to the ActiveMQ broker via the AMQP, MQTT, and STOMP protocols.
ActiveMQ sends messages asynchronously, so consumers don’t necessarily receive messages immediately. The producer’s task of composing and sending a message is disconnected from the consumer’s task of fetching it. Because ActiveMQ uses a broker as an intermediary, producers and consumers are independent (and even unaware) of each other. As soon as a producer sends a message to a broker, its task is complete, regardless of whether or when a consumer receives the message. Conversely, when a consumer receives a message from a broker, it does so without the knowledge of the producer that created the message. This type of arrangement, in which clients function without knowledge of one another, is known as loose coupling. [For more detailed information on ActiveMQ one can refer to this git repo which I found very helpful]

activeMQ architecture

Using Spark-Structured-Streaming to fetch data

As described above, ActiveMQ provides us with a broker that stores the queues/topics which in turn store the messages, now to retrieve data from the broker we need to set-up a client, that can securely connect to the ActiveMQ broker then fetch all the enqueued messages in the queue/topic of ours interest. For the context of this blog, we will consider subscribing to a queue. Given the fact that we will be dealing with continuous data, we will set up a spark-streaming application as our client. As mentioned on the apache-spark website:

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small-batch jobs thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees.”

We can set-up a spark-streaming application as:

The format specifies the type of source the spark application is reading from, and the options is used to pass a Map of parameters necessary to initiate the connection from our spark-application to the data-producer(in our case the ActiveMQ broker).

The df DataFrame represents an unbounded table containing the streaming-data. The table content depends on the type of source passed to it in the format viz. for reading data from Kafka the table column is ‘id’ and ‘value’. In our ActiveMQ case the columns are ‘jms-id’, ‘message-content’, ‘queue-name’, etc.

It is to be noted that the df does not receive any form of data till the load(), as we are just setting up the transformation and have not yet started it. It is after using the start, in the writeStream, the actual data-fetching process begins. The awaitTermination ensures that the data fetching process is not killed unless we explicitly terminate the application.

Before moving further lets have a look at build.sbt
build.sbt

Clubbing JMS with spark

This is the part where things will get really messy as there is hardly any information available on the internet, regarding this topic, so bear with me. The code shown here is available at: https://github.com/WinterSoldier13/linkinJMS

So as mentioned above, while consuming data through spark we need to specify a format, which we will use to club our JMS client with the spark. So, we will first start by creating a package. For the sake of an example, following the repo mentioned above let’s name it as com.wintersoldier.linkinJMS. This repository name is passed as the ‘source’ to the format of readStream.

To make this package usable as a source, we need to have a class named as DefaultSource.scala, which should extend all the other required class(es) from the spark-sql-sources. For this example, let’s consider only the reading part from “streaming” source, hence our code should have:

import org.apache.spark.sql.sources.StreamSourceProvider

Thus our class will be:

private[linkinJMS] class DefaultSource extends StreamSourceProvider { ... }

As we are extending the StreamSourceProvider, we need to overload two methods : sourceSchema and createSource.

sourceSchema method

The above snapshot shows the sourceSchema method, that returns a (String, SchemaStruct). As we are making our application JMS specific so instead of accepting an schema from the user we will always return the JMSMessage Schema.

Then we define the createSource function as:

createSource method

We also club the DefaultSource class with an companion object as:

DefaultSource companion object

After all the changes the DefaultSource.scala will look like:

DefaultSource.scala
Okay so, at this point I have introduced two ‘unknown’ functions in the code : JmsStreamingSource() and AMQConnectionFactoryProvider()

Let’s have a look at them one by one, kicking off with the AMQConnectionFactoryProvider().

So, the basic functionality of the AMQConnectionFactoryProvider() is to create an ActiveMQ connection with the help of ActiveMQConnectionFactory(), which can be initialized in three ways:

  1. We pass a brokerUrl
  2. We pass a brokerUrl, along with an username and password to establish a secure connection
  3. We pass nothing to create a connection on the default brokerUrl

As we see in DefaultSource-companion-object, we are passing a parameters variable to the AMQConnectionFactoryProvider() which is a Map of [String, String].(recall from the above paragraphs, that this parameters is nothing else but the optionsMap passed in the readStream). Hence we can write a method to handle all the three cases mentioned above:

ConnectionFactoryProvider.scala
Okay, so this sorts out our AMQConnectionFactoryProvider() , now let’s have a look at the JmsStreamingSource() where all the magic happens.

First of all to store JmsStreamingSource we need to create a new package: org.apache.spark.sql.jms This is important because the JmsStreamingSource class uses a function that can’t be called from outside of this package.

To start off with, we will first extend the Source class and then extract the necessary details from the parameters. We need to get hold of:

  1. readInterval : Specifies the wait time after each fetch
  2. clientName : This is an optional parameter that can be used to create a durable connection in case of a topic.
  3. srcType : Specifies whether our application needs to be connected to a queue or a topic.
  4. srcName : Specifies the name of the topic/queue.

After extracting these informations, we create an connection to the ActiveMQ broker using the companion object we defined in the DefaultSource.scala. After establishing a connection with the ActiveMQ broker, we can create a session on top of this connection, and use this Session to create a MessageConsumer to read messages either from the queue or the topic, as specified by the user. The code has been shown below:

creating a session in JMSStreaming

Now as our class extends Source we need to override four functions:

  1. getOffset : It returns the current offset, for this we can use a counter variable which will be incremented by 1 on each call, then return it as JmsSourceOffset.
  2. getBatch : This is the function that is called 'in a loop' by the spark.readStream, and this function will contain all the necessary code to fetch the messages.
  3. schema: This function is used by the getBatch method while returning a dataframe.
  4. stop : This function is automatically invoked when the application is terminated where we should close all the open sessions, followed by the open connections.

Let’s have a look at the getBatch function, which returns the streaming DataFrame.
  1. First we define an empty ListBuffer of type JmsMessage(see here) to store all the received messages.
  2. We define a boolean variable break, that will help us to exit the loop.
  3. Start the loop, and in each iteration get the textMessage from the JMSProvider.
  4. Acknowledge the message if required.
  5. In case if we receive a null as textMessage we break out of the loop, else we append the message to the listBuffer.

After coming out of the loop, we parallelize the listBuffer to an rdd, then return it as a Streaming-DataFrame.

Finally, our code will look like:

JmsStreamingSource.scala

And we're done..

We can test our application by running :

sampleApp.scala

Writing to the ActiveMQ

Just as we created a Session to read messages from the MQ, we can create one to write back to the MQ, the only difference being that this time instead of creating a MessageConsumer we will be creating a MessageProducer.

writeToMQ.scala

Tags

Sanjay

Among with Ayush

Tookitaki