Load Balancing Data Across a NiFi Cluster

Mark Payne - 

With each release of Apache NiFi, we tend to see at least one pretty powerful new application-level feature, in addition to all of the new and improved Processors that are added. And the latest release of NiFi, version 1.8.0, is no exception! Version 1.8.0 brings us a very powerful new feature, known as Load-Balanced Connections, which makes it much easier to move data around a cluster. Prior to this feature, when a user needs to spread data from one node in a cluster to all the nodes of the cluster, the best option was to use Remote Process Groups and Site-to-Site to move the data. The approach looks like this:

First, an Input Port has to be added to the Root Group. Then, a Remote Process Group has to be added to the flow in order to transfer data to that Root Group Input Port. The connection has to be drawn from the Processor to the Remote Process Group, and the Input Port has to be chosen. Next, the user will configure the specific Port within the Remote Process Group and set the Batch Size to 1 FlowFile, so that data is Round-Robin'ed between the nodes. A connection must then be made from the Root Group Input Port all the way back down, through the Process Groups, to the desired destination, so that the FlowFiles that are sent to the Remote Process Group are transferred to where they need to go in the flow. This may involve adding several more Local Input Ports to the Process Groups, to ensure that the data can flow to the correct destination. Finally, all of those newly created components have to be started.

Unfortunately, this approach has a few problems:

  • It's a lot to setup! The user doesn't just have to create the Remote Process Group, but the Input Port, any number of intermediate input ports and connections to move data from the Root Group to the desired group, and then configure and start all of this.
  • It requires an Input Port at the Root Group level and every level in between. This means that the user either needs permissions to update all of these Process Groups, or needs the assistance of someone who does. In a large, multi-tenant environment, this can become problematic.
  • This results in a disconnected graph. Rather than connecting Processor A to Processor B, we end up connecting Processor A to Remote Process Group. Then, we have a second graph that connects Input Port 1 to Processor B. Even if there are no intermediate Process Groups, it makes the graph look something like this:

    The intent of this, and the understanding of how data flows through the system, is far less clear than if we simply connect Processor A to Processor B.

  • This approach does not lend itself well to reuse. When a flow is built using this paradigm, and the flow is then versioned using the NiFi Flow Registry, the URL of the Remote Process Group becomes part of that Versioned Flow. When it is then imported into a second environment, the user has to be sure to update the URL of the Remote Process Group, or else risk sending data to the wrong cluster.
  • Remote Process Groups do not support node affinity. When using an RPG to transfer data between nodes, the protocol is designed to balance the load across the cluster pretty evenly. However, it does not allow sending "like data" to the same node.

So if using Remote Process Groups to distribute data across the cluster has all of these drawbacks, it begs the question: why did we ever design it this way to begin with? The simple answer here is that it was never designed as a mechanism to re-distribute data between nodes in the same cluster. It was designed as a mechanism to send data from one NiFi instance/cluster to another. Hence the naming Remote Process Group and Site-to-Site protocol. It still does a pretty good job of this (though, I would still love to see "Remote Input Ports" at all levels, not just the Root Group level - but hopefully that will come soon!) When we started seeing this constant need to distribute data across the cluster, the Remote Process Group became the answer because the capability already existed and it worked for most cases - even if not optimally.

But version 1.8.0 aims to fix all of these problems by allowing the data to be load-balanced across the cluster at the Connection level. Now, any time that a user decides to create a Connection between two components on the graph - or configure an existing connection - the user is given some new options in the Settings dialog:

We can now configure a Load Balance Strategy, to start. The default value is "Do Not Load Balance,"
which behaves the same as connections always have. Data stays on the node that it's on. However, we can change the value to one of several values:

  • Round Robin. Data will be spread across the cluster evenly. This will be the most commonly chosen option.
  • Partition by Attribute. This allows "like data" to be sent to the same node in the cluster. When this value is chosen, the user is able to configure the "Attribute name." Any two FlowFiles that have the same value for this attribute will be sent to the same node. This is super helpful, for example, when using a MergeContent or MergeRecord processor, and we want to ensure that data spread across the cluster is merged together based on a "customerId" field.
  • Single Node. While this will likely be the least used strategy, there are times when we want to ensure that all data in a cluster is sent to a single node. For instance, before using an EnforceOrder processor to push to a database. Or when we want to merge data together into larger batches but the data rate is fairly low, so we would rather have all the data on a single node in order to more quickly meet our minimum thresholds.

So, to make it clear how we can use these features in a full flow, let's take the following use case as an example:

We need to get data from a remote SFTP server. The data is Point-of-Sale data in CSV format and is compressed using GZIP. One of the fields in the CSV data is the Store Identifier field, "storeId." We need to use this Store Identifier to perform some enrichment, adding in the City, State, and Region where the store is located. Once we have enriched the data, we want to send the data to three destinations: HDFS for archiving, Kafka for our real-time analytics to pull from, and Elasticsearch for querying and dashboarding. We want to push to Kafka in Avro format as quickly as possible. For HDFS, we want to merge together larger batches and store as JSON. For Elasticsearch, we can also benefit from having data bundled together because their Bulk API is more efficient than sending individual messages. We can build this flow fairly simply. Our entire flow now looks like this:

The first connection, from ListSFTP to FetchSFTP uses a Load Balance Strategy of "Round Robin." We simply want to spread the data across the cluster so that we can have all nodes in the cluster working together to pull the data, decompress it, enrich it, and split it apart based on Region. The connection before MergeRecord is configured to use a Load Balance Strategy of "Partition by Attribute" and uses the "region" attribute that was added by the PartitionRecord Processor.

Now, notice the new icon on a couple of the connections:

This lets us know that the Connection is currently configured to Load Balance data across the cluster. If we hover over it, we get a little bit more information:

Now, when we start our flow, we can see the data quickly flowing. We can also see that this icon has now changed from a grey color to a blue color and the orientation is different:

This tells us that not only is Load Balancing configured for this connection but that data is currently being transferred between nodes in the cluster. This is very important information if, for example, we notice that the destination of our connection is not processing the data. If the indicator is blue, then we know that the reason the data is not being processed is probably because the data does not yet live on the correct node and is being transferred there now.

It's also important to note that if the Load Balance Strategy is changed while data is queued up in the connection, then the connection will immediately start re-balancing the data to ensure that the data goes to the correct node. For example, if "Do Not Load Balance" is configured, and then it is changed to "Round Robin," it will immediately start binning the data to go to the different nodes in the cluster, and you'll see the indicator in blue. Similarly, if the Load Balance Strategy is changed from "Round Robin" to "Partition by Attribute," the data will be partitioned and binned differently, to ensure that data ends up on the correct node. If the number of nodes in our cluster changes (i.e., a node is removed from the cluster or added to the cluster), then the data will also be re-partitioned.

Now, let's consider what happens in a failure scenario. Specifically, what happens if a node disconnects from the cluster, or if a Node A simply cannot communicate with Node B. In this case, what happens to the data that is queued up waiting to go to Node B? Will the data just queue up, or will it fail over to another node?

Well, that depends on the Load Balance Strategy configured. If "Round Robin" is used, and a failure occurs, the data will be rebalanced to another node. In this case, the data is rebalanced fairly slowly, though. Up to 1,000 FlowFiles, or 10 MB of data, will be rebalanced per second. This is done so that if the node reconnects to the cluster or is able to communicate again momentarily, that the data is not immediately redistributed to other nodes. But if that does not happen, the data is still redistributed throughout the cluster in a timely manner.

If the "Partition by Attribute" or "Single Node" strategy is used, then the data will just queue up, waiting to send, until the node is reconnected and able to communicate. This is because these strategies expect that a given piece of data go to a specific node, whereas "Round Robin" just expects that data gets spread across the cluster.

While this is a very powerful feature that will significantly improve the user experience offered, this feature also lays the groundwork for some other very exciting features. The newly added "Offload" feature makes use of this mechanism to allow a node in the cluster to be disconnected and then Offloaded. Doing this results in stopping any and all Processors on that node and distributing all data across the cluster to the still-connected nodes. This, in turn, enables easy decommissioning of a NiFi node and truly elastic clustering capabilities.

While this is just a quick walkthrough of the new feature, I hope it has piqued your interest enough to get you excited about it and get you started. As always, please don't hesitate to reach out if you have any questions or comments.