Functional reads over Accumulo
This post was moved to the Accumulo project site.
Table structure is a common area of discussion between all types of Accumulo users. In the relational database realm, there was often a straightforward way that most users could agree upon that would be ideal to store and query some dataset. Data was identified by its schema, some fixed set of columns where each value within that column had some given characteristic. One of the big pushes behind the "NoSQL" movement was a growing pain in representing evolving data within a static schema. Applications like Accumulo removed that notion for a more flexible layout where the columns vary per row, but this flexibility often sparks debates about how data is "best" stored that often ends without a clear-cut winner.
In general, I've found that, with new users to Accumulo, it's difficult to move beyond the basic concept of GETs and PUTs of some value for a key. Rightfully so, it's analogous to a spreadsheet: get or update the cell in the given row and column. However, there's a big difference in that the spreadsheet is running on your local desktop, instead of running across many machines. In the same way, while a local spreadsheet application has some similar functionality to Accumulo, it doesn't really make sense to think about using Accumulo as you would a spreadsheet application. Personally, I've developed a functional-programming-inspired model which I tend to follow when implementing applications against Accumulo. The model encourages simple, efficient and easily testable code, mainly as a product of modeling the client interactions against Accumulo's APIs.
Read APIs
Accumulo has two main classes for reading data from an Accumulo table: the Scanner and BatchScanner. Both accept Range(s) which limit the data read from the table based on a start and stop Key. Only data from the table that falls within those start and stop keys will be returned to the client. The reason that we have two "types" of classes to read data is that a Scanner will return data from a single Range in sorted order whereas the BatchScanner accepts multiple Ranges and returns the data unordered. In terms of Java language specifics, both the Scanner and BatchScanner are also Iterables, which return a Java Iterator that can be easily passed to some other function, transformation or for-loop.
Having both a sorted, synchronous stream and an unsorted stream of Key-Value pairs from many servers in parallel allows for a variety of algorithms to be implemented against Accumulo. Both constructs allow for the transparency in where the data came from and encourage light-weight processing of those results on the client.
Accumulo Iterators
One notable feature of Accumulo is the SortedKeyValueIterator interface, or, more succinctly, Accumulo Iterators. Typically, these iterators run inside of the TabletServer process and perform much of the heavy lifting. Iterators are used to implement a breadth of internal features such as merged file reads, visibility label filtering, versioning, and more. However, users also have the ability to leverage this server-side processing mechanism to deploy their own custom code.
One interesting detail about these iterators is that they each have an implicit source which provides them data to operate on. This source is also a SortedKeyValueIterator which means that the "local" SortedKeyValueIterator can use its own API on its data source. With this implicit hierarchy, Iterators act in concert with each other in some fixed order - they are stackable. The order in which Iterators are constructed, controlled by an Iterator's priority, determines the order of the stack. An Iterator uses its "source" Iterator to read data, performs some operation, and then passes it on (the next element could be a client or another Iterator). The design behind iterators deserves its own blog post; however, the concept to see here is that iterators are best designed as stateless as possible (transformations, filters, or aggregations that always net the same results given the same input).
Functional Influences
In practice, these two concepts mesh very well with each other. Data read from a table can be thought of as a "stream" which came from some number of operations on the server. For a Scanner, this stream of data is backed by one tablet at a time to preserve sorted-order of the table. In the case of the BatchScanner, this is happening in parallel across many tablets from many tabletservers, with the client receiving data from many distinct hosts at one time. Likewise, the Scanner and BatchScanner APIs also encourage stateless processing of this data by presenting the data as a Java Iterator. Exposing explicit batches of Key-Value pairs would encourage blocking processing of each batch would be counter-intuitive to what the server-side processing model is. It creates a more seamless implementation paradigm on both the client and the server.
When we take a step back from Object-Oriented Java and start to think about applications in a Functional mindset, it becomes clear how these APIs encourage functional-esque code. We are less concerned about mutability and encapsulation, and more concerned about stateless operations over some immutable data. Modeling our client code like this helps encourage parallelism as application in some multi-threaded environment is much simpler.
Practical Application
I started out talking about schemas and table layouts which might seem a bit unrelated to this discussion on the functional influences in the Accumulo API. Any decisions made on a table structure must take query requirements with respect to the underlying data into account. As a practical application of what might otherwise seem like pontification, let's consider a hypothetical system that processes clickstream data using Accumulo.
Clickstream data refers to logging users who visit a website, typically for the purpose of understanding usage patterns. If a website is thought of as a directed graph, where an anchor on one page which links to another page is an edge in that graph, a user's actions on that website can be thought of as a "walk" over that graph. In managing a website, it's typically very useful to understand usage patterns of your site: what page is most common? which links are most commonly clicked? what changes to a page make users act differently?
Now, let's abstractly consider that we store this clickstream data in Accumulo. Let's not go into specifics, but say we retain the typical row-with-columns idea: each row represents some user visiting a page on your website using a globally unique identifier. Each column would contain some information about that visit: the user who is visiting the website, the page they're visiting, the page they came from, the web-browser user-agent string, etc. Say you're the owner of this website, and you recently made a modification to you website which added a prominent link to some new content on the front-page. You want to know how many people are visiting your new content with this new link you've added, so we want to answer the question "how many times was our new link on the index page clicked by any user?". For the purposes of this example, let's assume we don't have any index tables which might help us answer this query more efficiently.
Let's think about this query in terms of stateless operations and performing as much of a reduction in data returned to the client as possible. We have a few basic steps:
- Filter: Ignore all clickstream events that are not for the index page.
- Filter: Ignore all clickstream events that are not for the given anchor.
- Aggregation: Only a sum of the occurrences is needed, not the full record.
The beauty in using Accumulo is that all three of these operations can be performed inside of the tablet server process without returning any unnecessary data to the client. Unwanted records can be easily skipped, while each record that matches our criteria is reduced to a single "+1" counter. Instead of returning each full record to the client, the tablet server can combine these counts together and simply return a sum to the client for the Range of data that was read.
The other perk of thinking about the larger problem in discrete steps is that it is easily parallelized. Assuming we have many tablet servers hosting the tablets which make up our clickstream table, we can easily run this query in parallel across them all using the BatchScanner. Additionally, because we've reduced our initial problem from a large collection of records to a stream of partial sums, we've drastically reduced the amount of work that must be performed on our (single) client. Each key-value pair returned by a server is a partial-sum which can be combined together in a very lightweight operation (in both memory and computation) as the result is made available. The client then has the simple task of performing one summation. We took a hard problem and performed an extreme amount of heavy lifting server-side while performing next-to-no computation in our client which is great for web applications or thin clients.
Tiered Computation
This type of algorithm, a multi-stage computation, becomes very common when working with Accumulo because of the ability to push large amounts of computation to each tablet server. Tablet servers can compute aggregations, filters and/or transformations very "close" to the actual data, return some reduced view of the data being read. Even when some function is very efficient, computing it over large data sets can still be extremely time-consuming. Eliminating unwanted data as early as possible can often outweigh even the most optimal algorithms due to the orders of magnitude difference in the speed of CPU over disk and network.
It's important to remember that this idea isn't new, though. The above model is actually very reminiscent of the MapReduce paradigm, just applied with different constraints. The types of problems efficiently solvable by MapReduce is also a super-set of what is possible with one representation of data stored in Accumulo. This also isn't a recommendation Accumulo Iterators are not a complete replacement for MapReduce (a tool is rarely a 100% "better" replacement for another). In fact, Accumulo Iterators are often used as another level of computation to make an existing MapReduce job more efficient, typically through the AccumuloInputFormat.
We've identified a category of problems - a function is applied to a batch of key-value pairs which reduces the complexity of a question asked over a distributed dataset - in which the features and APIs of Accumulo lend themselves extremely well to solving in an efficient and simple manner. The ability to leverage Accumulo to perform these computations requires foresight into the types of questions that are to be asked of a dataset, the structure of the dataset within Accumulo, and the reduction of a larger problem into discrete functions which are each applied to the dataset by an Accumulo Iterator.