Measuring product engagment at scale

How engaged are our users for a certain segment of the population? How many users are actively using a new feature? One way to answer that question is to compute the  engagement ratio (ER) for that segment, which is defined as daily active users (DAU) over monthly active users (MAU), i.e.

ER_{segment} =  \frac{DAU_{segment}}{MAU_{segment}}

Intuitively the closer the ratio is to 1, the higher the number of returning users. A segment can be an arbitrary combination of values across a set of dimensions, like operating system and activity date.

Ideally, given a set of D dimensions, pre-computing the ER for all possible combinations of values of said dimensions would ensure that user queries run as fast as possible. Clearly that’s not very efficient though; if we assume for simplicity that every dimension has k possible values, then there are \sum_{d=1}^{D}\binom{D}{d} k^d ratios.

Is there a way around computing all of those ratios while still having an acceptable query latency? One could build a set of users for each value of each dimension and then at query time simply use set union and set intersection to compute any desired segment. Unfortunately that doesn’t work when you have millions of users as the storage complexity is proportional to the number of items stored in the sets. This is where probabilistic counting and the HyperLogLog sketch comes into play.

HyperLogLog

The HyperLogLog sketch can estimate cardinalities well beyond 10^9 with a standard error of 2% while only using a couple of KB of memory.

The intuition behind is simple. Imagine a hash function that maps user identifiers to a string of N bits. A good hash function ensures that each bit has the same probability of being flipped. If that’s the case then the following is true:

  • 50 % of hashes have the prefix 1
  • 25 % of hashes have the prefix 01
  • 12.5% of hashes the prefix 001

Intuitively, if 4 distinct values are hashed we would expect to see on average one single hash with a prefix of 01 while for 8 distinct values we would expect to see one hash with a prefix of 001 and so on. In other words, the cardinality of the original set can be estimated from the longest prefix of zeros of the hashed values. To reduce the variability of this single estimator, the average of K estimators can be used as the approximated cardinality and it can be shown that the standard error of a HLL sketch is \frac{1.04}{sqrt(K)}.

The detailed algorithm is documented in the original paper, and its practical implementation and variants are covered in depth by a 2013 paper from Google.

Set operations

One of the nice properties of HLL is that the union of two HLL sketches is very simple to compute as the union of a single estimator with another estimator is just the maximum of the two estimators, i.e. the longest prefix of zeros. This property makes the algorithm trivially parallelizable which makes it well suited for map-reduce style computations.

What about set intersection? There are two ways to compute that for e.g. two sets:

It turns out that both approaches yield bad approximations when the overlap is small, which makes set intersection not particularly attractive for many use-cases, which is why we decided to use only the union operation for HLL sketches for our datasets.

Going back to the original problem of estimating the ER for an arbitrary segment, by computing the HLL sketches for all combinations of values across all dimensions the HLL sketch for any segment can be derived using set union.

Implementation

At Mozilla we use both Spark and Presto for analytics and even though both support HLL  their implementation is not compatible, i.e. Presto can’t read HLL sketches serialized from Spark. To that end we created a Spark package, spark-hyperloglog, and a Presto plugin, presto-hyperloglog, to extend both systems with the same HLL implementation.

Usage

Mozilla employees can access the client_count table from re:dash and run Presto queries against it to compute DAU, MAU or the ER for a certain segment. For example, this is how DAU faceted by channel and activity date can be computed:


SELECT normalizedchannel,
       activitydate,
       cardinality(merge(cast(hll AS HLL))) AS hll
FROM client_count
GROUP BY activitydate, normalizedchannel
ORDER BY normalizedchannel, activitydate DESC

The merge aggregate function computes the set union while the cardinality function returns the cardinality of a sketch. A complete example with DAU, MAU and ER for the electrolysis engagement ratio can be seen here.

By default the HLL sketches in the client_count table have a standard error of about 1.6%.

Telemetry meets SQL

In an effort to ease data access at Mozilla we started providing SQL access to our Parquet datasets through Presto. The benefit of SQL is that it’s a commonly understood language among engineers and non-engineers alike and is generally easy to pick up.

Even though we use Redshift for some of our data,  there are datasets that store complex hierarchical data which would require unnatural transformations to fit in a typical SQL store that doesn’t support the flexibility of a nested data model (think of structs, arrays & maps) such as the one provided by Parquet. Furthermore, we were looking to use a single store for our data, i.e. Parquet files on S3, which both Spark and a SQL engine could access directly.

Presto provides the best of both worlds: SQL over Parquet. Presto is an open-source distributed SQL query engine for running interactive analytic queries against various data sources. It allows querying different sources such as Hive and Cassandra, relational databases or even proprietary data stores and a single query can combine data from multiple sources.

Apache Drill offers similar features without requiring up-front schema knowledge, which is a big advantage over Presto given how painful a schema migration can be at times. Overall Drill feels less mature than Presto though and is not supported yet by Amazon EMR unlike Presto, which makes deployment & maintance more involved. For those reasons we picked Presto as our SQL engine.

Presto supports a rich set of data types which map to Parquet ones:

Arrays
The [] operator is used to access an element of an array:

SELECT my_array[1] AS first_element

Maps
The [] operator is used to retrieve the value corresponding to a given key from a map:

SELECT name_to_age_map['Bob'] AS bob_age

Structs
Fields within a structure are accessed with the field reference operator .:

SELECT my_column.my_field

Unnesting maps and structs
The UNNEST clause is used to expand an array or a map into a relation. Arrays are expanded into a single column, and maps are expanded into two columns (key, value). UNNEST can also be used with multiple arguments, in which case they are expanded into multiple columns, with as many rows as the highest cardinality argument:


SELECT numbers, animals, n, a
FROM (
  VALUES
    (ARRAY[2, 5], ARRAY['dog', 'cat', 'bird']),
    (ARRAY[7, 8, 9], ARRAY['cow', 'pig'])
) AS x (numbers, animals)
CROSS JOIN UNNEST(numbers, animals) AS t (n, a);

which yields:

  numbers  |     animals      |  n   |  a
-----------+------------------+------+------
 [2, 5]    | [dog, cat, bird] |    2 | dog
 [2, 5]    | [dog, cat, bird] |    5 | cat
 [2, 5]    | [dog, cat, bird] | NULL | bird
 [7, 8, 9] | [cow, pig]       |    7 | cow
 [7, 8, 9] | [cow, pig]       |    8 | pig
 [7, 8, 9] | [cow, pig]       |    9 | NULL

The longitudinal view is currently accessible through Presto.  As a concrete example, this is how one could count the number of Telemetry fragments over a time range:


SELECT d, count(*)
FROM (
  SELECT substr(ssd, 1, 11) AS d
  FROM longitudinal
  CROSS JOIN UNNEST(subsession_start_date) AS s(ssd)
)
WHERE d >= '2015-11-15' AND d < '2016-03-15'
GROUP BY d
ORDER BY d ASC

Re:dash
Re:dash allows to query Presto directly from Firefox. After a query is run a table is displayed with the result. Queries can be saved and optionally scheduled to run periodically at a given time.

table
Query editor

Different kinds of plots (e.g. bar charts, line charts, boxplots, …) can be built over a table which are updated every time the query is re-run.

graph
Number of Telemetry fragments over time

Custom dashboards can be built that link to tables and plots. Users can visualize the dashboards and optionally access the SQL code that powers them and fork it.

dash
A simple dashboard

Mozilla’s re:dash instance can be accessed at sql.telemetry.mozilla.org.

Longitudinal studies with Telemetry

Unified Telemetry (UT) replaced both Telemetry and FHR at the end of last year. FHR has been historically used to answer longitudinal questions, such as churn, while Telemetry has mainly been used for performance studies.

In UT-land, multiple self-contained submissions are generated for a profile over time in contrast to FHR for which submissions contained all historical records. With the new format, a typical longitudinal query on the raw data requires conceptually a big group-by on the profile ID over all submissions to recreate the history for each profile. To avoid the expensive grouping operation we are providing a longitudinal batch view of our Telemetry dataset.

The longitudinal view is logically organized as a table where rows represent profiles and columns the various metrics (e.g. startup time). Each field of the table contains a list of chronologically sorted values, one per Telemetry submission received for that profile. Even though each profile could have been represented as an array of structures with Parquet, ultimately we decided to represent it as a structure of arrays to deal with some inefficiencies in reading nested data from Spark.

The dataset is periodically regenerated from scratch, which allows us to apply non backward compatible changes to the schema and not worry about merging procedures.

The  dataset can be accessed from Mozilla’s Spark clusters as a DataFrame:


frame = sqlContext.sql(SELECT * FROM longitudinal)

The view contains several thousand measures, which include all histograms and a large part of our scalar metrics stored in the various sections of Telemetry submissions.

frame.printSchema()
root
 |-- profile_id: string
 |-- normalized_channel: string
 |-- submission_date: array
 |    |-- element: string
 |-- build: array
 |    |-- element: struct
 |    |    |-- application_id: string
 |    |    |-- application_name: string
 |    |    |-- architecture: string
 |    |    |-- architectures_in_binary: string
 |    |    |-- build_id: string
 |    |    |-- version: string
 |    |    |-- vendor: string
 |    |    |-- platform_version: string
 |    |    |-- xpcom_abi: string
 |    |    |-- hotfix_version: string
...

A Jupyter notebook with example queries that use the longitudinal dataset is available on Spark clusters launched from a.t.m.o.

On Monoids For Analytics

This is a short post on the elegance of using abstract algebra for analytics in Scala.

A monoid is a set T that is closed under an associative binary operation append with an identity element zero such that append(a, zero) = a . In other words, the following 3 properties apply:

  • Closure – the result of combining two elements of the set is also an elment of the set:

\forall a, b \in T: append(a, b) \in T

  • Associativity – when combining more than two elements of a set the order of the pairwise combinations doesn’t matter, which makes a monoid well suited for parallel computations:

\forall a, b, c \in T: append(append(a, b), c) = append(a, append(b, c))

  • Identity – there is a special element of the set that when combined with any other element of the set yields the same element:

\exists zero \in T: \forall a \in T: append(zero, a) = append(a, zero) = a

One way to define a trait for a monoid in Scala is the following:

trait Monoid[T] {
  def zero: T
  def append(a: T, b: T): T
}

Monoids are everywhere; think of the set of natural numbers and addition or the set of strings and concatenation. Also note that the same set can have multiple “monoidal forms”; for example the set of natural numbers can have both an additive and a multiplicative monoid.

Monoids compose well; for example a tuple of monoids is itself a monoid, as such it’s simple to define a monoid for a complex type once monoids for its constituents types exists. Scalaz and Algebird are two Scala libraries that provide monoids for data types such as List, Set, Option, Map and others. Algebird in particular, which is targeted at building aggregation systems, comes with a set of monoids useful for counting such as DecayedValue for exponential decay, AveragedValue for averaging and HyperLogLog for approximate cardinality counting.

As a concrete example on how monoids provided by Scalaz are used in practice, suppose we have a server that receives sparse histograms from its clients:

import scalaz._
import Scalaz._

case class Histogram(var values: Map[Long, Long], sum: Int, count: Int)

We would like to aggregate histograms over a time window. It’s simple enough to write some code to do that, yet monoids offer an elegant solution. Since Scalaz provides by default an additive monoid for Map and Long, we can easily define one for Histogram as well:

implicit def histogramMonoid: Monoid[Histogram] = new Monoid[Histogram] {
  def zero = Histogram(Map(), 0, 0)
  def append(a: Histogram, b: => Histogram) = Histogram(a.values |+| b.values,
                                                        a.sum    |+| b.sum,
                                                        a.count  |+| b.count)
}

Where |+| invokes the binary operator of the monoid defined over the type of its operators.  We can aggregate histograms now that we have a monoid, e.g.:

val h1 = Histogram(Map(10 -> 12, 100 -> 41), 53, 3)
val h2 = Histogram(Map(10 -> 21, 80 -> 14), 35, 17)
h1 |+| h2

yields:

Histogram(Map(10 -> 33, 80 -> 14, 100 -> 41), 88, 20)

Let’s say that we have different histograms for different measurements which are stored in a mapping from strings to histograms. Since there is a monoid defined over Histogram then we can easily aggregate multiple Map[String, Histogram] as well:

val m1 = Map("metric1" -> h1, "metric2" -> h2)
val m2 = Map("metric1" -> h2, "metric3" -> h1)
m1 |+| m2

yields:

Map(metric1 -> Histogram(Map(10 -> 33, 80 -> 14, 100 -> 41), 88, 20),
    metric2 -> Histogram(Map(10 -> 21, 80 -> 14), 35, 17),
    metric3 -> Histogram(Map(10 -> 12, 100 -> 41), 53, 3))

This is just the tip of the iceberg of the elegance provided by abstract algebra. If this short article caught your interest grab a copy of Functional Programming in Scala, which is without any doubt the best functional programming book I have ever read. The book introduces a variety of abstract algebra concepts with their relative implementations.

Detecting Talos regressions

 

This post is about modelling Talos data with a probabilistic model which can be applied to different use-cases, like detecting regressions and/or improvements over time.

Talos is Mozilla’s multiplatform performance testing framework written in python that we use to run and collect statistics of different performance tests after a push.

As a concrete example, this is how the performance data of a test might look like over time:

noisy

Even though there is some noise, which is exacerbated in this graph as the vertical axis doesn’t start from 0, we clearly see a shift of the distribution over time. We would like to detect such shifts as soon as possible after they happened.

Talos data has been known for a while to generate in some cases bi-modal data points that can break our current alerting engine.

bimodal

Possible reasons for bi-modality are documented in Bug 908888. As past efforts to remove the bi-modal behavior at the source have failed we have to deal with it in our model.

The following are some notes originated from conversations with Joel, Kyle, Mauro and Saptarshi.

Mixture of Gaussians

The data can be modelled as a mixture of K Gaussians, where the parameter K could be determined by fitting K models and selecting the best one according to some criteria.

The first obstacle is to estimate the parameters of the mixture from a set of data points. Let’s state this problem formally; if you are not interested in the mathematical derivation it suffices to know that scikit-learn has an efficient implementation of it.

EM Algorithm

We want to find the probability density f(x) , where f is a mixture of K Gaussians, that is most likely to have generated a given data point x :

f(x; \theta) = \sum_{k=1}^{K} p_k g(x; \mu_k, \sigma_k)

where p_k is the mixing coefficient of cluster K , i.e. the probability that a generic point belongs to cluster K so that \sum_{k=1}^{K}p_k = 1 , and g is the probability density function of the normal distribution:

g(x; \mu_k, \theta_k) = \frac{1}{(\sqrt{2\pi\sigma_k})}e^{-\frac{(x - \mu_k)^2}{2\sigma_k^2}}

Now, given a set of N data points that are independent and identically distributed, we would like to determine the values of p_k , \mu_k and \sigma_k that maximize the log-likelihood function. Finding the maximum of a function often involves taking the derivative of a function and solving for the parameter being maximized, and this is often easier when the function being maximized is a log-likelihood rather than the original likelihood function.

\log\mathcal{L}(\theta | x_1, ..., n_n) = \log\prod_{i=1}^{N} f(x_i; \theta)

To find a maximum of  \mathcal{L} , let’s compute the partial derivative of it wrt \mu_k , \sigma_k and p_k . Since

\frac{\partial{g(x_i; \mu_k, \theta_k)}}{\partial{\mu_k}} = g(x_i; \mu_k, \theta_k) \frac{\partial}{\partial{\mu_k}} [-\frac{(x_i - \mu_k)^2}{2\sigma_k^2}] = \frac{g(x_i; \mu_k, \theta_k) (x_i - \mu_k)}{\sigma_k^2}

then

\frac{\partial\log\mathcal{L}}{\partial{\mu_k}} = \sum_{i=1}^{N}\frac{1}{\sigma_k^2}\frac{p_kg(x_i; \mu_k, \sigma_k)}{\sum_{j=1}^{N}p_jg(x_i; \mu_j, \sigma_j)}(x_i - \mu_k)

But, by Bayes’ Theorem, \frac{p_kg(x_i; \mu_k, \sigma_k)}{\sum_{j=1}^{N}p_jg(x_i; \mu_j, \sigma_j)} is the conditional probability of selecting cluster k given that the data point x_i was observed, i.e. p(k|x_i) , so that:

\frac{\log\partial\mathcal{L}}{\partial{\mu_k}} = \sum_{i=1}^{N}\frac{p(k|x_i)}{\sigma_k^2}(x_i - \mu_k)

By applying a similar procedure to compute the partial derivative with respect to \sigma_k and p_k and finally setting the derivatives we just found to zero, we obtain:

\mu_k = \frac{\sum_{i=1}^{N}p(k|x_i)x_i}{\sum_{i=1}^{N}p(k|x_i)}

\sigma_k = \sqrt{\frac{\sum_{i=1}^{N}p(k|x_i)(x_i - \mu_k)^2}{\sum_{n=1}^{N}p(k|x_i)}}

p_k = \frac{1}{N}\sum_{i=1}^{N}p(k|x_i)

The first two equations turn out to be simply the sample mean and standard deviation of the data weighted by the conditional probability that component k generated the data point x_i .

Since the terms p(k|x_i) depend on all the terms on the left-hand side of the expressions above, the equations are hard to solve directly and this is where the EM algorithm comes to rescue. It can be proven that the EM algorithm convergences to a local maximum of the likelihood function when the following computations are iterated:

E Step

p^{(n)}(k|x_i) = \frac{p_k^{(n)}g(x_i; \mu_k^{(n)}, \sigma_k^{(n)})}{\sum_{j=1}^{N}p_jg(x_i; \mu_j^{(n)}, \sigma_j^{(n)})}

M Step

\mu_k^{(n+1)} = \frac{\sum_{i=1}^{N}p^{(n)}(k|x_i)x_i}{\sum_{i=1}^{N}p^{(n)}(k|x_i)}

\sigma_k^{(n+1)} = \sqrt{\frac{\sum_{i=1}^{N}p^{(n)}(k|x_i)(x_i - \mu^{(n+1)})^2}{\sum_{n=1}^{N}p^{(n)}(k|x_i)}}

p_k^{(n+1)} = \frac{1}{N}\sum_{i=1}^{N}p^{(n)}(k|x_i)

Intuitively, in the E-step the parameters of the components are assumed to be given and the data points are soft-assigned to the clusters. In the M-step we compute the updated parameters for our clusters given the new assignment.

Determine K

Now that we have a way to fit a mixture of K gaussians to our data, how do we determine K ? One way to deal with it is to generate K models and select the best one according to their BIC score. Adding more components to a model will fit the data better but doing so may result in overfitting. BIC prevents this problem by introducing a penalty term for the number of parameters in the model.

Regression Detection

A simple approach to detect changes in the series is to use a rolling window and compare the distribution of the first half of the window to the distribution of the second half. Since we are dealing with Gaussians, we can use the z-statistic to compare the mean of each component in the left window to mean of its corresponding component in the right window:

z = \frac{\mu_l - \mu_r}{\sqrt{\frac{\sigma_l^2}{n_l} + \frac{\sigma_r^2}{n_r}}}

In the following plots the red dots are points at which the regression detection would have fired. Ideally the system would generate a single alert per cluster for the first point after the distribution shift.

regression1

regression2

regression3

Talos generates hundreds of different time series, some with dominating and peculiar noise patterns. As such it’s hard to come up with a generic model that solves the problem for good and represents the data perfectly.

Since the API to access this data is public, it provides an exciting opportunity for a contributor to come up with better ways of representing it. Feel free to join us on #perf if you are interested. Oh and, did I mentions we are hiring a Senior Data Engineer?

Telemetry meets Parquet

In Mozilla’s Telemetry land, raw JSON pings are stored on S3 within files containing framed Heka records, which form our immutable, append-only master dataset. Reading the raw data with Spark can be slow for analyses that read only a handful of fields per record from the thousands available; not to mention the cost of parsing the JSON blobs in the first place.

We are slowly moving away from JSON to a more OLAP-friendly serialization format for the master dataset, which requires defining a proper schema. Given that we have been collecting data in big JSON payloads since the beginning of time, different subsystems have been using various, in some cases schema un-friendly, data layouts. That and the fact that we have thousands of fields embedded in a complex nested structure makes it hard to retrospectively enforce a schema that matches our current data, so a change is likely not going to happen overnight.

Until recently the only way to process Telemetry data with Spark was to read the raw data directly, which isn’t efficient but gets the job done.

Batch views

Some analyses require filtering out a considerable amount of data before the actual workload can start. To improve the efficiency of such jobs, we started defining derived streams, or so called pre-computed batch views in lambda’s architecture lingo.

A batch view is regenerated or updated daily and, in a mathematical sense, it’s simply a function over the entire master dataset. The view usually contains a subset of the master dataset, possibly transformed, with the objective to make analyses that depend on it more efficient.

batch_view

As a concrete example, we have run an A/B test on Aurora 43 in which we disabled or enabled Electrolysis (E10s) based on a coin flip (note that E10s is enabled by default on Aurora). In this particular experiment we were aiming to study the performance of E10s. The experiment had a lifespan of one week per user. As some of our users access Firefox sporadically, we couldn’t just sample a few days worth of data and be done with it as ignoring the long tail of submissions would have biased our results.

We clearly needed a batch view of our master dataset that contained only submissions for users that were currently enrolled in the experiment, in order to avoid an expensive filtering step.

Parquet

We decided to serialize the data for our batch views back to S3 as Parquet files. Parquet is a columnar file storage that is slowly becoming the lingua franca of Hadoop’s ecosystem as it can be read and written from e.g. Hive, Pig & Spark.

Conceptually it’s important to clarify that Parquet is just a storage format, i.e. a binary representation of the data, and it relies on object models, like the one provided by Avro or Thrift, to represent the data in memory. A set of object model converters are provided to map between the in-memory representation and the storage format.

Parquet supports efficient compression and encoding schemes, which are applied on a per-column level where the data tends to be homogeneous. Furthermore, as Spark can load parquet files in a Dataframe, a Python analysis can potentially experience the same performance as a Scala one thanks to a unified physical execution layer.

A Parquet file is composed of:

  • row groups, which contain a subset of rows – a row group is composed of a set of column chunks;
  • column chunks, which contain values for a specific column – a column chunk is partitioned in a set of pages;
  • pages, which are the smallest level of granularity for reads – compression and encoding are applied at this level of abstraction
  • a footer, which contains the schema and some other metadata.

parquet

As batch views typically use only a subset of the fields provided in our Telemetry payloads, the problem of defining a schema for such subset becomes a non-issue.

Benefits

Unsurpsingly, we have seen speed-ups of up to a couple of orders of magnitude in some analyses and a reduction of file size by up to 8x compared to files having the same compression scheme and content (no JSON, just framed Heka records).

Each view is generated with a Spark job. In the future Heka is likely going to produce the views directly, once it supports Parquet natively.

Telemetry metrics roll-ups

Our Telemetry aggregation system has been serving us well for quite some time. As Telemetry evolved though, maintaining the codebase and adding new features such as keyed histograms has proven to be challenging. With the introduction of unified FHR/Telemetry, we decided to rewrite the aggregation pipeline with an updated set of requirements in mind.

Metrics

A ping is the data payload that clients submit to our server. The payload contains, among other things, over thousand metrics of the following types:

  • numerical, like e.g. startup time
  • categorical, like e.g. operating system name
  • distributional, like e.g. garbage collection timings

Distributions are implemented with histograms that come in different shapes and sizes. For example, a keyed histogram represent a collection of labelled histograms. It’s not rare for keyed histograms to have thousands of possible labels. For instance, MISBEHAVING_ADDONS_JANK_LEVEL, which measures the longest blocking operation performed by an add-on, has potentially a label for each extensions.

The main objective of the aggregator is to create time or build-id based aggregates by a set of dimensions:

  • channel, e.g. nightly
  • build-id or submission date
  • metric name, e.g. GC_MS
  • label, for keyed histograms
  • application name, e.g. Fennec
  • application version, e.g. 41
  • CPU architecture, e.g. x86_64
  • operating system, e.g. Windows
  • operating system version, e.g. 6.1
  • e10s enabled
  • process type, e.g. content or parent

As scalar and categorical metrics are converted to histograms during the aggregation, ultimately we display only distributions in our dashboard.

Raw Storage

We receive millions of pings each day over all our channels. A raw uncompressed ping has a size of over 100KB. Pings are sent to our edge servers and end up being stored in an immutable chunk of up to 300MB on S3, partitioned by submission date, application name, update channel, application version, and build id.

As we are currently collecting v4 submissions only on pre-release channels, we store about 700 GB per day; this considering only saved_session pings as those are the ones being aggregated. Once we start receiving data on the release channel as well we are likely going to double that number.

As soon as an immutable chunk is stored on S3, an AWS lambda function adds a corresponding entry to a SimpleDB index. The index allows Spark jobs to query the set of available pings by different criteria without the need of performing an expensive scan over S3.

Spark Aggregator

A daily scheduled Spark job performs the aggregation, on the data received the day before, by the set of dimensions mentioned above. We are likely going to move from a batch job to a streaming one in the future to reduce the latency from the time a ping is stored on S3 to the time its data appears in the dashboard.

Two kinds of aggregates are produced by aggregator:

  • submission date based
  • build-id based

Aggregates by build-id computed for a given submission date have to be added to the historical ones. As long as there are submissions coming from an old build of Firefox, we will keep receiving and aggregating data for it. The aggregation of the historical aggregates with the daily computed ones (i.e. partial aggregates) happens within a PostgreSQL database.

Database

There is only one type of table within the database which is partitioned by channel, version and build-id (or submission date depending on the aggregation type).

As PostgreSQL supports natively json blobs and arrays, it came naturally to express each row just as a couple of fields, one being a json object containing a set of dimensions and the other being an array representing the histogram. Adding a new dimension in the future should be rather painless as dimensions are not represented with columns.

When a new partial aggregate is pushed to the database, PostreSQL finds the current historical entry for that combination of dimensions, if it exists, and updates the current histogram by summing to it the partially aggregated histogram. In reality a temporary table is pushed to the database that contains all partial aggregates which is then merged with the historical aggregates, but the underlying logic remains the same.

As the database is usually queried by submission date or build-id and as there are milions of partial aggregates per day deriving from the possible combinations of dimensions, the table is partitioned the way it is to allow the upsert operation to be performed as fast as possible.

API

An inverted index on the json blobs allows to efficiently retrieve, and aggregate, all histograms matching a given filtering criteria.

For example,


select aggregate_histograms(histograms)
from build_id_nightly_41_20150602
where dimensions @&gt; '{"metric": "SIMPLE_MEASURES_UPTIME",
                      "application": "Firefox",
                      "os": "Windows"}'::jsonb;

retrieves a list of histograms, one for each combination of dimensions matching the where clause, and adds them together producing a final histogram that represents the distribution of the uptime measure for Firefox users on the nightly Windows build created on the 2nd of June 2015.

Aggregates are made available through a HTTP API. For example, to retrieve the aggregated histogram for the GC_MS metric on Windows for build-ids of the 2015/06/15 and 2015/06/16:

curl -X GET "http://aggregates.telemetry.mozilla.org/aggregates_by/build_id/channels/nightly/?version=41&dates=20150615&metric=GC_MS&os=Windows_NT"

 which returns

{"buckets":[0, ..., 10000],
 "data":[{"date":"20150615",
          "count":239459,
          "histogram":[309, ..., 5047],
          "label":""}],
 "kind":"exponential",
 "description":"Time spent running JS GC (ms)"}

Dashboard

Our intern, Anthony Zhang, did a phenomenal job creating a nifty dashboard to display the aggregates. Even though it’s still under active development, it’s already functional and thanks to it we were able to spot a serious bug in the v2 aggregation pipeline.

It comes with two views, the histogram view designed for viewing distributions of measures:

histogramand an evolution view for viewing the evolution of aggregate values for measures over time:

evolutionAs we started aggregating data at the beginning of June, the evolution plot looks rightfully wacky before that date.