Record-Oriented Data with NiFi
Record-Oriented Data with NiFi
Intro - The What
Apache NiFi is being used by many companies and organizations to power their data distribution
needs. One of NiFi's strengths is that the framework is data agnostic. It doesn't care what type
of data you are processing. There are processors for handling JSON, XML, CSV, Avro, images and
video, and several other formats. There are also several general-purposes processors, such as
RouteText and CompressContent. Data can be hundreds of bytes or many gigabytes. This makes NiFi
a powerful tool for pulling data from external sources; routing, transforming, and aggregating it;
and finally delivering it to its final destinations.
While this ability to handle any arbitrary data is incredibly powerful, we often see users working
with record-oriented data. That is, large volumes of small "records," or "messages," or "events."
Everyone has their own way to talk about this data, but we all mean the same thing. Lots of really
small, often structured, pieces of information. This data comes in many formats. Most commonly,
we see CSV, JSON, Avro, and log data.
While there are many tasks that NiFi makes easy, there are some common tasks that we can do
better with. So in version 1.2.0 of NiFi, we released a new set of Processors and Controller Services,
for working with record-oriented data. The new Processors are configured with a Record Reader
and a Record Writer Controller Service. There are readers for JSON, CSV, Avro, and log data. There
are writers for JSON, CSV, and Avro, as well as a writer that allows users to enter free-form text.
This can be used to write data out in a log format, like it was read in, or any other custom textual format.
The Why
These new processors make building flows to handle this data
simpler. It also means that we can build processors that accept any data format without having to
worry about the parsing and serialization logic. Another big advantage of this approach is that we
are able to keep FlowFiles larger, each consisting of multiple records, which results in far better
performance.
The How - Explanation
In order to make sense of the data, Record Readers and Writers need to know the schema that is associated
with the data. Some Readers (for example, the Avro Reader) allow the schema to be read from the data itself.
The schema can also be included as a FlowFile attribute. Most of the time, though, it will be looked up by
name from a Schema Registry. In this version of NiFi, two Schema Registry implementations exist: an Avro-based
Schema Registry service and a client for an external Hortonworks Schema Registry.
Configuring all of this can feel a bit daunting at first. But once you've done it once or twice,
it becomes rather quick and easy to configure. Here, we will walk through how to set up a local Schema Registry
service, configure a Record Reader and a Record Writer, and then start putting to use some very powerful
processors. For this post, we will keep the flow simple. We'll ingest some CSV data from a file and then
use the Record Readers and Writers to transform the data into JSON, which we will then write to a new directory.
The How - Tutorial
In order to start reading and writing the data, as mentioned above, we are going to need a Record Reader
service. We want to parse CSV data and turn that into JSON data. So to do this, we will need a CSV Reader
and a JSON Writer. So we will start by clicking the "Settings" icon on our Operate
palette.
This will allow us to start configuring our Controller Services. When we click the 'Add' button in the
top-right corner, we see a lot of options for Controller Services:
Since we know that we want to read CSV data, we can type "CSV" into our Filter box in the top-right corner
of the dialog, and this will narrow down our choices pretty well:
We will choose to add the CSVReader service and then configure the Controller Service. In the Properties tab,
we have a lot of different properties that we can set:
Fortunately, most of these properties have default values that are good for most cases, but you can choose which
delimiter character you want to use, if it's not a comma. You can choose whether or not to skip the first line,
treating it as a header, etc. For my case, I will set the "Skip Header Line" property to "true" because my data contains a header line
that I don't want to process as a record. The first properties are very important though. The "Schema Access Strategy" is used
to instruct the reader on how to obtain the schema. By default, it is set to "Use String Fields From Header." Since
we are also going to be writing the data, though, we will have to configure a schema anyway. So for this demo, we will
change this strategy to "Use 'Schema Name' Property." This means that we are going to lookup the schema from a Schema
Registry. As a result, we are now going to have to create our Schema Registry. If we click on the "Schema Registry"
property, we can choose to "Create new service...":
We will choose to create an AvroSchemaRegistry. It is important to note here that we are reading CSV data and writing
JSON data - so why are we using an Avro Schema Registry? Because this Schema Registry allows us to convey the schema
using the Apache Avro Schema format, but it does not imply anything
about the format of the data being read. The Avro format is used because it is already a well-known way of storing
data schemas.
Once we've added our Avro Schema Registry, we can configure it and see in the Properties tab that it has no properties
at all. We can add a schema by adding a new user-defined property (by clicking the 'Add' / 'Plus' button in the top-right
corner). We will give our schema the name "demo-schema" by using this as the name of the property. We can then type or paste
in our schema. For those unfamiliar with Avro schemas, it is a JSON formatted representation that has a syntax like the
following:
{ "name": "recordFormatName", "namespace": "nifi.examples", "type": "record", "fields": [ { "name": "id", "type": "int" }, { "name": "firstName", "type": "string" }, { "name": "lastName", "type": "string" }, { "name": "email", "type": "string" }, { "name": "gender", "type": "string" } ] }
Here, we have a simple schema that is of type "record." This is typically the case, as we want multiple fields. We then
specify all of the fields that we have. There is a field named "id" of type "int" and all other fields are of type "string."
See the Avro Schema documentation for more information.
We've now configured our schema! We can enable our Controller Services now.
We can now add our JsonRecordSetWriter controller service as well. When we configure this service, we see some familiar
options for indicating how to determine the schema. For the "Schema Access Strategy," we will again use the "Use 'Schema Name' Property,"
which is the default. Also note that the default value for the "Schema Name" property uses the Expression Language to reference
an attribute named "schema.name". This provides a very nice flexibility, because now we can re-use our Record Readers and Writers
and simply convey the schema by using an UpdateAttribute processor to specify the schema name. No need to keep creating Record
Readers and Writers. We will set the "Schema Registry" property to the AvroSchemaRegistry that we just created and configured.
Because this is a Record Writer instead of a Record Reader, we also have another interesting property: "Schema Write Strategy."
Now that we have configured how to determine the data's schema, we need to tell the writer how to convey that schema to the next
consumer of the data. The default option is to add the name of the schema as an attribute. We will accept the default. But we
could also write the entire schema as a FlowFile attribute or use some strategies that are useful for interacting with the Hortonworks
Schema Registry.
Now that we've configured everything, we can apply the settings and start our JsonRecordSetWriter as well. We've now got all of our
Controller Services setup and enabled:
Now for the fun part of building our flow! The above will probably take about 5 minutes, but it makes laying out the flow super
easy! For our demo, we will have a GetFile processor bring data into our flow. We will use UpdateAttribute to add a "schema.name"
attribute of "demo-schema" because that is the name of the schema that we configured in our Schema Registry:
We will then use the ConvertRecord processor to convert the data into JSON. Finally, we want to write the data out using PutFile:
We still need to configure our ConvertRecord processor, though. To do so, all that we need to configure in the Properties
tab is the Record Reader and Writer that we have already configured:
Now, starting the processors, we can see the data flowing through our system!
Also, now that we have defined these readers and writers and the schema, we can easily create a JSON Reader, and an
Avro Writer, for example. And adding additional processors to split the data up, query and route the data becomes very
simple because we've already done the "hard" part.
Conclusion
In version 1.2.0 of Apache NiFi, we introduced a handful of new Controller Services and Processors that will make managing
dataflows that process record-oriented data much easier. I fully expect that the next release of Apache NiFi will have several
additional processors that build on this. Here, we have only scratched the surface of the power that this provides to us.
We can delve more into how we are able to transform and route the data, split the data up, and migrate schemas. In the meantime,
each of the Controller Services for reading and writing records and most of the new processors that take advantage of these
services have fairly extensive documentation and examples. To see that information, you can right-click on a processor and
click "Usage" or click the "Usage" icon on the left-hand side of your controller service in the configuration dialog.
From there, clicking the "Additional Details..." link in the component usage will provide pretty significant documentation.
For example, the JsonTreeReader provides a wealth of knowledge in its
Additional Details.