Hello, everyone this is my own one cell I work. In a data infer group at uber and I am co-presenting. With makgulee, boo, yang he's also working data in fryer to work here. We are to present Xia's, ubers. Highly, scalable. And distributed, shuffle as a service. For. Uber we, are a group or company we, are 15, billion trips we completed over 15 billion trips we, are 18, million, tips per day we, are in six continents, we. Are in 69 countries and 10,000 cities we. Have, 103. Million active, monthly users and, 5. Million, active, drivers. So. Data, and ml. Actually. Is the. Backbone. For uber. To. Do lot, of stuff there, are a lot of use cases which, uses, data and ml platforms. And, and. Provide, user, experience, the. Many of them like uber eats ETA, self-driving, vehicles, forecasting. Maps they're, many I'm, going to talk about few, in the next few slides. So. Eta. Is right so eta is a very important. Use. Case for uber, so when you open an app you, see my cars are three, minutes away and when, you click. Right. Then it will probably give, you a route and it will give you a time. That how much time this will take all these, things comes, from the ETA functionality. ETS. Are generated, by route based algorithms, there, are many many models, ml models which crunches, lot of lot of lots of data and produce. These. Eat years and. These ETS, are. If. There is any error in the ETS those take as a feedback, and then again B process you know mine X right right so this is very very important, use case and it's been powered for ML and data platforms. So. The next use case I'll talk about the, driver writer match so. Once. You open an app and then you, click. The write. Then, there is a. In a supply right and then people, so. Then actually. At. The runtime machine. Learning models predict, if you're. Gonna make a ride if, you may not gonna make a ride what are the what are the cars which are nearby, you and, then they match, between rider and and that's all being driven by a Maryland it up. Platforms. So. This is very very important use case for uber. The. Another, business. I know business, Cooper, have which is uber eats which is growing, very, fast given, this. So. We. All the, eats functionality. Is being driven by data and ml models the, models used, for ranking of the restaurants, delivered. Delivering, times search. Ranking. There, are hundreds, and hundreds of models which gets, load. Which gets predicted, and renders its homepage, when. What, are the different. Dishes you want to like what, are the so, that eats apps and, the more models. Predictor, and, this has been run at the runtime and then they train the model from the data and ml platforms, this has been a very very big use case for. So. There is another very, interesting use, case is self-driving, car so we have uber, has a self-driving car unit, we, where, we we. Are we trying to make. Self-driving, cars and the software, for that so, for that we need a very. Very big data animal, models to predict the routes to predict the signs, to predict the pedestrians. And. And. Then. Visualize, the whole path right so this is a big, very, big use case I will say for, date animal platform. So. Let's, talk about little bit on what. How oberst it attacks the play so, there are tier data leaks, we. Have in-memory databases. Like, you know and Ares, GB and then, we use HDFS. For. The start and warm storage and then, we have the, archival. For the disaster, recovery purposes, which, we do to the, on, the left side we see there are mobile event. Appearance. And there. Are which, comes from your apps device. Telemetry. Device information. Micro. Services events database, events, third-party, feeds and burn so, there are lot of lots of events which comes which. Are being, emitted in the overstock. So. There is a ensued, all these events pipe through Kafka and then, they go to. The, tiered Italy. Then. There, is a compute, fabric, on top of this tiered italic which, is yarn. And missiles. And plus. Peloton, so we have this pellet and internal, scheduler, so. We use yarn and missus purse peloton as a compute, fabric, for the whole uber. These. Complete, fabric, powers, stream. Processing like. Fling fling mostly run onion and then, batch processing, where most of the spark applications. They is MapReduce, all these things runs on top of this computer. On. Top of it there, is a, real-time. Three aggregated, Curie engines there is a talk where engines and there is a complex batch variants pre, aggregated, real-time, for engines or athina eggs ad hoc interactive, presto, Vertica and then there is a hive query, engine on top.
On. Top of it we use many m bi and. Analytics tools. And dashboards. So we have piper you work which is you. Work. Which. Is our workflow. Engine and then, we have these dashboards built, on top of it then there are ad hoc queries which. We run on query builder as well as there are tools like gears W and W, so. Now, let's, talk about the ogres ml stack so, we there. Are many stages of, four. Machine, on right, there. Is a data preparation which, we used stream. Processing, for. Stream processing we huge Fling which powers. Through Kafka all the events come through Kafka, and for, the batch processing, the, events we use high spark, high, one spark and spark. And I, want this and these, things comes from data leak from HT difference, once. Data preparation is done then we do start to prototyping. Prototyping. We you mostly use Jupiter, notebooks, and spark magic, for. The training we use thought of. Technologies. They tensorflow pi torch XZ buspar, common, right. So we train the model and then, at the real-time prediction, for inference we use real-time prediction, service for, the do type inference and then for batch production jobs also we. Have feature. Store model store and metric store and this is all part of a Michelangelo platform, which is our ml platform at work. So. Let's talk about app a chips park at uber. So. Apache. Spark is a primary primarily. Analytics. Execution, engine which. We use at work most, of the batch applications or, most of the batch workloads, which is like 95%, of the batch work done on top of Apache spark, Apache. Spark strands of yarn and, peloton. And measles both we, use and. For the x4 shuffle, we, use external, shuffle service. In. Both, of the platforms, let's, talk about how. External. Shuffle service. Works in few words. So. We all know we have a mapper and the reducers, mapper. Runs under the executors, once mapper tries to put, some data on to the disk or the map output they, create some partitions, and those partitions was written. Through. To load local, disk in some files through local shuffle service right this is what this diagram shows on, top of it and then once we once a reducer, task wants to we can read all the mapper output, the reducer task will go to each machine where it knows where, where the mapper ran and then, reduce a task will read each and every mapper and for one partition, and then, aggregate all the data at the reducer side and produce outputs based on the business logic right, this is how Cole. Shuffle. Paradigm. Or hole shuffle service works where. Shuffle service has been for. Writing the data mapper write the data and, read, the data through shuffle steps right. So. There are some challenges, by. Using external shuffle service and some of them are. Here. So one, is the SSD wearing out issue so. What happens is we are using SSDs, on our computer. Fabric and the. And, the, problem, is there. Is a concept, in SSDs, which is called e WP d and which is mostly, one what it means is you can write and read, from the disk one. Times a day, however. In. Our in uber we have so much workloads. So, we actually write, around. 3.5. X of 4x times a day, the, whole disk which, actually, reduces the life of the disk to, 1/3. Or 1/4 right, so, this, which should be bad. Within, 3, to 4 years is actually getting bad 8 to 12 months so, that is a big issue we had to so, we have to keep on changing the SSDs, or disks, or replace, the hardware so, that's a big cost for us so this is one of the major issue which we are facing second. Is reliability. External. Shuffle surveys is. Basically. Depends upon the local disk. Space and many. Consecutive, and then there is no then. There is no, isolation. Of disk space or i/o so, if there are many applications which, goes and runs on top of it and one, application, is more chatty than other then, it. Will fill up the disk and most, of the application, which is running on the machines, will, fail so this, this one this way this is one of the major reliability. Concerns, for us from. From. Russia, for service perspective. And. From, compute. Engine perspective we are migrating towards, coupon it is so, for coupon it is dynamic allocation we, need an external service, the. Mean are the one of the main reason also is the colocation, so we wanted to co-locate. State, and state place in batch workloads. Together on the same machine for. Doing so we need to remove ions from the Machine and from removing i/o we need to we move shuffle. Data from, the local machine by, that stateless, and - Kendra, work together so, this is one of them other. Reason, we wanted to also his. Spend. Our time on something which can be solving, of these issues. So. We actually tried different approaches, before. We coming up with the come up with the. Larger. Architecture, so, one, of the DS one of the things which we did is we actually, extracted.
Out The shuffle manager and we, wrote and we actually plugged in lot of new, lot. Of external. Storage like anaphase, HDFS, and we, write through synchronous, writes the. NFS was try to Explo HDFS. Was actually. 5x loads, then. We actually tried different approach we, try to do some semi. Synchronous, rights on the shuffle manager, what. It does is it tries that pools and tried to write parallely in two different files however. That also is, not performant. As we thought so, that was that experiment, also for X. So. What. We tried is now we said, okay these things are not working so let's do streaming rights let's try streaming right so we wrote a shuffle service a very base, version of shuffle service and see if the streaming rates will perform so, and, the back end we used, HDFS. For the first so streaming rights from HDFS, it was like 1.5, times, lower which. Was better but not at the same what we wanted and then, we tried, streaming, rights on the local disk which, had, same. Performance. Similar. Performance, F external, shuffle, service we. Tried to change something which were talked about in the next slides but. Streaming. Rights on the local actually, helped us. So. In, the remote shuffle service what we actually. Zero. Down is streaming. Rights on the local storage however. We, had to change. Many many things to get to the similar performance, we, actually change the whole MapReduce. Paradigm, how. MapReduce, today work so. Mapper produces output in each local machine and reducers gets from all the machines they get the data so, that has to be changed and then we, actually. Streamline. The whole process so we record. A stream we'll go to the shuffle service and directly go to the disk so we streaming to the disk technically, which. Actually, gave us lot of performance, improvements, and and, there is one more important, thing that there is no spill, now from the executor side which. Is which. Was actually one of the main, reason, for slowness, so now because of there is no spill and the performance, is really better. So. Let's take a let's look at the. Enlarged. Harka texture what is how remote shuffle service looks like and the, upper, upper. Side of the diagram. There is a host which is running executors, which has map tasks and there. Is a shuffle manager which is plugged into our, remote. Shuffle service and which is been abstracted out from the shuffle manager from this part so, all what happens is this the host will these, map tasks will know which partition they are writing to so, the MapReduce paradigm what, we change it to is all, the map tasks will. Write. For, the same partition, to, the same srimad shuffle server. So. There. Is a discovery. Is there, is a server discovery, where we'll, talk about it later but the server discovery, will know that for this partition. For. All the map tasks for this application will go to the same server so all the mappers, will write to, the same shuffle.
Server For. The same partition and those, but and those data will be righted written, sequentially. And then, reducer. Side we, will, know that all, for, this partition the data is in this particular remote, Scheffer server and then, the reducer, side those. Data will be copied. In from so the reducer will go to one map, shuffle, server and treat. The data sequential, and it doesn't need to go to the all the machines so, this is the change in the paradigm where mappers writing for same partition to a machine and reducer. Only going to one machine that. Actually. Improved us a lot of performance, in. The shuffle server side we, have partition, errs which actually. And partition. The data into, different files, and. We will talk about this into the next few, slides so. Let's, talk about and so let's do deep dive and our colleague. The Bo will now take it over and he, will go. From there, thank. You Thank, You Mia hey, everyone, my name is Bose I work in the deli in frantic nuba so, I will go through some details about how we design and implement the, remote shuffle service. In. Outside. Yeah. We did a lot of experiments, and, iterations. So. We summarize, our some. Design principle, as followings, the. First one is a scale out horizontally, yeah. With no yes, per has many, executors, we may have thousands, of machine. In Custis so we want to the. Whole system could scare out very, easily and. Because. We store data in a remote server and, for. The network the, latency, matters, it's, not feeble so we need to walk, around network, latency, using, a lot of techniques, and at. The same time the network, bandwidth, is very high so. We don't focus on the bandwidth too much we, instead. We focus on the latent or not and. Also. We do performance. Very seriously, we we. Did a lot of problems of demise a nation and. We. Find the. Major select applications and succeed in our environment, so we focus on this application. To optimize for, our. Most. Cases and, for. Failure cases we, rely. On spark, and a yarn and, we try to. To. Support the. Recovery. Ok. We were talked about this in history. Three. Groups the first one scare out. You. Know we achieved, horizontal. Scalable by. Make. Each. Travel. Server work individually, to, not depend each other and the. Whole cluster has hundreds. Of travel servers so different, application, can share, different. Can. Share some. Chapel. Service and they. Can also use different chapel so it's pretty flexible and. Because. We don't have shared, state among, different, chapel servers we, can just add a new server without any bottleneck. And. Here is one, example how, we share. How. We put different chapel, server for. Us about application, and, we know application. Had different, and mappers and reducers. Also. Its data had, different partitions. These. Are all kind of factors that we need to consider when, we assign shuffles over to different, tasks. So. In this example we, have three chapel servers well have four mappers, and the. Each member, will connect to. Each. Chapel, server, because. When the metal send data to the chapel server the mapper have data for different partition, and the, different partition reside on different, shuffle servers so, basically each mapper, will.
Decide. Based. On the key which. Partition. The data belongs, to the matter will send data to, that Chapel. Server which hosted the petition, that's. Why each matter will connect to, every. Chapel server now, for the reducer sy is pretty simple because. All, the partition data reside, on a single. Shop or server so the reducer, only connects to that Java server and a download the whole file directory. So, we. Simplified. The reducer design a lot. So. Next time we will give some generic. In. Calculation. Or how many network, connection, we, we have and. We use some number for example have M mappers and our. Reducer, and we have s servers so. The total connections, in Memphis I is M, multiple. S, connections. And. The. For reducer size is our connections, so. It's kind of, different. From the spark to know Java service in spec so much shop so is the the, Maasai is kind. Of simple it's right to the local and. For. Reducer, it. Will connect to different. Executors. To read data so there will be a lot of connections. In the reducer side in in. The spark external, Chavez OS and here. We kind of in. The mesial have more connection, in the register you have des connections, but. The overall connections, is kind of similar. Yeah. Now we will talk about networking. Latency. We, use nettie in, the server side and we know Nettie is a very high-performance, a single, server, framework, so it catch you very high. Speed when transferring, data. And. We use two thread groups. In. The server, side. The first asura group is to accept. Stock the connection the. Second. The third group. Is to restock, the data and the right to. File so. They, won't impact, each other and they can work. In parallel so. This. Virtual design is. Because. When, we do load test we find sometimes if people if. There. Are a lot of arcane writing a lot of data it use up the, second square who read, go for naught, and, in. This design we still have another threat group which can accept. A new connection so it can, still running, and for. Network protocol, we use a binary, network protocol, design, the by ourselves we catch a very high, efficiency, encoding and also, we can do compression. In. Our. Side and. Yeah. I think we will have talked about that later, with. These are more details yeah. Yeah. We write tend, to disco file we write to the, OS cloud directory, we do not do application level bartering and you. You, know as hey this is very. Fast because OS has its own buffer, so, we do not need another buffer and. We do zero copy this is a common technique used by many. Servers we do that as well so we transfer the data from. The shampoo server to the original side without without. Any, user. Level, memory copy, and. We before, we write and read data, sequential. There's no random disk IO it. Is very fast. You. Know we mentioned we, use binary. Network. Protocols we do compression, in the client side so. The shuffle kind will compress and decompress data. It. Reduce, the network, size and also it reduces the CPU usage on the server side so the server does not need to involve. In the compression, tea conversion, is kind of seen and. Also. It's potentially, will support client-side encryption so, we do incubation from the client side and. Duty. Pressure from kinda as well so the server side again, does no need to involve so, it makes the whole system pretty. Secure. Another. Technique. We use is we pearl. Solar. - and the network i/o in. Spark shovel, it will, sterilize data, each. Travel record basically is a Java object so you need to be, sure like two bytes, so. Into. Civilization lot, and a civilization, text time so. We use a with. Acabo this two step we use a background thread to. Read, the serialize data and send it to the server side so they can run.
Side, By side don't block each other it's improved, performance, in, our. Production. You. Also use connection, for it is pretty simple just we use connection, when possible. This. Is common we don't talk about. Not yet okay, so, we do, a very very. Seedy, of merging our sites I want to share that as well. Yeah. We do asynchronous travel, data commit so. This is coming. From the. Observation. We find so, Formica tasks when. We stream. Data to the server side and. The. Server side need to flush the data to make it the persistence, so, the flashing takes time and we. Don't want map tasks to wait for the flashing of region. So. We again, we decouple these two slabs two, steps so. The magnet Haskell will. Tell. The server side is finished sending, data and the. Server side will do the flash we call it commit as well in. The background. So. The, next market ask Allah to no need to wait for the previous, map, tasks to flush data so. This brings, some issue in the reduces I the other reducer, need, to know okay all the data is flashed before it to read data so, in the reducer. Side we, do clearly, data availability, before. It's, fish data so, it asks us about hey whether the data is available if, it available, it will get data to the server side if not it will wait, a little bit and the retry, and. The fish net again. Yeah. We also support for tolerance in our design. Yeah. The first thing we use do keep her as so, very. Recovery. And. To. Health check so. It's, pretty common yeah. And. We do data replicas for the method tasks so we need the right data is, a right to. Multiple. Server. At. The same time so if one saw is done it, can still write data, for. The reducer, because. We have different reticles the reducer just pick up replicas. And read from that again, when, so it's done it will switch to another, replica. And. The. Suicide has some local states yeah, we measure we do not have centralized, the shared state but, each shopper server to have the local state or normally to maintain, the. Witch market has is finished, or is a communion so, this kind of local. State we, do flash in batches, we try to avoid a small. File, flash again, so, we do with, flash all these data in badges so normally when, a sample stage your finish we flash the state. So. We we. Keep our. Flash, operation, as minimum as possible. Yeah. We will talk about some production, status in, our current environment. The. First thing is our, hope. Remote. I/o service is compatible, with open, source Apache spark so, you do not need to change anything when used when you use it or you do not need, to change your internal. Spark code to use it it. You can set the, shuffle, man you plug into our. Removable. Manager. Last name so it were just enunciated a. Flooding. System, in. Our side we used the. Spock map status, and map, output tracker, to. To. Maintain some state. For. Example the, sharpest, of a connection information, so we embed that in the map status, so the reducer, can retrieve that when, it's a try, to connect the in servers. Yeah. We do a lot of metrics and, booba. Had m3, open-source. Matrix. Library so he used that and, we. Find that some very important metrics in our site is a network connection file, descriptor, and dyskinesia. So, we monitor these metrics very closely. We. Do a lot of tests to make it production.
Quality. So we do unit tests and also we do stress. Tests so. We we, have a special, tour it is randomly, generate, data and the wrong random. Map tasks it will also random, cure servers, your, in time so, we can test a lot of educators, so this have us adult to find a lot of issues, also. With sample, production. Of Hiva queries and convert, this query to stock application, so, we can get, our real production, load and then test, our system. This. Is currently running our production. How much data we have I seen the, graph shows we, may have around five hundred terabytes. Sharpening, every day so this not whole. Amount. Of data now we have even more data, than this is running, for eight months so far and so. Far everything is good and we. Have hundreds. Of thousand applications running every day and the. Most importantly, we find. A job tendency, it's very similar with the sparks. Mojo so. We do not introduce any integrate, to creation for the server. And. Also. Because we remove the spool files so actually, reduced, file, IO dramatically. By aha. Yeah. Hopefully we were all my sauce it very soon. Used. To have a lot of work to. Create our room at the, photo range we want to support high bounce back as well and. Yeah. We we, want to add a quota so, it can grow well in multi-tenancy, environment, also, of course load, balancing is important, we have more time we can fine-tune the, low dependency. Also. Inspect community there is a discussion, about new. Shuffle metadata, API that. Is to, support all kinds of different remote, travel service so, which come out we want to integrate with that as well. Yeah. That's all so thank you everyone and if. We have time we can do Q&A. You.
2020-08-14