Indexing Tweets with NiFi and Solr

Bryan Bende -

This post will cover how to use Apache NiFi to pull in the public stream of tweets from the Twitter API, identify
specific tweets of interest, and deliver those tweets to Solr for indexing. The example developed here was built against
Apache NiFi 0.3.0 and Apache Solr 5.3.

In addition, you will also need to create a Twitter application for accessing their API. A good set of instructions for
doing that can be found in this article -
How to Create a Twitter App in 8 Easy Steps

Setting up Solr

For this example we will start Solr in cloud mode, and create a tweets collection based off the data-driven configuration provided by Solr.
The data-driven configuration allows Solr to create fields on the fly based off the incoming data, and is a good place to start for quickly prototyping.
The following two commands can be used to get Solr running and create the collection:

./bin/solr start -c
./bin/solr create_collection -c tweets -d data_driven_schema_configs -shards 1 -replicationFactor 1

At this point we should have a running Solr instance on port 8983, with an embedded ZooKeeper on port 9983. Navigate
to http://localhost:8983/solr/#/~cloud in your browser to verify Solr
is running and the tweets collection was created sucessfully.

Building the Dataflow

The dataflow we are going build in NiFi will consist of the following processors:

  • GetTwitter for accessing the Twitter API and producing FlowFiles containing JSON Tweets
  • EvaluateJsonPath for extracting values from JSON documents into FlowFile attributes
  • RouteOnAttribute for making decisions about incoming FlowFiles based on their attributes
  • MergeContent for merging together many JSON documents to a single document
  • PutSolrContentStream for streaming JSON documents to Solr

The overall flow looks like the following:


Lets walk through the configuration of each processor...



Set the end-point to the Sample Endpoint and fill in the credentials corresponding with the Twitter application you created earlier.

For this example we are going to specify a language of "en" to get only English tweets.



Now we want to extract the text and language of each tweet into FlowFile attributes in order to make decisions on these values later. Set the Destination property to "flowfile-attribute" and add two user-defined properties using the New Property icon on the top right. The name of the property will be the FlowFile attribute name, and the value is the JSON path we would like to extract from the Tweet. This will allow us to access these values later by using expression language, such as ${twitter.text} or ${twitter.lang}.



At this point we would like to ensure we are only indexing tweets we are interested. So we add a user-defined property called "tweet" and specify the following expression language as the value:


The first part of this expression filters out all the tweets which have an empty message. This occurs frequently as some of the JSON documents coming through the end-point are not actual tweets.
The second part of the expression ensures that we are only selecting english tweets. We already set GetTwitter to filter on "en", but if we hadn't, the language could be used here to route different languages to different relationships.
Any tweets matching the above conditions will be routed to the "tweet" relationship based on the property we defined, and anything else will be routed to the "unmatched" relationship. We can auto-terminate the "unmatched" relationship to have the non-matching FlowFiles discarded.



In this example we could send the JSON documents from RouteOnAttribute directly to Solr, but in most real world scenarios accessing the Solr cluster will require network communication, and it will likely produce better performance if we batch together multiple JSON documents into a single request to reduce the amount requests sent over the network. The MergeContent processor is a powerful processor that was built just for this scenario in mind, and is capable of merging FlowFiles based on a number of criteria such as the number of FlowFiles, or their age. MergeContent also performs merges in a streaming manner and as a result is capable of merging a significant number of FlowFiles without worrying about exceeding memory constraints.

To configure MergeContent, set a Minimum and Maximum Number of entries, as well as a Max Bin Age to trigger a merge in cases where no new data has come in for a period of time. Also set the Delimiter Strategy to "Text", and specify the Header, Footer, and Demarcator as [ , ] respectively. This allows us to create a large JSON document composed of many incoming documents.



Configure PutSolrContentStream to point to the Solr instance we started earlier by setting the Solr Type to "Cloud" and the Solr Location to the embedded ZooKeeper that was started earlier (localhost:9983). Also specify a Commit Within of "1000" to commit the incoming documents every second (this may not be needed if you set autoCommit settings in your solrconfig.xml).

Any user-defined properties will get sent to Solr on the request as key value pairs, so we need to tell Solr how to transform the incoming JSON document into a Solr document. A good explanation of how this JSON to Solr mapping works can be found
here, and an explanation of the user-defined
properties can be found here.

For this example we will provide the following mappings:

  • split=/ to treat each child JSON document that we merged together as individual Solr documents
  • id:/id to map the id field of the JSON to the id field in the Solr schema (already defined)
  • twitter_text_t:/text to map the text field of the JSON to a dynamic field of type text_general in Solr
  • twitter_username_s:/user/name to map the user of the tweet to a dynamic string field in Solr
  • twitter_created_at_s:/created_at to map the creation date string to a dynamic string field in Solr
  • twitter_timestamp_ms_tl:/timestamp_ms to map the timestamp to a dynamic trie-long field in Solr

The naming conventions of the fields allow us to leverage dynamic fields in Solr. Dynamic fields take effect by using a suffix on the field name to indicate the field type, so the "_t" on "twitter_text_t" tells Solr that this field will map to a text_general field type. It is possible to not provide any field mappings and just send the JSON to Solr, and let Solr guess the field types on the fly. However, in that case all of the fields would get added as multiValued fields, and multiValued fields can't be used in a sorting clause which would prevent us from sorting on the timestamp. So we opt for the dynamic fields here.

Also worth noting, this processor has two failure relationships. The regular "failure" relationship is for failures that would generally require some intervention to resolve, such as an invalid document being submitted. The "connection_failure" relationship is for failures related to communication issues between NiFi and Solr. A typical approach in a production scenario would be to route the "connection_failure" back to itself to retry during connection failures, and to route the "failure" relationship to a PutFile processor that could write out failed documents for later inspection.


At this point you should be able to hit Start on your processors and start seeing tweets flowing to Solr, happy indexing!

A template of the flow created in this example can be found here. We would love to hear any questions, comments, or feedback that you may have!

Learn more about Apache NiFi and feel free to leave comments here or e-mail us at