Today's blog is brought to you by our latest committer and the developer behind the Spark integration in Apache Phoenix, Josh Mahonin, a Software Architect at Interset
.

PageRank with Phoenix and Spark

The Phoenix SQL interface provides a lot of great analytics capabilities on top of structured HBase data. Some tasks however, such as machine learning or graph analysis, are more efficiently done using other tools like Apache Spark.

Since the Phoenix 4.4.0 release, the phoenix-spark module allows us to expose Phoenix tables as RDDs or DataFrames within Spark. From there, that same data can be used with other tools within Spark, such as the machine learning library MLlib, the graph engine GraphX, or Spark Streaming.

This example makes use of the Enron email test set from Stanford Network Analysis Project, and executes the GraphX implementation of PageRank on it to find interesting entities. It then saves the results back to Phoenix.

Note that runnable source code is also available on Github

Prerequisites

  • Phoenix 4.4.0+
  • Spark 1.3.0+ (ensure phoenix-client JAR is in the Spark driver classpath, see
    setup guide )

Load sample data

Login to a node with the Apache Phoenix binaries available. I will use localhost to refer to the Phoenix URL, but you may need to adjust to your local environment

cd /path/to/phoenix/bin
./sqlline.py localhost

Once in the SQLLine console, we'll create the tables to hold the input data, and the destination table for the pagerank results

CREATE TABLE EMAIL_ENRON(
    MAIL_FROM BIGINT NOT NULL, 
    MAIL_TO BIGINT NOT NULL 
    CONSTRAINT pk PRIMARY KEY(MAIL_FROM, MAIL_TO));
CREATE TABLE EMAIL_ENRON_PAGERANK(
    ID BIGINT NOT NULL, 
    RANK DOUBLE 
    CONSTRAINT pk PRIMARY KEY(ID));

Use 'ctrl+d' to exit SQLline

Download and extract the file enron.csv.gz to a local directory, such as /tmp. We'll use 'psql.py' to load the CSV data

gunzip /tmp/enron.csv.gz
./psql.py -t EMAIL_ENRON localhost /tmp/enron.csv

When finished, you should see the output:
CSV Upsert complete. 367662 rows upserted

Interactive analysis with spark-shell

Login to a node with Spark installed. Note that the phoenix-client JAR must be available in the Spark driver classpath

cd /path/to/spark/bin
./spark-shell

Once you're in the spark shell, you can type, or copy the code below into the interactive shell

import org.apache.spark.graphx._
import org.apache.phoenix.spark._

// Load the phoenix table
val rdd = sc.phoenixTableAsRDD("EMAIL_ENRON", Seq("MAIL_FROM", "MAIL_TO"), zkUrl=Some("localhost"))

// Convert to an RDD of VertexId tuples
val rawEdges = rdd.map{ e => (e("MAIL_FROM").asInstanceOf[VertexId], e("MAIL_TO").asInstanceOf[VertexId]) }

// Create a graph with default edge weights
val graph = Graph.fromEdgeTuples(rawEdges, 1.0)

// Run page rank
val pr = graph.pageRank(0.001)

// Save to Phoenix
pr.vertices.saveToPhoenix("EMAIL_ENRON_PAGERANK", Seq("ID", "RANK"), zkUrl = Some("localhost"))

Once finished, you can exit spark-shell with 'ctrl+d'

Results

On your Phoenix node, open sqlline again


cd /path/to/phoenix/bin
./sqlline.py localhost

Let's run a query that will give us the top-ranked entities from the PageRank results

SELECT * FROM EMAIL_ENRON_PAGERANK ORDER BY RANK DESC LIMIT 5;

+------------------------------------------+------------------------------------------+
|                    ID                    |                   RANK                   |
+------------------------------------------+------------------------------------------+
| 5038                                     | 497.2989872977676                        |
| 273                                      | 117.18141799210386                       |
| 140                                      | 108.63091596789913                       |
| 458                                      | 107.2728800448782                        |
| 588                                      | 106.11840798585399                       |
+------------------------------------------+------------------------------------------+

Although this data-set has the email addresses removed, if you're curious, you can find results of a similar analysis here. If you're familiar with the Enron case, some of those names will ring a bell.

Conclusion

Although this example is fairly trivial, it shows the capabilities, as well as succinctness, of using Phoenix and Spark together to run complex algorithms across arbitrarily large datasets. In my experience, the methods shown here extend quite well to other "big data" problems such as community detection and clustering, as well as anomaly detection. There are likely many other problem domains which are applicable as well

Thanks for reading!