Real-Time Stream Analytics with Google Cloud Dataflow: Common Use Cases & Patterns (Cloud Next '18)
Joining. Me today is Oliver. Tweety from. Sky. Director. Of data engineering and today. We're going to be talking through real-time. Streaming analytics with data flow common. Use cases and, patterns. But. Before we dive straight into patterns it's. Important to understand streaming, and why it matters and, a couple of things that the Apache beam SDK and dataflow do to, solve some of the common, problems that you may run into. So. We can have data that we're processing. And. That data can, be big, for. Instance we can be processing from gigabytes, to terabytes. It. Could be really really big so. We may have huge. Amounts of data that we want to process every day in multiple days that we want to process and. That. Data can also be infinitely, big so. We. May have streaming data that's constantly, coming it never stops, and this. Gives a problem with processing, especially, with unknown, delays. So. One of the more difficult things with unbounded data is unknown, delays so, assuming that every record has an, event, timestamp. Hopefully. We would see the records show up shortly after the event that occurred. So. We're going to look over here the, red record here happened. At 8 a.m. and it, showed up in the system shortly after the. Yellow record happened, at 8 a.m. and showed. Up in the system slightly delayed but. Then we have this green act record over here that has an event timestamp of 8 a.m. but, showed up at 2:30. Which, is about a 6 hour delay and. There's, several reasons why these delays may happen the, most, common case is actually mobile, devices so. With. Mobile devices you can imagine that you get on a plane you turn your plane your, phone onto you airplane mode you. Play a game on your mobile device the, entire plane ride when, you get off you turn your, airplane mode off and all those events shoot, up to the cloud and so, you get all these events into your data pipeline all at once, because. We had no connectivity during, that period. Now. If we were doing element wise transforms, such, as just. Filtering out for the yellow records here. That's. Relatively simple we don't care about, processing. Time or event time it's, easy because it's a continuous, transform. But. When we get to grouping. Data via. Processing, time windows so let's, say we want to do an aggregation we want to count, the, number of events that happened per user for batch. It's easy because it's bounded, we just read, the input stream and then, as soon as there's no more data we. Stop. Processing and we calculate, the result and omit it for. Streaming we can't do that you, never run out of data so there's no way, of saying like this. Is the time where I output. The count that I've seen. So. What we most commonly what people want to do is group data into. Processing. Time windows or windows. Rather and divide, data up into small. Chunks so that they can compute aggregations. Over that chunk of time and then, output results so. Initial approach is to use processing, time windows. Essentially. When a job sees a record it looks, at the wall clock it does that for a minute and then outputs, a record. But. What we really, may want to do is actually group that by event, time, so. Instead of viewing something as, time. On the wall clock viewing, data as a stream. And, group. By our event timestamps within, our data and output them as we see them this. Allows us to perform calculations over. The. Event time interval, which the events have occurred so. Essentially what we do is we take the. Input which is all the events occurring at processing time we, regroup, them and shuffle, them to group, them into event, time and calculate, meaningful, analytics, over our data. So. I mentioned Windows, what, is a window a window is just a function. To divide data into chunks, within. The Apache beam SDK, and dataflow we. Provide several different window types to. Slice and dice your data in different ways so. For, inches here's, some some of the more common ones fixed. Windows in sliding windows. So. A fixed, window you can think of it as.
Scene Data from like 1 2 2 2 2 3 so. On and so forth. And so you're pretty much dividing, up your data into time. Chunks and then grouping. It and providing some aggregation, on that data. Sliding. Window so you can think of those as overlapping. Fixed, windows so. An, event that happened may, actually end up in multiple different windows so, say that I had a sliding, window that, was an, hour and it slides every minute I, essentially, have. Windows, that end up being 12:01. 2 101. 102. Or, 1202, to 102 so on and so forth so I could have a lot of overlapping, windows and, this is useful when you're doing things. Like analysis. On maybe the input rate of the, data or. Calculating. How. Many events that I've seen in the past minute, or 10. Minutes for a particular event type. In. Beam, we also support, data dependent, windows so. What. If I wanted. To group by something, that. Was in my my. Event itself so. I, want to create windows. For users. For. Instance, this. Is what we would call sessions, so. Every key here ends, up in a different window. Sessions. Are defined by, whenever, there's a burst of activity I want. To group that into a window for processing, and calculate statistics, over that and then. When there's no data for a period of time such. As a gap I'm, going to close that window and, fire it off. This. Is a common thing you may want to do in analytics, and it supported out-of-the-box and, data flow and Apache beam. So. I've talked a little bit about the windowing semantics. And how, we would divide our data up and maybe do computations, over it within a real-time analytics pipeline, but. When does the answer come out this. Is problematic because you have to know when to emit, the result, for. The data that you're seeing for that window. By. Default what we have is something called a watermark. The. Watermark, tries. To know when, we were done with data for a window, so. If the watermark says 12 o'clock it means to the best of our knowledge there's. No more data left, before, 12 o'clock and we. Can calculate the result in a minute downstream. For. External sources it has to be somewhat heuristic, so. For instance for pub/sub we can actually do a read ahead of the pub/sub buffer to.
Essentially. Look, at is there, any more messages left before a certain period of time and then, know that we. Can effectively emit the results now. Going. Back to the, mobile. Phone on a plain example. You. Could have someone who has data, from 11, o'clock on their, phone they don't have any internet connectivity and. Obviously we can't know about that so, that actually becomes late data so if we look at the heuristic watermark here, essentially. You can see that while. It does pretty well it, doesn't catch the, 9 that happens a, lot later than we expected for. The perfect watermark, it is perfect. It, knows about everything, including events, that may happen in the future so. What, happens when you have late data in the. Beam SDK we have a way of saying that I want to process late data you. Can add triggering, to your windows to indicates you, want to process that late data and there's several different ways of processing it so. For. Instance using. Accumulating. Processing. Means. That if, I have late say to show up I want, to add that late, data to my. Previous aggregation. So, for instance if I was counting elements, and I had a count to 50 before and that late data shows up I. Give. I fire, off that late data and I get back a result of 52, instead. Of the original 50 when it went to a late elements show up I, can. Also say I want to discard, late, data, or. Discard, fired, panes rather so. Essentially. If I wanted to do, this counting, compute the sum I. Could. Have 50 elements that show up on time I could have two elements that show up late and as, opposed to, recalculating. The results and firing off 52, I may say forget, about the 50 that I had previously calculated, I only want to get the count of two and, discard those previous, fired pains, so. In this other animation, you can actually see. Some. Of our speculative, in late firings. So. In this case the heuristic, watermark, is actually. Firing, speculatively. For for records. That show up before the watermark, and then, also catching those late records for, a given period of time so. We're actually able to capture a lot of that data that, does. Not show up when we would expect it. So. That was a little bit about streaming and sort of the common challenges in how data flow and the Apache beam SDK helps address some of those challenges for. The subsequent section we're actually going to talk about building. A pipeline using some common patterns so. From. Simple, element-wise, transforms. To you know grouping aggregations. And that type stuff. For. This we're actually going to use the scenario that I've been alluding, to previously. Of capturing. Events and analyzing. Them for a mobile game, so. Let's imagine we have a mobile game and, we, want to build a real-time analytics. Pipelines, who analyze those game events that may be happening and, so. And, we want our game to be popular too so we want to be able to handle these, events at scale and be able to do interesting, things like compute aggregations. Over how many events that we're seeing per user or analyze. The user behavior. Within the game app. We'll. Start out with exactly, once processing. So. Starting to build this pipeline from, the most simple case we, may have our. Gaming, devices publishing. Up data, into, pub/sub. Then. Data flow processing, that data and then, bigquery, finally. Landing, that data into our data warehouse. We. Using, pub/sub because it's a reliable globally. Available message. Delivery service and we expect our game to be popular worldwide so why not. Now. If we're analyzing user, behavior. Duplicates. Which may answer our system may throw off our analytics, right. So if I'm calculating. Counts. If I'm doing averages, if I allow duplicates into my system I'm going to throw off the analytics. That I'm doing. And thus make. Them substantially, less. Valuable. And. This is also critical in other industries, such as you, know retail or finance so imagine, I have a POS system which, is generating. Transaction. Events I obviously wouldn't want to double count those transactions. And same thing with trades, or Holdings. In. This example zooming. In on to the to cloud pub/sub you, may think. That it, may cause a problem because, it has at least once delivery, semantics, so. Inherently by using pub/sub we may receive duplicates, within our system. Here. The colors indicate unique, keys for the records so we. Have red, yellow green. Kameena, and we may have duplicate. Records that come out. However. When combining, pub/sub with dataflow we're actually able to achieve this exactly once, semantics.
Within Our pipeline and it's with relatively. Little effort. So. We're gonna zoom in a little bit on the records coming into our pipeline they may look like this. We. Have attributes. In a payload but, that comprises the message that's going into pub some. Attributes. Are key value pairs you. Can think of it as a header, on the message, it allows you to sort of inspect. The, message and maybe route it in different ways to perform other operations, without. Having to actually deserialize. The payload and the, payload contains. The serialized payload. Of our message in this case we're publishing, JSON, events up, into our pub subtopic. And. You'll see that we have a unique. Identifier, within, the. Message payload, in event ID, so. The first step to actually processing, this in exactly once fashion is that, will promote this, event ID into our attributes and this. Allows us to instruct data flow to. React to it within our pipeline and start, deduplicating, these messages. So. We take a look at our example pipeline here for. Our game events it. May look like the following so essentially. We have a pub sub bio transform, it's, reading some data from a subscription, and that, is passing down data downstream, to our transforms. So. The first step was to add in that event ID into our action buttes now. We need to instruct dataflow to react, to that event ID and, that's, relatively simple here, as opposed, to, just. Having a very vanilla transform, with just specifying, the subscription, we, add in this dot with ID attribute, and this. Allows data flow to know I'm, gonna look at this event ID and D, duplicate on that as messages come in to their the pipeline now. This will D duplicate messages that, are being caused by pub/sub so any, messages. That are duplicated as part of pub subs at least once delivery, semantics, will be duplicated, automatically, within the pipeline, there's. Also other, areas, in which we could receive duplicate messages such as a publisher. May be publishing, a message twice, this. Will also pick that up as well but. It will only do that within a 10-minute window so we don't keep around the. Event. ID deduplication, table for a past 10 minutes so if you had a publisher, maybe going, publishing. A message at minute 1 and then they publish the same duplicate message at you. Know hour 1 minute 1 you. Would then have to account for that elsewhere, in your pipeline such as using a unique transform. So. We're, achieving exactly. Once semantics. Within our pipeline the next thing we may want to do is think, about how we deal with bad data. In. Order to deal with bad data coming into our system we're actually going to add some things to our architecture, such as pub sub to, handle as, a dead-letter topic and claude, functions than to alert on messages, which ended up in that dead letter topic. So. This so what does this look like in actuality so we have this this message being published in you. Can notice that this message is, malformed. In some way we don't have a closing tag on, our JSON object the, event ID is is no longer in. The. Proper format so we were expecting, a string, and. Now it's it doesn't have the quotes around it. So. By default if this was to enter our pipeline it's. Going to fail when we try to parse these messages, right, in. Data flow will, actually try to reprocess those messages forever in streaming mode because. We don't want to lose, that data so if that data fails and it throws an exception right we don't want to just drop it on the floor so we'll continue to reprocess it forever, or. Until, you tell us what to do with it such as updating your pipeline or creating a new path within your pipeline to deal with the bad data. So. What do we do we. Can actually, edit our pardhu, function so each one of those boxes is a pardhu, function essentially, you could think of a pardhu function as almost. Like a map in MapReduce. Pardhu. Function has a process. Element method which where. It performs the computation, and so, as opposed. To just performing, our computation, and outputting records what, we'll do is we'll wrap it in a try-catch block and then, we'll output, filled, records for any exception, to our dead-letter tag and, that, dead-letter tag allows us to then retrieve, those messages later, on in our pipeline and then do something with them such as output them to a persistent store. So. If we to go back to our pipeline here. In. This. Case we. Are now redirecting, our messages, to a different location we're.
Taking That dead letter tag and we, are then routing it to both bigquery, and pub/sub. Routing. To bigquery and pub/sub because we want our support team to be able to analyze those bad data, to. Be able to inspect. It within bigquery, and. Then, we also want to output it to pub/sub for ease of reprocessing, and alerting. Within our cloud function. And. So in this example this, is an example of a, bigquery, table that you may see for dead letter processing, where. We're storing maybe the event timestamp, the. Payload in bytes and maybe the try to convert the payload to a string to you know show more. Data about that payload so someone has more context, so, are the attributes and, the error, message in the stack trace and. This way you, know someone can go in there see what the problem is and then hopefully rectify, it. So. We've, added handling, for bad data within our pipeline but. We also may actually have another case which may cause failures, and, that's, a schema, changes so. For a mobile game mention that we have partners, that are publishing, this data so, our, partners are publishing this data and they may want to update the schema at some point and. We need to react to that within our pipeline. So. We may need to add additional fields. Within. Our pipeline dynamically. Now. We could do this not dynamically, we could have coordination, like we could call, up the partner and say we're gonna coordinate a release on this date and. Have. A lot of operational, overhead but. We actually want to dynamically, handle this because then, we're free from that. Operational. Overhead of handing having to manage it. So. Looking back at our messages, we may have a the message on the Left which. Is the yellow message which is. Coming. In you know just as we expected in the schema that we expected we. Also may have a message. On the right with like the the green message here that. Has, an additional, field so the partner may have added like a device IP or, some contextual. Information that's, that's useful for them and we, want to capture that, but. If they added an additional field we may fail. On our insert into bigquery so. We're gonna have to account for that within our pipeline. So. There's a couple things that bigquery, and, dataflow our. Couple features of bigquery and data flow that allow us to do this fairly efficiently. Bigquery. Allows for schema changes which are additive within. The same table, so. Schema changes happen. Online and. Require, no downtime and, the. Bigquery sink within data flow allows that you're the retrieval of failed. Inserts. Combining. All these things together we can actually, achieve. This dynamic, mutating, schema within our pipeline. If. We were to look at our pipeline again, we. May add. In the. Get filled inserts, when. We're writing out - ready. Now to bigquery this allows us to retrieve everything, that has failed insertion. To bigquery then. We may invoke, a function which dynamically. Mutates, our schema so. Everything. That failed insert, inspect. It add the, additional, field to bigquery and then pass it downstream and then. Finally writing those mutated, records out, now. This allows us to do this fairly efficiently, so alternatively. What we could have done is we could have inspected, every, single message which enters our pipeline and that's, fairly inefficient because you're, doing an operation on a bunch of elements that you may not need to write. This, schema change may only happen occasionally, and so you're just adding overhead. To your pipeline. So. As opposed to doing that reacting, to the failed inserts and then mutating the schema dynamically, and rewriting. Those records out to bigquery allows, us to have efficient dynamic. Mutating schemas within, data flow. So. If we were to look at our pipeline again how. Does this look, we have messages that may have mutated schema. And. Essentially, we've added a whole new path here in which, errors, come. Out and, then. We validate, and we mutate that schema and then we re out put it to bigquery and so. Without. Having to have, this, complexity. Of managing, changes, with our partners we're, able to react, to it automatically, within our pipeline. Another. Common use case is real-time denormalization. So. In. Order to provide a summary. View in our data warehouse we, may want to look up reference data before, we output events, in. Our mobile gaming use case this could be looking up reference data like on the user or, on the partner. Adding. Contextual, information that, will help us do our analysis.
Later. In. Order to accomplish this we'll. Need to add an additional datastore to our architecture, in this. Case we'll choose BigTable. Because of the scalability, and latency, requirements that we may have our. Game is popular we may be processing. Thousands. Hundreds of thousands, or million of events. Per second so, we need something that can scale to that. Before. We implement, the. Per record lookups. We. Want to consider both. The performance of the database so, if we're doing this is within GCP, we're. Gonna prefer BigTable, or data store to do these per, record lookups because. They'll. Give us the performance metrics that we need if we were to reach out to bigquery it'd be quite a bit slower because you're talking the orders of seconds, as opposed to on the orders of tens, of milliseconds, for you that each individual, lookup and. We, also want something that's scalable too because, if we put let's say a single Postgres, instance to do our lookups, data. Flow is like a cannon we shoot at GCP products and so, you. Can easily make a single instance fall over so. We want to keep, that in mind when we're doing our per record lookups and. Finally. We also want to utilize the, right areas, to initialize, and teardown our connections. Pardhu's. Within data flow have a life cycle and, I'll speak to that life cycle in just a second but, essentially we want to make sure that we're initializing in, Theriault on these connections, in the proper place within our pipeline so, within the app setup method of the do function and within. The app teardown method of, the do function to close the connect connection. So. We to look at the pardhu life cycle this. Is sort of a crude, representation of what it might look like we. Have at setup start, bundle and process elements and, then we also have you. Finish bundle and teardown, so, where do we do our connection. Initialization. And and teardown, we're, gonna do that in setup and teardown just like it sounds like and, the reason being is because it's executed, once per doofen instance, so, as these, functions. Are distributed, across your worker pool. Within your job it's, executed, once and so you don't have a ton of setup, and teardown connections. It, is rather expensive to, create. New connections, and tear them down all the time. Start. Bundle, and finish bundle. Are. Executed, once per bundle of Records so, within. The pipeline data flow will actually bundle records up and then, process, them to each transform, and this, allows, it to do to, be more efficient and not have to materialize, the, output for, each individual, record, and. We could have done it here but this would be executed a lot more frequently because. There's going to be a lot more bundles within the pipeline than there are instances of the do fund so. This would be a, decent, place to actually initialize, and teardown our connections but, it would not provide the performance that we would like, and. Finally. We have the process element method, this is executed, once, per worker if you, do really expensive things in the, process element, method this is where you can slow down your pipeline so, for, instance.
Initializing. Connections, and tearing them down if, you're doing that for each individual. Record and you're processing sitting records at hundreds. Of thousands, or millions per second you. Are going to slow down your pipeline quite a bit and so that's why it's pretty critical to. Connect. And tear down your connections to do. These external lookups, in the correct place within your do fund. So. Zooming in on our pipeline again we. Want to add rich data from an external, store and, essentially, we just have added a couple different methods to the. Enriched messages, do fund essentially. Now we're in, the app setup method we're initializing that connection we're, calling. A method, to enrich the record which. Will then call out to our external data store and, then. We in our teardown method we're, closing down that connection, and. This, allows us to have efficient, per, record lookups within our pipeline. Then. If we look at what our pipeline looks like as we, add in this functionality. Essentially. We, have this, additional, lookup. Here. Connecting. To BigTable, doing. The per record lookup and richen the data with either user data or partner data for a game events and then, passing that data downstream. So. Now. That we've done, a lot of the common. Functionality, that we may have to our pipeline we may want to perform analysis, on. The. Data that's moving, through, so. This is where we may want to utilize some of the window and techniques that I had mentioned earlier. So. Here. We don't need to add any additional. Components. To our architecture, because windowing is handled within data flow itself so, there's, nothing that's that needs to be added here. And. For. Our analysis, as I alluded to earlier we. Want to do some analysis, to perform, how. Long is the user spent, playing a game we. May also want to possibly. Know, when the user stopped playing we, may want to do things like for, a session grouped together all the events and then compute an output, one aggregate, record that contains an array using, big, queries nested and repeated fields. To, our advantage, and so. Here we'll actually do. The you know how long is a user spent playing a game now. If, we. Were to use fixed windows we inherently have problems because we. May have family. Members that use the same device and, so if we were just to group by fixed. Windows of a period of time from like twelve to one or, something like that we may actually end up with different individual, users within that time period and so, that would throw off our, analysis. Of the count of messages that we're seeing per user or even any. Additional. Analysis that we're doing on the user behavior. So. This would have, incorrect, correlations. What. We really want is utilize, the session name that I had mentioned earlier because. With session windows what. We achieve is what we really want we, basically, group, together the events by a key and that key. Being a user, ID for instance and then, we're able to get all the events for a particular, user into. One. Window and then, compute aggregations, and do analysis on, that window before. Passing that data downstream, and so this gives us exactly the subdivisions, that we want. So. Looking at our pipeline code. We. Window, the, sessions with a gap of five, minutes so essentially, we. Are going to look at bursts of activity and whenever we do not see activity, for five minutes we will close that window and we'll fire it off and that. Will trigger the aggregation, downstream. In our processing. After. That we're going to key, by the user ID so essentially, we're gonna take that game event coming in and then, we're going to get. The user ID and this would actually creates.
A Key value pair that, we can then group by so essentially we're keen these records before we group them so we can get their records into the right place. Then. We're going to group by those. Records so we're going to group by that key that we have with. The game events and that's basically going to create a grouping. Of a key. To an array, of all the records that map to that key within that session and. Then. Finally we're going to call a method or class, which, is going to analyze our events, so within, this this class we can do the the counting or the averaging, of those, records we can do the behavior, analysis, that we want to and. Really. We can do any, analysis, that we, see fit for the our use case. So. Now looking at our updated, pipeline, we have this whole new path. After. Converting two rows we output. To session. Windows we key by the user we group by the key and then we analyze the events and it, all happens before we pass it downstream to bigquery. So. In, this, example, we. Did is we. D. Duplicated, records using exactly once we. Handled. Bad data we handled mutating, data, we. Handled in rich in data in real time with, with. These records and then. We also were, able to then window. These records, into. Meaningful. Windows. For. Our use case and compute, analytics that that we preferred them and, so. Now I'm going to hand it over to. Oliver. To talk us through data. Flow at Skye just, to introduce myself I'm, Oliver 280 I'm director, of data engineering, at Skye and for. Those of people who who've not heard a sky we're one of the leading TV. Companies. In Europe we've got 22, million subscribers, across the, UK Italy Germany, and Austria and we're a quad play provider, so we do TV, broadband. Phone, and mobile services and, we, selected, Google in 2017. Primarily, for its data capabilities. So we, spent about four months getting, Google. Cloud set up for Sky and designing our target architecture and. We've been open running and live for eight months now so. We consider GCP to be a great data and analytics, platform, as a service. So. Today, I'm going to talk about Sky, Q now. Sky Q is our flagship, TV, platform, it's, a TV, box and it we think it offers one of the best TV, experiences. In. The UK in terms of quality of the platform, and the. Range of content available for, the box so. Sky queue as a satellite tuner, it, is video on-demand it, does live streaming, it. Does HD, and UHD, content, and. What we do is we collect diagnostic. Data from Sky Q boxes. In every customer's home and that's sort of three million boxes, at the moment as live in the field. So. We want to collect diagnostic. Data so we can understand, user, journeys, we, want to be able to optimize those G's attorneys so, our, customers, can find the TV content, they love easily so that means fewest. Clicks or shortage, shortest. Searches, easy, to navigate UI. Promotions. And personalization. So. For example someone clicks on the home menu they. Go to movies, they, select a genre they find the movie they press play. So. Our original solution, for, Sky Q analytics, was based on a Hadoop cluster it was a fixed sized cluster, on Prem, and. Sky. Q great, product, really successful, but the problem was that, we. Had to deal with lots of peaks of data so, TV. Diagnostic, data has. Large Peaks so, in the UK we don't have the Webber of California, unfortunately, it. Rains a lot when it rains people, go, indoors they watch more TV they, generate, more diagnostic, events. Also we can have news. Events, we can have sports, events, and on those days we get more diagnostic events, so, the problem with our fixed size system. Is that. We, have to over provision so, effectively, from, a yet if, we're provisioning, to the yellow line we have a lot of capacity that's, wasted. Whereas. In worst case we under provision the system and that, means our batch, processing. Or real, time jobs take longer or worst.
Case We lose data altogether. The. Other problem that we had was that we spent a lot of time tuning, our Hadoop cluster, I don't, know how many people in the audience run. Hadoop but our ops team spent time tuning. Yarn, on a regular, basis and dealing with managing. Yarn contain, we had a platform, with mixed, workloads, so it's very difficult to get, those workflows correct, when things, changed, over time. So. Not a big surprise we, move to GCP and our architecture, is designed around pub/sub. Quiet dataflow and bigquery. The, reason is that we, chose this is it's Google's canonical, architecture. So really, good integration between all those components. And. We took the opportunity to, move. The pipeline, from batch processing, to streaming because, our modern key boxes, report, back in real, time over IP and, the. Other thing is that we chose to use dataflow. To gain the benefits of auto scaling, serverless. Architecture, and out of order data processing. So. Much like Ryan's example. With. Mobile, gaming people, turn off our TV boxes, at night quite. Often we get data, turning, up late so out of order data processing, is really important to us. So. Features. In. The beam model. Really. Important, and. We also think that beam model is becoming, much more mature and, we like this sort of separation, between the runner and the SDK, I think it's a. Good. Thing to be able to abstract the data pipeline, itself from the infrastructure, or the runner. So. Moving on to talk about our architecture. Effectively. What we do here is we take, data in streaming, mode in the India stage so it's JSON, data just like in the. Mobile. Gaming. Example. We, take it from, our cue box into pub/sub. Separately. We take reference, data. Batch. Mode into, GCS. So. Then in our process, stage what we do is. Process this in data flow so in streaming, path we clean the data we. Are additional, attributes, we. Assign join IDs we, session, eyes the data so. Session windows we, turn a sequence, of user. Events. Into distinct. TV sessions, and these. Are enriched with session, IDs. At times and, the. Event number of sessions. So. We think that the ability to. Dynamically. Add. Columns. In bigquery when, our JSON schema changes, is a great feature and we make use of this so this means we can have non. Breaking changes. With the. Source system so our source system team just adds an element, to the JSON message and that manifests. Itself in, bigquery, as a new column. So. What we've got is also, a separation. Between our first, stage of processing and enrichment and the. Reason that we do this is that we use our granular, data for multiple use cases and. We don't always want to be able to use. The rich enriched, data in those, separate, use cases so we have two stages of processing in our architecture, so. In the rich, stage what, we do is we create this notion of. Cubox. Journeys, and these box journeys, add things. Like the. Customer, details they add information. Around, how the planner works and, different. Reference data relating, to the tiles, on the UI so. For example we. Can see then that somebody started searching for data on. The box they found Game of Thrones and, then, they started, watching Game of Thrones and engaged in a big game of Thrones TV. Marathon. Finally. We. Display. All this in tableau, so we've got a number of aggregates. That sit under tableau, and, allow. Tableau to perform. Effectively. And give everyone really kind of snappy performance. So. Again. Like. In the example, that Ryan was talking about we make use of sessioning.
For. Our TV events, so, sky processed sessions, based on a continuous, stream of diagnostic. Events. Each. One of those is calculated. For, an individual, Sky key box and. We do this based on a four-hour, gap. Of. Inactivity, so, essentially what we're looking for is that for our gap of of inactivity. And then everything, in that, window before it becomes a session. We've got a hard cutoff, after 24, hours on the basis that we think no. One watches TV for, more than 24 hours. So. We also make use of auto scaling, so. Also scaling, happens transparently. But. Interestingly, the. Rules for auto scaling are quite sophisticated so. Don't. Expect auto, scaling, to happen immediately when, there's an influx of new work to do. Sometimes. It. Takes a while for data. Flow to scale, up and that's because it's got a, sophisticated. Heuristic. In the background, working. And optimizing, the number of workers in the data flow, pipelines. So essentially, it might add workers, to keep, up with the input rate so, it knows the amount of unprocessed, work in pubsub or the queue depth, and. It might also need to catch up with the backlog and it's smart enough to understand, the rate of processing, and whether it's worth scaling. Up those extra workers to process, that. Backlog, of work so, sometimes, it might not scale up because it'll realise that by, the time you scaled up and added. More workers the work would be done. So. What we've got now is a, situation, where. We. Can process many. Hundreds, of millions of records without. Having to manage any of the infrastructure, and be. Able to cope with those Peaks entirely. Dynamically. So we're. Regularly, processing, more than 300, million messages, to a day in some, cases now well over 400, million. It's. Been a great success in the business it. Only took five weeks to develop this solution from end to end and we, organised a bit about whether we should pour, our existing, apache spark code, over. And run, it on data, proc and we, think we right made the right decision, not to do that and rewrite everything in, data. Flow just because we've taken all.
The Benefit, of not having to manage the cluster, we. Also partnered. With data, tonic, who may. Have in the audience but data, tonic are experts in data engineering, and machine learning and, they were able to get our team up to speed with. Apache. Beam. Pub/sub. Bigquery, really quickly, we. Do a/b testing. On the box software, so we can determine whether from. One release, of the key software to the next that we're actually seeing material, benefits. And improvements in, the, software so typically what we do is we, take a small population of, boxes, we roll those boxes. Up to the new revision. And we watch whether. Both. Old. And new cohorts. Of box, users are performing. As we would expect and whether the new cohort, is seeing, a material, benefit, and. In many cases we, can now find that we're seeing more than double-digit. Improvements, in performance. Of the UI. And user experience. So. Quick summary, of before and after so. Before we had an on-prem, Hadoop cluster, we. Have to manage network, servers. Software. Upgrades, we. Had to patch everything, we, had a lot to maintain now, we've got a completely server list as a service, model, we. Can scale in. No. Time at all like 90 seconds, to add more work as whereas before we. Were talking months, to add new servers where. We may have to even put in a request to Finance and deal. With all of the capital, expenditure outlay, so, it's high fixed costs, with, the existing solution, and now we can just pay for what we use. One. Of the biggest improvements, has been in development time. Before. With the fixed size system on-premise, it was almost impossible to, recreate that, production. Workload, especially with multiple, workloads running on the same system so. Development, time took a long time because it was very difficult to test things and replicate. The production, workload, now, we can replay, all our messages back. Into pub/sub, through, into data flow and into the new, developed. Version. Of our pipeline and it's very much, very. Much simpler, to test and that's meant that development, now happening. In weeks not months, and. Ops are. Delighted, rather than spending two days a week, cheating, yarn it's, near zero ops. So. Gain. Great. Success for sky and also a great, success for the business so Dan, Connell who heads up our products, product, experience, now fig feels. That this is an extension, of, the. Product, design. And development capability. So. He says this capability has, the power to change the way we develop our, products, and has made me very proud of what. We've built together, and. Finally, I just like to say a couple of thankies say, data tonic who partnered, with us to build this and also, agile, solutions who. Helped do some of the project management so thanks to Louie and Andrew on the technical side at Data tonic and Steven, from, our gel solutions.