An overview of Mozilla’s Data Pipeline

This post describes the architecture of Mozilla’s data pipeline, which is used to collect Telemetry data from our users and logs from various services. One of the cool perks of working at Mozilla is that most of what we do is out in the open and because of that I can do more than just show you some diagram with arrows of our architecture; I can point you to the code, script & configuration that underlies it!

To make the examples concrete, the following description is centered around the collection of Telemetry data. The same tool-chain is used to collect, store and analyze data coming from disparate sources though, such as service logs.

center.jpeg

Firefox

There are different APIs and formats to collect data in Firefox, all suiting different use cases:

  • histogramsfor recording multiple data points;
  • scalars – for recording single values;
  • timings – for measuring how long operations take;
  • events – for recording time-stamped events.

These are commonly referred to as probes. Each probe must declare the collection policy it conforms to: either opt-out (aka base telemetry) or opt-in (aka extended telemetry). When adding a new measurement data-reviewers carefully inspect the probe and eventually approve the requested collection policy:

  • opt-out (base telemetry) data is collected by default from all Firefox users; users may choose to turn this data collection off in preferences;
  • opt-in (extended telemetry) data is collected from users who explicitly express a choice to help with Firefox development; this includes all users who install pre-release & testing builds, plus release users who have explicitly checked the box in preferences.

A session begins when Firefox starts up and ends when it shuts down. As a session could be long-running and last weeks, it gets sliced into smaller logical units called subsessions. Each subsession generates a batch of data containing the current state of all probes collected so far, i.e. a main ping, which is sent to our servers. The main ping is just one of the many ping types we support. Developers can create their own ping types if needed.

Pings are submitted via an API that performs a HTTP POST request to our edge servers. If a ping fails to successfully submit (e.g. because of missing internet connection), Firefox will store the ping on disk and retry to send it until the maximum ping age is exceeded.

Kafka

HTTP submissions coming in from the wild hit a load balancer and then an NGINX module. The module accepts data via a HTTP request which it wraps in a Hindsight protobuf message and forwards to two places: a Kafka cluster and a short-lived S3 bucket (landfill) which acts as a fail-safe in case there is a processing error and/or data loss within the rest of the pipeline. The deployment scripts and configuration files of NGINX and Kafka live in a private repository.

The data from Kafka is read from the Complex Event Processors (CEP) and the Data Warehouse Loader (DWL), both of which use Hindsight.

Hindsight

Hindsight, an open source stream processing software system developed by Mozilla as Heka’s successor, is useful for a wide variety of different tasks, such as:

  • converting data from one format to another;
  • shipping data from one location to another;
  • performing real time analysis, graphing, and anomaly detection.

Hindsight’s core is a lightweight data processing kernel written in C that controls a set of Lua plugins executed inside a sandbox.

The CEP are custom plugins that are created, configured and deployed from an UI which produce real-time plots like the number of pings matching a certain criteria.  Mozilla employees can access the UI and create/deploy their own custom plugin in real-time without interfering with other plugins running.

sample.jpeg
CEP – a custom plugin in action

The DWL is composed of a set of plugins that transform, convert & finally shovel pings into S3 for long term storage. In the specific case of Telemetry data, an input plugin reads pings from Kafka, pre-processes them and sends batches to S3, our data lake, for long term storage. The data is compressed and partitioned by a set of dimensions, like date and application.

The data has traditionally been serialized to Protobuf sequence files which contain some nasty “free-form” JSON fields. Hindsight gained recently the ability to dump data directly in Parquet form though.

The deployment scripts and configuration files of the CEP & DWL live in a private repository.

Spark

Once the data reaches our data lake on S3 it can be processed with Spark. We provide a portal (ATMO) that allows Mozilla employees to create their own Spark cluster pre-loaded with a set of libraries & tools, like jupyter, numpy, scipy, pandas etc., and an API to conveniently read data stored in Protobuf form on S3 in a Spark RDD using a ORM-like interface. Behind the scenes we use EMR to create Spark clusters, which are then monitored by ATMO.

ui.jpeg
ATMO – monitoring clusters

ATMO is mainly used to write custom ad-hoc analyses; since our users aren’t necessary data engineers/scientists we chose Python as the main supported language to interface with Spark. From ATMO it’s also possible to schedule periodic notebook runs and inspect the results from a web UI.

As mentioned earlier, most of our data lake contains data serialized to Protobuf with free-form JSON fields. Needless to say, parsing JSON is terribly slow when ingesting TBs of data per day. A set of ETL jobs, written in Scala by Data Engineers and scheduled with Airflow, create Parquet views of our raw data.

A dedicated Spark job feeds daily aggregates to a Postgres database which powers a HTTP service to easily retrieve faceted roll-ups. The service is mainly used by TMO, a dashboard that visualizes distributions and time-series, and cerberus, an anomaly detection tool that detects and alerts developers of changes in the distributions. Originally the sole purpose of the Telemetry pipeline was to feed data into this dashboard but in time its scope and flexibility grew to support more general use-cases.

foo.jpeg
TMO – timeseries

Presto & re:dash

We maintain a couple of Presto clusters and a centralized Hive metastore to query Parquet data with SQL. The Hive metastore provides an universal view of our Parquet dataset to both Spark and Presto clusters.

Presto, and other databases, are behind a re:dash service (STMO) which provides a convenient & powerful interface to query SQL engines and build dashboards that can be shared within the company. Mozilla maintains its own fork of re:dash to iterate quickly on new features, but as good open source citizen we push our changes upstream.

img4.jpeg
STMO – who doesn’t love SQL?

HBase

We use HBase to store the history of pings by client. It’s a relatively new addition and it provides support for fast needle-in-haystack queries.

Is that it?

No, not really. For example, the DWL pushes some of the Telemetry data to Redshift and Elasticsearch but those tools satisfy more niche needs. The pipeline ingests logs from services as well and there are many specialized dashboards out there I haven’t mentioned.

There is a vast ecosystem of tools for processing data at scale, each with their pros & cons. The pipeline grew organically and we added new tools as new use-cases came up that we couldn’t solve with our existing stack. There are still scars left from that growth though which require some effort to get rid of, like ingesting data from schema-less format.

Telemetry meets HBase

At the end of November AWS announced that HBase on EMR supported S3 as data store. That’s great news because it means one doesn’t have to keep around an HDFS cluster with 3x replication, which is not only costly but it comes with its own operational burden.

At the same time we had some use cases that could have been addressed with a key-value store and this seemed like a good opportunity to give HBase a try.

What is HBase?

HBase is an open source, non-relational, distributed key-value store which traditionally runs on top of HDFS. It provides a fault-tolerant, efficient way of storing large quantities of sparse data using column-based compression and storage.

In addition, it provides fast lookup of data thanks to indexing and in-memory cache. HBase is optimized for sequential write operations, and is highly efficient for batch inserts, updates, and deletes. HBase also supports cell versioning so one can look up and use several previous versions of a cell or a row.

The system can be imagined as a distributed log-structured merge tree and is ultimately an open source implementation of Google’s BigTable whitepaper. A HBase table is partitioned horizontally in so called regions, which contains all rows between the region’s start and end key. Region Servers are responsible to serve regions while the HBase master handles region assignments and DDL operations.

A region server has:

  • a BlockCache which serves as a LRU read cache;
  • a BucketCache (EMR version only), which caches reads on local disk;
  • a WAL, used to store writes not yet persisted to HDFS/S3 and stored on HDFS;
  • a MemStore per column family (a collection of columns); a MemStore is a write cache which, once it accumulated enough data, is written to a store file;
  • a store file stores rows as sorted key values on HDFS/S3;
hbase-files.png
HBase architecture with HDFS storage

This is just a 10000-foot overview of the system and there are many articles out there that go into important details, like store file compaction.

hbase_s3.png
EMR’s HBase architecture with S3 storage and BucketCache

One nice property of HBase is that it guarantees linearizable consistency, i.e. if operation B started after operation A successfully completed, then operation B must see the the system in the same state as it was on completion of operation A, or a newer state. That’s easy to do since each row can only be served by one region server.

Why isn’t Parquet good enough?

Many of our datasets are stored on S3 in Parquet form. Parquet is a great format for typical analytical workloads where one needs all the data for a particular subset of measurements. On the other hand, it isn’t really optimized for finding needles in haystacks; partitioning and sorting can help alleviate this issue only so much.

As some of our analysts have the need to efficiently access the telemetry history for a very small and well-defined sub-population of our user base (think of test-pilot clients before they enrolled), a key-value store like HBase or DynamoDB fits that requirement splendidly.

HBase stores and compresses the data per column-family, unlike Parquet which does the same per column. That means the system will read way more data than it is actually needed if only a small subset of columns is read during a full scan. And no, you can’t just have a column family for each individual column as column families are flushed in concert. Furthermore, HBase doesn’t have a concept of types unlike Parquet. Both the key and the value are just bytes and it’s up to the user to interpret those bytes accordingly.

It turns out that Mozilla’s telemetry data was once stored in HBase! If you knew that then you have been around at Mozilla much longer than I have. That approach was later abandoned as keeping around mostly un-utilized data in HDFS was costly and typical analytical workloads involving large scans were slow.

Wouldn’t it be nice to have the best of both worlds: efficient scans and fast look-ups? It turns out there is one open system out there currently being developed that aims to feel that gap. Apache Kudu provides a combination of fast inserts/updates and efficient columnar scans to enable multiple real-time analytic workloads across a single storage layer. I don’t think it’s ready for prime time just yet though.

What about DyamoDB?

DymanoDB is a managed key value store. Leaving aside operational costs, it’s a fair question to wonder how much it differs in terms of pricing for our example use case.

The data we are planning to store has a compressed size of about 200 GB (~ 1.2 TB uncompressed) per day and it consists of 400 Millions key-value pairs of about 3 KB each uncompressed. As we are planning to keep around the data for 90 days, the total size of the table would amount to 18 TB.

HBase costs

Let’s say the machines we want to use for the HBase cluster are m4.xlarge which have 16 GB of RAM. As suggested in Hortonwork’s HBase guidelines, each machine could ideally serve about 50 regions. By dividing the the table into say 1000 regions, each region would have a size of 18 GB, which is still in the recommended maximum region size. Since each machine can serve about 50 regions, and we have 1000 regions, it means our cluster should ideally have a size of 20.

Using on-demand EMR prices the cluster would have a monthly cost of:

20 \mathrm{\ nodes\ } \times 30 \mathrm{\ day} \times \frac{24 \ \mathrm{hour}}{\mathrm{day}} \times \frac{0.239\$ + 0.060 \$}{\mathrm{hour} \times \mathrm{node}} = 4306 \$

This is an upper bound as reserved or spot instances cost less.

The daily batch job that pushes data to HBase uses 5 c3.4xlarge machines and takes 5 hours, so it would have a monthly cost of:

5 \mathrm{\ nodes\ } \times 30 \mathrm{\ day} \times \frac{5 \mathrm{\ hour}}{\mathrm{day}} \times \frac{0.840\$ + 0.210 \$}{\mathrm{hour} \times \mathrm{node}} = 788 \$

To keep around about 18 TB of data on S3 we will need 378 $ at 0.021 $ per GB. Note that this doesn’t include the price for the requests which is rather difficult to calculate, albeit low.

In total we have a cost of about 5500 $ per month for the HBase solution.

DynamoDB costs

DynamoDB’s pricing is based on the desired request throughput the system needs to have. The throughput is measured in capacity units. Let’s assume that one write request per second corresponds to 3 write capacity units as one unit of write capacity is limited to items of up to 1 KB in size and we are dealing with items of about 3 KB in size. Let’s also assume that we want to use a batch job, equivalent to the one used for HBase, to push the data into the store. Which means that we want enough write capacity to shovel 400 M pings in 5 hours:

\frac{\mathrm{sec} \times 3\ \mathrm{write\ unit}}{\mathrm{\ write}} \times \frac{400 * 10^6 \mathrm{\ write}}{5 \mathrm{\ hour}} \times \frac{\mathrm{hour}}{3600 \mathrm{\ sec}} \times \frac{0.0065\ \$}{\mathrm{hour} \times 10 \times \mathrm {write \ unit}} \times\frac{5\ \mathrm{hour}}{\mathrm{day}} = 217 \mathrm{\ \$/day}

which amounts to about 6510 $ a month. Note that this is just the cost to push the data in and it’s not considering the cost for reads all day around.

The cost of the storage, assuming the compression ratio is the same as with HBase, is:

\frac{0.25\  \$}{\mathrm{GB}} \times 18000 \ \mathrm{GB} = 4500 \ \$

Finally, if we consider also the cost of the batch job (788 $) we have a total spending of about 11800 $ per month.

In conclusion the HBase solution is cheaper and more flexible. For example, one could keep around historical data on S3 and not have an HBase cluster serving it until it’s really needed. The con is that HBase isn’t automagically managed and as such it requires operational effort.

How do I use this stuff?

We have created a mirrored view in HBase of the main summary dataset which is accessible through a Python API. The API allows one to retrieve the history of main pings for a small subset of client ids sorted by activity date:

view = HBaseMainSummaryView()
history = view.get(sc, ["00000000-0000-0000-0000-000000000000"])

We haven’t yet decided if this is something we want to support and keep around and we will make that decision once we have an understanding of the usefulness it provides to our analysts.