# Detecting Talos regressions

This post is about modelling Talos data with a probabilistic model which can be applied to different use-cases, like detecting regressions and/or improvements over time.

Talos is Mozilla’s multiplatform performance testing framework written in python that we use to run and collect statistics of different performance tests after a push.

As a concrete example, this is how the performance data of a test might look like over time:

Even though there is some noise, which is exacerbated in this graph as the vertical axis doesn’t start from 0, we clearly see a shift of the distribution over time. We would like to detect such shifts as soon as possible after they happened.

Talos data has been known for a while to generate in some cases bi-modal data points that can break our current alerting engine.

Possible reasons for bi-modality are documented in Bug 908888. As past efforts to remove the bi-modal behavior at the source have failed we have to deal with it in our model.

The following are some notes originated from conversations with Joel, Kyle, Mauro and Saptarshi.

Mixture of Gaussians

The data can be modelled as a mixture of $K$ Gaussians, where the parameter $K$ could be determined by fitting $K$ models and selecting the best one according to some criteria.

The first obstacle is to estimate the parameters of the mixture from a set of data points. Let’s state this problem formally; if you are not interested in the mathematical derivation it suffices to know that scikit-learn has an efficient implementation of it.

EM Algorithm

We want to find the probability density $f(x)$, where $f$ is a mixture of $K$ Gaussians, that is most likely to have generated a given data point $x$:

$f(x; \theta) = \sum_{k=1}^{K} p_k g(x; \mu_k, \sigma_k)$

where $p_k$ is the mixing coefficient of cluster $K$, i.e. the probability that a generic point belongs to cluster $K$ so that $\sum_{k=1}^{K}p_k = 1$, and $g$ is the probability density function of the normal distribution:

$g(x; \mu_k, \theta_k) = \frac{1}{(\sqrt{2\pi\sigma_k})}e^{-\frac{(x - \mu_k)^2}{2\sigma_k^2}}$

Now, given a set of $N$ data points that are independent and identically distributed, we would like to determine the values of $p_k$, $\mu_k$ and $\sigma_k$ that maximize the log-likelihood function. Finding the maximum of a function often involves taking the derivative of a function and solving for the parameter being maximized, and this is often easier when the function being maximized is a log-likelihood rather than the original likelihood function.

$\log\mathcal{L}(\theta | x_1, ..., n_n) = \log\prod_{i=1}^{N} f(x_i; \theta)$

To find a maximum of  $\mathcal{L}$, let’s compute the partial derivative of it wrt $\mu_k$, $\sigma_k$ and $p_k$. Since

$\frac{\partial{g(x_i; \mu_k, \theta_k)}}{\partial{\mu_k}} = g(x_i; \mu_k, \theta_k) \frac{\partial}{\partial{\mu_k}} [-\frac{(x_i - \mu_k)^2}{2\sigma_k^2}] = \frac{g(x_i; \mu_k, \theta_k) (x_i - \mu_k)}{\sigma_k^2}$

then

$\frac{\partial\log\mathcal{L}}{\partial{\mu_k}} = \sum_{i=1}^{N}\frac{1}{\sigma_k^2}\frac{p_kg(x_i; \mu_k, \sigma_k)}{\sum_{j=1}^{N}p_jg(x_i; \mu_j, \sigma_j)}(x_i - \mu_k)$

But, by Bayes’ Theorem, $\frac{p_kg(x_i; \mu_k, \sigma_k)}{\sum_{j=1}^{N}p_jg(x_i; \mu_j, \sigma_j)}$ is the conditional probability of selecting cluster $k$ given that the data point $x_i$ was observed, i.e. $p(k|x_i)$, so that:

$\frac{\log\partial\mathcal{L}}{\partial{\mu_k}} = \sum_{i=1}^{N}\frac{p(k|x_i)}{\sigma_k^2}(x_i - \mu_k)$

By applying a similar procedure to compute the partial derivative with respect to $\sigma_k$ and $p_k$ and finally setting the derivatives we just found to zero, we obtain:

$\mu_k = \frac{\sum_{i=1}^{N}p(k|x_i)x_i}{\sum_{i=1}^{N}p(k|x_i)}$

$\sigma_k = \sqrt{\frac{\sum_{i=1}^{N}p(k|x_i)(x_i - \mu_k)^2}{\sum_{n=1}^{N}p(k|x_i)}}$

$p_k = \frac{1}{N}\sum_{i=1}^{N}p(k|x_i)$

The first two equations turn out to be simply the sample mean and standard deviation of the data weighted by the conditional probability that component $k$ generated the data point $x_i$.

Since the terms $p(k|x_i)$ depend on all the terms on the left-hand side of the expressions above, the equations are hard to solve directly and this is where the EM algorithm comes to rescue. It can be proven that the EM algorithm convergences to a local maximum of the likelihood function when the following computations are iterated:

###### E Step

$p^{(n)}(k|x_i) = \frac{p_k^{(n)}g(x_i; \mu_k^{(n)}, \sigma_k^{(n)})}{\sum_{j=1}^{N}p_jg(x_i; \mu_j^{(n)}, \sigma_j^{(n)})}$

###### M Step

$\mu_k^{(n+1)} = \frac{\sum_{i=1}^{N}p^{(n)}(k|x_i)x_i}{\sum_{i=1}^{N}p^{(n)}(k|x_i)}$

$\sigma_k^{(n+1)} = \sqrt{\frac{\sum_{i=1}^{N}p^{(n)}(k|x_i)(x_i - \mu^{(n+1)})^2}{\sum_{n=1}^{N}p^{(n)}(k|x_i)}}$

$p_k^{(n+1)} = \frac{1}{N}\sum_{i=1}^{N}p^{(n)}(k|x_i)$

Intuitively, in the E-step the parameters of the components are assumed to be given and the data points are soft-assigned to the clusters. In the M-step we compute the updated parameters for our clusters given the new assignment.

Determine K

Now that we have a way to fit a mixture of $K$ gaussians to our data, how do we determine $K$? One way to deal with it is to generate $K$ models and select the best one according to their BIC score. Adding more components to a model will fit the data better but doing so may result in overfitting. BIC prevents this problem by introducing a penalty term for the number of parameters in the model.

Regression Detection

A simple approach to detect changes in the series is to use a rolling window and compare the distribution of the first half of the window to the distribution of the second half. Since we are dealing with Gaussians, we can use the z-statistic to compare the mean of each component in the left window to mean of its corresponding component in the right window:

$z = \frac{\mu_l - \mu_r}{\sqrt{\frac{\sigma_l^2}{n_l} + \frac{\sigma_r^2}{n_r}}}$

In the following plots the red dots are points at which the regression detection would have fired. Ideally the system would generate a single alert per cluster for the first point after the distribution shift.

Talos generates hundreds of different time series, some with dominating and peculiar noise patterns. As such it’s hard to come up with a generic model that solves the problem for good and represents the data perfectly.

Since the API to access this data is public, it provides an exciting opportunity for a contributor to come up with better ways of representing it. Feel free to join us on #perf if you are interested. Oh and, did I mentions we are hiring a Senior Data Engineer?

# Telemetry meets Parquet

In Mozilla’s Telemetry land, raw JSON pings are stored on S3 within files containing framed Heka records, which form our immutable, append-only master dataset. Reading the raw data with Spark can be slow for analyses that read only a handful of fields per record from the thousands available; not to mention the cost of parsing the JSON blobs in the first place.

We are slowly moving away from JSON to a more OLAP-friendly serialization format for the master dataset, which requires defining a proper schema. Given that we have been collecting data in big JSON payloads since the beginning of time, different subsystems have been using various, in some cases schema un-friendly, data layouts. That and the fact that we have thousands of fields embedded in a complex nested structure makes it hard to retrospectively enforce a schema that matches our current data, so a change is likely not going to happen overnight.

Until recently the only way to process Telemetry data with Spark was to read the raw data directly, which isn’t efficient but gets the job done.

Batch views

Some analyses require filtering out a considerable amount of data before the actual workload can start. To improve the efficiency of such jobs, we started defining derived streams, or so called pre-computed batch views in lambda’s architecture lingo.

A batch view is regenerated or updated daily and, in a mathematical sense, it’s simply a function over the entire master dataset. The view usually contains a subset of the master dataset, possibly transformed, with the objective to make analyses that depend on it more efficient.

As a concrete example, we have run an A/B test on Aurora 43 in which we disabled or enabled Electrolysis (E10s) based on a coin flip (note that E10s is enabled by default on Aurora). In this particular experiment we were aiming to study the performance of E10s. The experiment had a lifespan of one week per user. As some of our users access Firefox sporadically, we couldn’t just sample a few days worth of data and be done with it as ignoring the long tail of submissions would have biased our results.

We clearly needed a batch view of our master dataset that contained only submissions for users that were currently enrolled in the experiment, in order to avoid an expensive filtering step.

Parquet

We decided to serialize the data for our batch views back to S3 as Parquet files. Parquet is a columnar file storage that is slowly becoming the lingua franca of Hadoop’s ecosystem as it can be read and written from e.g. Hive, Pig & Spark.

Conceptually it’s important to clarify that Parquet is just a storage format, i.e. a binary representation of the data, and it relies on object models, like the one provided by Avro or Thrift, to represent the data in memory. A set of object model converters are provided to map between the in-memory representation and the storage format.

Parquet supports efficient compression and encoding schemes, which are applied on a per-column level where the data tends to be homogeneous. Furthermore, as Spark can load parquet files in a Dataframe, a Python analysis can potentially experience the same performance as a Scala one thanks to a unified physical execution layer.

A Parquet file is composed of:

• row groups, which contain a subset of rows – a row group is composed of a set of column chunks;
• column chunks, which contain values for a specific column – a column chunk is partitioned in a set of pages;
• pages, which are the smallest level of granularity for reads – compression and encoding are applied at this level of abstraction
• a footer, which contains the schema and some other metadata.

As batch views typically use only a subset of the fields provided in our Telemetry payloads, the problem of defining a schema for such subset becomes a non-issue.

Benefits

Unsurpsingly, we have seen speed-ups of up to a couple of orders of magnitude in some analyses and a reduction of file size by up to 8x compared to files having the same compression scheme and content (no JSON, just framed Heka records).

Each view is generated with a Spark job. As a concrete example, E10sExperiment.scala generates the batch view for the E10s experiment cited above. Views can be easily added as most of the boilerplate code is taken care of. In the future Heka is likely going to produce the views directly, once it supports Parquet natively.

# Telemetry metrics roll-ups

Our Telemetry aggregation system has been serving us well for quite some time. As Telemetry evolved though, maintaining the codebase and adding new features such as keyed histograms has proven to be challenging. With the introduction of unified FHR/Telemetry, we decided to rewrite the aggregation pipeline with an updated set of requirements in mind.

Metrics

A ping is the data payload that clients submit to our server. The payload contains, among other things, over thousand metrics of the following types:

• numerical, like e.g. startup time
• categorical, like e.g. operating system name
• distributional, like e.g. garbage collection timings

Distributions are implemented with histograms that come in different shapes and sizes. For example, a keyed histogram represent a collection of labelled histograms. It’s not rare for keyed histograms to have thousands of possible labels. For instance, MISBEHAVING_ADDONS_JANK_LEVEL, which measures the longest blocking operation performed by an add-on, has potentially a label for each extensions.

The main objective of the aggregator is to create time or build-id based aggregates by a set of dimensions:

• channel, e.g. nightly
• build-id or submission date
• metric name, e.g. GC_MS
• label, for keyed histograms
• application name, e.g. Fennec
• application version, e.g. 41
• CPU architecture, e.g. x86_64
• operating system, e.g. Windows
• operating system version, e.g. 6.1
• e10s enabled
• process type, e.g. content or parent

As scalar and categorical metrics are converted to histograms during the aggregation, ultimately we display only distributions in our dashboard.

Raw Storage

We receive millions of pings each day over all our channels. A raw uncompressed ping has a size of over 100KB. Pings are sent to our edge servers and end up being stored in an immutable chunk of up to 300MB on S3, partitioned by submission date, application name, update channel, application version, and build id.

As we are currently collecting v4 submissions only on pre-release channels, we store about 700 GB per day; this considering only saved_session pings as those are the ones being aggregated. Once we start receiving data on the release channel as well we are likely going to double that number.

As soon as an immutable chunk is stored on S3, an AWS lambda function adds a corresponding entry to a SimpleDB index. The index allows Spark jobs to query the set of available pings by different criteria without the need of performing an expensive scan over S3.

Spark Aggregator

A daily scheduled Spark job performs the aggregation, on the data received the day before, by the set of dimensions mentioned above. We are likely going to move from a batch job to a streaming one in the future to reduce the latency from the time a ping is stored on S3 to the time its data appears in the dashboard.

Two kinds of aggregates are produced by aggregator:

• submission date based
• build-id based

Aggregates by build-id computed for a given submission date have to be added to the historical ones. As long as there are submissions coming from an old build of Firefox, we will keep receiving and aggregating data for it. The aggregation of the historical aggregates with the daily computed ones (i.e. partial aggregates) happens within a PostgreSQL database.

Database

There is only one type of table within the database which is partitioned by channel, version and build-id (or submission date depending on the aggregation type).

As PostgreSQL supports natively json blobs and arrays, it came naturally to express each row just as a couple of fields, one being a json object containing a set of dimensions and the other being an array representing the histogram. Adding a new dimension in the future should be rather painless as dimensions are not represented with columns.

When a new partial aggregate is pushed to the database, PostreSQL finds the current historical entry for that combination of dimensions, if it exists, and updates the current histogram by summing to it the partially aggregated histogram. In reality a temporary table is pushed to the database that contains all partial aggregates which is then merged with the historical aggregates, but the underlying logic remains the same.

As the database is usually queried by submission date or build-id and as there are milions of partial aggregates per day deriving from the possible combinations of dimensions, the table is partitioned the way it is to allow the upsert operation to be performed as fast as possible.

API

An inverted index on the json blobs allows to efficiently retrieve, and aggregate, all histograms matching a given filtering criteria.

For example,


select aggregate_histograms(histograms)
from build_id_nightly_41_20150602
where dimensions @&gt; '{"metric": "SIMPLE_MEASURES_UPTIME",
"application": "Firefox",
"os": "Windows"}'::jsonb;



retrieves a list of histograms, one for each combination of dimensions matching the where clause, and adds them together producing a final histogram that represents the distribution of the uptime measure for Firefox users on the nightly Windows build created on the 2nd of June 2015.

Aggregates are made available through a HTTP API. For example, to retrieve the aggregated histogram for the GC_MS metric on Windows for build-ids of the 2015/06/15 and 2015/06/16:

curl -X GET "http://aggregates.telemetry.mozilla.org/aggregates_by/build_id/channels/nightly/?version=41&dates=20150615&metric=GC_MS&os=Windows_NT"



which returns

{"buckets":[0, ..., 10000],
"data":[{"date":"20150615",
"count":239459,
"histogram":[309, ..., 5047],
"label":""}],
"kind":"exponential",
"description":"Time spent running JS GC (ms)"}



Dashboard

Our intern, Anthony Zhang, did a phenomenal job creating a nifty dashboard to display the aggregates. Even though it’s still under active development, it’s already functional and thanks to it we were able to spot a serious bug in the v2 aggregation pipeline.

It comes with two views, the histogram view designed for viewing distributions of measures:

and an evolution view for viewing the evolution of aggregate values for measures over time:

As we started aggregating data at the beginning of June, the evolution plot looks rightfully wacky before that date.

# Spark best practices

We have been running Spark for a while now at Mozilla and this post is a summary of things we have learned about tuning and debugging Spark jobs.

#### Spark execution model

Spark’s simplicity makes it all too easy to ignore its execution model and still manage to write jobs that eventually complete. With larger datasets having an understanding of what happens under the hood becomes critical to reduce run-time and avoid out of memory errors.

Let’s start by taking our good old word-count friend as starting example:

rdd = sc.textFile("input.txt")\
.flatMap(lambda line: line.split())\
.map(lambda word: (word, 1))\
.reduceByKey(lambda x, y: x + y, 3)\
.collect()


RDD operations are compiled into a Direct Acyclic Graph of RDD objects, where each RDD points to the parent it depends on:

At shuffle boundaries, the DAG is partitioned into so-called stages that are going to be executed in order, as shown in figure 2. The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation.

To organize data for the shuffle, Spark generates sets of tasks – map tasks to organize the data and reduce tasks to aggregate it. This nomenclature comes from MapReduce and does not directly relate to Spark’s map and reduce operations. Operations within a stage are pipelined into tasks that can run in parallel, as shown in figure 3.

Stages, tasks and shuffle writes and reads are concrete concepts that can be monitored from the Spark shell. The shell can be accessed from the driver node on port 4040, as shown in figure 4.

#### Best practices

Spark UI

Running Spark jobs without the Spark UI is like flying blind. The UI allows to monitor and inspect the execution of jobs. To access it remotely a SOCKS proxy is needed as the shell connects also to the worker nodes.

Using a proxy management tool like FoxyProxy allows to automatically filter URLs based on text patterns and to limit the proxy settings to domains that match a set of rules. The browser add-on automatically handles turning the proxy on and off when you switch between viewing websites hosted on the master node, and those on the Internet.

Assuming that you launched your Spark cluster with the EMR service on AWS, type the following command to create a proxy:

ssh -i ~/mykeypair.pem -N -D 8157 hadoop@ec2-...-compute-1.amazonaws.com


Finally, import the following configuration into FoxyProxy:

<?xml version="1.0" encoding="UTF-8"?>
<foxyproxy>
<proxies>
<proxy name="emr-socks-proxy" notes="" fromSubscription="false" enabled="true" mode="manual" selectedTabIndex="2" lastresort="false" animatedIcons="true" includeInCycle="true" color="#0055E5" proxyDNS="true" noInternalIPs="false" autoconfMode="pac" clearCacheBeforeUse="false" disableCache="false" clearCookiesBeforeUse="false" rejectCookies="false">
<matches>
<match enabled="true" name="*ec2*.amazonaws.com*" pattern="*ec2*.amazonaws.com*" isRegEx="false" isBlackList="false" isMultiLine="false" caseSensitive="false" fromSubscription="false" />
<match enabled="true" name="*ec2*.compute*" pattern="*ec2*.compute*" isRegEx="false" isBlackList="false" isMultiLine="false" caseSensitive="false" fromSubscription="false" />
<match enabled="true" name="10.*" pattern="http://10.*" isRegEx="false" isBlackList="false" isMultiLine="false" caseSensitive="false" fromSubscription="false" />
<match enabled="true" name="*10*.amazonaws.com*" pattern="*10*.amazonaws.com*" isRegEx="false" isBlackList="false" isMultiLine="false" caseSensitive="false" fromSubscription="false" />
<match enabled="true" name="*10*.compute*" pattern="*10*.compute*" isRegEx="false" isBlackList="false" isMultiLine="false" caseSensitive="false" fromSubscription="false" />
<match enabled="true" name="*localhost*" pattern="*localhost*" isRegEx="false" isBlackList="false" isMultiLine="false" caseSensitive="false" fromSubscription="false" />
</matches>
</proxy>
</proxies>
</foxyproxy>

Once the proxy is enabled you can open the Spark UI by visiting localhost:4040.

Use the right level of parallelism

Clusters will not be fully utilized unless the level of parallelism for each operation is high enough. Spark automatically sets the number of partitions of an input file according to its size and for distributed shuffles, such as groupByKey and reduceByKey, it uses the largest parent RDD’s number of partitions. You can pass the level of parallelism as a second argument to an operation. In general, 2-3 tasks per CPU core in your cluster are recommended. That said, having tasks that are too small is also not advisable as there is some overhead paid to schedule and run a task.

As a rule of thumb tasks should take at least 100 ms to execute; you can ensure that this is the case by monitoring the task execution latency from the Spark Shell. If your tasks take considerably longer than that keep increasing the level of parallelism, by say 1.5, until performance stops improving.

Reduce working set size

Sometimes, you will get terrible performance or out of memory errors, because the working set of one of your tasks, such as one of the reduce tasks in groupByKey, was too large. Spark’s shuffle operations (sortByKey, groupByKey, reduceByKey, join, etc) build a hash table within each task to perform the grouping, which can often be large.

Even though those tables spill to disk, getting to the point where the tables need to be spilled increases the memory pressure on the executor incurring the additional overhead of disk I/O and increased garbage collection. If you are using pyspark, the memory pressure will also increase the chance of Python running out of memory.

The simplest fix here is to increase the level of parallelism, so that each task’s input set is smaller.

Avoid groupByKey for associative operations

Both reduceByKey and groupByKey can be used for the same purposes but reduceByKey works much better on a large dataset. That’s because Spark knows it can combine output with a common key on each partition before shuffling the data.

In reduce tasks, key-value pairs are kept in a hash table that can spill to disk, as mentioned in “Reduce working set size“. However, the hash table flushes out the data to disk one key at a time. If a single key has more values than can fit in memory, an out of memory exception occurs. Pre-combining the keys on the mappers before the shuffle operation can drastically reduce the memory pressure and the amount of data shuffled over the network.

Avoid reduceByKey when the input and output value types are different

Consider the job of creating a set of strings for each key:

rdd.map(lambda p: (p[0], {p[1]}))\
.reduceByKey(lambda x, y: x | y)\
.collect()


Note how the input values are strings and the output values are sets. The map operation creates lots of temporary small objects. A better way to handle this scenario is to use aggregateByKey:

def seq_op(xs, x):
return xs

def comb_op(xs, ys):
return xs | ys

rdd.aggregateByKey(set(), seq_op, comb_op).collect()


Avoid the flatMap-join-groupBy pattern

When two datasets are already grouped by key and you want to join them and keep them grouped, you can just use cogroup. That avoids all the overhead associated with unpacking and repacking the groups.

The spark.executor.memory option, which determines the amount of memory to use per executor process, is JVM specific. If you are using pyspark you can’t set that option to be equal to the total amount of memory available to an executor node as the JVM might eventually use all the available memory leaving nothing behind for Python.

Using the broadcast functionality available in SparkContext can greatly reduce the size of each serialized task, and the cost of launching a job over a cluster. If your tasks use any large object from the driver program, like a static lookup table, consider turning it into a broadcast variable.

Cache judiciously

Just because you can cache a RDD in memory doesn’t mean you should blindly do so. Depending on how many times the dataset is accessed and the amount of work involved in doing so, recomputation can be faster than the price paid by the increased memory pressure.

It should go without saying that if you only read a dataset once there is no point in caching it, i it will actually make your job slower. The size of cached datasets can be seen from the Spark Shell.

Don’t collect large RDDs

When a collect operation is issued on a RDD, the dataset is copied to the driver, i.e. the master node. A memory exception will be thrown if the dataset is too large to fit in memory; take or takeSample can be used to retrieve only a capped number of elements instead.

Minimize amount of data shuffled

A shuffle is an expensive operation since it involves disk I/O, data serialization, and network I/O. As illustrated in figure 3, each reducer in the second stage has to pull data across the network from all the mappers.

As of Spark 1.3, these files are not cleaned up from Spark’s temporary storage until Spark is stopped, which means that long-running Spark jobs may consume all available disk space. This is done in order to don’t re-compute shuffles.

Know the standard library

Avoid re-implementing existing functionality as it’s guaranteed to be slower.

Use dataframes

A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a pandas Dataframe in Python.

props = get_pings_properties(pings,
["environment/system/os/name",
only_median=True)

frame = sqlContext.createDataFrame(props.map(lambda x: Row(**x)))
frame.groupBy("environment/system/os/name").count().show()


yields:

environment/system/os/name count
Darwin 2368
Linux 2237
Windows_NT 105223


Before any computation on a DataFrame starts, the Catalyst optimizer compiles the operations that were used to build the DataFrame into a physical plan for execution. Since the optimizer generates JVM bytecode for execution, pyspark users will experience the same high performance as Scala users.

# A glance at unified FHR/Telemetry

Lots is changing in Telemetry land. If you do occasionally run data analyses with our Spark infrastructure you might want to keep reading.

Background

The Telemetry and FHR collection systems on desktop are in the process of being unified. Both systems will be sending their data through a common data pipeline which has some features of both the current Telemetry pipeline as well the Cloud Services one that we use to ingest server logs.

The goals of the unification are to:

• avoid measuring the same metric in multiple systems on the client side;
• reduce the latency from the time a measurement occurs until it can be analyzed on the server;
• increase the accuracy of measurements so that they can be better correlated with factors in the user environment such as the specific build, enabled add-ons, and other hardware or software characteristics;
• use a common data pipeline for client telemetry and service log data.

The unified pipeline is currently sending data for Nightly, Aurora and Beta. Classic FHR and Telemetry pipelines are going to keep sending data to the very least until the new unified pipeline has not been fully validated. The plan is to land this feature in 40 Release. We’ll also continue to respect existing user preferences. If the user has opted out of FHR or Telemetry, we’ll continue to respect that for the equivalent data sets. Similarly, the opt-out and opt-in defaults will remain the same for equivalent data sets.

Data format

A Telemetry ping, stored as JSON object on the client, encapsulates the data sent to our backend. The main differences between the new unified Telemetry ping format (v4) and the classic Telemetry one (v2) are that:

• multiple ping types are supported beyond the classic saved-session ping, like the main ping;
• pings have a common top-level which contains basic information shared between types, like build-id and channel;
• pings have an optional environment field which consists of data that is expected to be characteristic for performance and other behavior.

From an analysis point of view, the most important addition is the main ping which includes the very same histograms and other performance and diagnostic data as the v2 saved-session pings. Unlike in “classic” Telemetry though, there can be multiple main pings during a single session. A main ping is triggered by different scenarios, which are documented by the reason field:

• aborted-session: periodically saved to disk and deleted at shutdown – if a previous aborted session ping is found at startup it gets sent to our backend;
• environment-change: generated when the environment changes;
• shutdown: triggered when the browser session ends;
• daily: a session split triggered in 24h hour intervals at local midnight; this is needed to make sure we keep receiving data also from clients that have very long sessions.

Data access through Spark

Once you connect to a Spark enabled IPython notebook launched from our self-service dashboard, you will be prompted with a new tutorial based on the v4 dataset. The v4 data is fetched through the get_pings function by passing “v4” as the schema parameter. The following parameters are valid for the new data format:

• app: an application name, e.g.: “Firefox”;
• channel: a channel name, e.g.: “nightly”;
• version: the application version, e.g.: “40.0a1”;
• build_id: a build id or a range of build ids, e.g.:”20150601000000″ or (“20150601000000”, “20150610999999”)
• submission_date: a submission date or a range of submission dates, e.g: “20150601” or (“20150601”, “20150610”)
• doc_type: ping type, e.g: “main”, set to “saved_session” by default
• fraction: the fraction of pings to return, set to 1.0 by default

Once you have a RDD, you can further filter the pings down by reason. There is also a new experimental API that returns the history of submissions for a subset of profiles, which can be used for longitudinal analyses.

# Strata + Hadoop World @ London

The Strata conference in London has just ended and I came back with the feeling that in a few years the conference might as well be called Strata+Spark World. A quick look at the schedule reveals the sheer amount of Spark related talks. Most people I have spoken to have played with it but are not using it yet in production; mostly because they developed over the years a considerable amount of tooling around Hadoop MapReduce that makes it hard to jump ship. That said, it’s hard to miss where the industry is moving to: Spark is currently the most active Apache project and it’s experiencing an exponential growth.

Peter Wendell spoke about what’s coming in the next versions of Spark on Wednesday. What’s interesting for us, as Mozilla, is that there are some interesting improvements in sight, like metrics visualization for Spark Streaming and various performance improvement in terms of memory and CPU usage. We are not using Spark Streaming at the moment as we have our very own stream processing system, Heka, but we might consider it for few particular use cases.

Lots of work is going into Spark Dataframes, which one might think of as a distributed equivalent of a pandas dataframe. The current API isn’t stable yet but it looks very promising. When leveraging the dataframe API, Python jobs can be as fast as Scala ones thanks to a unified physical execution layer.

I feel that Dataframes work well with simple scalar types, unlike our Telemetry histograms, and when most of the operations needed are available through the DSL. For that reason we are probably not going to adopt the Dataframe API for the time being and keep using the RDD API. That said we could exploit Dataframes in the future by representing Telemetry histograms with an average value.

There was a nice talk about Elasticsearch by Costin Leau on Thursday with a hands-on demo. I have been thinking for a while of using Kibana for Telemetry, a visualization platform for Elasticsearch, and seeing it in action convinced me to give it a shot.

The main issue with unleashing Elasticsearch on our dataset is that we rolled our own histogram representation which is not going to play ball with Elasticsearch. That said, as the goal of this dashboard would be to give an overview of our data, nothing is stopping us to represent histograms with a single average estimate, like the median. The dashboard could empower anyone in the organization to slice, dice and visualize Telemetry data without writing a single line of code while using Spark for non-trivial analyses and our existing dashboard to display the full distributions for a predetermined set of aggregations.

# Dashboards made simple with Spark and Plotly

In my previous about our new Spark infrastructure, I went into the details on how to launch a Spark cluster on AWS to perform custom analyses on Telemetry data. Sometimes though one has the need to rerun an analysis recurrently over a certain timeframe, usually to feed data into dashboards of various kinds. We are going to roll out a new feature that allows users to upload an IPython notebook to the self-serve data analysis dashboard and run it on a scheduled basis. The notebook will be executed periodically with the chosen frequency and the result will be made available as an updated IPython notebook.

To schedule a Spark job:

1. Visit the analysis provisioning dashboard at telemetry-dash.mozilla.org and sign in using Persona with an @mozilla.com email address.
2. Click “Schedule a Spark Job”.
3. Enter some details:
• The “Job Name” field should be a short descriptive name, like “chromehangs analysis”.
• Set the number of workers of the cluster in the “Cluster Size” field.
• Set a schedule frequency using the remaining fields.

Once a new scheduled job is created it will appear in the top listing of the scheduling dashboard. When the job is run its result will be made available as an IPython notebook visible by clicking on the “View Data” entry of your job.

As I briefly mentioned at the beginning, periodic jobs are typically used to feed data to dashboards. Writing dashboards for a custom job isn’t very pleasant and I wrote in the past some simple tool to help with that. It turns out though that thanks to IPython one doesn’t need necessarily to write a dashboard from scratch but can simple re-use the notebook as the dashboard itself! I mean, why not? That might not be good enough for management facing dashboards but acceptable for ones aimed at engineers.

In fact with IPython we are not limited at all to matplotlib’s static charts. Thanks to Plotly, it’s easy enough to generate interactive plots which allow to:

• Check the x and y coordinates of every point on the plot by hovering with the cursor.
• Zoom in on the plot and resize lines, points and axes by clicking and dragging the cursor over a region.
• Pan by holding the shift key while clicking and dragging.
• Zooms back out to the original version by double clicking on the plot.

Plotly comes with its own API but if you have already a matplotlib based chart then it’s trivial to convert it to an interactive plot. As a concrete example, I updated my Spark Hello World example with a plotly chart.

fig = plt.figure(figsize=(18, 7))
frame["WINNT"].plot(kind="hist", bins=50)
plt.title("startup distribution for Windows")
plt.ylabel("count")
plt.xlabel("log(firstPaint)")
py.iplot_mpl(fig, strip_style=True)


As you can see, just a single extra line of code is needed for the conversion.

As WordPress doesn’t support iframes, you are going to have to click on the image and follow the link to see the interactive plot in action.