Stream Processing: NiFi and Spark

Mark Payne - 
markap14@hotmail.com


Without doubt, Apache Spark has become wildly popular for processing large quantities of data. One of the key features that Spark provides
is the ability to process data in either a batch processing mode or a streaming mode with very little change to your code. Batch processing is typically performed by reading data from HDFS.
There have been a few different articles posted about using Apache NiFi (incubating) to publish data HDFS.
This article does a great job of explaining how to accomplish this.

In many contexts, though, operating on the data as soon as it is available can provide great benefits. In order to provide the right data as quickly as possible,
NiFi has created a Spark Receiver, available in the 0.0.2 release of Apache NiFi.
This post will examine how we can write a simple Spark application to process data from NiFi and how we can configure NiFi to expose the data to Spark.

Incorporating the Apache NiFi Receiver into your Spark application is pretty easy. First, you'll need to add the Receiver to your application's POM:

  
    org.apache.nifi
    nifi-spark-receiver
    0.0.2-incubating
  

That's all that is needed in order to be able to use the NiFi Receiver. So now we'll look at how to use the Receiver in your code.

The NiFi Receiver is a Reliable Java Receiver. This means that if we lose a node after it pulls the data from NiFi, the data will not be lost. Instead, another node will simply pull and process the data. In order to create a NiFi Receiver, we need to first create a configuration that tells the Receiver where to pull the data from. The simplest form is to just tell the config where NiFi is running and which Port to pull the data from:

SiteToSiteClientConfig config = new SiteToSiteClient.Builder()
  .url("http://localhost:8080/nifi")
  .portName("Data For Spark")
  .buildConfig();

To briefly explain the terminology here, NiFi refers to its mechanism for transferring data between clusters or instances as Site-to-Site. It exposes options for pushing data or pulling,
so that the most appropriate approach can be used for each situation. Spark doesn't supply a mechanism to have data pushed to it - instead, it wants to pull data from other sources.
In NiFi, this data can be exposed in such a way that a receiver can pull from it by adding an Output Port to the root process group. For Spark, we will use this same mechanism -
we will use this Site-to-Site protocol to pull data from NiFi's Output Ports. In order for this to work, we need two pieces of information: the URL to connect to NiFi and the name of
the Output Port to pull data from.

If the NiFi instance to connect to is clustered, the URL should be that of the NiFi Cluster Manager. In this case, the Receiver will automatically contact the Cluster
Manager to determine which nodes are in the cluster and will automatically start pulling data from all nodes. The Receiver automatically determines by communicating with the
Cluster Manager which nodes have the most data backed up and will pull from those nodes more heavily than the others. This information is automatically updated periodically
so that as new nodes are added to the cluster or nodes leave the cluster, or if the nodes become more or less bogged down, the Receiver will automatically adjust to handle this.

Next, we need to instruct the Receiver which Port
to pull data from. Since NiFi can have many different Output Ports, we need to provide either a Port Identifier or a Port Name. If desired, we can also configure communications
timeouts; SSL information for secure data transfer, authentication, and authorization; compression; and preferred batch sizes. See the JavaDocs for the SiteToSiteClient.Builder
for more information.

Once we have constructed this configuration object, we can now create the Receiver:

 SparkConf sparkConf = new SparkConf().setAppName("NiFi-Spark Streaming example");
 JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000L));
 
 // Create a JavaReceiverInputDStream using a NiFi receiver so that we can pull data from 
 // specified Port
 JavaReceiverInputDStream packetStream = 
     ssc.receiverStream(new NiFiReceiver(config, StorageLevel.MEMORY_ONLY()));

This gives us a JavaReceiverInputDStream of type NiFiDataPacket.
The NiFiDataPacket is a simple construct that packages an arbitrary byte array with a map of Key/Value pairs (referred to as attributes) that correspond to the data.
As an example, we can process the data without paying attention to the attributes:

 // Map the data from NiFi to text, ignoring the attributes
 JavaDStream text = packetStream.map(new Function() {
   public String call(final NiFiDataPacket dataPacket) throws Exception {
     return new String(dataPacket.getContent(), StandardCharsets.UTF_8);
   }
 });

Or we can make use of the attributes:

 // Extract the 'uuid' attribute
 JavaDStream text = packetStream.map(new Function() {
   public String call(final NiFiDataPacket dataPacket) throws Exception {
     return dataPacket.getAttributes().get("uuid");
   }
 });

So now we have our Receiver ready to pull data from NiFi. Let's look at how we can configure NiFi to expose this data.

First, NiFi has to be configured to allow site-to-site communication. This is accomplished by setting the nifi.remote.input.socket.port property
in the nifi.properties file to the desired port to use for site-to-site (if this value is changed, it will require a restart of NiFi for the changes to take effect).

Now that NiFi is setup to allow site-to-site, we will build a simple flow to feed data to Spark. We will start by adding two GetFile processors to the flow. One will
pick up from /data/in/notifications and the other will pick up from /data/in/analysis:

GetFile

Next, let's assume that we want to assign different priorities to the data that is picked up from each directory. Let's assign a priority of "1" (the highest priority)
to data coming from the analysis directory and assign a lower priority to the data from the notifications directory. We can use
the UpdateAttribute processor to do this. Drag the Processor onto the graph and configure it. In the Settings tab, we set the name to "Set High Priority". In the
Properties tab, we have no properties available. However, there's a button in the top-right of the dialog that says "New Property." Clicking that button lets us
add a property with the name priority and a value of 1. When we click OK, we see it added to the table:

Set Priority

Let's click Apply and add another UpdateAttribute processor for the data coming from the notifications directory. Here, we will add a property
with the name priority but give it a value of 2. After configuring these processor and connecting the GetFile processors to them,
we end up with a graph that looks like this:

Connect GetFile and UpdateAttribute

Now, we want to combine all of the data into a single queue so that we can prioritize it before sending the data to Spark. We do this by adding a Funnel to the graph
and connecting both of the UpdateAttribute processors to the Funnel:

With Funnel

Now we can add an Output Port that our Spark Streaming application can pull from. We drag an Output Port onto our graph. When prompted for a name, we will name it
Data For Spark, as this is the name that we gave to our Spark Streaming application. Once we connect the Funnel to the Output Port, we have a graph
like this:

Before Running

We haven't actually told NiFi to prioritize the data yet, though. We've simply added an attribute named priority. To prioritize the data based on that, we can
right-click on the connection that feeds the Output Port and choose Configure. From the Settings tab, we can drag the PriorityAttributePrioritizer from the list of Available Prioritizers
to the list of Selected Prioritizers:

Prioritize

Once we click Apply, we're done. We can start all of the components and we should see the data start flowing to our Spark Streaming application:

Running

Now any data that appears in our /data/in/notifications or /data/in/analysis directory will make its way to our streaming application!

By using NiFi's Output Port mechanism, we are able to create any number of different named Output Ports, as well. This allows you, as a NiFi user, to choose
exactly which data gets exposed to Spark. Additionally, if NiFi is configured to be secure, each Output Port can be configured to provide the data to only the hosts and users that
are authorized.

Let's consider, though, that this data has significant value for processing in a streaming fashion as soon as we have the data, but it may also be of value to a batch processing
analytic. This is easy to handle, as well. We can add a MergeContent processor to our graph and configure it to merge data into 64-128 MB TAR files. Then, when we have a
full TAR file, we can push the data to HDFS. We can configure MergeContent to make these bundles like so:

Merge configuration

We can then send the merged files to PutHDFS and auto-terminate the originals. We can feed all the data from our Funnel to this MergeContent processor, and this will
allow us to dual-route all data to both HDFS (bundled into 64-128 MB TAR files) and to Spark Streaming (making the data available as soon as possible with very low latency):

hdfs and spark streaming

We would love to hear any questions, comments, or feedback that you may have!

Learn more about Apache NiFi and feel free to leave comments here or e-mail us at dev@nifi.incubator.apache.org.