Getting Syslog Events to HBase

Bryan Bende - 
bbende@gmail.com
@bbende


In the Apache NiFi 0.4.0 release there are several new integration points
including processors for interacting with Syslog and HBase. In this post we'll
demonstrate how to use NiFi to receive messages from Syslog over UDP,
and store those messages in HBase.

The flow described in this post was created using Apache NiFi 0.4.0,
rsyslog 5.8.10, and Apache HBase 1.1.2.

Setting up Syslog

In order for NiFi to receive syslog messages, rsyslog needs to forward messages
to a port that NiFi will be listening on. Forwarding of messages can be
configured in rsyslog.conf, generally located in /etc on most Linux operating
systems.

Edit rsyslog.conf and add the following line:

*.* @localhost:7780

This tells rsyslog to forward all messages over UDP to localhost port 7780.
A double '@@' can be used to forward over TCP.

Restart rsyslog for the changes to take effect:

/etc/init.d/rsyslog restart
Shutting down system logger:                               [  OK  ]
Starting system logger:                                    [  OK  ]

Setting up HBase

In order to store the syslog messages, we'll create an HBase table called
'syslog' with one column family called 'msg'. From the command line enter the
following:

hbase shell
create 'syslog', {NAME => 'msg'}

Configure an HBase Client Service

The HBase processors added in Apache NiFi 0.4.0 use a controller service to
interact with HBase. This allows the processors to remain unchanged when the
HBase client changes, and allows a single NiFi instance to support multiple
versions of the HBase client. NiFi's class-loader isolation provided in NARs,
allows a single NiFi instance to interact with HBase instances of different
versions at the same time.

The HBase Client Service can be configured by providing paths to external
configuration files, such as hbase-site.xml, or by providing several
properties directly in the processor. For this example we will take the
latter approach. From the Controller Services configuration window in NiFi,
add an HBase_1_1_2_ClientService with the following configuration (adjusting
values appropriately for your system):

client-service-config.jpg

After configuring the service, enable it in order for it to be usable by
processors:

client-service-enabled.jpg

Building the Dataflow

The dataflow we are going build will consist of the following components:

  • ListenSyslog for receiving syslog messages over UDP
  • UpdateAttribute for renaming attributes and creating a row id for HBase
  • AttributesToJSON for creating a JSON document from the syslog attributes
  • PutHBaseJSON for inserting each JSON document as a row in HBase

The overall flow looks like the following:

syslog-hbase-flow.jpg

Lets walk through the configuration of each processor...

ListenSyslog

config-listensyslog.jpg

Set the Port to the same port that rsyslog is forwarding messages to,
in this case 7780. Leave everything else as the default values.

With a Max Batch Size of "1" and Parse Messages as "true", each syslog
message will be emitted as a single FlowFile, with the content of the
FlowFile being the original message, and the results of parsing the message
being stored as FlowFile attributes.

The attributes we will be interested in are:

  • syslog.priority
  • syslog.severity
  • syslog.facility
  • syslog.version
  • syslog.timestamp
  • syslog.hostname
  • syslog.sender
  • syslog.body
  • syslog.protocol
  • syslog.port

UpdateAttribute

config-updateattr.jpg

The attributes produced by ListenSyslog all start with "syslog." which keeps
them nicely namespaced in NiFi. However, we are going to use these attribute
names as column qualifiers in HBase. We don't really need this prefix since
we will already be with in a syslog table.

Add a property for each syslog attribute to remove the prefix, and use the
Delete Attributes Expression to remove the original attributes. In addition,
create an id attribute of the form "timestamp_uuid" where timestamp is the
long representation of the timestamp on the syslog message, and uuid is the
uuid of the FlowFile in NiFi. This id attribute will be used as the row id in
HBase.

The expression language for the id attribute is:

${syslog.timestamp:toDate('MMM d HH:mm:ss'):toNumber()}_${uuid}
  

AttributesToJSON

config-attrstojson.jpg

Set the Destination to "flowfile-content" so that the JSON document
replaces the FlowFile content, and set Include Core Attributes to
"false" so that the standard NiFi attributes are not included.

PutHBaseJSON

config-puthbasejson.jpg

Select the HBase Client Service we configured earlier and set the
Table Name and Column Family to "syslog" and "msg" based on the
table we created earlier. In addition set the Row Identifier Field Name
to "id" to instruct the processor to use the id field from the JSON for the
row id.

Verifying the Flow

From a terminal we can send a test message to syslog using the logger utility:

logger "this is a test syslog message"

Using the HBase shell we can inspect the contents of the syslog table:

hbase shell
hbase(main):002:0> scan 'syslog'
ROW                                          COLUMN+CELL
29704815000_84f91b21-d35f-4a24-8e0e-aaed4a521c13 column=msg:body, timestamp=1449775215481,
  value=root: this is a test message
29704815000_84f91b21-d35f-4a24-8e0e-aaed4a521c13 column=msg:hostname, timestamp=1449775215481,
  value=localhost
29704815000_84f91b21-d35f-4a24-8e0e-aaed4a521c13 column=msg:port, timestamp=1449775215481,
  value=7780
29704815000_84f91b21-d35f-4a24-8e0e-aaed4a521c13 column=msg:protocol, timestamp=1449775215481,
  value=UDP
29704815000_84f91b21-d35f-4a24-8e0e-aaed4a521c13 column=msg:sender, timestamp=1449775215481,
  value=/127.0.0.1
29704815000_84f91b21-d35f-4a24-8e0e-aaed4a521c13 column=msg:timestamp, timestamp=1449775215481,
  value=Dec 10 19:20:15
29704815000_84f91b21-d35f-4a24-8e0e-aaed4a521c13 column=msg:version, timestamp=1449775215481,
  value=
1 row(s) in 0.1120 seconds

Performance Considerations

In some cases the volume of syslog messages being pushed to ListenSyslog may
be very high. There are several options to help scale the processing
depending on the given use-case.

Concurrent Tasks

ListenSyslog has a background thread reading messages as fast
as possible and placing them on a blocking queue to be de-queued and processed
by the onTrigger method of the processor. By increasing the number of
concurrent tasks for the processor, we can scale up the rate at which messages
are processed, ensuring new messages can continue to be queued.

Parsing

One of the more expensive operations during the processing of a message is
parsing the message in order to provide the the attributes. Parsing messages
is controlled on the processor through a property and can be turned off in
cases where the attributes are not needed, and the original message just
needs to be delivered somewhere.

Batching

In cases where parsing the messages is not necessary, an additional option is
batching many messages together during one call to onTrigger. This is
controlled through the Batch Size property which defaults to "1". This
would be appropriate in cases where having individual messages is not
necessary, such as storing the messages in HDFS where you need them batched
into appropriately sized files.

ParseSyslog

In addition to parsing messages directly in ListenSyslog, there is also a
ParseSyslog processor. An alternative to the flow described in the post
would be to have ListenSyslog produce batches of 100 messages at a time,
followed by SplitText, followed by ParseSyslog. The tradeoff here is that
we can scale the different components independently, and take advantage of
backpressure between processors.

Summary

At this point you should be able to get your syslog messages ingested into
HBase and can experiment with different configurations. The template for this flow can be found
here.

We would love to hear any questions, comments, or feedback that you may have!

Learn more about Apache NiFi and feel
free to leave comments here or e-mail us at dev@nifi.apache.org.