Integrating Apache NiFi with Kafka
Integrating Apache NiFi with Apache Kafka
A couple of weeks ago, Joey Echeverria wrote a fantastic blog post about how to get started with Apache NiFi (Incubating), available at http://ingest.tips/2014/12/22/getting-started-with-apache-nifi/ . In it, Joey outlines how to quickly build a simple dataflow that automatically picks up any data from the /dropbox directory on your computer and pushes the data to HDFS. He then goes on to use the ExecuteStream Processor to make use of the Kite SDK’s ability to push CSV data into Hadoop.
In this blog post, we will expand upon Joey’s example to build a dataflow that’s a bit more complicated and illustrate a few additional features of Apache NiFi. If you haven’t already read Joey’s blog, we would recommend you read it before following along here, as this post assumes that you have a very basic understanding of NiFi, such as what NiFi is and how to add a Processor to the graph.
We will continue with the MovieLens dataset, this time using the "MovieLens 10M" dataset, which contains "10 million ratings and 100,000 tag applications applied to 10,000 movies by 72,000 users." However, rather than downloading this dataset and placing the data that we care about in the /dropbox directory, we will use NiFi to pull the data directly from the MovieLens site.
To do this, we first drag a Processor onto the graph. When we do this, we are given the option of choosing many different types of Processors. We know that we want to retrieve data from an HTTP address, so we will search for Processors of interest by choosing the "by tag" option for the filter and typing "http":
This narrows the list of Processors to just a few. We can quickly see the GetHTTP Processor. Clicking on
the Processor shows a description of the Processor just below: "Fetches a file via HTTP." This sounds like
a good fit, so we click the "Add" button to add it to the graph. Next, we need to configure the Processor to
pull the data that we are interested in. Right-clicking the Processor and clicking the "Configure" menu
option provides us a dialog with a "Properties" tab. Here, we can enter the URL to fetch the data from
(http://files.grouplens.org/datasets/movielens/ml-10m.zip) and enter the filename to use when downloading it:
We can leave the other properties as they are for the sake of this blog post. In a real environment,
you may have an interest in changing timeout values, providing authentication, etc. We do, however, want to look at the
"Scheduling" tab. By default, Processors are scheduled to run as fast as they can. While this is great for most high-volume
dataflows, we wouldn't be very good clients if we continually hit the website nonstop as fast as we can - and I doubt that those
responsible for the site would appreciate it very much. In order to avoid this, we go to the "Scheduling" tab and change the
"Run Schedule" setting from "0 sec" to something more reasonable - say "10 mins". This way, we will check for new data
from the website every 10 minutes. If the E-Tag changes or the Last Modified date changes, then we will pull the data again.
Otherwise, we won't continue to pull the data.
Next, we know that the data is in ZIP format, based on the URL. We will want to unzip the data to interact with
each file individually. Again, we can drag a Processor to the graph and filter by the tag "zip." We see that the
UnpackContent processor can handle ZIP files. We add it to the graph and configure its Properties as we did for
GetHTTP. This time, there is only one property: Packing Format. We change it to "zip" and click "Apply."
Next, we need to send the data from GetHTTP to UnpackContent. We drag a Connection between the Processors and
include the "success" relationship, as that is the only relationship defined by GetHTTP. Now that we've unzipped
the data, the README tells us that the actual data included in the zip is in 3 files:
movies.dat, ratings.dat,
and tags.dat. We don't really care about the rest of the data in the zip file, so we will use a RouteOnAttribute
to throw it out. We'll configure our RouteOnAttribute with the following properties:
We will connect the "success" relationsihp of UnpackContent to RouteOnAttribute. We don't care about the original
FlowFile - we only want the unpacked data. And if the Unpack fails, then there's not much we can do, so we will
configure UnpackContent and in the Settings tab choose to Auto-terminate the "original" and "failure" relationships.
Likewise, we don't care about anything routed to the "unmatched" relationship on the RouteOnAttribute, so we will
Auto-terminate that relationship as well.
Now, we can just decide what to do with the "movies", "ratings", and "tags" relationships. For the sake of this
post, let's go ahead and push all of this data to HDFS with a PutHDFS Processor. We add a PutHDFS Processor and
configure it as Joey's blog instructs. We then drag a Connection from RouteOnAttribute to PutHDFS and choose
all three of these relationships.
For PutHDFS, once we have successfully sent the data, there is nothing else to do, so we Auto-terminate the "success"
relationship. However, if there's a problem sending the data, we don't want to lose this data. We can then connect
the PutHDFS' "failure" relationship back to the PutHDFS Processor. Now, if we fail to send the data we will keep
retrying until we are successful.
Now, we're ready to send some data to Kafka. Let's assume that we want only the data in the `movies.dat` file to
go to Kafka. We can drag a Processor onto the graph and filter by the tag "kafka." We see two Processors: GetKafka
and PutKafka. Since we want to send the data, we will use PutKafka. To send the movies data, we simply draw a Connection
from the RouteOnAttribute Processor to PutKafka and choose only the "movies" relationship. NiFi will take care of cloning
the FlowFile in a way that's very efficient so that no data is actually copied.
We configure PutKafka by Auto-terminating "success" and looping "failure" back to itself as with PutHDFS. Now let's look
at the Properties tab. We have to choose a Kafka Topic to send the data to and a list of 1 or more Kafka servers to send to.
We'll set the Known Brokers to "localhost:9092" (assuming this is running on the same box as Kafka) and set the Kafka Topic
to "movies". Since the data is a CSV file, we know that it is new-line delimited. By default, NiFi will send the entire contents of
a FlowFile to Kafka as a single message. However, we want each line in our CSV file to be a new message on the Kafka Topic.
We accomplish this by setting the "Message Delimiter" property to "\n". At this point, our flow should look something like this:
And now we're ready to start our flow!
We can press Ctrl+A to select all and then click the Start button in the middle toolbar to start everything running. While the flow is running, we can right-click on the canvas and select "Refresh Status" to update all of the stats shown on the Processors. Then we will see in the top-right-hand corner of GetHTTP a "1" indicating that there is currently a single task running for this Processor. Once this task is finished, the data will be routed to the UnpackContent Processor, then the RouteOnAttribute Processor. At this point, some of the data will be destroyed because RouteOnAttribute will route it to "unmatched." The rest will go to the PutHDFS Processor, while only the "movies.dat" file will be sent to the PutKafka Processor. If we keep clicking "Refresh Status" as the data is processing, we will see each of these steps happening.
We've now successfully setup a dataflow with Apache NiFi that pulls the largest of the available MovieLens datasets, unpacks the zipped contents, grooms the unwanted data, routes all of the pertinent data to HDFS, and finally sends a subset of this data to Apache Kafka.
In Part Two of this series, we will look at how we can consume data from Kafka using NiFi, as well as how we can see what data we've pulled and what we've done with that data. Until then, please feel free to leave any questions, comments, or feedback in the Comments section.