Strata + Hadoop World @ London

The Strata conference in London has just ended and I came back with the feeling that in a few years the conference might as well be called Strata+Spark World. A quick look at the schedule reveals the sheer amount of Spark related talks. Most people I have spoken to have played with it but are not using it yet in production; mostly because they developed over the years a considerable amount of tooling around Hadoop MapReduce that makes it hard to jump ship. That said, it’s hard to miss where the industry is moving to: Spark is currently the most active Apache project and it’s experiencing an exponential growth.

Peter Wendell spoke about what’s coming in the next versions of Spark on Wednesday. What’s interesting for us, as Mozilla, is that there are some interesting improvements in sight, like metrics visualization for Spark Streaming and various performance improvement in terms of memory and CPU usage. We are not using Spark Streaming at the moment as we have our very own stream processing system, Heka, but we might consider it for few particular use cases.

Lots of work is going into Spark Dataframes, which one might think of as a distributed equivalent of a pandas dataframe. The current API isn’t stable yet but it looks very promising. When leveraging the dataframe API, Python jobs can be as fast as Scala ones thanks to a unified physical execution layer.

physicalI feel that Dataframes work well with simple scalar types, unlike our Telemetry histograms, and when most of the operations needed are available through the DSL. For that reason we are probably not going to adopt the Dataframe API for the time being and keep using the RDD API. That said we could exploit Dataframes in the future by representing Telemetry histograms with an average value, if we are willing to trade speed with data.

There was a nice talk about Elasticsearch by Costin Leau on Thursday with a hands-on demo. I have been thinking for a while of using Kibana for Telemetry, a visualization platform for Elasticsearch, and seeing it in action convinced me to give it a shot.

The main issue with unleashing Elasticsearch on our dataset is that we rolled our own histogram representation which is not going to play ball with Elasticsearch. That said, as the goal of this dashboard would be to give an overview of our data, nothing is stopping us to represent histograms with a single average estimate, like the median. The dashboard could empower anyone in the organization to slice, dice and visualize Telemetry data without writing a single line of code while using Spark for non-trivial analyses and our existing dashboard to display the full distributions for a predetermined set of aggregations.

Dashboards made simple with Spark and Plotly

In my previous about our new Spark infrastructure, I went into the details on how to launch a Spark cluster on AWS to perform custom analyses on Telemetry data. Sometimes though one has the need to rerun an analysis recurrently over a certain timeframe, usually to feed data into dashboards of various kinds. We are going to roll out a new feature that allows users to upload an IPython notebook to the self-serve data analysis dashboard and run it on a scheduled basis. The notebook will be executed periodically with the chosen frequency and the result will be made available as an updated IPython notebook.

To schedule a Spark job:

  1. Visit the analysis provisioning dashboard at telemetry-dash.mozilla.org and sign in using Persona with an @mozilla.com email address.
  2. Click “Schedule a Spark Job”.
  3. Enter some details:
    • The “Job Name” field should be a short descriptive name, like “chromehangs analysis”.
    • Upload your IPython notebook containt the analysis.
    • Set the number of workers of the cluster in the “Cluster Size” field.
    • Set a schedule frequency using the remaining fields.

Once a new scheduled job is created it will appear in the top listing of the scheduling dashboard. When the job is run its result will be made available as an IPython notebook visible by clicking on the “View Data” entry of your job.

As I briefly mentioned at the beginning, periodic jobs are typically used to feed data to dashboards. Writing dashboards for a custom job isn’t very pleasant and I wrote in the past some simple tool to help with that. It turns out though that thanks to IPython one doesn’t need necessarily to write a dashboard from scratch but can simple re-use the notebook as the dashboard itself! I mean, why not? That might not be good enough for management facing dashboards but acceptable for ones aimed at engineers.

In fact with IPython we are not limited at all to matplotlib’s static charts. Thanks to Plotly, it’s easy enough to generate interactive plots which allow to:

  • Check the x and y coordinates of every point on the plot by hovering with the cursor.
  • Zoom in on the plot and resize lines, points and axes by clicking and dragging the cursor over a region.
  • Pan by holding the shift key while clicking and dragging.
  • Zooms back out to the original version by double clicking on the plot.

Plotly comes with its own API but if you have already a matplotlib based chart then it’s trivial to convert it to an interactive plot. As a concrete example, I updated my Spark Hello World example with a plotly chart.

fig = plt.figure(figsize=(18, 7))
frame["WINNT"].plot(kind="hist", bins=50)
plt.title("startup distribution for Windows")
plt.ylabel("count")
plt.xlabel("log(firstPaint)")
py.iplot_mpl(fig, strip_style=True)

As you can see, just a single extra line of code is needed for the conversion.

startup_distribution_for_windows

As WordPress doesn’t support iframes, you are going to have to click on the image and follow the link to see the interactive plot in action.

Add-on hangs: a brief summary

I have blogged about clustering BHR hangs before. This post is dedicated to the add-on side of things.

In a way BHR could be seen as a distributed profiler: each user runs a local profiler that samples the stack of a thread only when that thread is hanging for over N milliseconds. Then the stacks are sent to our backend infrastructure through a Telemetry submission.

Imagine to profile Firefox for an hour. At the end of the hour you would like to determine the impact of one of your installed add-ons on the browsing experience. Not simple at all, the add-on might have been in use only for a fraction of your session, say 20 seconds. In that fraction it might have slowed down the browser significantly though. Since you are collecting hangs for the whole session, that signal might eventually be dominated by noise. This means that in most cases, add-on hangs are not going to look like a big deal once aggregated.

I aggregated the stacks and partitioned them in browser and add-on specific ones. For a given add-on Foo, I computed for all sessions the ratio of hangs of Foo over the number of hangs of both Firefox and Foo. Finally I averaged those ratios. This final number gives an idea of the average proportion of hangs due to Foo an user of that add-on can expect to find in his session.

That’s not the whole story though, one can imagine scenarios where an add-on triggers an asynchrounous workload in the browser which will not be accounted to the add-on itself, like garbage collection. In a grantedly less common, but still plausible scenario, an add-on could improve the performances of the browser and in doing so reducing the number of browser specific hangs while increasing the ratio. In general I tend to see the final number as a lower bound and even though it’s not precise, it can help identify bottlenecks.

From the most popular add-ons the ones that have a high ratio are:

  1. Lastpass (12%)
  2. Hola Better Internet (11%)
  3. Avira Antivirus (10%)
  4. noscript (9%)
  5. Adblock (9%), note this is not Adblock Plus

LastPass, for instance, has a ratio of hangs of about 12%, which means that if you had just LastPass installed and no other add-on, on average about 12% of the hangs you would experience would likely be due to LastPass. That’s a lot and and the main culprit seems to be the logic to scan for input fields when the document changes dynamically. That said I am glad I can’t see any popular add-ons with a shockingly high ratio of hangs, which is good.

The numbers shouldn’t be used to mark an add-on as bad or good; they are based on a fallible heuristic and they are meant to give us the tools to prioritize which add-ons we should keep an eye on.

Next-gen Data Analysis Framework for Telemetry

The easier it is to get answers, the more questions will be asked

In that spirit me and Mark Reid have been working for a while now on a new analysis infrastracture to make it as easy as possible for engineers to get answers to data related questions.

Our shiny new analysis infrastructure is based primarily on IPython and Spark. I blogged about Spark before, I even gave a short tutorial on it at our last workweek in Portland (slides and tutorial); IPython might be something you are not familiar with unless you have a background in science. In a nutshell it’s a browser-based notebook with support for code, text, mathematical expressions, inline plots and other rich media.

IPython
An IPython notebook in all its glory

The combination of IPython and Spark allows to write data analyses interactively from a browser and seemingly parallelize them over multiple machines thanks to a rich API with over 80 distributed operators! It’s a huge leap forward in terms of productivity compared to traditional batch oriented map-reduce frameworks. An IPython notebook contains both the code and the product of the execution of that code, like plots. Once executed, a notebook can simply be serialized and uploaded to Github. Then, thanks to nbviewer, it can be visualized and shared among colleagues.

In fact, the issue with sharing just the end product of an analysis is that it’s all too easy for bugs to creep in or to make wrong assumptions. If your end result is a plot, how do you test it? How do you know that what you are looking at does actually reflect the truth? Having the code side by side with its evaluation allows more people to  inspect it and streamlines the review process.

This is what you need to do to start your IPython backed Spark cluster with access to Telemetry data:

  1. Visit the analysis provisioning dashboard at telemetry-dash.mozilla.org and sign in using Persona with an @mozilla.com email address.
  2. Click “Launch an ad-hoc Spark cluster”.
  3. Enter some details:
    • The “Cluster Name” field should be a short descriptive name, like “chromehangs analysis”.
    • Set the number of workers for the cluster. Please keep in mind to use resources sparingly; use a single worker to write and debug your job.
    • Upload your SSH public key.
  4. Click “Submit”.
  5. A cluster will be launched on AWS preconfigured with Spark, IPython and some handy data analysis libraries like pandas and matplotlib.

Once the cluster is ready, you can tunnel IPython through SSH by following the instructions on the dashboard, e.g.:

ssh -i my-private-key -L 8888:localhost:8888 hadoop@ec2-54-70-129-221.us-west-2.compute.amazonaws.com

Finally, you can launch IPython in Firefox by visiting http://localhost:8888.

Now what? Glad you asked. In your notebook listing you will see a Hello World notebook. It’s a very simple analysis that produces the distribution of startup times faceted by operating system for a small fraction of Telemetry submissions; let’s quickly review it here.

We start by importing a telemetry utility to fetch pings and some commonly needed libraries for analysis: a json parser, pandas and matplotlib.

import ujson as json
import matplotlib.pyplot as plt
import pandas as pd
from moztelemetry.spark import get_pings

To execute a block of code in IPython, aka cell, press Shift-Enter. While a cell is being executed, a gray circle will appear in the upper right border of the notebook. When the circle is full, your code is being executed by the IPython kernel; when only the borders of the circle are visible then the kernel is idle and waiting for commands.

Spark exploits parallelism across all cores of your cluster. To see the degree of parallelism you have at your disposal simply yield:


sc.defaultParallelism

Now, let’s fetch a set of telemetry submissions and load it in a RDD using the get_pings utility function from the moztelemetry library:

pings = get_pings(sc,
                  appName="Firefox",
                  channel="nightly",
                  version="*",
                  buildid="*",
                  submission_date="20141208",
                  fraction=0.1)

That’s pretty much self documenting. The fraction parameter, which defaults to 1, selects a random subset of the selected submissions. This comes in handy when you first write your analysis and don’t need to load lots of data to test and debug it.

Note that both the buildid and submission_date parameters accept also a tuple specifying, inclusively, a range of dates, e.g.:


pings = get_pings(sc,
                  appName="Firefox",
                  channel="nightly",
                  version="*",
                  buildid=("20141201", "20141202"),
                  submission_date=("20141202", ""20141208"))

Let’s do something with those pings. Since we are interested in the distribution of the startup time of Firefox faceted by operating system, let’s extract the needed fields from our submissions:

def extract(ping):
    ping = json.loads(ping)
    os = ping["info"]["OS"]
    startup = ping["simpleMeasurements"].get("firstPaint", -1)
    return (os, startup)

cached = pings.map(lambda ping: extract(ping)).filter(lambda p: p[1] > 0).cache()

As the Python API matches closely the one used from Scala, I suggest to have a look at my older Spark tutorial if you are not familiar with Spark. Another good resource are the hands-on exercises from AMP Camp 4.

Now, let’s collect the results back and stuff it into a pandas DataFrame. This is a very common pattern, once you reduce your dataset to a manageable size with Spark you collect it back on your driver (aka the master machine) and finalize your analysis with statistical tests, plots and whatnot.

grouped = cached.groupByKey().collectAsMap()

frame = pd.DataFrame({x: log(pd.Series(list(y))) for x, y in grouped.items()})
frame.boxplot()
plt.ylabel("log(firstPaint)")
plt.show()

startup_hello
Startup distribution by OS

Finally, you can save the notebook, upload it to Github or Bugzilla and visualize it on nbviewer, it’s that simple. Here is the nbviewer powered Hello World notebook. I warmly suggest that you open a bug report on Bugzilla for your custom Telemetry analysis and ask me or Vladan Djeric to review it. Mozilla has been doing code reviews for years and with good reasons, why should data analyses be different?

Congrats, you just completed your first Spark analysis with IPython! If you need any help with your custom job feel free to drop me a line in #telemetry.

A/B test for Telemetry histograms

A/B tests are a simple way to determine the effect caused by a change in a software product against a baseline, i.e. version A against version B. An A/B test is essentially an experiment that indiscriminately assigns a control or experiment condition to each user. It’s an extremely effective method to ascertain causality which is hard, at best, to infer with statistical methods alone. Telemetry comes with its own A/B test implementation, Telemetry Experiments.

Depending on the type of data collected and the question asked, different statistical techniques are used to verify if there is a difference between the experiment and control version:

  1. Does the rate of success of X differ between the two versions?
  2. Does the average value of  Y differ between the two versions?
  3. Does the average time to event Z differ between the two versions?

Those are just the most commonly used methods.

The frequentist statistical hypothesis testing framework is based on a conceptually simple idea: assuming that we live in a world where a certain baseline hypothesis (null hypothesis) is valid, what’s the probability of obtaining the results we observed? If the probability is very low, i.e. under a certain threshold, we gain confidence that the effect we are seeing is genuine.

To give you a concrete example, say I have reason to believe that the average battery duration of my new phone is 5 hours but the manufacturer claims it’s 5.5 hours. If we assume the average battery has indeed a duration of 5.5 hours (null hypothesis), what’s the probability of measuring an average duration that is 30 minutes lower? If the probability is small enough, say under 5%, we “reject” the null hypothesis. Note that there are many things that can go wrong with this framework and one has to be careful in interpreting the results.

Telemetry histograms are a different beast though. Each user submits its own histogram for a certain metric, the histograms are then aggregated across all users for version A and version B. How do you determine if there is a real difference or if what you are looking at is just due to noise? A chi-squared test would seem the most natural choice but on second thought its assumptions are not met as entries in the aggregated histograms are not independent from each other. Luckily we can avoid to sit down and come up with a new mathematically sound statistical test. Meet the permutation test.

Say you have a sample of metric M for users of version A and a sample of metric M for users of version B. You measure a difference of d between the means of the samples. Now you assume there is no difference between A and B and randomly shuffle entries between the two samples and compute again the difference of the means. You do this again, and again, and again… What you end up with is a distribution D of the differences of the means for the all the reshuffled samples. Now, you compute the probability of getting the original difference d, or a more extreme value, by chance and welcome our newborn hypothesis test!

Going back to our original problem of comparing aggregated histograms for the experiment and control group, instead of having means we have aggregated histograms and instead of computing the difference we are considering the distance; everything else remains the same as in the previous example:


def mc_permutation_test(xs, ys, num):
    n, k = len(xs), 0
    h1 = xs.sum()
    h2 = ys.sum()

    diff = histogram_distance(h1, h2)
    zs = pd.concat([xs, ys])
    zs.index = np.arange(0, len(zs))

    for j in range(num):
        zs = zs.reindex(np.random.permutation(zs.index))
        h1 = zs[:n].sum()
        h2 = zs[n:].sum()
        k += diff < histogram_distance(h1, h2)

    return k / num

Most statistical tests were created in a time where there were no [fast] computers around, but nowadays churning a Monte-Carlo permutation test is not a big deal and one can easily run such a test in a reasonable time.

ClientID in Telemetry submissions

A new functionality landed recently that allows to group Telemetry sessions by profile ID. Being able to group sessions by profile turns out be extremely useful for several reasons. For instance, as some users tend to generate an enourmous amount of sessions daily, analyses tend to be skewed towards those users.

Take uptime duration; if we just consider the uptime distribution of all sessions collected in a certain timeframe on Nightly we would get a distribution with a median duration of about 15 minutes. But that number isn’t really representative of the median uptime for our users. If we group the submissions by Client ID and compute the median uptime duration for each group, we can build a new distribution that is more representative of the general population:

clientid

And we can repeat the exercise for the startup duration, which is expressed in ms:

startupOur dashboards are still based on the session distributions but it’s likely that we will provide both session and user based distributions in our next-gen telemetry dash.

edit:

Please keep in mind that:

  • Telemetry does not collect privacy-sensitive data.
  • You do not have to trust us, you can verify what data Telemetry is collecting in the about:telemetry page in your browser and in aggregate form in our Telemetry dashboards.
  • Telemetry is an opt-in feature on release channels and a feature you can easily disable on other channels.
  • The new Telemetry Client ID does not track users, it tracks Telemetry performance & feature-usage metrics across sessions.

A Telemetry API for Spark

Check out my previous post about Spark and Telemetry data if you want to find out what all the fuzz is about Spark. This post is a step by step guide on how to run a Spark job on AWS and use our simple Scala API to load Telemetry data.

The first step is to start a machine on AWS using Mark Reid’s nifty dashboard, in his own words:

  1. Visit the analysis provisioning dashboard at telemetry-dash.mozilla.org and sign in using Persona (with an @mozilla.com email address as mentioned above).
  2. Click “Launch an ad-hoc analysis worker”.
  3. Enter some details. The “Server Name” field should be a short descriptive name, something like “mreid chromehangs analysis” is good. Upload your SSH public key (this allows you to log in to the server once it’s started up).
  4. Click “Submit”.
  5. A Ubuntu machine will be started up on Amazon’s EC2 infrastructure. Once it’s ready, you can SSH in and run your analysis job. Reload the webpage after a couple of minutes to get the full SSH command you’ll use to log in.

Now connect to the machine and clone my starter project template:


git clone https://github.com/vitillo/mozilla-telemetry-spark.git
cd mozilla-telemetry-spark && source aws/setup.sh

The setup script will install Oracle’s JDK among some other bits. Now we are finally ready to give Spark a spin by launching:

sbt run

The command will run a simple job that computes the Operating System distribution for a small number of pings. It will take a while to complete as sbt, an interactive build tool for Scala, is downloading all required dependencies for Spark on the first run.


import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

import org.json4s._
import org.json4s.jackson.JsonMethods._

import Mozilla.Telemetry._

object Analysis{
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("mozilla-telemetry").setMaster("local[*]")
    implicit val sc = new SparkContext(conf)
    implicit lazy val formats = DefaultFormats

    val pings = Pings("Firefox", "nightly", "36.0a1", "*", ("20141109", "20141110")).RDD(0.1)

    var osdistribution = pings.map(line => {
      ((parse(line.substring(37)) \ "info" \ "OS").extract[String], 1)
    }).reduceByKey(_+_).collect

    println("OS distribution:")
    osdistribution.map(println)

    sc.stop()
  }
}

If it the job completed successfully, you should see something like this in the output:

OS distribution:
(WINNT,4421)
(Darwin,38)
(Linux,271)

To start writing your job, simply customize src/main/scala/main.scala to your needs. The Telemetry Scala API allows you to define a RDD for a Telemetry filter:

val pings = Pings("Firefox", "nightly", "36.0a1", "*", ("20141109", "20141110")).RDD(0.1)

The above statement retrieves a sample of 10% of all Telemetry submissions of Firefox received on the 9th and 10th of November for any build-id of nightly 36. The last 2 parameters of Pings, i.e. build-id and date, accept either a single value or a tuple specifying a range.

If you are interested to learn more, there is going to be a Spark & Telemetry tutorial at MozLandia next week! I will briefly go over the data layout of Telemetry and how Spark works under the hood and finally jump in a hands-on interactive analysis session with real data. No prerequisites are required in terms of Telemetry, Spark or distributed computing.

Time: Friday, December 5, 2014, 1 PM
Location: Belmont Room, Marriott Waterfront 2nd, Mozlandia

Update
I have uploaded the slides and the tutorial.