Yieldmo’s mission is to elevate the digital advertising experience for consumers, advertisers and publishers by harnessing the power of the massive data set that lives behind our consumer friendly formats. Our massive data set goes way beyond clicks by capturing countless micro-interactions consumers have with our ads. These engagements include gestures, swipes, scrolls, tilts, time spent etc… On any given day, we collect a billion plus data points leading to a daily processing payload of 280GB per day.
Starting late December 2017, Yieldmo saw a huge increase in the number of ad requests it received. This was largely driven by new partnerships, features and revisions to its business model. Our ETL (Extract Transform Load) platform started showing its cracks. Average latency to data being query-able went from 5 minutes to 45 minutes. Even worse, sometimes latencies were upwards of 9 hours. Yikes! Welcome late night pipeline patches and a myriad of operational issues.
When our phones auto updated our ‘Favorite Contacts’ from our significant others to the on-call software, we decided enough was enough.
In this post, I will detail how switching our data persistence strategy from ETL to ELT (Extract Load Transform), has prepared us for a potential ad traffic growth of ~20x over current ingest load. All this scale on the same hardware and reduce our average latency from 45 minutes (at 2x traffic) down to 15 seconds. I will also talk about some other design considerations that nudged us in the general direction of ELT, the prototypes we evaluated to help us achieve our goal, and about how we settled on our final selection.
The following diagram briefly outlines our architecture:
Every ad request, and its corresponding user engagement, is captured by a series of web servers. These web servers encode the details of this interaction as protobuf (protocol buffers), and persist them into a series of topics in our Kafka cluster. We used a data transformation tool called Pentaho to read this data from Kafka (one consumer process per topic), apply business rules (such as fraud detection) and normalize the protobuf data into csv files that could subsequently be loaded into Snowflake (via S3).
At this point, a few limitations I imagine scream out to the data engineers in the house:
Having to serially process all the data for a topic was a consequence of applying business rules during transformation. This had the side effect that the recovery pipeline (in case of any catastrophic failure) could not be processed in parallel with standard production pipeline. This also meant that there was no scale-out strategy. If the volume of data quadrupled (likely to happen before end of the year), we would have trouble keeping up with the increased data volume.
After being repeatedly hazed by the existing system, it was time to re-architect. We set the following mandatory goals for any new strategies we were evaluating:
We also set the following criterion as good / great to have:
Given the weaknesses listed above, one of the first things that became apparent to us was the immediate need to switch from an ETL (Extract Transform Load) system to an ELT (Extract Load Transform) system. This would mean that data would be persisted to a data store as is before we applied any business logic / transformations to it. We evaluated many prototypes including:
After extensive research, and, given the nature of our constraints, we finally settled on Spark streaming on Amazon EMR. Here is a quick summary that sheds some light on the decision-making criterion.
From a transition perspective, our simplest option would perhaps have been to leverage the Kafka high level consumer API and deploy a new set of consumers on the EC2 instances, where our existing ETL tool (Pentaho) was installed. This option had the following pro’s:
However, it also came with the following disadvantages:
Estimated Cost of Operation:
6 * M4.2XL (Kafka Cluster) + 3 * M4.2XL (Kafka Consumer Instance) ~ $$
Spark streaming had always been a strong contender for a solution to replace our existing ETL infrastructure. It was extremely attractive that adding new nodes to the cluster, would trigger a rebalance of the kafka partitions amongst the individual consumers without any manual intervention (push button scaling!). Leveraging spark-ec2 or flintrock to deploy the cluster would have the following advantages:
Provisioning a cluster is an extremely simple exercise given the rich API
Zero software licensing costs (or so I believe)
Scaling out and scaling in are relatively simple. For instance,
Estimated Cost of Operation:
6 * M4.2XL (Kafka Cluster) + 1 * M4.2XL (Spark master node) +3 * M4.2XL (Spark slave nodes) ~ $$
Given the limitations of the systems enlisted above, and the luck we have had with managed services such as Snowflake and Looker in the past, we decided to give EMR a chance. It won the battle on the following counts:
Provisioning a cluster was an extremely simple task
Scaling in and scaling out were push of a button tasks.
EMR also supports auto scaling based on cloud watch metrics. Meaning, it can add new nodes to the cluster if for any reason the streaming process was running low on resources
Amazon EMR provides fast Amazon S3 connectivity using the Amazon EMR File System (EMRFS). In our testing, we found writes to S3 to be much more reliable while using EMR v/s a self provisioned cluster on EC2.
Estimated Cost of Operation:
6 * M4.2XL (Kafka Cluster) + 1 * M4.2XL (Spark master node) + 3 * M4.2XL (Spark slave nodes) + 4 * M4.2XL (EMR cost which is close to nothing) ~ $$
We have had great success with Amazon Kinesis pipeline in a different workflow here at Yieldmo, and hence it was also a contender. It has all the great features that comes with a managed service such as auto scaling, fault tolerance etc… However, we realized it would be a non-trivial exercise to get this to work because:
In short a lot of teams would need to make changes, and the clock was ticking.
Estimated Cost Of Operation:
Nature of data - (2500 records/sec) @ (3KB/record) + (6000 records/sec) @ (1KB/record) Kinesis Data Stream +Kinesis Firehose ~ $$$$
Following is a summary table of all the candidates we evaluated based on our decision making criterion
** Requires pre-processing using Lambda, as Kinesis cannot infer protobuf schema
The resulting architecture looks like this:
The switch in architecture to Spark Streaming resulted in the following improvements:
To summarize, switching from ETL to ELT helps us design a distributed, pipeline that is set to scale better with growth in business. The infrastructure is a lot simpler to monitor for any anomalies, and requires little to no maintenance once setup.