An overview of Mozilla’s Data Pipeline

This post describes the architecture of Mozilla’s data pipeline, which is used to collect Telemetry data from our users and logs from various services. One of the cool perks of working at Mozilla is that most of what we do is out in the open and because of that I can do more than just show you some diagram with arrows of our architecture; I can point you to the code, script & configuration that underlies it!

To make the examples concrete, the following description is centered around the collection of Telemetry data. The same tool-chain is used to collect, store and analyze data coming from disparate sources though, such as service logs.

center.jpeg

Firefox

There are different APIs and formats to collect data in Firefox, all suiting different use cases:

  • histogramsfor recording multiple data points;
  • scalars – for recording single values;
  • timings – for measuring how long operations take;
  • events – for recording time-stamped events.

These are commonly referred to as probes. Each probe must declare the collection policy it conforms to: either opt-out (aka base telemetry) or opt-in (aka extended telemetry). When adding a new measurement data-reviewers carefully inspect the probe and eventually approve the requested collection policy:

  • opt-out (base telemetry) data is collected by default from all Firefox users; users may choose to turn this data collection off in preferences;
  • opt-in (extended telemetry) data is collected from users who explicitly express a choice to help with Firefox development; this includes all users who install pre-release & testing builds, plus release users who have explicitly checked the box in preferences.

A session begins when Firefox starts up and ends when it shuts down. As a session could be long-running and last weeks, it gets sliced into smaller logical units called subsessions. Each subsession generates a batch of data containing the current state of all probes collected so far, i.e. a main ping, which is sent to our servers. The main ping is just one of the many ping types we support. Developers can create their own ping types if needed.

Pings are submitted via an API that performs a HTTP POST request to our edge servers. If a ping fails to successfully submit (e.g. because of missing internet connection), Firefox will store the ping on disk and retry to send it until the maximum ping age is exceeded.

Kafka

HTTP submissions coming in from the wild hit a load balancer and then an NGINX module. The module accepts data via a HTTP request which it wraps in a Hindsight protobuf message and forwards to two places: a Kafka cluster and a short-lived S3 bucket (landfill) which acts as a fail-safe in case there is a processing error and/or data loss within the rest of the pipeline. The deployment scripts and configuration files of NGINX and Kafka live in a private repository.

The data from Kafka is read from the Complex Event Processors (CEP) and the Data Warehouse Loader (DWL), both of which use Hindsight.

Hindsight

Hindsight, an open source stream processing software system developed by Mozilla as Heka’s successor, is useful for a wide variety of different tasks, such as:

  • converting data from one format to another;
  • shipping data from one location to another;
  • performing real time analysis, graphing, and anomaly detection.

Hindsight’s core is a lightweight data processing kernel written in C that controls a set of Lua plugins executed inside a sandbox.

The CEP are custom plugins that are created, configured and deployed from an UI which produce real-time plots like the number of pings matching a certain criteria.  Mozilla employees can access the UI and create/deploy their own custom plugin in real-time without interfering with other plugins running.

sample.jpeg
CEP – a custom plugin in action

The DWL is composed of a set of plugins that transform, convert & finally shovel pings into S3 for long term storage. In the specific case of Telemetry data, an input plugin reads pings from Kafka, pre-processes them and sends batches to S3, our data lake, for long term storage. The data is compressed and partitioned by a set of dimensions, like date and application.

The data has traditionally been serialized to Protobuf sequence files which contain some nasty “free-form” JSON fields. Hindsight gained recently the ability to dump data directly in Parquet form though.

The deployment scripts and configuration files of the CEP & DWL live in a private repository.

Spark

Once the data reaches our data lake on S3 it can be processed with Spark. We provide a portal (ATMO) that allows Mozilla employees to create their own Spark cluster pre-loaded with a set of libraries & tools, like jupyter, numpy, scipy, pandas etc., and an API to conveniently read data stored in Protobuf form on S3 in a Spark RDD using a ORM-like interface. Behind the scenes we use EMR to create Spark clusters, which are then monitored by ATMO.

ui.jpeg
ATMO – monitoring clusters

ATMO is mainly used to write custom ad-hoc analyses; since our users aren’t necessary data engineers/scientists we chose Python as the main supported language to interface with Spark. From ATMO it’s also possible to schedule periodic notebook runs and inspect the results from a web UI.

As mentioned earlier, most of our data lake contains data serialized to Protobuf with free-form JSON fields. Needless to say, parsing JSON is terribly slow when ingesting TBs of data per day. A set of ETL jobs, written in Scala by Data Engineers and scheduled with Airflow, create Parquet views of our raw data.

A dedicated Spark job feeds daily aggregates to a Postgres database which powers a HTTP service to easily retrieve faceted roll-ups. The service is mainly used by TMO, a dashboard that visualizes distributions and time-series, and cerberus, an anomaly detection tool that detects and alerts developers of changes in the distributions. Originally the sole purpose of the Telemetry pipeline was to feed data into this dashboard but in time its scope and flexibility grew to support more general use-cases.

foo.jpeg
TMO – timeseries

Presto & re:dash

We maintain a couple of Presto clusters and a centralized Hive metastore to query Parquet data with SQL. The Hive metastore provides an universal view of our Parquet dataset to both Spark and Presto clusters.

Presto, and other databases, are behind a re:dash service (STMO) which provides a convenient & powerful interface to query SQL engines and build dashboards that can be shared within the company. Mozilla maintains its own fork of re:dash to iterate quickly on new features, but as good open source citizen we push our changes upstream.

img4.jpeg
STMO – who doesn’t love SQL?

HBase

We use HBase to store the history of pings by client. It’s a relatively new addition and it provides support for fast needle-in-haystack queries.

Is that it?

No, not really. For example, the DWL pushes some of the Telemetry data to Redshift and Elasticsearch but those tools satisfy more niche needs. The pipeline ingests logs from services as well and there are many specialized dashboards out there I haven’t mentioned.

There is a vast ecosystem of tools for processing data at scale, each with their pros & cons. The pipeline grew organically and we added new tools as new use-cases came up that we couldn’t solve with our existing stack. There are still scars left from that growth though which require some effort to get rid of, like ingesting data from schema-less format.

Telemetry meets HBase (again)

At the end of November AWS announced that HBase on EMR supported S3 as data store. That’s great news because it means one doesn’t have to keep around an HDFS cluster with 3x replication, which is not only costly but it comes with its own operational burden.

At the same time we had some use cases that could have been addressed with a key-value store and this seemed like a good opportunity to give HBase a try.

What is HBase?

HBase is an open source, non-relational, distributed key-value store which traditionally runs on top of HDFS. It provides a fault-tolerant, efficient way of storing large quantities of sparse data using column-based compression and storage.

In addition, it provides fast lookup of data thanks to indexing and in-memory cache. HBase is optimized for sequential write operations, and is highly efficient for batch inserts, updates, and deletes. HBase also supports cell versioning so one can look up and use several previous versions of a cell or a row.

The system can be imagined as a distributed log-structured merge tree and is ultimately an open source implementation of Google’s BigTable whitepaper. A HBase table is partitioned horizontally in so called regions, which contains all rows between the region’s start and end key. Region Servers are responsible to serve regions while the HBase master handles region assignments and DDL operations.

A region server has:

  • a BlockCache which serves as a LRU read cache;
  • a BucketCache (EMR version only), which caches reads on local disk;
  • a WAL, used to store writes not yet persisted to HDFS/S3 and stored on HDFS;
  • a MemStore per column family (a collection of columns); a MemStore is a write cache which, once it accumulated enough data, is written to a store file;
  • a store file stores rows as sorted key values on HDFS/S3;
hbase-files.png
HBase architecture with HDFS storage

This is just a 10000-foot overview of the system and there are many articles out there that go into important details, like store file compaction.

hbase_s3.png
EMR’s HBase architecture with S3 storage and BucketCache

One nice property of HBase is that it guarantees linearizable consistency, i.e. if operation B started after operation A successfully completed, then operation B must see the the system in the same state as it was on completion of operation A, or a newer state. That’s easy to do since each row can only be served by one region server.

Why isn’t Parquet good enough?

Many of our datasets are stored on S3 in Parquet form. Parquet is a great format for typical analytical workloads where one needs all the data for a particular subset of measurements. On the other hand, it isn’t really optimized for finding needles in haystacks; partitioning and sorting can help alleviate this issue only so much.

As some of our analysts have the need to efficiently access the telemetry history for a very small and well-defined sub-population of our user base (think of test-pilot clients before they enrolled), a key-value store like HBase or DynamoDB fits that requirement splendidly.

HBase stores and compresses the data per column-family, unlike Parquet which does the same per column. That means the system will read way more data than it is actually needed if only a small subset of columns is read during a full scan. And no, you can’t just have a column family for each individual column as column families are flushed in concert. Furthermore, HBase doesn’t have a concept of types unlike Parquet. Both the key and the value are just bytes and it’s up to the user to interpret those bytes accordingly.

It turns out that Mozilla’s telemetry data was once stored in HBase! If you knew that then you have been around at Mozilla much longer than I have. That approach was later abandoned as keeping around mostly un-utilized data in HDFS was costly and typical analytical workloads involving large scans were slow.

Wouldn’t it be nice to have the best of both worlds: efficient scans and fast look-ups? It turns out there is one open system out there currently being developed that aims to feel that gap. Apache Kudu provides a combination of fast inserts/updates and efficient columnar scans to enable multiple real-time analytic workloads across a single storage layer. I don’t think it’s ready for prime time just yet though.

What about DyamoDB?

DymanoDB is a managed key value store. Leaving aside operational costs, it’s a fair question to wonder how much it differs in terms of pricing for our example use case.

The data we are planning to store has a compressed size of about 200 GB (~ 1.2 TB uncompressed) per day and it consists of 400 Millions key-value pairs of about 3 KB each uncompressed. As we are planning to keep around the data for 90 days, the total size of the table would amount to 18 TB.

HBase costs

Let’s say the machines we want to use for the HBase cluster are m4.xlarge which have 16 GB of RAM. As suggested in Hortonwork’s HBase guidelines, each machine could ideally serve about 50 regions. By dividing the the table into say 1000 regions, each region would have a size of 18 GB, which is still in the recommended maximum region size. Since each machine can serve about 50 regions, and we have 1000 regions, it means our cluster should ideally have a size of 20.

Using on-demand EMR prices the cluster would have a monthly cost of:

20 \mathrm{\ nodes\ } \times 30 \mathrm{\ day} \times \frac{24 \ \mathrm{hour}}{\mathrm{day}} \times \frac{0.239\$ + 0.060 \$}{\mathrm{hour} \times \mathrm{node}} = 4306 \$

This is an upper bound as reserved or spot instances cost less.

The daily batch job that pushes data to HBase uses 5 c3.4xlarge machines and takes 5 hours, so it would have a monthly cost of:

5 \mathrm{\ nodes\ } \times 30 \mathrm{\ day} \times \frac{5 \mathrm{\ hour}}{\mathrm{day}} \times \frac{0.840\$ + 0.210 \$}{\mathrm{hour} \times \mathrm{node}} = 788 \$

To keep around about 18 TB of data on S3 we will need 378 $ at 0.021 $ per GB. Note that this doesn’t include the price for the requests which is rather difficult to calculate, albeit low.

In total we have a cost of about 5500 $ per month for the HBase solution.

DynamoDB costs

DynamoDB’s pricing is based on the desired request throughput the system needs to have. The throughput is measured in capacity units. Let’s assume that one write request per second corresponds to 3 write capacity units as one unit of write capacity is limited to items of up to 1 KB in size and we are dealing with items of about 3 KB in size. Let’s also assume that we want to use a batch job, equivalent to the one used for HBase, to push the data into the store. Which means that we want enough write capacity to shovel 400 M pings in 5 hours:

\frac{\mathrm{sec} \times 3\ \mathrm{write\ unit}}{\mathrm{\ write}} \times \frac{400 * 10^6 \mathrm{\ write}}{5 \mathrm{\ hour}} \times \frac{\mathrm{hour}}{3600 \mathrm{\ sec}} \times \frac{0.0065\ \$}{\mathrm{hour} \times 10 \times \mathrm {write \ unit}} \times\frac{5\ \mathrm{hour}}{\mathrm{day}} = 217 \mathrm{\ \$/day}

which amounts to about 6510 $ a month. Note that this is just the cost to push the data in and it’s not considering the cost for reads all day around.

The cost of the storage, assuming the compression ratio is the same as with HBase, is:

\frac{0.25\  \$}{\mathrm{GB}} \times 18000 \ \mathrm{GB} = 4500 \ \$

Finally, if we consider also the cost of the batch job (788 $) we have a total spending of about 11800 $ per month.

In conclusion the HBase solution is cheaper and more flexible. For example, one could keep around historical data on S3 and not have an HBase cluster serving it until it’s really needed. The con is that HBase isn’t automagically managed and as such it requires operational effort.

How do I use this stuff?

We have created a mirrored view in HBase of the main summary dataset which is accessible through a Python API. The API allows one to retrieve the history of main pings for a small subset of client ids sorted by activity date:

view = HBaseMainSummaryView()
history = view.get(sc, ["00000000-0000-0000-0000-000000000000"])

We haven’t yet decided if this is something we want to support and keep around and we will make that decision once we have an understanding of the usefulness it provides to our analysts.

A martingale approach to detect changes in histograms

This post is co-authored by Alessio Placitelli.

If a user has opted into submitting performance data to Mozilla, the Telemetry system will collect various measures of Firefox performance, hardware, usage and customizations and submit it to Mozilla. Telemetry histograms are the preferred way to track numeric measurements such as timings.

The histogram below is taken from Firefox’s about:telemetry page. It shows a histogram used for tracking plugin shutdown times and the data collected over a single Firefox session. The timing data is grouped into buckets where the height of the blue bars represents the number of items in each bucket. The tallest bar, for example, indicates that there were 63 plugin shutdowns lasting between 129 ms and 204 ms.

sampleHistogram.png

The previous metric is just one of the many the Firefox browser collects and sends over to our pipeline, which aggregates metrics by build and submission date and makes the roll ups available through the telemetry.js v2 API as time series:

plugin.jpegDetecting changes over time is crucial to alert developers when a build regressed a performance measure. There is an ad-hoc system in place at the moment but it’s hard to tune and adapt to new measurement types.

Individual histograms can be represented as vectors. What we ultimately want to test is if a sequence of vectors, i.e. a time series for a particular measure, is  independent and identically distributed. As traditional statistical approaches to test this property are inappropriate for high dimensional data we tried to address this problem using martingales.

Martingales?

A martingale is a model of a fair game where knowledge of past events never helps predict the mean of the future winnings and only the current event matters. In particular, a martingale is a sequence of random variables for which, at a particular time in the realized sequence, the expectation of the next value in the sequence is equal to the present observed value even given knowledge of all prior observed values.

\displaystyle \mathbf{E}(|X_N|) < \infty

\displaystyle \mathbf{E}(X_{n+1} | X_1, ..., X_n) = X_n

Before we describe how a martingale can be used to detect changes in time series, we introduce the concept of strangeness measure to assess how much an observation differs from other ones. Say we have a set of vectors S and we want to determine how different a new vector \mathbf{x} is from S . One way to do that is to compute the average vector \mathbf{\bar{s}} from S and take the euclidean distance of \mathbf{x} from \mathbf{\bar{s}} . The euclidean distance acts as a strangeness measure in this case.

The martingale change detection algorithm proceeds in two steps. Observations are presented one by one in an online fashion. For each new observation o_n , its strangeness score a_n is determined and consequently its p-value p_n computed given the scores of past observations:

\displaystyle p_n = \frac{\# \{i: a_i > a_n \} + \theta_n \#\{i: a_i = a_n\}}{n} 

where \theta_n is a random number from [0, 1] . It can be shown that if all observations are i.i.d. then the p-values are uniformly distributed in [0, 1] .

In the second step a family of martingales is introduced, indexed by \epsilon \in [0, 1] , and referred to as the randomized power martingale:

\displaystyle M_n^{(\epsilon)} = \prod_{i=1}^{n}(\epsilon p_i^{\epsilon - 1})

Where the p_is are the p-values discussed above, with the initial martingale M_0^{(\epsilon)} = 1 . If and when the martingale grows larger than a set threshold \lambda > 0, the assumption that there is no change in the time series can be rejected based on Doob’s inequality:

P(\mathrm{max}\ M_k \ge \lambda) = \frac{1}{\lambda}

This inequality means that it is unlikely for any M_k to have a high value.

Implementation

The implementation is relatively straightforward but we had to tweak it to adapt it to our use case. To highlight the salient points we will use only a few measurements to do so but bear in mind that they apply more generally to other measures as well.

Let’s start by checking how the change detection algorithm, with the cosine distance as strangeness measure, performs on a time series we know didn’t change. GC_MS tracks the time spent running the Javascript garbage collector in ms:

gc_ms_1.jpeg

As shown in the plots above, the histogram exhibits a weekly seasonality due to the fact that less data is collected during the weekends. That seasonality can cause the martingale to grow when it really shouldn’t which ultimately could lead to false positives.

Since we mostly care about the shape of the histogram we will try normalizing the histogram first:

gc_ms_.jpeg

As the plots show, the martingale now grows less than in the previous case as we reduced the noise in the time series by eliminating the seasonality.

Notice also how the martingale grows initially since it hasn’t seen yet many observations. That can cause to yield a higher number of false positives under some circumstances.  To avoid it we will force the martingale to not change for the first N = 5 points so that the algorithm has a chance to learn how the strangeness distribution looks like.

gc-3.jpeg

Now we will test the change detection algorithm on a histogram known to have changed in the given time period.

TELEMETRY_ARCHIVE_CHECKING_OVER_QUOTA_MS measures the time in ms it takes to check if the telemetry archive is over-quota:

archive.jpeg

The heatmap shows clearly that there was a change on the 7th of October. The problem now is that that the martingale doesn’t grow fast enough and tends to stay close to 1 and as such Doob’s inequality is hardly applicable. This is due to the fact that we have only few data points available.

One way to deal with the martingale not growing fast enough is “to cheat” and use spline interpolation to generate more points.

archive-3.jpeg

Now we can spot a nice spike in the martingale plot! Note also how the martingale is reset to 1 when it crosses the configurable change threshold which in this case is set to 20.

The proposed algorithm catches many changes that are easily spotted by a human but also some that are debatable. Ideally we want to reduce the number of false positives as much as possible even at the cost of missing some real changes. A high number of false positives could cause developers to loose trust, and ultimately ignore, the detected changes.

places-3.jpeg

It’s very hard to see any change at all in the heatmap for PLACES_KEYWORDS_COUNT. What’s going on? It appears that bin 0, where most of the distribution is concentrated, is mostly stable until it isn’t; in other words a slight change of that bin can cause the martingale to grow very fast:

bin0.jpeg

One way to deal with the issue is to increase the change threshold which in turn can cause to miss some real changes. A better way to deal with it is to check how the histogram bins are affected before and after a detected change and ignore it if the weight difference is so small that even if it is statistically significant it isn’t practically of relevance.

Without further ado let’s see some other changes caught by the algorithm in the given time period:

minor3.jpeg
Note how the martingale grows slightly around the 3rd of October where there was a temporary change.
geo-3.jpeg
This histogram is rather noisy and we will have to blacklist it.
plugins-3.jpeg
Bins 0 and 1 have been practically inverted on the 6th of October.
telem.jpeg
Note how the martingale grew during the first change around the 28th of September and then slowly started decreasing after the time series nearly went back to its original form.

And now a few histograms for which no change was detected:

thumbnail.jpeg
This histogram is way too noisy and the martingale plot reflects it.
nochange.jpeg
This histogram didn’t change at all.

Evaluation

Since our time-series are unlabeled we were forced to evaluate the performance and tune the parameters of our anomaly detection algorithm manually. To do so we used the number of changes found and precision to tune our model as we wanted to catch as many anomalies as possible, but more importantly, keep the number of false positives to a very small number (one digit percentage). Ideally if we had a labelled set of all changes we could just cross-validate our model. Unfortunately that’s not the case though.

We might consider in the future to use Mechanical Turk to label a set of time-series but as it currently stands we don’t have the time to do so ourselves as the number of histograms is extremely high  (in the order of several thousands) and labeling anomalies requires not only to look at the data but also to consider known performance regressions/changes of the Firefox browser.

Differential Privacy for Dummies

Technology allows companies to collect more data and with more detail about their users than ever before. Sometimes that data is sold to third parties, other times it’s used to improve products and services.

In order to protect users’ privacy, anonymization techniques can be used to strip away any piece of personally identifiable data and let analysts access only what’s strictly necessary. As the Netflix competition in 2007 has shown though, that can go awry. The richness of data allows to identify users through a sometimes surprising combination of variables like the dates on which an individual watched certain movies. A simple join between an anonymized datasets and one of many publicly available, non-anonymized ones, can re-identify anonymized data.

Aggregated data is not much safer either under some circumstances! For example, say we have two summary statistics: one is the number of users, including Frank, that watch one movie per day and the other is the number of users, without Frank, that watch one movie per day. Then, by comparing the counts, we could tell if Frank watches one movie per day.

Differential Privacy to the rescue

Differential privacy formalizes the idea that a query should not reveal whether any one person is present in a dataset, much less what their data are. Imagine two otherwise identical datasets, one with your information in it, and one without it. Differential Privacy ensures that the probability that a query will produce a given result is nearly the same whether it’s conducted on the first or second dataset. The idea is that if an individual’s data doesn’t significantly affect the outcome of a query, then he might be OK in giving his information up as it is unlikely that the information would be tied back to him. The result of the query can damage an individual regardless of his presence in a dataset though. For example, if an analysis on a medical dataset finds a correlation between lung cancer and smoking, then the health insurance cost for a particular smoker might increase regardless of his presence in the study.

More formally, differential privacy requires that the probability of a query producing any given output changes by at most a multiplicative factor when a record (e.g. an individual) is added or removed from the input. The largest multiplicative factor quantifies the amount of privacy difference. This sounds harder than it actually is and the next sections will iterate on the concept with various examples, but first we need to define a few terms.

Dataset

We will think of a dataset d as being a collection of records from an universe U . One way to represent a dataset d is with a histogram in which each entry d_i represents the number of elements in the dataset equal to u_i \in U . For example, say we collected data about coin flips of three individuals, then given the universe U = \{head, tail\} , our dataset d would have two entries: d_{head} = i and d_{tail} = j , where i + j = 3 . Note that in reality a dataset is likely to be an ordered lists of rows (i.e. a table) but the former representation makes the math a tad easier.

Distance

Given the previous definition of dataset, we can define the distance between two datasets x, y with the l_1 norm as:

\displaystyle ||x - y||_1 = \sum\limits_{i=1}^{|U|}  |x_i - y_i|

Mechanism

A mechanism is an algorithm that takes as input a dataset and returns an output, so it can really be anything, like a number, a statistical model or some aggregate. Using the previous coin-flipping example, if mechanism C counts the number of individuals in the dataset, then C(d) = 3 . In practice though we will specifically considering randomized mechanisms, where the randomization is used to add privacy protection.

Differential Privacy

A mechanism M satisfies \epsilon differential privacy if for every pair of datasets x, y such that ||x - y||_1 \leq 1 , and for every subset S \subseteq \text{Range}(M) :

\displaystyle \frac{Pr[M(x) \in S]}{Pr[M(y) \in S]} \leq e^{\epsilon}

What’s important to understand is that the previous statement is just a definition. The  definition  is  not  an  algorithm,  but  merely  a  condition that must be satisfied by a mechanism to claim that it satisfies \epsilon differential privacy. Differential privacy allows researchers to use a common framework to study algorithms and compare their privacy guarantees.

Let’s check if our mechanism C satisfies 1 differential privacy. Can we find a counter-example for which:

\displaystyle \frac{Pr[C(x) \in S]}{Pr[C(y) \in S]} \leq e

is false? Given x, y such that ||x - y||_1 = 1 and ||x||_1 = k , then:

\displaystyle \frac{Pr[C(x) = k]}{Pr[C(y) = k]} \leq e

i.e. \frac{1}{0} \leq e , which is clearly false, hence this proves that mechanism C doesn’t satisfy 1 differential privacy.

Composition theorems

A powerful property of differential privacy is that mechanisms can easily be composed. These require the key assumption that the mechanisms operate independently given the data.

Let d be a dataset and g an arbitrary function. Then, the sequential composition theorem asserts that if M_I(d) is \epsilon_i differentially private, then M(d) = g(M_1(d), M_2(d), ..., M_N(d)) is \sum_{i=1}^{N}\epsilon_i differentially private. Intuitively this means that given an overall fixed privacy budget, the more mechanisms are applied to the same dataset, the more the available privacy budget for each individual mechanism will decrease.

The parallel composition theorem asserts that given N partitions of a dataset d , if for an arbitrary partition d_i , M_i(d_i) is \epsilon differentially private, then M(d) = g(M_1(d_1), M_2(d_2), ..., M_N(d_N)) is \epsilon differentially private. In other words, if a set of \epsilon differentially private mechanisms is applied to a set of disjoint subsets of a dataset, then the combined mechanism is still \epsilon differentially private.

The randomized response mechanism

The first mechanism we will look into is “randomized response”, a technique developed in the sixties by social scientists to collect data about embarrassing or illegal behavior. The study participants have to answer a yes-no question in secret using the following mechanism M_R(d, \alpha, \beta) :

  1. Flip a biased coin with probability of heads \alpha ;
  2. If heads, then answer truthfully with d ;
  3. If tails, flip a coin with probability of heads \beta and answer “yes” for heads and “no” for tails.

In code:

def randomized_response_mechanism(d, alpha, beta):
    if random() < alpha:
        return d
    elif random() < beta:
        return 1
    else:
        return 0

Privacy is guaranteed by the noise added to the answers. For example, when the question refers to some illegal activity, answering “yes” is not incriminating as the answer occurs with a non-negligible probability whether or not it reflects reality, assuming \alpha and \beta are tuned properly.

Let’s try to estimate the proportion p of participants that have answered “yes”. Each participant can be modeled with a Bernoulli variable X_i which takes a value of 0 for “no” and a value of 1 for “yes”. We know that:

\displaystyle P(X_i = 1) = \alpha p + (1 - \alpha) \beta

Solving for p yields:

\displaystyle p = \frac{P(X_i = 1) - (1 - \alpha) \beta}{\alpha}

Given a sample of size n , we can estimate P(X_i = 1)  with \frac{\sum_{i=1}^{i=n} X_i}{n} .  Then, the estimate \hat{p} of p is:

\displaystyle \hat{p} = \frac{\frac{\sum\limits_{i=1}^{i=n} X_i}{n} - (1 - \alpha) \beta}{\alpha}

To determine how accurate our estimate is we will need to compute its standard deviation. Assuming the individual responses X_i are independent, and using basic properties of the variance,

\displaystyle \mathrm{Var}(\hat{p}) = \mathrm{Var}\biggl({\frac{\sum_{i=1}^{i=n} X_i}{n \alpha}}\biggr) = {\frac{\mathrm{Var}(X_i)}{n \alpha^2}}

By taking the square root of the variance we can determine the standard deviation of \hat{p} . It follows that the standard deviation s is proportional to \frac{1}{\sqrt{n}} , since the other factors are not dependent on the number of participants. Multiplying both \hat{p} and s by n yields the estimate of the number of participants that answered “yes” and its relative accuracy expressed in number of participants, which is proportional to \sqrt{n} .

The next step is to determine the level of privacy that the randomized response method guarantees. Let’s pick an arbitrary participant. The dataset d is represented with either 0 or 1 depending on whether the participant answered truthfully with a “no” or “yes”. Let’s call the two possible configurations of the dataset respectively d_{no} and d_{yes} . We also know that ||d_i - d_j||_1 \leq 1 for any i, j . All that’s left to do is to apply the definition of differential privacy to our randomized response mechanism M_R(d, \alpha, \beta) :

\displaystyle \frac{Pr[M_R(d_{i}, \alpha, \beta) = k]}{Pr[M_R(d_{j}, \alpha, \beta) = k]} \leq e^{\epsilon}

The definition of differential privacy applies to all possible configurations of i, j \text{ and } k , e.g.:

\displaystyle \begin{gathered}\frac{Pr[M_R(d_{yes}, \alpha, \beta) = 1]}{Pr[M_R(d_{no}, \alpha, \beta) = 1]} \leq e^{\epsilon} \\\ln \left( {\frac{\alpha + (1 - \alpha)\beta}{1 - (\alpha + (1 - \alpha)\beta)}} \right) \leq \epsilon \\\ln \left( \frac{\alpha + \beta - \alpha \beta}{1 - (\alpha + \beta - \alpha \beta)} \right) \leq \epsilon\end{gathered}

The privacy parameter \epsilon can be tuned by varying \alpha \text{ and } \beta . For example, it can be shown that the randomized response mechanism with \alpha = \frac{1}{2} and \beta = \frac{1}{2} satisfies \ln{3} differential privacy.

The proof applies to a dataset that contains only the data of a single participant, so how does this mechanism scale with multiple participants? It follows from the parallel composition theorem that the combination of \epsilon differentially private mechanisms applied to the datasets of the individual participants is \epsilon differentially private as well.

The Laplace mechanism

The Laplace mechanism is used to privatize a numeric query. For simplicity we are going to assume that we are only interested in counting queries f , i.e. queries that count individuals, hence we can make the assumption that adding or removing an individual will affect the result of the query by at most 1.

The way the Laplace mechanism works is by perturbing a counting query f with noise distributed according to a Laplace distribution centered at 0 with scale b = \frac{1}{\epsilon} ,

\displaystyle Lap(x | b) = \frac{1}{2b}e^{- \frac{|x|}{b}}

Then, the Laplace mechanism is defined as:

\displaystyle M_L(x, f, \epsilon) = f(x) + Z

where Z is a random variable drawn from Lap(\frac{1}{\epsilon}) .

In code:

def laplace_mechanism(data, f, eps):
    return f(data) + laplace(0, 1.0/eps)

It can be shown that the mechanism preserves \epsilon differential privacy. Given two datasets x, y such that ||x - y||_1 \leq 1 and a function f which returns a real number from a dataset, let p_x denote the probability density function of M_L(x, f, \epsilon) and p_y the probability density function of M_L(y, f, \epsilon) . Given an arbitrary real point k ,

\displaystyle \frac{p_x(k)}{p_y(k)} = \frac{e^{- \epsilon |f(x) - k|}}{e^{- \epsilon |f(y) - k|}} =

\displaystyle e^{\epsilon (|f(x) - k| - |f(y) - k|)} \leq

\displaystyle e^{\epsilon |f(x) - f(y)|}

by the triangle inequality. Then,

\displaystyle e^{\epsilon |f(x) - f(y)|} \leq e^\epsilon

What about the accuracy of the Laplace mechanism? From the cumulative distribution function of the Laplace distribution it follows that if Z \sim Lap(b) , then Pr[|Z| \ge t \times b] = e^{-t} . Hence, let k = M_L(x, f, \epsilon) and \forall \delta \in (0, 1] :

\displaystyle Pr \left[|f(x) - k| \ge \ln{\left (\frac{1}{\delta} \right)} \times \frac{1}{\epsilon} \right] =

\displaystyle Pr \left[|Z| \ge \ln{\left (\frac{1}{\delta} \right)} \times \frac{1}{\epsilon} \right] = \delta

where Z \sim Lap(\frac{1}{\epsilon}) . The previous equation sets a probalistic bound to the accuracy of the Laplace mechanism that, unlike the randomized response, does not depend on the number of participants n .

Counting queries

The same query can be answered by different mechanisms with the same level of differential privacy. Not all mechanisms are born equally though; performance and accuracy have to be taken into account when deciding which mechanism to pick.

As a concrete example, let’s say there are n individuals and we want to implement a query that counts how many possess a certain property p . Each individual can be represented with a Bernoulli random variable:

participants = binomial(1, p, n)

We will implement the query using both the randomized response mechanism M_R(d, \frac{1}{2}, \frac{1}{2}) , which we know by now to satisfy \ln{3} differential privacy, and the Laplace mechanism M_L(d, f, \ln{3}) which satisfies \ln{3} differential privacy as well.

def randomized_response_count(data, alpha, beta):
    randomized_data = randomized_response_mechanism(data, alpha, beta)
    return len(data) * (randomized_data.mean() - (1 - alpha)*beta)/alpha

def laplace_count(data, eps):
    return laplace_mechanism(data, np.sum, eps)

r = randomized_response_count(participants, 0.5, 0.5)
l = laplace_count(participants, log(3))

Note that while that while M_R is applied to each individual response and later combined in a single result, i.e. the estimated count, M_L is applied directly to the count, which is intuitively why M_R is noisier than M_L . How much noisier?  We can easily simulate the distribution of the accuracy for both mechanisms with:

def randomized_response_accuracy_simulation(data, alpha, beta, n_samples=1000):
    return [randomized_response_count(data, alpha, beta) - data.sum()
           for _ in range(n_samples)]

def laplace_accuracy_simulation(data, eps, n_samples=1000):
    return [laplace_count(data, eps) - data.sum()
            for _ in range(n_samples)]

r_d = randomized_response_accuracy_simulation(participants, 0.5, 0.5)
l_d = laplace_accuracy_simulation(participants, log(3))

As mentioned earlier, the accuracy of M_R grows with the square root of the number of participants:

index
Randomized Response Mechanism Accuracy

while the accuracy of M_L is a constant:

index
Laplace Mechanism Accuracy

You might wonder why one would use the randomized response mechanism if it’s worse in terms of accuracy compared to the Laplace one. The thing about the Laplace mechanism is that the private data about the users has to be collected and stored, as the noise is applied to the aggregated data. So even with the best of intentions there is the remote possibility that an attacker might get access to it.  The randomized response mechanism though applies the noise directly to the individual responses of the users and so only the perturbed responses are collected! With the latter mechanism any individual’s information cannot be learned with certainty, but an aggregator can still infer population statistics.

That said, the choice of mechanism is ultimately a question of which entities to trust. In the medical world, one may trust the data collectors (e.g. researchers), but not the general community who will be accessing the data. Thus one collects the private data in the clear, but then derivatives of it are released on request with protections. However, in the online world, the user is generally looking to protect their data from the data collector itself, and so there is a need to prevent the data collector from ever accumulating the full dataset in the clear.

Real world use-cases

The algorithms presented in this post can be used to answer simple counting queries. There are many more mechanisms out there used to implement complex statistical procedures like machine learning models. The concept behind them is the same though: there is a certain function that needs to be computed over a dataset in a privacy preserving manner and noise is used to mask an individual’s original data values.

One such mechanism is RAPPOR, an approach pioneered by Google to collect frequencies of an arbitrary set of strings. The idea behind it is to collect vectors of bits from users where each bit is perturbed with the randomized response mechanism.  The bit-vector might represent a set of binary answers to a group of questions, a value from a known dictionary or, more interestingly, a generic string encoded through a Bloom filter. The bit-vectors are aggregated and the expected count for each bit is computed in a similar way as shown previously in this post. Then, a statistical model is fit to estimate the frequency of a candidate set of known strings. The main drawback with this approach is that it requires a known dictionary.

Later on the approach has been improved to infer the collected strings without the need of a known dictionary at the cost of accuracy and performance. To give you an idea, to estimate a distribution  over  an  unknown  dictionary  of  6-letter  strings without knowing  the  dictionary,  in  the  worst  case,  a sample size in  the  order  of  300  million  is required; the sample size grows quickly as the length of the strings increases. That said, the mechanism consistently finds the most frequent strings which enable to learn the dominant trends of a population.

Even though the theoretical frontier of differential privacy is expanding quickly there are only a handful implementations out there that, by ensuring  privacy without the need for a trusted third party like RAPPOR, suit well the kind of data collection schemes commonly used in the software industry.

References

Best practices for peer-reviewing a data analysis

Writing good code is hard, writing a good analysis is harder. Peer-review is an essential tool to fight repetitive errors, omissions and more generally divulge knowledge. I found the use of a checklist to be invaluable to help me remember the most important things I should watch out for during a review. It’s far too easy to focus on few details and ignore others which might be catched (or not) in a successive round.

I don’t religiously apply every bullet point of the following checklist to every analysis, nor is this list complete; more items would have to be added depending on the language, framework, libraries, models, etc. used.

  • Is the question the analysis should answer clearly stated?
  • Is the best/fastest dataset that can answer the question being used?
  • Do the variables used measure the right thing (e.g. submission date vs activity date)?
  • Is a representative sample being used?
  • Are all data inputs checked (for the correct type, length, format, and range) and encoded?
  • Do outliers need to be filtered or treated differently?
  • Is seasonality being accounted for?
  • Is sufficient data being used to answer the question?
  • Are comparisons performed with hypotheses tests?
  • Are estimates bounded with confidence intervals?
  • Should the results be normalized?
  • If any statistical method is being used, are the assumptions of the model met?
  • Is correlation confused with causation?
  • Does each plot communicate an important piece of information or address a question of interest?
  • Are legends and axes labelled and do the they start from 0?
  • Is the analysis easily reproducible?
  • Does the code work, i.e. does it perform its intended function?
  • Is there a more efficient way to solve the problem, assuming performance matters?
  • Does the code read like prose?
  • Does the code conform to the agreed coding conventions?
  • Is there any redundant or duplicate code?
  • Is the code as modular as possible?
  • Can any global variables be replaced?
  • Is there any commented out code and can it be removed?
  • Is logging missing?
  • Can any of the code be replaced with library functions?
  • Can any debugging code be removed?
  • Where third-party utilities are used, are returning errors being caught?
  • Is any public API commented?
  • Is any unusual behavior or edge-case handling described?
  • Is there any incomplete code? If so, should it be removed or flagged with a suitable marker like ‘TODO’?
  • Is the code easily testable?
  • Do tests exist and do they actually test that the code is performing the intended functionality?

What is it like to be a Data Engineer at Mozilla?

When people ask me what I do at Mozilla and I tell them I work with data, more often than not their reaction is something like “Wait, what? What kind of data could Mozilla possibly have?” But let’s start from the beginning of my journey. Even though I have worked on data systems since my first internship, I didn’t start my career at Mozilla officially as a Data Engineer. I was hired to track, investigate, and fix performance problems in the Firefox browser; an analyst of sorts, given my background in optimizing large scientific applications for the ATLAS experiment at the LHC.

The thing with applications for which you have some control over how they are used is that you can “just” profile your known use cases and go from there. Profiling a browser is an entirely different story. Every user might stress the browser in different ways and you can’t just sample stacks and indiscriminately collect data from your users without privacy and performance consequences. If that wasn’t enough, there are as many different hardware/software configurations as there are users. Even with the same configuration, a different driver version can be make a huge difference. This is where Telemetry comes in, an opt-in data acquisition system that collects general performance metrics like garbage collection timings and hardware information, like the GPU model, across our user-base. In case you are wondering what data I am talking about, why don’t you have a look? You are just one click away from about:telemetry, a page that shows the data that Telemetry collects.

Imagine receiving Terabytes of data with millions of payloads per day, each containing thousands of complex hierarchical measurements, and being tasked to make sense of that data, to find patterns in it, how would you do it? Step 1 is getting the data, how hard can it possibly be, right? In order to do that, I had to fire a script to launch an EC2 instance to which I had to ssh into and from which I could run a custom, non-distributed, MapReduce (MR) system written in Python. This was a couple of years ago; you might be wondering why anyone would use a custom MR system instead of something like Hadoop back then. It turns out that it was simple enough to support the use cases the performance team had at the time and when something went wrong it was easy to debug. Trust me, debugging a full-fledged distributed system can be painful.

The system was very simple, the data was partitioned on S3 according to a set of dimensions, like submission date, and the MR framework allowed the user to define the which values to use for each dimension. My first assignment was to get back the ratio of users that had a solid state disk. Sounds simple enough, yet it took me a while to get the data I wanted out of the system. The debug cycle was very time consuming; I had to edit my job, try to run it and if something went wrong start all over again. An analysis for a single day could take hours to complete and, as it’s often the case with data, if something that was supposed to be a string turned out be an integer because of data corruption, and I failed to handle that case, the whole thing exploded.

There weren’t many people running Telemetry analyses back then; it was just too painful. In time, as more metrics were added and I tried to answer more questions with data, I decided to step back for a moment and try something new. I played around with different systems like Pig, mrjob, and Spark. Though Spark has a rich API with far more operators than map & reduce, what ultimately sold me was the fact that it had a read–eval–print loop. If your last command failed, you could just retry it without having to rerun the whole job from the beginning! I initially used a Scala API to run my analyses but I soon started missing some of the Python libraries I grew used to for analysis, like pandas, scipy and matplotlib; the Scala counterpart just weren’t on par with it. I wanted to bring others on board and convincing someone to write analyses in a familiar language was a lot easier. So I built a tiny wrapper around Spark’s Python API which lets user run queries on our data from a Jupyter notebook and in time the number of people using Telemetry for their analytics needs grew.

I deliberately ignored many design problems that had to be solved. What storage format should be used? How should the data be partitioned? How should the cluster(s) be provisioned; EMR, ECS or something else entirely? How should recurrent jobs be scheduled?

Telemetry is one of the most complex data sources we have but there are many others, like the server logs from a variety of services. Others were solving similar problems as the ones I faced but with different approaches, so we joined forces. Fast-forward a couple of years and Telemetry has changed a lot. A Heka pipeline pre-processes and validates the Telemetry data before dumping it on S3. Scala jobs convert the raw JSON data in more manageable derived Parquet datasets that can then be read by Spark with Python, Scala or R. The scope of Telemetry has expanded to drive not only performance needs but business ones as well. Derived datasets can be queried with SQL through Presto and re:dash allows us to build and share dashboard among peers. Jobs can be scheduled and monitored with Airflow. Telemetry based analyses are being run at all level of the organization: no longer are few people responsible to answer every imaginable data related question: there are interns running statistically rigorous analyses on A/B experiments and Senior Managers building retention plots for their products. Analyses are being shared as Jupyter notebooks and peer-reviewed on Bugzilla. It’s every engineer’s responsibility to know how to measure how well its own feature or product is performing.

There is this popular meme about how Big data is like teenage sex: everyone talks about it, nobody really knows how to do it, everyone thinks everyone else is doing it, so everyone claims they are doing it. The truth is that there are only so many companies out there that can claim a user base with millions of users. This is one of the thing I love the most about Mozilla, we aren’t big as the giants but we aren’t small either. As an engineer, I get to solve challenging problems with the agility, flexibility and freedom of a startup. Our engineers work on interesting projects and there is a lot of room for trying new things, disrupting the status quo and pushing our comfort zone.

What’s the life of a Data engineer at Mozilla nowadays? How do we spend our time and what are our responsibilities?

Building the data pipeline

We build, maintain, monitor and deploy our data infrastructure and make sure that it works reliably and efficiently. We strive to use the smallest number of technologies that can satisfy our needs and work well in concert.

What kind of data format (e.g. Avro, Parquet) and compression codec (e.g. Snappy, LZO) should be used for the data? Should the data be stored on HDFS or S3? How should it be partitioned and bucketed? Where should the metadata (e.g. partition columns) be stored? Should the data be handled in real-time or in batch? Should views be recomputed incrementally or from scratch? Does summary data need to be accurate or can probabilistic data structures be used? Those are just some of the many decisions a data engineer is expected to take.

We contribute to internal projects. Heka for example, a stream processing system, is used in a multitude of ways: from loading and parsing log files to performing real time analysis, graphing, and anomaly detection.

At Mozilla, we love open source. When we built our stream processing system, we open sourced it. When different data systems like Spark or Presto need some custom glue to make them play ball, we extend them to fit our needs. Everything we do is out in the open; many of our discussions happen either on Bugzilla and on public mailing lists. Whatever we learn we share with the outside world.

Supporting other teams

Engineers, managers, and executives alike come to us with questions that they would like to answer with data but don’t know yet quite how to. We try to understand which are the best metrics that can answer their questions, assuming they can be answered in the first place. If the metrics required are missing then we add them to our pipeline. This could mean adding instrumentation to Firefox or, if the desired metrics can be derived from simpler ones that are available in the raw data, extending our derived datasets.

Once the data is queryable, we teach our users how to access it with e.g. Spark, Presto or Heka, depending on where the dataset is stored and what their requirements are. We learn from common patterns and inefficiencies by supporting our users’ analysis needs. For example, if a considerable amount queries require to group a dataset in a certain way, we might opt to build a pre-grouped version of the dataset to improve the performance.

We strive to review many of our users’ analyses. It’s far too easy to make mistakes when analyzing data since the final deliverables, like plots, can look reasonable even in the presence of subtle bugs.

Doing data science

We build alerting mechanisms that monitor our data to warn us when something is off. From representing time-series with hierarchical statistical models to using SAX for anomaly detection, the fun never stops.

We create dashboards for general use cases, like the display of daily aggregates partitioned by a set of dimensions or to track engagement ratios of products & features.

We validate hypotheses with A/B tests. For example, one of the main features we are going to ship this year, Electrolysis, requires a clear understanding of its performance impact before shipping it. We designed and ran A/B tests to compare with statistically sound methods a vast variety of performance metrics with and without Electrolysis enabled.

There is also no shortage of analyses that are more exploratory in nature. For example, we recently noticed a small spike in crashes whenever a new release is shipped. Is it a measurement artifact due to crashes that happened during the update? Or maybe there is something off that causes Firefox to crash after a new version is installed, and if so what is it? We don’t have an answer to this conundrum yet.

The simpler it is to get answers, the more questions will be asked.

This has been my motto for a while. Being data driven doesn’t mean having a siloed team of wizards and engineers that poke at data. Being data driven means that everyone in the company knows how to access the data they need in order to answer the questions they have; that’s the ultimate goal of a data engineering team. We are getting there but there is still a lot to do. If you have read so far, are passionate about data and have relevant skills, check out our jobs page.

Measuring product engagment at scale

How engaged are our users for a certain segment of the population? How many users are actively using a new feature? One way to answer that question is to compute the  engagement ratio (ER) for that segment, which is defined as daily active users (DAU) over monthly active users (MAU), i.e.

ER_{segment} =  \frac{DAU_{segment}}{MAU_{segment}}

Intuitively the closer the ratio is to 1, the higher the number of returning users. A segment can be an arbitrary combination of values across a set of dimensions, like operating system and activity date.

Ideally, given a set of D dimensions, pre-computing the ER for all possible combinations of values of said dimensions would ensure that user queries run as fast as possible. Clearly that’s not very efficient though; if we assume for simplicity that every dimension has k possible values, then there are \sum_{d=1}^{D}\binom{D}{d} k^d ratios.

Is there a way around computing all of those ratios while still having an acceptable query latency? One could build a set of users for each value of each dimension and then at query time simply use set union and set intersection to compute any desired segment. Unfortunately that doesn’t work when you have millions of users as the storage complexity is proportional to the number of items stored in the sets. This is where probabilistic counting and the HyperLogLog sketch comes into play.

HyperLogLog

The HyperLogLog sketch can estimate cardinalities well beyond 10^9 with a standard error of 2% while only using a couple of KB of memory.

The intuition behind is simple. Imagine a hash function that maps user identifiers to a string of N bits. A good hash function ensures that each bit has the same probability of being flipped. If that’s the case then the following is true:

  • 50 % of hashes have the prefix 1
  • 25 % of hashes have the prefix 01
  • 12.5% of hashes the prefix 001

Intuitively, if 4 distinct values are hashed we would expect to see on average one single hash with a prefix of 01 while for 8 distinct values we would expect to see one hash with a prefix of 001 and so on. In other words, the cardinality of the original set can be estimated from the longest prefix of zeros of the hashed values. To reduce the variability of this single estimator, the average of K estimators can be used as the approximated cardinality and it can be shown that the standard error of a HLL sketch is \frac{1.04}{sqrt(K)}.

The detailed algorithm is documented in the original paper, and its practical implementation and variants are covered in depth by a 2013 paper from Google.

Set operations

One of the nice properties of HLL is that the union of two HLL sketches is very simple to compute as the union of a single estimator with another estimator is just the maximum of the two estimators, i.e. the longest prefix of zeros. This property makes the algorithm trivially parallelizable which makes it well suited for map-reduce style computations.

What about set intersection? There are two ways to compute that for e.g. two sets:

It turns out that both approaches yield bad approximations when the overlap is small, which makes set intersection not particularly attractive for many use-cases, which is why we decided to use only the union operation for HLL sketches for our datasets.

Going back to the original problem of estimating the ER for an arbitrary segment, by computing the HLL sketches for all combinations of values across all dimensions the HLL sketch for any segment can be derived using set union.

Implementation

At Mozilla we use both Spark and Presto for analytics and even though both support HLL  their implementation is not compatible, i.e. Presto can’t read HLL sketches serialized from Spark. To that end we created a Spark package, spark-hyperloglog, and a Presto plugin, presto-hyperloglog, to extend both systems with the same HLL implementation.

Usage

Mozilla employees can access the client_count table from re:dash and run Presto queries against it to compute DAU, MAU or the ER for a certain segment. For example, this is how DAU faceted by channel and activity date can be computed:


SELECT normalizedchannel,
       activitydate,
       cardinality(merge(cast(hll AS HLL))) AS hll
FROM client_count
GROUP BY activitydate, normalizedchannel
ORDER BY normalizedchannel, activitydate DESC

The merge aggregate function computes the set union while the cardinality function returns the cardinality of a sketch. A complete example with DAU, MAU and ER for the electrolysis engagement ratio can be seen here.

By default the HLL sketches in the client_count table have a standard error of about 1.6%.