Spark Integration in Apache Phoenix
Today's blog is brought to you by our latest committer and the developer behind the Spark integration in Apache Phoenix, Josh Mahonin, a Software Architect at Interset.
PageRank with Phoenix and Spark
The Phoenix SQL interface provides a lot of great analytics capabilities on top of structured HBase data. Some tasks however, such as machine learning or graph analysis, are more efficiently done using other tools like Apache Spark.
Since the Phoenix 4.4.0 release, the phoenix-spark module allows us to expose Phoenix tables as RDDs or DataFrames within Spark. From there, that same data can be used with other tools within Spark, such as the machine learning library MLlib, the graph engine GraphX, or Spark Streaming.
This example makes use of the Enron email test set from Stanford Network Analysis Project, and executes the GraphX implementation of PageRank on it to find interesting entities. It then saves the results back to Phoenix.
Note that runnable source code is also available on Github
Prerequisites
- Phoenix 4.4.0+
- Spark 1.3.0+ (ensure phoenix-client JAR is in the Spark driver classpath, see
setup guide )
Load sample data
Login to a node with the Apache Phoenix binaries available. I will use localhost to refer to the Phoenix URL, but you may need to adjust to your local environment
cd /path/to/phoenix/bin
./sqlline.py localhost
Once in the SQLLine console, we'll create the tables to hold the input data, and the destination table for the pagerank results
CREATE TABLE EMAIL_ENRON(
MAIL_FROM BIGINT NOT NULL,
MAIL_TO BIGINT NOT NULL
CONSTRAINT pk PRIMARY KEY(MAIL_FROM, MAIL_TO)); CREATE TABLE EMAIL_ENRON_PAGERANK(
ID BIGINT NOT NULL,
RANK DOUBLE
CONSTRAINT pk PRIMARY KEY(ID));
Use 'ctrl+d' to exit SQLline
Download and extract the file enron.csv.gz to a local directory, such as /tmp. We'll use 'psql.py' to load the CSV data
gunzip /tmp/enron.csv.gz
./psql.py -t EMAIL_ENRON localhost /tmp/enron.csv
When finished, you should see the output:
CSV Upsert complete. 367662 rows upserted
Interactive analysis with spark-shell
Login to a node with Spark installed. Note that the phoenix-client JAR must be available in the Spark driver classpath
cd /path/to/spark/bin
./spark-shell
Once you're in the spark shell, you can type, or copy the code below into the interactive shell
import org.apache.spark.graphx._ import org.apache.phoenix.spark._ // Load the phoenix table val rdd = sc.phoenixTableAsRDD("EMAIL_ENRON", Seq("MAIL_FROM", "MAIL_TO"), zkUrl=Some("localhost")) // Convert to an RDD of VertexId tuples val rawEdges = rdd.map{ e => (e("MAIL_FROM").asInstanceOf[VertexId], e("MAIL_TO").asInstanceOf[VertexId]) } // Create a graph with default edge weights val graph = Graph.fromEdgeTuples(rawEdges, 1.0) // Run page rank val pr = graph.pageRank(0.001) // Save to Phoenix pr.vertices.saveToPhoenix("EMAIL_ENRON_PAGERANK", Seq("ID", "RANK"), zkUrl = Some("localhost"))
Once finished, you can exit spark-shell with 'ctrl+d'
Results
On your Phoenix node, open sqlline again
cd /path/to/phoenix/bin
./sqlline.py localhost
Let's run a query that will give us the top-ranked entities from the PageRank results
SELECT * FROM EMAIL_ENRON_PAGERANK ORDER BY RANK DESC LIMIT 5;
+------------------------------------------+------------------------------------------+
| ID | RANK |
+------------------------------------------+------------------------------------------+
| 5038 | 497.2989872977676 |
| 273 | 117.18141799210386 |
| 140 | 108.63091596789913 |
| 458 | 107.2728800448782 |
| 588 | 106.11840798585399 |
+------------------------------------------+------------------------------------------+
Although this data-set has the email addresses removed, if you're curious, you can find results of a similar analysis here. If you're familiar with the Enron case, some of those names will ring a bell.
Conclusion
Although this example is fairly trivial, it shows the capabilities, as well as succinctness, of using Phoenix and Spark together to run complex algorithms across arbitrarily large datasets. In my experience, the methods shown here extend quite well to other "big data" problems such as community detection and clustering, as well as anomaly detection. There are likely many other problem domains which are applicable as well
Thanks for reading!