How Twitter Migrated its On-Prem Analytics to Google Cloud (Cloud Next '18)
Hello. Everyone. Today. Steven, I will present how we at. Twitter, migrated. One of our main revenue data, pipelines, for, analytics into the Google cloud. My. Name, is Fabian I the. Tech, lead for the revenue data pipeline pipelines. Team my, team is handling the ingestion transformation. And aggregation. Of raw revenue data and. We. Then ingest. That data into various data stores where it will get. Picked up by various. Downstream. Systems with. Me Steve. Everybody. I'm Steve I'm the tech lead I'm a Revenue, data analytics, team at Twitter we. Basically pick up right. Where Fabien team leaves off so, we handle storing, and querying all of the analytic, data that's, been processed by his pipelines. So. Let's, just take the stuff for a second and talk about advertiser. Analytics, at Twitter in general and. The things we do so, as Fabien mentioned we take raw, impression. And engagement, events so an impression, is just kind of like someone viewing an ad things like that engagements. Are people actually interacting, with them in some way we. Take those from a variety of sources. Client. Devices or internal, systems, a lot, of different things and. Run them through various pipelines that will filter and normalize. That data so. Then we take that normalized, data and we. Pre, aggregate it so why do we pre a great it mainly. For performance, reasons the scope, of the data that we're running on is so. High that it would be infeasible to actually do it on the query side so. Instead of doing that we do it at ingestion time so. We'll take this data it's, fairly. Highly dimensional and some of these dimensions, have a lot of cardinality, which, is caused problems with just some like off-the-shelf. Systems and why we have to build our own pipelines, to do this. We'll. Take those, results. Of these aggregations, and we, actually store, them in different, data stores so. The, way we do this the storage is generally. Based on query patterns that we observe things. Like that. So. Just, talk about the scale of what, we're doing this app so everyone has an idea. Every. Again like I said every add an impression, event we have flows, through this system so that's something like on the scale of about 20 terabytes of log data a day, that, breaks down to around a, hundred. Thousand events a second that we're processing, on. The serving side from how we're actually taking. This data and using it in a useful way um it's, somewhere around like five, thousand, queries, a second that we're actually serving and. These can range from simple, queries of giving me the last week of, spend. For an advertiser to give me the last three. Years of data broken down by geography, or impressions, or things like that so the query complexity, can be a very. High range of stuff. Typically. We range, we. Have an SLO of around a one-second, p99, wait and see on these queries typically. We were on around 300 milliseconds, so that's what we target, for these queries. So. Before, we go into the system that we're we. Are building and have built let's, look, at our legacy. System how. Analytic. Analytics.
Aggregations, Have been working at Twitter for the past few years so. We're the legacy system is a lambda architecture, consisting. Out of stream, aggregation, system and a, batch processing, system. Basically. The flow for both of these is we're, having raw logs that. Are ingested. By one of the streaming systems, by one of the aggregation systems, and then, aggregated. And stored in a, key. Value store. The. Serving, site so when advertisers. Were want, to query how. Well their ads are doing they're, hitting through heating, our survey service, serving. Front end and. That. Serving, front end will then reach out to the key value stores and fetch the data that, they're requesting. So. We, have our. Stream aggregation. Pipeline. Is, approximate. That, means we're. Subsampling. Some of the data so, we're having too much data to aggregate it on the fly in the current system and. Also. Our, current. Streaming, system only. Supports. It most once delivery which, means some data is getting. Dropped and, so. This is not a good system to build, people, for or in. General, it's just an approximation, of the of the, overall data. Which. Is where the batch processing, sink comes in so just for reference the, streaming system is about an end-to-end latency, from ingestion to, serving, of about a minute or two on. The batch, layer, we. Are processing all data and. Not. Doing any sampling, every. Four hours, for. Four hours of data so that means the. Batch system, is about lagging, four to eight hours behind. The. Batch. System, is authority and. Persisting. The data into a persistent key value store so, when the serving front end is querying. For data it will try to fetch the data from the persistent key value store if it's not available it will try to fetch it from the ephemeral. Key value store that, has the approximate, data, there. Are some obvious disadvantages. Of running, a system like this number. One, we. Have two different very, different technology. Stacks we have a streaming streaming Gatien system and we, have a batch aggregation, system that uses very different technologies, and that's, a lot of operational, overhead there's. A another. Problem, that while, business. Logic between the two systems has shared, it. Is not, identical for. Example the the, sampling of data is only, implemented, in the stream aggregation, which, then makes it very difficult, to validate the stream aggregation, aggregations. Versus the bachelor again, aggregations. So, there's some some validation, overhead there. The. Biggest, problem that we're facing is that this. System is about five to six years old and has, reached its. Design. Capacity. So. We're, reaching a point where. We can't just. Throw resources, at our current. Current. Architecture in order to scale it up and. In. Order to keep up with Twitter's growth we need to build. A new system so. What. What problems, are we are we actually trying to solve, our. Our, project goals are to, reduce the operational complexity, and really. Fundamentally. Decouple business, logic, aggregation. From data storage, and query. Execution. We. Also would, like to increase our data accessibility, really. Making our data accessible, at every part of the pipeline and, inspectable.
And Then. We. Need to plan, ahead and have, a system that can run. At five to ten times our current scale, so. We have some Headroom and don't, have to go through this exercise anytime soon again so. This. Is what we envision. The future looks, like, we. Want to build a copper architecture. A first-class. Class, authoritative. Stream. Aggregation, system and. With. Multiple. Storage. Backends, that, allow different use cases so, like, a quick walkthrough we. Have the raw Lex logs that are getting ingested, into. Or getting pulled by some stream aggregation, system or adjusted, by stream aggregation, system and then, write it into, bigquery. And, BigTable. Or. Stream it into the aggregated data into bigquery BigTable. The, BigTable. Data. Store is going to serve as the serving front end for very. Similar to our existing use case where. Our, advertisers. Will. Be served. Getting sort of data from, there at very. Low latency and high throughput. On. The bigquery side, we. Want to be, able to allow, for intro use cases and also, for future. For. Future development. Purposes. Allow, access, to the aggregated, data and to, some extent the raw data on. On, a very low level and very flexible query, and layer laya sequel. Is. A great language that. Allows you to to, slice and dice data however you want and that's. One of the primary reasons we want to go and to want. To start using bigquery for this use case -. So. Building. This the stream streaming. First system, is great, but we need to find a way how to migrate. Our legacy, system into. An environment like this. Our current, plan and what we are building right now and have built and is. Is. Looks. Like this we're, adjusting raw, logs, in. Into. HDFS. We've, then, pushed those HDFS. Logs into bigquery using. An internal tool and. Then. In, bigquery we're using bigquery as a as. A data warehouse and as a processing, engine. Ingesting. The data into big table where, it will be where, it is served serving, our, our. Clients. Through, the serving front-end so. Why are we doing the, bachelor first, and. That. Has a lot of there. Are a couple good reasons for that. One. Number one is like our current or the legacy batch. Layer is the, only, system. That actually implements, the full business logic and has all data available, so, in order to be able to validate, what. We have built we. Needed to go through a process that we could validate. Against. Furthermore. Validating. Data and bigquery is much easier, than, through. A streaming system for example. Using. Bigquery really allows us to inspect the data and validate the data at any step in the process, and it. Makes the discovery. And an, implementation. Of business logic much much simpler, so. It allows, very fast iterations. On implementing. Business.
Logic. So. Let's. Look at how we are ingesting data into bigquery right, now. Specifically. Our. Raw, logs are stored on HDFS, in civilized. Thrift. We, generate. About a terabyte, of data an hour of raw logs and. We. Build an internal tool. Hadoop. Library called. BQ, blaster, I think there was a presentation, about this one earlier, today that went, to in a little bit more detail. That. Tool, will. Take. The serialize. Thrift data converted. And transcode, it into Avro and then. Write. Those a profiles, to GCSE Google Cloud storage from. There it will be picked, up at the end of the of the, write still. Part of the MapReduce, job, it. Will kick off a big query low job one. Partition, partition. Per. For. Low job so we can load a lot of partitions, in parallel and a lot of data in parallel and we're. Using bigquery load jobs instead, of the. Instead. Instead of the stream. Streaming, API. Because. Bigquery. Load jobs are, cheaper. To run and we're not really worried about enter and latency for this project in terms, of in, the batch ingestion. So. Why. Why. Are we using bigquery at all and and. I, mentioned that a couple times before like we're using bigquery here, as a data warehouse, and it's it's a very performant. Data warehouse that scales very well it's, capable of ingesting terabytes. Of data an hour all. Data is credible. And it spectable at all times because. It's it's sequin, has a sequel front-end you can just write white crew write queries just against. Any other sequel, database and and. Get results and really interact with with. Your data huge amounts of data. Bhiku, inherently, has has a very good separation of, storage. And compute that, makes scaling. Very very easy and gives you a lot of control of, what, you want to scale and how you want to scale the. Sequel, interface allows a very fast, iterative. Development. Process. And we, cept for separation. Of concerns so. You're you're really forced of separating. Your business logic from, your other system. We. Are not using bigquery, and this implementation. Only as a data warehouse but we're also using it as a processing, engine in this, case we're placing some, of our MapReduce, jobs in that. Are running in in our Twitter environment. So. The way we're doing this is after, we ingesting, the raw logs into bigquery, we, are scheduling, jobs that. Run, a set of sequel queries. Which. Which. Then output, the aggregated data in into. Another bigquery, table these. Schedule jobs are. Lam. Whether, they're scheduled or triggered, they, run in an in. A, schedule our I, think Mesa soku. Benitez or. Google. Cloud composer like airflow and. Whenever. There are new raw logs available, these jobs get kicked off and. First. Normalize, the data using. A sequel statement and then aggregate the data also, using sequel and then, the output of those, statements. Is getting in and, jested, into the aggregated data table. Then. Very, nice thing here is that data, will actually never leave bigquery. So these, drops are very very lightweight, because. No data needs to flow in through, those jobs. Here's. A music, here's an example how this works. So as I mentioned before these these, processing, jobs first, well normally I normalize, our raw logs so we have a cup of raw logs, we. Have, in. This, case we have a time stamp and then, for columns, the, time step obviously is a column as well it's special, in the sense that. We. Partition, on time, terms, time. Stamps and the. Other. Difference. Is from. A column. That. We will use this, we. Will transform, this for aggregation purposes, so, the first thing that we do in the normalization, process is is, really, figuring out like which columns do we actually use so and this is example here when, for. A gig for the aggregated, data we're not using column three so, that gets filtered out the. Next step is we're we're differentiating. Between. Different. Types of columns we're differentiating between, dimensions. And. Metrics. So. Dimension. Is when, you write a sequel query is everything. That you, group by so, it's your group by clause a metric, is on the, aggregation side so in this specific case. Metric. Is column four and we therefore aggregated. Together and it becomes an aggregation to to. A sum of column, four so, our, full. Aggregation, process. Simplified, is where, we're. Filtering, out, the columns, we don't need figuring. Out what are our dimensions, what are our metrics and then, aggregate, on the metrics the. Aggregations, are defined, in in, sequel obviously we have two different types of aggregations, here and this example with some aggregation, with summing the.
Metrics From column, four together and, for. The aggregate. There's, the aggregation count which doesn't take an input and only counts the number of times. One. Of the group. By clause was matched or the group key was matched, one. Interesting, thing here is we. Are truncating, on the, time stamp so that we're, losing. The. Minutes in this case so, this means we get hourly aggregated, data so, this is a simplified, version of like what's going on internally in one of those aggregation. Or normalization, and aggregation jobs. So. In. In. To recap, what's happening in bigquery processing, we're reading all of the, raw logs from. Bigquery, normalized. And aggregate the data and write it back to an aggregated table on the query both, extraction. And transformation, are modeled using separate secret queries so, this, that means we were set a separate, secret quarry for, filtering and normalization and one, sequel query for aggregation, so. This, allows, us to have like a clear. Separation of, normalization, or normalizing. Aggregating, and and. Then obviously also, from. Ingestion storing, serving because that's done and a completely different technology. Stack, we're. Using DML, so, think about a sequel merge statement. - I'd, importantly, write new, data in our aggregated, table which makes makes our jobs. Idempotent. In terms of let's. Say a job failed. In, the middle of writing we, can just write it again and don't have to be worried about a, double. Counting, certain, aggregates or something, like that and, one. Of the really nice things is like sequel is used, here as a modeling tool to, model our business logic so, when. We go forward in a pure streaming. Framework a lot, of streaming frameworks, beam. Kafka, streams do support sequel, now, even. If we can't directly use the sequel that we came up with it's our, distill business logic that we can then use to to. Implement in a streaming framework, so, that that, gives us a very clear path forward. I'm. Gonna hand this over to Steve who's gonna, talk, about the big table and survey all right thank you for being so bobbing. Did your awesome job describing. How we get our aggregate, data into, bigquery. We're. Gonna talk now about how. Do we take that data from bigquery and load. It into BigTable, in a way that we can use to serve, from our serving front end so. In, the, process of investigating technologies. That we wanted to use to do this we, looked through a few. Of them one that really stood out was a project, called Apache beam and. Then Google Cloud dataflow which is our honor for this so. Apache beam comes. Basically, pre. Prepackaged. With a lot of really useful. Who. Are called transforms, in its lingo that we can use to read and write data from. Bigquery, and, right to BigTable so. Immediately, it kind of like removed, a large, amount of development work that we would have to do to get data out of these systems and put it into the system kind of like the busy work that we didn't want to do anyways this. Basically allowed us to then focus, our effort on actually. Writing the, transformation. Steps and not having to worry about just moving data around um. Additionally. Some other nice things about dataflow. Is it's fully hosted in Google which, basically, means that the data locality is really good we're reading from bigquery which. Is obviously in GCP we're writing to BigTable which is in the same place so. We don't have to worry about pulling, down large amounts of data into our own data center and then writing it back into Google we, can just do everything right. Right on the right in the cloud, so, basically, the way this works is that we've written a transform, that takes a configuration, file defined, as e animal and it. Uses that to interpret the data coming out of bigquery so fabien mentioned the, idea of dimensions, and aggregations, before this, gamal file basically, defines, the mapping of those to the bigquery columns. So. We'll take those and we convert them into BigTable. Mutations, for, people unfamiliar with BigTable, a mutation, is basically just the idea of like a row key and then, some cells that you're going to do something with either delete or insert or update things, like that in, our case we've made really sure, to. Ensure. That all of the writes we do are totally, idempotent, well this basically means is we could run some, of these load transfer, jobs over and over again without, corrupting, the state of the data in BigTable we don't have to worry about like oh this, load job ran twice now we've double counted all of the data in BigTable, or anything like that and it allows us to be really confident, that our BigTable. Data. Dirt exactly, matches our bigquery data so there's never a question, like Oh like why is this data different if we're investigating, data, discrepancies, or things like that, um.
So. Why BigTable, for people unfamiliar, with BigTable, it's. A high-performance highly. Scalable. No. Sequel store so, it basically, works on a sorted roki design so your Rho key is kind of like your primary key in sequel, lingo and, it's sorted lexicographically. Inside. Of a row key you have columns. The. Columns don't be have to have to be the same Perot or anything it's more just like a tag on your data inside of the row. Additionally. It has a really advanced. Read API that, allows us to construct pretty intricate, queries, to send to BigTable, and. Push. A lot of the predicates, and filtering that we would have to do on our end down, to the actual BigTable, instances, this. Reduces, the amount of data we have to then read out of BigTable, and. Can push some of that compute into the BigTable instances themselves. So. Now let's talk about how do we actually serve this data now that we've stored it in there um, to, do that I want to talk a little bit about BigTable. Itself in some of the primitives we have to work with this, isn't really anything specific, to what we've done just a little primer, so again, I talked about the Rho key you can think of this as like your clustered index your primary key it's. Sort of lexicographically. As I mentioned um your, Rho key design is probably the most important, thing when. You're working with BigTable, because it will influence. How scalable your, instance, is going to be and how, much you, can scale horizontally um if you choose a bad row key you won't have great performance basically, so. With inner okey I mentioned, again we have column. Families and then, within a column family you have column qualifiers, so, a column family just basically a group of common qualifiers, they call them qualifier, you can think is a tag on your actual data and. The data itself, is just kind of binary blobs it's up to you to define how, to serialize your data how, to how you want to interpret it you can kind of do whatever you want there the. Tuple of the row key column, family and column qualifier, defines. A cell which, is your your kind of your primitive unit of storage within BigTable, so, when we operate on things we operate on cells generally. So. Now let's talk about how. We've taken that primitives. That we get and turned, it into a useful. Storage. Schema for storing, our time series data for analytics so. We'll start with the row key again we. Choose the row key based, on the dimensions that we've seen are the most common, dimensions that queries are gonna be, queried. On so, again, this is important to choose because if you get, it wrong you're basically going to have a very poorly performing, query system but. If you get it right you can really tune for very high performance queries so, again, I mentioned we have a configuration file, in that configuration is, defined. Which dimensions, go into the row key when they get serialized. So. Also something, that's interesting to point out is Fabien mentioned how we do timestamp, truncation, to, turn our. Single data point granularities. We, also do something interesting here or instead, of storing a single aggregation, per row we'll, actually store a large, number of time series within a single row so, in our example here, with a. One hour granularity. Data point we. Might actually store entire week worth of data in the row which, allows us to then actually compress, those values together with, some. Compression algorithms, that are well tuned for time series data this has given us probably, a two, to three X decrease in the data we actually have to store in BigTable which has been obviously. Great for cost but also really good for performance includes, we just have to again, read less data out of the BigTable, instance. Dimensions. Again we store mostly, in the row key because. Those are going to be what we're filtering on mostly, but, we can also store them in columns for, less less frequently accessed data, and. Then finally the aggregations, again we we mentioned our store it is generally, compressed blobs within the row that. We've pivoted a lot of them into a single row.
So. Now, that we have data in BigTable we, need to serve it somehow, initial. Or really early on in this project we, realized that there was a kind, of a common, theme across the company that people, were building very, specific. Services, to, query the data that they wanted and, they. Were kind of spending a lot of time doing the same thing over and over again so one, of the things we thought about was how can we build a more generic query, API to, query any data set and not necessarily, have to have, developers, build, their own services, and run their own services, just, a query one piece of data so. We went out to build a more generic query, API designed. Around querying, time series data in doing analytics, on it in, building that we, also added support for multiple back-end, data stores so we mentioned bigquery, and BigTable, as data, stores that we support we. Also use druid internally. To, store some of our data in so, doing, this basically allows us to provide our users the decision. To make on trade-offs. Of like how, much do I want my system to cost what is the performance, requirements, of it you. Can imagine for things that we're doing internal, analytics on we, might not need to really. Worry that much about performance maybe five seconds is fine for most query astir on and we're only going to be running a couple, queries, every minute on it druid. Is pretty good for that and is is cheap to run internally. Bigquery. Is great for huge, datasets, of, multiple. Petabytes. And we can run queries on them pretty fast in a couple minutes so. Maybe it's okay if users wait a little while for that the throughput is really high but the latency solves are really high on it too and we can't run a lot of queries concurrently, well. Then finally BigTable, is kind of our premier data store that we recommend. Most of our users of the system use because. It's very low latency in very high throughput we can run many, thousands, of QPS against, it without really any performance, impact. So. Let's, talk now back, to BigTable, how we actually take these queries, that they are serving, front end gets and turn, them into BigTable. Queries and then turn them into useful results. So. An idea, of what a query looks like we talked about dimensions, a couple times a query will have a filter on it and then a filter will, be a boolean, expression of, a maybe. A few dimensions you have some things you want to filter on maybe I want the, advertiser, account ID I want, a time. Range a couple other things on that we. Can take those input queries and using. That configuration, we talked about in the amyl file basically, build a set of scans of the row keys we want to look at based on those dimensions, we. Can also take things that maybe didn't perfectly, match the row key directly. And push them down as filters, also into BigTable, we'll, take those scans and those filters and batch them up into a few different requests, and then send them out to parallel, into BigTable, as. Data, starts coming back to, our instances. The. BigTable, API is a streaming API so we get data basically as it becomes red by the BigTable instances we, can begin doing in memory aggregation, then of that data that's being returned, so in, BigTable, restoring. The kind of the lowest-level data that we have the. Queries that come in might not be on that level maybe they're saying I want, the sum of all, impressions, for an advertiser, over a year we're storing it hourly so obviously we need to compress it on our end into or, aggregate, it on our end into a single, data, point so. That's what we do here and every, request. That sent out turns, into an in-memory aggregation. Stream. Basically, in our service, when. All of those partitions complete, in our returned, we'll do one more pass of aggregation, in memory and. Then, return. That, all set to our client this. System is basically what's powering all of our advertiser, analytics, that we show to our clients, and our API users. And all of that so, this. Is one of the the larger scale, systems we're running. Here for analytics. To. Recap. So, let's talk about the end-to-end system that we've done and. How it matches, the objectives, we set out to to. Achieve when we started this so a really. Important, part of this is that the, data pipeline, we've built is really, easy to inspect end to end because. Most of its in sequel we can easily write sequel queries and. We being developers. At the at Twitter. Business. Analysts, QA people here. Anyone. Can write these sequel queries and execute, them against the system to, see how.
The. Data looks in in different parts of it and it makes it really easy to debug the business logic again. Because we're using sequel, and configuration, to isolate that business logic it. Really forces people to do that isolation and separate the concerns which, is really important you don't have your serialization, leaking. Into your query system and leaking, into your business logic and things like that everything, is very well partitioned. And isolated, and, then finally bigquery and BigTable, are very. Performant. And very highly scalable, and. Allow. Us to be confident, that in a couple years we can scale the system up to 10x, 20x if we, need to and not have to be back at the drawing board and redesigning. These systems again. So. Another. Another. Look. Back at at. What we presented. We. Came up with the idea of. Migrating. The batch layer first into, into, new back-end stores so, with. The idea that we eventually want to bring this over to a streaming system so, how far away we are we actually from this and, if. You look at this slide and and, the, next-generation streaming, system there's really not much of a difference right. So the big difference is that we will move the processing. Out of bigquery. Into. A stream. Aggregation, system, and the, processing, is completely, now. Defined. By a set of obsequious. Which. We either can directly use in a stream aggregation, system or will. Be very simple, to will. Will, give us very good requirements. Of what needs to be done at the same time we have the data, stores, and. The format already, figure, it out and it's going to be very easy for us to validate whatever, we are implement. In a streaming system. So. For, the streaming systems itself we are looking, at a lot of different technologies, obviously. Data flow beam is one of them, but. There. Is still work, to be done. This. Is it. You.