What is it like to be a Data Engineer at Mozilla?

When people ask me what I do at Mozilla and I tell them I work with data, more often than not their reaction is something like “Wait, what? What kind of data could Mozilla possibly have?” But let’s start from the beginning of my journey. Even though I have worked on data systems since my first internship, I didn’t start my career at Mozilla officially as a Data Engineer. I was hired to track, investigate, and fix performance problems in the Firefox browser; an analyst of sorts, given my background in optimizing large scientific applications for the ATLAS experiment at the LHC.

The thing with applications for which you have some control over how they are used is that you can “just” profile your known use cases and go from there. Profiling a browser is an entirely different story. Every user might stress the browser in different ways and you can’t just sample stacks and indiscriminately collect data from your users without privacy and performance consequences. If that wasn’t enough, there are as many different hardware/software configurations as there are users. Even with the same configuration, a different driver version can be make a huge difference. This is where Telemetry comes in, an opt-in data acquisition system that collects general performance metrics like garbage collection timings and hardware information, like the GPU model, across our user-base. In case you are wondering what data I am talking about, why don’t you have a look? You are just one click away from about:telemetry, a page that shows the data that Telemetry collects.

Imagine receiving Terabytes of data with millions of payloads per day, each containing thousands of complex hierarchical measurements, and being tasked to make sense of that data, to find patterns in it, how would you do it? Step 1 is getting the data, how hard can it possibly be, right? In order to do that, I had to fire a script to launch an EC2 instance to which I had to ssh into and from which I could run a custom, non-distributed, MapReduce (MR) system written in Python. This was a couple of years ago; you might be wondering why anyone would use a custom MR system instead of something like Hadoop back then. It turns out that it was simple enough to support the use cases the performance team had at the time and when something went wrong it was easy to debug. Trust me, debugging a full-fledged distributed system can be painful.

The system was very simple, the data was partitioned on S3 according to a set of dimensions, like submission date, and the MR framework allowed the user to define the which values to use for each dimension. My first assignment was to get back the ratio of users that had a solid state disk. Sounds simple enough, yet it took me a while to get the data I wanted out of the system. The debug cycle was very time consuming; I had to edit my job, try to run it and if something went wrong start all over again. An analysis for a single day could take hours to complete and, as it’s often the case with data, if something that was supposed to be a string turned out be an integer because of data corruption, and I failed to handle that case, the whole thing exploded.

There weren’t many people running Telemetry analyses back then; it was just too painful. In time, as more metrics were added and I tried to answer more questions with data, I decided to step back for a moment and try something new. I played around with different systems like Pig, mrjob, and Spark. Though Spark has a rich API with far more operators than map & reduce, what ultimately sold me was the fact that it had a read–eval–print loop. If your last command failed, you could just retry it without having to rerun the whole job from the beginning! I initially used a Scala API to run my analyses but I soon started missing some of the Python libraries I grew used to for analysis, like pandas, scipy and matplotlib; the Scala counterpart just weren’t on par with it. I wanted to bring others on board and convincing someone to write analyses in a familiar language was a lot easier. So I built a tiny wrapper around Spark’s Python API which lets user run queries on our data from a Jupyter notebook and in time the number of people using Telemetry for their analytics needs grew.

I deliberately ignored many design problems that had to be solved. What storage format should be used? How should the data be partitioned? How should the cluster(s) be provisioned; EMR, ECS or something else entirely? How should recurrent jobs be scheduled?

Telemetry is one of the most complex data sources we have but there are many others, like the server logs from a variety of services. Others were solving similar problems as the ones I faced but with different approaches, so we joined forces. Fast-forward a couple of years and Telemetry has changed a lot. A Heka pipeline pre-processes and validates the Telemetry data before dumping it on S3. Scala jobs convert the raw JSON data in more manageable derived Parquet datasets that can then be read by Spark with Python, Scala or R. The scope of Telemetry has expanded to drive not only performance needs but business ones as well. Derived datasets can be queried with SQL through Presto and re:dash allows us to build and share dashboard among peers. Jobs can be scheduled and monitored with Airflow. Telemetry based analyses are being run at all level of the organization: no longer are few people responsible to answer every imaginable data related question: there are interns running statistically rigorous analyses on A/B experiments and Senior Managers building retention plots for their products. Analyses are being shared as Jupyter notebooks and peer-reviewed on Bugzilla. It’s every engineer’s responsibility to know how to measure how well its own feature or product is performing.

There is this popular meme about how Big data is like teenage sex: everyone talks about it, nobody really knows how to do it, everyone thinks everyone else is doing it, so everyone claims they are doing it. The truth is that there are only so many companies out there that can claim a user base with millions of users. This is one of the thing I love the most about Mozilla, we aren’t big as the giants but we aren’t small either. As an engineer, I get to solve challenging problems with the agility, flexibility and freedom of a startup. Our engineers work on interesting projects and there is a lot of room for trying new things, disrupting the status quo and pushing our comfort zone.

What’s the life of a Data engineer at Mozilla nowadays? How do we spend our time and what are our responsibilities?

Building the data pipeline

We build, maintain, monitor and deploy our data infrastructure and make sure that it works reliably and efficiently. We strive to use the smallest number of technologies that can satisfy our needs and work well in concert.

What kind of data format (e.g. Avro, Parquet) and compression codec (e.g. Snappy, LZO) should be used for the data? Should the data be stored on HDFS or S3? How should it be partitioned and bucketed? Where should the metadata (e.g. partition columns) be stored? Should the data be handled in real-time or in batch? Should views be recomputed incrementally or from scratch? Does summary data need to be accurate or can probabilistic data structures be used? Those are just some of the many decisions a data engineer is expected to take.

We contribute to internal projects. Heka for example, a stream processing system, is used in a multitude of ways: from loading and parsing log files to performing real time analysis, graphing, and anomaly detection.

At Mozilla, we love open source. When we built our stream processing system, we open sourced it. When different data systems like Spark or Presto need some custom glue to make them play ball, we extend them to fit our needs. Everything we do is out in the open; many of our discussions happen either on Bugzilla and on public mailing lists. Whatever we learn we share with the outside world.

Supporting other teams

Engineers, managers, and executives alike come to us with questions that they would like to answer with data but don’t know yet quite how to. We try to understand which are the best metrics that can answer their questions, assuming they can be answered in the first place. If the metrics required are missing then we add them to our pipeline. This could mean adding instrumentation to Firefox or, if the desired metrics can be derived from simpler ones that are available in the raw data, extending our derived datasets.

Once the data is queryable, we teach our users how to access it with e.g. Spark, Presto or Heka, depending on where the dataset is stored and what their requirements are. We learn from common patterns and inefficiencies by supporting our users’ analysis needs. For example, if a considerable amount queries require to group a dataset in a certain way, we might opt to build a pre-grouped version of the dataset to improve the performance.

We strive to review many of our users’ analyses. It’s far too easy to make mistakes when analyzing data since the final deliverables, like plots, can look reasonable even in the presence of subtle bugs.

Doing data science

We build alerting mechanisms that monitor our data to warn us when something is off. From representing time-series with hierarchical statistical models to using SAX for anomaly detection, the fun never stops.

We create dashboards for general use cases, like the display of daily aggregates partitioned by a set of dimensions or to track engagement ratios of products & features.

We validate hypotheses with A/B tests. For example, one of the main features we are going to ship this year, Electrolysis, requires a clear understanding of its performance impact before shipping it. We designed and ran A/B tests to compare with statistically sound methods a vast variety of performance metrics with and without Electrolysis enabled.

There is also no shortage of analyses that are more exploratory in nature. For example, we recently noticed a small spike in crashes whenever a new release is shipped. Is it a measurement artifact due to crashes that happened during the update? Or maybe there is something off that causes Firefox to crash after a new version is installed, and if so what is it? We don’t have an answer to this conundrum yet.

The simpler it is to get answers, the more questions will be asked.

This has been my motto for a while. Being data driven doesn’t mean having a siloed team of wizards and engineers that poke at data. Being data driven means that everyone in the company knows how to access the data they need in order to answer the questions they have; that’s the ultimate goal of a data engineering team. We are getting there but there is still a lot to do. If you have read so far, are passionate about data and have relevant skills, check out our jobs page.

Telemetry meets SQL

In an effort to ease data access at Mozilla we started providing SQL access to our Parquet datasets through Presto. The benefit of SQL is that it’s a commonly understood language among engineers and non-engineers alike and is generally easy to pick up.

Even though we use Redshift for some of our data,  there are datasets that store complex hierarchical data which would require unnatural transformations to fit in a typical SQL store that doesn’t support the flexibility of a nested data model (think of structs, arrays & maps) such as the one provided by Parquet. Furthermore, we were looking to use a single store for our data, i.e. Parquet files on S3, which both Spark and a SQL engine could access directly.

Presto provides the best of both worlds: SQL over Parquet. Presto is an open-source distributed SQL query engine for running interactive analytic queries against various data sources. It allows querying different sources such as Hive and Cassandra, relational databases or even proprietary data stores and a single query can combine data from multiple sources.

Apache Drill offers similar features without requiring up-front schema knowledge, which is a big advantage over Presto given how painful a schema migration can be at times. Overall Drill feels less mature than Presto though and is not supported yet by Amazon EMR unlike Presto, which makes deployment & maintance more involved. For those reasons we picked Presto as our SQL engine.

Presto supports a rich set of data types which map to Parquet ones:

Arrays
The [] operator is used to access an element of an array:

SELECT my_array[1] AS first_element

Maps
The [] operator is used to retrieve the value corresponding to a given key from a map:

SELECT name_to_age_map['Bob'] AS bob_age

Structs
Fields within a structure are accessed with the field reference operator .:

SELECT my_column.my_field

Unnesting maps and structs
The UNNEST clause is used to expand an array or a map into a relation. Arrays are expanded into a single column, and maps are expanded into two columns (key, value). UNNEST can also be used with multiple arguments, in which case they are expanded into multiple columns, with as many rows as the highest cardinality argument:


SELECT numbers, animals, n, a
FROM (
  VALUES
    (ARRAY[2, 5], ARRAY['dog', 'cat', 'bird']),
    (ARRAY[7, 8, 9], ARRAY['cow', 'pig'])
) AS x (numbers, animals)
CROSS JOIN UNNEST(numbers, animals) AS t (n, a);

which yields:

  numbers  |     animals      |  n   |  a
-----------+------------------+------+------
 [2, 5]    | [dog, cat, bird] |    2 | dog
 [2, 5]    | [dog, cat, bird] |    5 | cat
 [2, 5]    | [dog, cat, bird] | NULL | bird
 [7, 8, 9] | [cow, pig]       |    7 | cow
 [7, 8, 9] | [cow, pig]       |    8 | pig
 [7, 8, 9] | [cow, pig]       |    9 | NULL

The longitudinal view is currently accessible through Presto.  As a concrete example, this is how one could count the number of Telemetry fragments over a time range:


SELECT d, count(*)
FROM (
  SELECT substr(ssd, 1, 11) AS d
  FROM longitudinal
  CROSS JOIN UNNEST(subsession_start_date) AS s(ssd)
)
WHERE d >= '2015-11-15' AND d < '2016-03-15'
GROUP BY d
ORDER BY d ASC

Re:dash
Re:dash allows to query Presto directly from Firefox. After a query is run a table is displayed with the result. Queries can be saved and optionally scheduled to run periodically at a given time.

table
Query editor

Different kinds of plots (e.g. bar charts, line charts, boxplots, …) can be built over a table which are updated every time the query is re-run.

graph
Number of Telemetry fragments over time

Custom dashboards can be built that link to tables and plots. Users can visualize the dashboards and optionally access the SQL code that powers them and fork it.

dash
A simple dashboard

Mozilla’s re:dash instance can be accessed at sql.telemetry.mozilla.org.

Longitudinal studies with Telemetry

Unified Telemetry (UT) replaced both Telemetry and FHR at the end of last year. FHR has been historically used to answer longitudinal questions, such as churn, while Telemetry has mainly been used for performance studies.

In UT-land, multiple self-contained submissions are generated for a profile over time in contrast to FHR for which submissions contained all historical records. With the new format, a typical longitudinal query on the raw data requires conceptually a big group-by on the profile ID over all submissions to recreate the history for each profile. To avoid the expensive grouping operation we are providing a longitudinal batch view of our Telemetry dataset.

The longitudinal view is logically organized as a table where rows represent profiles and columns the various metrics (e.g. startup time). Each field of the table contains a list of chronologically sorted values, one per Telemetry submission received for that profile. Even though each profile could have been represented as an array of structures with Parquet, ultimately we decided to represent it as a structure of arrays to deal with some inefficiencies in reading nested data from Spark.

The dataset is periodically regenerated from scratch, which allows us to apply non backward compatible changes to the schema and not worry about merging procedures.

The  dataset can be accessed from Mozilla’s Spark clusters as a DataFrame:


frame = sqlContext.sql(SELECT * FROM longitudinal)

The view contains several thousand measures, which include all histograms and a large part of our scalar metrics stored in the various sections of Telemetry submissions.

frame.printSchema()
root
 |-- profile_id: string
 |-- normalized_channel: string
 |-- submission_date: array
 |    |-- element: string
 |-- build: array
 |    |-- element: struct
 |    |    |-- application_id: string
 |    |    |-- application_name: string
 |    |    |-- architecture: string
 |    |    |-- architectures_in_binary: string
 |    |    |-- build_id: string
 |    |    |-- version: string
 |    |    |-- vendor: string
 |    |    |-- platform_version: string
 |    |    |-- xpcom_abi: string
 |    |    |-- hotfix_version: string
...

A Jupyter notebook with example queries that use the longitudinal dataset is available on Spark clusters launched from a.t.m.o.

Telemetry meets Parquet

In Mozilla’s Telemetry land, raw JSON pings are stored on S3 within files containing framed Heka records, which form our immutable, append-only master dataset. Reading the raw data with Spark can be slow for analyses that read only a handful of fields per record from the thousands available; not to mention the cost of parsing the JSON blobs in the first place.

We are slowly moving away from JSON to a more OLAP-friendly serialization format for the master dataset, which requires defining a proper schema. Given that we have been collecting data in big JSON payloads since the beginning of time, different subsystems have been using various, in some cases schema un-friendly, data layouts. That and the fact that we have thousands of fields embedded in a complex nested structure makes it hard to retrospectively enforce a schema that matches our current data, so a change is likely not going to happen overnight.

Until recently the only way to process Telemetry data with Spark was to read the raw data directly, which isn’t efficient but gets the job done.

Batch views

Some analyses require filtering out a considerable amount of data before the actual workload can start. To improve the efficiency of such jobs, we started defining derived streams, or so called pre-computed batch views in lambda’s architecture lingo.

A batch view is regenerated or updated daily and, in a mathematical sense, it’s simply a function over the entire master dataset. The view usually contains a subset of the master dataset, possibly transformed, with the objective to make analyses that depend on it more efficient.

batch_view

As a concrete example, we have run an A/B test on Aurora 43 in which we disabled or enabled Electrolysis (E10s) based on a coin flip (note that E10s is enabled by default on Aurora). In this particular experiment we were aiming to study the performance of E10s. The experiment had a lifespan of one week per user. As some of our users access Firefox sporadically, we couldn’t just sample a few days worth of data and be done with it as ignoring the long tail of submissions would have biased our results.

We clearly needed a batch view of our master dataset that contained only submissions for users that were currently enrolled in the experiment, in order to avoid an expensive filtering step.

Parquet

We decided to serialize the data for our batch views back to S3 as Parquet files. Parquet is a columnar file storage that is slowly becoming the lingua franca of Hadoop’s ecosystem as it can be read and written from e.g. Hive, Pig & Spark.

Conceptually it’s important to clarify that Parquet is just a storage format, i.e. a binary representation of the data, and it relies on object models, like the one provided by Avro or Thrift, to represent the data in memory. A set of object model converters are provided to map between the in-memory representation and the storage format.

Parquet supports efficient compression and encoding schemes, which are applied on a per-column level where the data tends to be homogeneous. Furthermore, as Spark can load parquet files in a Dataframe, a Python analysis can potentially experience the same performance as a Scala one thanks to a unified physical execution layer.

A Parquet file is composed of:

  • row groups, which contain a subset of rows – a row group is composed of a set of column chunks;
  • column chunks, which contain values for a specific column – a column chunk is partitioned in a set of pages;
  • pages, which are the smallest level of granularity for reads – compression and encoding are applied at this level of abstraction
  • a footer, which contains the schema and some other metadata.

parquet

As batch views typically use only a subset of the fields provided in our Telemetry payloads, the problem of defining a schema for such subset becomes a non-issue.

Benefits

Unsurpsingly, we have seen speed-ups of up to a couple of orders of magnitude in some analyses and a reduction of file size by up to 8x compared to files having the same compression scheme and content (no JSON, just framed Heka records).

Each view is generated with a Spark job. In the future Heka is likely going to produce the views directly, once it supports Parquet natively.

Telemetry metrics roll-ups

Our Telemetry aggregation system has been serving us well for quite some time. As Telemetry evolved though, maintaining the codebase and adding new features such as keyed histograms has proven to be challenging. With the introduction of unified FHR/Telemetry, we decided to rewrite the aggregation pipeline with an updated set of requirements in mind.

Metrics

A ping is the data payload that clients submit to our server. The payload contains, among other things, over thousand metrics of the following types:

  • numerical, like e.g. startup time
  • categorical, like e.g. operating system name
  • distributional, like e.g. garbage collection timings

Distributions are implemented with histograms that come in different shapes and sizes. For example, a keyed histogram represent a collection of labelled histograms. It’s not rare for keyed histograms to have thousands of possible labels. For instance, MISBEHAVING_ADDONS_JANK_LEVEL, which measures the longest blocking operation performed by an add-on, has potentially a label for each extensions.

The main objective of the aggregator is to create time or build-id based aggregates by a set of dimensions:

  • channel, e.g. nightly
  • build-id or submission date
  • metric name, e.g. GC_MS
  • label, for keyed histograms
  • application name, e.g. Fennec
  • application version, e.g. 41
  • CPU architecture, e.g. x86_64
  • operating system, e.g. Windows
  • operating system version, e.g. 6.1
  • e10s enabled
  • process type, e.g. content or parent

As scalar and categorical metrics are converted to histograms during the aggregation, ultimately we display only distributions in our dashboard.

Raw Storage

We receive millions of pings each day over all our channels. A raw uncompressed ping has a size of over 100KB. Pings are sent to our edge servers and end up being stored in an immutable chunk of up to 300MB on S3, partitioned by submission date, application name, update channel, application version, and build id.

As we are currently collecting v4 submissions only on pre-release channels, we store about 700 GB per day; this considering only saved_session pings as those are the ones being aggregated. Once we start receiving data on the release channel as well we are likely going to double that number.

As soon as an immutable chunk is stored on S3, an AWS lambda function adds a corresponding entry to a SimpleDB index. The index allows Spark jobs to query the set of available pings by different criteria without the need of performing an expensive scan over S3.

Spark Aggregator

A daily scheduled Spark job performs the aggregation, on the data received the day before, by the set of dimensions mentioned above. We are likely going to move from a batch job to a streaming one in the future to reduce the latency from the time a ping is stored on S3 to the time its data appears in the dashboard.

Two kinds of aggregates are produced by aggregator:

  • submission date based
  • build-id based

Aggregates by build-id computed for a given submission date have to be added to the historical ones. As long as there are submissions coming from an old build of Firefox, we will keep receiving and aggregating data for it. The aggregation of the historical aggregates with the daily computed ones (i.e. partial aggregates) happens within a PostgreSQL database.

Database

There is only one type of table within the database which is partitioned by channel, version and build-id (or submission date depending on the aggregation type).

As PostgreSQL supports natively json blobs and arrays, it came naturally to express each row just as a couple of fields, one being a json object containing a set of dimensions and the other being an array representing the histogram. Adding a new dimension in the future should be rather painless as dimensions are not represented with columns.

When a new partial aggregate is pushed to the database, PostreSQL finds the current historical entry for that combination of dimensions, if it exists, and updates the current histogram by summing to it the partially aggregated histogram. In reality a temporary table is pushed to the database that contains all partial aggregates which is then merged with the historical aggregates, but the underlying logic remains the same.

As the database is usually queried by submission date or build-id and as there are milions of partial aggregates per day deriving from the possible combinations of dimensions, the table is partitioned the way it is to allow the upsert operation to be performed as fast as possible.

API

An inverted index on the json blobs allows to efficiently retrieve, and aggregate, all histograms matching a given filtering criteria.

For example,


select aggregate_histograms(histograms)
from build_id_nightly_41_20150602
where dimensions @&gt; '{"metric": "SIMPLE_MEASURES_UPTIME",
                      "application": "Firefox",
                      "os": "Windows"}'::jsonb;

retrieves a list of histograms, one for each combination of dimensions matching the where clause, and adds them together producing a final histogram that represents the distribution of the uptime measure for Firefox users on the nightly Windows build created on the 2nd of June 2015.

Aggregates are made available through a HTTP API. For example, to retrieve the aggregated histogram for the GC_MS metric on Windows for build-ids of the 2015/06/15 and 2015/06/16:

curl -X GET "http://aggregates.telemetry.mozilla.org/aggregates_by/build_id/channels/nightly/?version=41&dates=20150615&metric=GC_MS&os=Windows_NT"

 which returns

{"buckets":[0, ..., 10000],
 "data":[{"date":"20150615",
          "count":239459,
          "histogram":[309, ..., 5047],
          "label":""}],
 "kind":"exponential",
 "description":"Time spent running JS GC (ms)"}

Dashboard

Our intern, Anthony Zhang, did a phenomenal job creating a nifty dashboard to display the aggregates. Even though it’s still under active development, it’s already functional and thanks to it we were able to spot a serious bug in the v2 aggregation pipeline.

It comes with two views, the histogram view designed for viewing distributions of measures:

histogramand an evolution view for viewing the evolution of aggregate values for measures over time:

evolutionAs we started aggregating data at the beginning of June, the evolution plot looks rightfully wacky before that date.

A glance at unified FHR/Telemetry

Lots is changing in Telemetry land. If you do occasionally run data analyses with our Spark infrastructure you might want to keep reading.

Background

The Telemetry and FHR collection systems on desktop are in the process of being unified. Both systems will be sending their data through a common data pipeline which has some features of both the current Telemetry pipeline as well the Cloud Services one that we use to ingest server logs.

The goals of the unification are to:

  • avoid measuring the same metric in multiple systems on the client side;
  • reduce the latency from the time a measurement occurs until it can be analyzed on the server;
  • increase the accuracy of measurements so that they can be better correlated with factors in the user environment such as the specific build, enabled add-ons, and other hardware or software characteristics;
  • use a common data pipeline for client telemetry and service log data.

The unified pipeline is currently sending data for Nightly, Aurora and Beta. Classic FHR and Telemetry pipelines are going to keep sending data to the very least until the new unified pipeline has not been fully validated. The plan is to land this feature in 40 Release. We’ll also continue to respect existing user preferences. If the user has opted out of FHR or Telemetry, we’ll continue to respect that for the equivalent data sets. Similarly, the opt-out and opt-in defaults will remain the same for equivalent data sets.

Data format

A Telemetry ping, stored as JSON object on the client, encapsulates the data sent to our backend. The main differences between the new unified Telemetry ping format (v4) and the classic Telemetry one (v2) are that:

  • multiple ping types are supported beyond the classic saved-session ping, like the main ping;
  • pings have a common top-level which contains basic information shared between types, like build-id and channel;
  • pings have an optional environment field which consists of data that is expected to be characteristic for performance and other behavior.

From an analysis point of view, the most important addition is the main ping which includes the very same histograms and other performance and diagnostic data as the v2 saved-session pings. Unlike in “classic” Telemetry though, there can be multiple main pings during a single session. A main ping is triggered by different scenarios, which are documented by the reason field:

  • aborted-session: periodically saved to disk and deleted at shutdown – if a previous aborted session ping is found at startup it gets sent to our backend;
  • environment-change: generated when the environment changes;
  • shutdown: triggered when the browser session ends;
  • daily: a session split triggered in 24h hour intervals at local midnight; this is needed to make sure we keep receiving data also from clients that have very long sessions.

Data access through Spark

Once you connect to a Spark enabled IPython notebook launched from our self-service dashboard, you will be prompted with a new tutorial based on the v4 dataset. The v4 data is fetched through the get_pings function by passing “v4” as the schema parameter. The following parameters are valid for the new data format:

  • app: an application name, e.g.: “Firefox”;
  • channel: a channel name, e.g.: “nightly”;
  • version: the application version, e.g.: “40.0a1”;
  • build_id: a build id or a range of build ids, e.g.:”20150601000000″ or (“20150601000000”, “20150610999999”)
  • submission_date: a submission date or a range of submission dates, e.g: “20150601” or (“20150601”, “20150610”)
  • doc_type: ping type, e.g: “main”, set to “saved_session” by default
  • fraction: the fraction of pings to return, set to 1.0 by default

Once you have a RDD, you can further filter the pings down by reason. There is also a new experimental API that returns the history of submissions for a subset of profiles, which can be used for longitudinal analyses.

Next-gen Data Analysis Framework for Telemetry

The easier it is to get answers, the more questions will be asked

In that spirit me and Mark Reid have been working for a while now on a new analysis infrastracture to make it as easy as possible for engineers to get answers to data related questions.

Our shiny new analysis infrastructure is based primarily on IPython and Spark. I blogged about Spark before, I even gave a short tutorial on it at our last workweek in Portland (slides and tutorial); IPython might be something you are not familiar with unless you have a background in science. In a nutshell it’s a browser-based notebook with support for code, text, mathematical expressions, inline plots and other rich media.

IPython
An IPython notebook in all its glory

The combination of IPython and Spark allows to write data analyses interactively from a browser and seemingly parallelize them over multiple machines thanks to a rich API with over 80 distributed operators! It’s a huge leap forward in terms of productivity compared to traditional batch oriented map-reduce frameworks. An IPython notebook contains both the code and the product of the execution of that code, like plots. Once executed, a notebook can simply be serialized and uploaded to Github. Then, thanks to nbviewer, it can be visualized and shared among colleagues.

In fact, the issue with sharing just the end product of an analysis is that it’s all too easy for bugs to creep in or to make wrong assumptions. If your end result is a plot, how do you test it? How do you know that what you are looking at does actually reflect the truth? Having the code side by side with its evaluation allows more people to  inspect it and streamlines the review process.

This is what you need to do to start your IPython backed Spark cluster with access to Telemetry data:

  1. Visit the analysis provisioning dashboard at telemetry-dash.mozilla.org and sign in using Persona with an @mozilla.com email address.
  2. Click “Launch an ad-hoc Spark cluster”.
  3. Enter some details:
    • The “Cluster Name” field should be a short descriptive name, like “chromehangs analysis”.
    • Set the number of workers for the cluster. Please keep in mind to use resources sparingly; use a single worker to write and debug your job.
    • Upload your SSH public key.
  4. Click “Submit”.
  5. A cluster will be launched on AWS preconfigured with Spark, IPython and some handy data analysis libraries like pandas and matplotlib.

Once the cluster is ready, you can tunnel IPython through SSH by following the instructions on the dashboard, e.g.:

ssh -i my-private-key -L 8888:localhost:8888 hadoop@ec2-54-70-129-221.us-west-2.compute.amazonaws.com

Finally, you can launch IPython in Firefox by visiting http://localhost:8888.

Now what? Glad you asked. In your notebook listing you will see a Hello World notebook. It’s a very simple analysis that produces the distribution of startup times faceted by operating system for a small fraction of Telemetry submissions; let’s quickly review it here.

We start by importing a telemetry utility to fetch pings and some commonly needed libraries for analysis: a json parser, pandas and matplotlib.

import ujson as json
import matplotlib.pyplot as plt
import pandas as pd
from moztelemetry.spark import get_pings

To execute a block of code in IPython, aka cell, press Shift-Enter. While a cell is being executed, a gray circle will appear in the upper right border of the notebook. When the circle is full, your code is being executed by the IPython kernel; when only the borders of the circle are visible then the kernel is idle and waiting for commands.

Spark exploits parallelism across all cores of your cluster. To see the degree of parallelism you have at your disposal simply yield:


sc.defaultParallelism

Now, let’s fetch a set of telemetry submissions and load it in a RDD using the get_pings utility function from the moztelemetry library:

pings = get_pings(sc,
                  appName="Firefox",
                  channel="nightly",
                  version="*",
                  buildid="*",
                  submission_date="20141208",
                  fraction=0.1)

That’s pretty much self documenting. The fraction parameter, which defaults to 1, selects a random subset of the selected submissions. This comes in handy when you first write your analysis and don’t need to load lots of data to test and debug it.

Note that both the buildid and submission_date parameters accept also a tuple specifying, inclusively, a range of dates, e.g.:


pings = get_pings(sc,
                  appName="Firefox",
                  channel="nightly",
                  version="*",
                  buildid=("20141201", "20141202"),
                  submission_date=("20141202", ""20141208"))

Let’s do something with those pings. Since we are interested in the distribution of the startup time of Firefox faceted by operating system, let’s extract the needed fields from our submissions:

def extract(ping):
    ping = json.loads(ping)
    os = ping["info"]["OS"]
    startup = ping["simpleMeasurements"].get("firstPaint", -1)
    return (os, startup)

cached = pings.map(lambda ping: extract(ping)).filter(lambda p: p[1] > 0).cache()

As the Python API matches closely the one used from Scala, I suggest to have a look at my older Spark tutorial if you are not familiar with Spark. Another good resource are the hands-on exercises from AMP Camp 4.

Now, let’s collect the results back and stuff it into a pandas DataFrame. This is a very common pattern, once you reduce your dataset to a manageable size with Spark you collect it back on your driver (aka the master machine) and finalize your analysis with statistical tests, plots and whatnot.

grouped = cached.groupByKey().collectAsMap()

frame = pd.DataFrame({x: log(pd.Series(list(y))) for x, y in grouped.items()})
frame.boxplot()
plt.ylabel("log(firstPaint)")
plt.show()

startup_hello
Startup distribution by OS

Finally, you can save the notebook, upload it to Github or Bugzilla and visualize it on nbviewer, it’s that simple. Here is the nbviewer powered Hello World notebook. I warmly suggest that you open a bug report on Bugzilla for your custom Telemetry analysis and ask me or Vladan Djeric to review it. Mozilla has been doing code reviews for years and with good reasons, why should data analyses be different?

Congrats, you just completed your first Spark analysis with IPython! If you need any help with your custom job feel free to drop me a line in #telemetry.