# On Monoids For Analytics

This is a short post on the elegance of using abstract algebra for analytics in Scala.

A monoid is a set $T$ that is closed under an associative binary operation $append$ with an identity element $zero$ such that $append(a, zero) = a$. In other words, the following 3 properties apply:

• Closure – the result of combining two elements of the set is also an elment of the set:

$\forall a, b \in T: append(a, b) \in T$

• Associativity – when combining more than two elements of a set the order of the pairwise combinations doesn’t matter, which makes a monoid well suited for parallel computations:

$\forall a, b, c \in T: append(append(a, b), c) = append(a, append(b, c))$

• Identity – there is a special element of the set that when combined with any other element of the set yields the same element:

$\exists zero \in T: \forall a \in T: append(zero, a) = append(a, zero) = a$

One way to define a trait for a monoid in Scala is the following:

trait Monoid[T] {
def zero: T
def append(a: T, b: T): T
}


Monoids are everywhere; think of the set of natural numbers and addition or the set of strings and concatenation. Also note that the same set can have multiple “monoidal forms”; for example the set of natural numbers can have both an additive and a multiplicative monoid.

Monoids compose well; for example a tuple of monoids is itself a monoid, as such it’s simple to define a monoid for a complex type once monoids for its constituents types exists. Scalaz and Algebird are two Scala libraries that provide monoids for data types such as List, Set, Option, Map and others. Algebird in particular, which is targeted at building aggregation systems, comes with a set of monoids useful for counting such as DecayedValue for exponential decay, AveragedValue for averaging and HyperLogLog for approximate cardinality counting.

As a concrete example on how monoids provided by Scalaz are used in practice, suppose we have a server that receives sparse histograms from its clients:

import scalaz._
import Scalaz._

case class Histogram(var values: Map[Long, Long], sum: Int, count: Int)


We would like to aggregate histograms over a time window. It’s simple enough to write some code to do that, yet monoids offer an elegant solution. Since Scalaz provides by default an additive monoid for Map and Long, we can easily define one for Histogram as well:

implicit def histogramMonoid: Monoid[Histogram] = new Monoid[Histogram] {
def zero = Histogram(Map(), 0, 0)
def append(a: Histogram, b: => Histogram) = Histogram(a.values |+| b.values,
a.sum    |+| b.sum,
a.count  |+| b.count)
}


Where |+| invokes the binary operator of the monoid defined over the type of its operators.  We can aggregate histograms now that we have a monoid, e.g.:

val h1 = Histogram(Map(10 -> 12, 100 -> 41), 53, 3)
val h2 = Histogram(Map(10 -> 21, 80 -> 14), 35, 17)
h1 |+| h2


yields:

Histogram(Map(10 -> 33, 80 -> 14, 100 -> 41), 88, 20)


Let’s say that we have different histograms for different measurements which are stored in a mapping from strings to histograms. Since there is a monoid defined over Histogram then we can easily aggregate multiple Map[String, Histogram] as well:

val m1 = Map("metric1" -> h1, "metric2" -> h2)
val m2 = Map("metric1" -> h2, "metric3" -> h1)
m1 |+| m2


yields:

Map(metric1 -> Histogram(Map(10 -> 33, 80 -> 14, 100 -> 41), 88, 20),
metric2 -> Histogram(Map(10 -> 21, 80 -> 14), 35, 17),
metric3 -> Histogram(Map(10 -> 12, 100 -> 41), 53, 3))


This is just the tip of the iceberg of the elegance provided by abstract algebra. If this short article caught your interest grab a copy of Functional Programming in Scala, which is without any doubt the best functional programming book I have ever read. The book introduces a variety of abstract algebra concepts with their relative implementations.

# Recommending Firefox add-ons with Spark

We are currently evaluating possible replacements for our Telemetry map-reduce infrastructure. As our current data munging machinery isn’t distributed, analyzing days worth of data can be quite a pain. Also, many algorithms can’t easily be expressed with a simple map/reduce interface.

So I decided to give Spark another try. “Another” because I have played with it in the past but I didn’t feel it was mature enough to be run in production. And I wasn’t the only one to think that apparently. I feel like things have changed though with the latest 1.1 release and I want to share my joy with you.

What is Spark?

In a nutshell, “Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala and Python, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.”

Spark primary abstraction is the Resilient Distributed Dataset (RDD), which one can imagine as a distributed pandas or R data frame. The RDD API comes with all kinds of distributed operations, among which also our dear map and reduce. Many RDD operations accept user-defined Scala or Python functions as input which allow average Joe to write distributed applications like a pro.

A RDD can also be converted to a local Scala/Python data structure, assuming the dataset is small enough to fit in memory. The idea is that once you chopped the data you are not interested in, what you are left with fits comfortably on a single machine. Oh and did I mention that you can issue Spark queries directly from a Scala REPL? That’s great for performing exploratory data analyses.

The greatest strength of Spark though is the ability to cache RDDs in memory. This allows you to run iterative algorithms up to 100x faster than using the typical Hadoop based map-reduce framework! It has to be remarked though that this feature is purely optional. Spark works flawlessly without caching, albeit slower. In fact in a recent benchmark Spark was able to sort 1PB of data 3X faster using 10X fewer machines than Hadoop, without using the in-memory cache.

Setup

A Spark cluster can run in standalone mode or on top of YARN or Mesos. To the very least for a cluster you will need some sort of distributed filesystem, e.g. HDFS or NFS. But the easiest way to play with it though is to run Spark locally, i.e. on OSX:


brew install spark
spark-shell --master "local[*]"



The above commands start a Scala shell with a local Spark context. If you are more inclined to run a real cluster, the easiest way to get you going is to launch an EMR cluster on AWS:


aws emr create-cluster --name SparkCluster --ami-version 3.3 --instance-type m3.xlarge \
--instance-count 5 --ec2-attributes KeyName=vitillo --applications Name=Hive \
--bootstrap-actions Path=s3://support.elasticmapreduce/spark/install-spark



Then, once connected to the master node, launch Spark on YARN:


yarn-client /home/hadoop/spark/bin/spark-shell --num-executors 4 --executor-cores 8 \
--executor-memory 8g —driver-memory 8g



The parameters of the executors (aka worker nodes) should obviously be tailored to the kind of instances you launched. It’s imperative to spend some time understanding and tuning the configuration options as Spark doesn’t automagically do it for you.

Now what?

Time for some real code. Since Spark makes it so easy to write distributed analyses, the bar for a Hello World application should be consequently be much higher. Let’s write then a simple, albeit functional, Recommender Engine for Firefox add-ons.

In order to do that, let’s first go over quickly the math involved. It turns out that given a matrix of the rankings of each user for each add-on, the problem of finding a good recommendation can be reduced to matrix factorization problem:

The model maps both users and add-ons to a joint latent factor space of dimensionality $F$. Both users and add-ons are thus seen as vectors in that space. The factors express latent characteristics of add-ons, e.g. if a an add-on is related to security or to UI customization. The ratings are then modeled as inner products in that space, which is proportional to the angle of the two vectors. The closer the characteristics of an add-on align to the preferences of the user in the latent factor space, the higher the rating.

But wait, Firefox users don’t really rate add-ons. In fact the only information we have in Telemetry is binary: either a user has a certain add-on installed or he hasn’t. Let’s assume that if someone has a certain add-on installed, he probably likes that add-on. That’s not true in all cases and a more significant metric like “usage time” or similar should be used.

I am not going to delve into the details, but having binary ratings changes the underlying model slightly from the conceptual one we have just seen. The interested reader should read this paper. Mllib, a machine learning library for Spark, comes out of the box with a distributed implementation of ALS which implements the factorization.

Implementation

Now that we have an idea of the theory, let’s have a look at how the implementation looks like in practice. Let’s start by initializing Spark:


val sc = new SparkContext(conf)



As the ALS algorithm requires tuples of (user, addon, rating), let’s munge the data into place:


val ratings = sc.textFile("s3://mreid-test-src/split/").map(raw => {
val parsedPing = parse(raw.substring(37))
(parsedPing \ "clientID", parsedPing \ "addonDetails" \ "XPI")
}).filter{
// Remove sessions with missing id or add-on list
case (JNothing, _) => false
case (_, JNothing) => false
case (_, JObject(List())) => false
case _ => true
}.map{ case (id, xpi) => {
}}.filter{ case (id, addonList) => {
// Remove sessions with empty add-on lists
}}.flatMap{ case (id, addonList) => {
// Create add-on ratings for each user
}}



Here we extract the add-on related data from our json Telemetry pings and filter out missing or invalid data. The ratings variable is a RDD and as you can see we used the distributed map, filter and flatMap operations on it. In fact it’s hard to tell apart vanilla Scala code from the distributed one.

As the current ALS implementation doesn’t accept strings for the user and add-on representations, we will have to convert them to numeric ones. A quick and dirty way of doing that is to hash the strings:


// Positive hash function
def hash(x: String) = x.hashCode & 0x7FFFFF

val hashedRatings = ratings.map{ case(u, a, r) => (hash(u), hash(a), r) }.cache



We are nearly there. To avoid overfitting, ALS uses regularization, the strength of which is determined by a parameter $\lambda$. As we don’t know beforehand the optimal value of the parameter, we can try to find it by minimizing the mean squared error over a pre-defined grid of $\lambda$ values using k-fold cross-validation.


// Use cross validation to find the optimal number of latent factors
val folds = MLUtils.kFold(hashedRatings, 10, 42)
val lambdas = List(0.1, 0.2, 0.3, 0.4, 0.5)
val iterations = 10
val factors = 100 // use as many factors as computationally possible

val factorErrors = lambdas.flatMap(lambda => {
folds.map{ case(train, test) =>
val model = ALS.trainImplicit(train.map{ case(u, a, r) => Rating(u, a, r) }, factors, iterations, lambda, 1.0)
val usersAddons = test.map{ case (u, a, r) => (u, a) }
val predictions = model.predict(usersAddons).map{ case Rating(u, a, r) => ((u, a), r) }
val ratesAndPreds = test.map{ case (u, a, r) => ((u, a), r) }.join(predictions)
val rmse = sqrt(ratesAndPreds.map { case ((u, a), (r1, r2)) =>
val err = (r1 - r2)
err * err
}.mean)

(model, lambda, rmse)
}
}).groupBy(_._2)
.map{ case(k, v) => (k, v.map(_._3).reduce(_ + _) / v.length) }



Finally, it’s just a matter of training ALS on the whole dataset with the optimal $\lambda$ value and we are good to go to use the recommender:


// Train model with optimal number of factors on all available data
val model = ALS.trainImplicit(hashedRatings.map{case(u, a, r) => Rating(u, a, r)}, factors, iterations, optimalLambda._1, 1.0)

def recommend(userID: Int) = {
val top = predictions.top(10)(Ordering.by[Rating,Double](_.rating))
}

recommend(hash("UUID..."))



I omitted some details but you can find the complete source on my github repository.

To submit the packaged job to YARN run:


spark-submit --class AddonRecommender --master yarn-client --num-executors 4 \



So what?

Question is, how well does it perform? The mean squared error isn’t really telling us much so let’s take some fictional user session and see what the recommender spits out.

For user A that has only the add-on Ghostery installed, the top recommendations are, in order:

• NoScript
• Web of Trust
• Symantec Vulnerability Protection
• Better Privacy
• LastPass
• DuckDuckGo Plus
• HTTPS-Everywhere
• Lightbeam

One could argue that 1 out of 10 recommendations isn’t appropriate for a security aficionado. Now it’s the turn of user B who has only the Firebug add-on installed:

• Web Developer
• FiddlerHook
• Greasemonkey
• ColorZilla
• User Agent Switcher
• McAfee
• RealPlayer Browser Record Plugin
• FirePHP
• Session Manager

There are just a couple of add-ons that don’t look that great but the rest could fit the profile of a developer. Now, considering that the recommender was trained only on a couple of days of data for Nightly, I feel like the result could easily be improved with more data and tuning, like filtering out known Antivirus, malware and bloatware.