From HDFS to S3: Migrate Pinterest Apache Spark Clusters
Hi. Everyone my name is gene yaw along with Daniel, today we like to take you along with the journey of migrating, Pinterest spark, raster. A. Brief, introduction, about ourselves my, name is Jean yow I'm a tech lead an interest, for ice team and. Previously, I had spent many great, years and fists and who another. My work was focused on building our skilled distributed, computing systems. And. This is Daniel died I'm, a tech leader at Pinterest tech team, I am. A PMC, member for Apache hype and a pig and before. That I was working in crud era and Hutton works and Yahoo today. Is. Talk we, will cover following topics first. We will share the high-level overview of our big data platform learn, the performance, of spark joy in this platform, next. Is the s3. Eventually, transistor model we have been dealing with then. We will talk about difference, between, primary storage each device and s3 the, scheduling, system methods, in Europe will, conclude our talk with, house bar has been used in interest. First. Let's, take a high-level overview of, our, big data platform in Pinterest. From. Search to eyes which, are learning to recommendations nearly. Every team within Pinterest, relies, on the Big Data Platform in some way or another the. Big Data Platform team is responsible, for providing stable reliable, and efficient ecosystems. For. Engineers. And analysis. To, manage their and transform, their data, Pinterest. Has been building our own in-house, big data from, leveraging, open source projects, for, just we are we we, use our HTS for storage, misses and Aurora for resource management as, scheduling. Unscheduled. Spark -, depressed, Oh for data processing, Kafka. Or logging. The. Big Data Platform and, pinterest, is constantly, evolving in order to fulfill the our changing, needs of our users and, do and to, stay up to date with the latest cutting-edge. Technologies. Or what last year our team has been working on migrating, to our next generation big data platforms, as shown, in this chart from, the high level the most significant. Changes are changing. Out result, measurements. And a scheduling system for methods we are switching. Our memory storage from HTML to x3 now. The Magnusson. Has finished with great improvements. In many, fronts from performance, to expectancy. From, the ability, to cost, savings we, will share our our wounds and learns in this talk. As. Any Big Data Platform performance. Always plays, one of the most important, roles we, have been striving to. Provide a platform that, could run jobs often mentally and efficiently.
As. We were building our next-generation big, data upon improving. Performance and, efficiency it one of the most important, goals we, can't improve anything if we don't know what's the bottleneck of our. Own cluster so. The first thing we did was to identify performance. Bottleneck, of our old, cluster. We. Took a data-driven process. To find a bottleneck of our cluster by, looking, through all the dashboards, metrics, we had both internally, and externally we. Had identified the, system wide for humans both life-form. Cluster, is local, active io at, rowing. In this chart not having enough nobody style, opens those down spark jobs do to stop shuttle which, in turn makes the job and the whole workflow slow which, in turn makes the aisle resource even more scarce and I. See you can see this, is a performance, downward, spiral, that no one wants to see in their system to. Understand. The program more, we, need to first understand, why love I always think women for spark performance, so, local. Deeds are typically used by several different ways spyrius. Travel data to local aid school and then, spy will also read locative to stop travel data for reducers, so, by also spills data to local disk will memory, is not big enough for, some operations, like solving. Let's take a look at these problems through a simple example this. Is a simple aggravation, Corey essentially. What it does is to coordinate able, to, get these things to IDs, and a max value of each ID. The. Core execution, will have metaphase, shuffle, phase and a reducer, phase, let's. Say we have 19 mappers and 19, reducers each mapper, raised one can have input force assays and store, the shuffle data into a local disk now. It's a shuffle phase the, coop shadow is to move the same idea the same abuser. So each reducer, could you could, compute the max of alyou of each ID in, order to do so each reducer, will talk to all the 90 matters and asked all the IDS this. Reducer is responsible. For so, in total all men, kiill dos attacks to all 19. My breast there, will be 80 81. Meaning, connections. One. Machine can run multiple my purse at same time so if we zoom in and only focus on one map one king in this case this, machine has 30 members all, those, 13 members share the same local disk of the machine one, may provides, the shuffle, data ones then reading 98 hands on 90 reducers, so, in total survey. Mappers, need. 270k. IO bridges, you're, in those operations. I can run same time because, each micro, process is roughly, the same amount of data and finished, them around same time that's, too many operations for, machine. Into short time frame. So. How. Do we optimize this job you know I'm on caster. We. Have done a lot to optimize, our jobs one, of the most helpful of monition, we did was to, kill. The number of mappers and reducers in. Order to combine small data operations in the big ones here. Are here, here, is the as an example we, reduce the number of post matters and reduces, from 92, straightly. Again. If we zoom in and only focus on one machine in, this case this one machine has only ten. All those 10 members share, that same logo disk of this machine one, might provide the travel data ones ingredients. Relay times for streaky, reducers, so, in total spot, will deuce 30k. Local, disk operations, which. Nine times better than before and one. Thing to notice here is that reducing. Number of mappers and reducers also, reduces, the power parallelism, of the job if. That, is too low job, only able to fully utilize all, the resources which. Could reverse, a promise. Last. Year we spend a whole month to off might one of our most of grateful jobs the, average, optimization. Result was awesome, with 4 times improvement but. It required. A tremendous amount, of manually those, tuning yura don't scale very well with, data volume increased and they, couldn't be easily applied on a job no. So, not having enough local disk i/o how to make spur jobs pretty hard to optimize and, having. System-wide, for comments but not really, put a heavy toll on every, job running is faster. New. Trustor so when we were building our new faster, we were determined, to move remove. The cluster wise performance, bottleneck, so. That, each team will focus more, on during, impact products, instead, of dealing with performance issues.
With. The performance, pond a caiman we do multiple, runs of testing, trying, to find the past Hardware taught English students for SPARC workload, eventually. We had settled on easy to type r15 in local, disk optimized, which which, brought, the local disk i/o per second, from, street age who are walking 40k. Our. Jobs really loved the new cluster, especially, the new ec2, instance, time jobs. Have seen 25%. Of run time improvement, on average that's without any extra, resource or tuning workshop. Of heavy job even called 35%. Implement, from Naga - down, to 57. Minutes out. Of all the cluster, wise infamous, we have done of mining, locally style was definitely the biggest contributor. If. There's, only one thing that I hope you could remember after this talk that is mattered before up might as well stay in that premature. Optimization is, the root of all evil in. Our case we did investigations. And tests with. Our own walk loading on Cranston so we didn't guess the Ottoman or instead, numbers. Told us that local, disk IO is the bottleneck of our cluster so, switching, to local, disk optimized, instance, tab it's a perfect, chain for our workers do. The test with your own workload in your own cluster identify, both leg of the world class and try to optimize match. In. Terms of optimizations. There, are three levels you can to cross the level spark level and job level so, you can off my expert at a cluster level, to. Benefit, all the workloads, running in this castle our example, is improving, local disk i/o as given as 25, percent of humans boost on average, for, all the jobs without any extra resource for tuning next. A the spark level seems, like queueing a member of our mappers, producers CPU. Memory doing. So could yield a great performance, improvement, without, knowing much about the penis logic, of the job so. Last panelist. Is a job level so. You can go and understand, business logic of the job and try to simplify that I perhaps removing. One unnecessary. Joining, or aggravation. Next. I'd like to talk about a three consistency. Model, we had to work with for the next generation, cluster. One. Big, change forward new cluster, is switch, in the primary storage, from, each device to s3, those. Two storage systems, of our difference consistent, model HDFS. Is the file system that is strong consistent, changes. Are immediately, what available on, the, contrary as reads the object storage so is that is eventually consistent there's. No guarantee, or change is immediately available to the client, this might cause meaningful, issue without either even, know it. Here. Is an example of read after write consistency. Off for hdbuzz and ice cream in, this chart from left to right it's the time line from top to bottom underwriter. Clients each device reader, HD value either an s3 reader, in. This example rather. Clans sent color to green to both each device and s3 at t1 for. Constant, really H device read client. At, any, given time after write complete, they always read the color as green so. I eventually consist, in the reader as real reader Matt region long at t2, the, green at history the now just at you for eventually, they, will always return the red color green.
What. Does this means Burke here's, one example a spare job rise out five files in the same folder but. I will another spark spark. Job trying to reduce, those files, shortly, after creation, as, we may only written four files instead, of five in. This case, the. Reader spot a job doesn't even know it missed one file so. The, next question is how often does this happen in, general, the chance of, inconsistency. Of listing. Of an i-strip folder, is super, super small mine. 3 it, arrests s regime had conducted a test showed. The chance of inconsistent. Inconsistent. Happens. Less than 110 million times we, also contacted s3. Consistency, it has internally. The chance even lies than 1, out of 50 meaning. When. We were designing our, solution, to this problem, there were a lot to consider and balance two, most important, ones are read consistency. And read consistency. Right, consistency, is whether, the job could write output consistently. With our minced duplicated. Or corrupted. Files really. Consistent, is whether the job could read files in of order the more or less dinah story. And, the, consideration, is also continuous, as. You. Can see here that's a lot to consider. So. We have compared, a lot of different kinds of solutions reading. From simple ones to really complex, ones I won't, take too much time to cover each one of them the, quick takeaways, as raised now that HTS and there. Are a lot to consider when you want to switch from HTS, lastly especially. How to deal with the consistent, model change you. Can use this table later as a reference, to making your own decision, when it comes to dealing with s3, consistency, models. To. Solve this issue, we took a hub a. Hybrid. Solution that, combines three, different tools we. Use ice weak emitter to improve, write writer, consistency. Then, you will cover the difference between different committers later in this talk we. Also, use novel fire monitor, to make sure mission-critical. Jobs, who miss any files, when reading in. Parallel, to this, we. Widely. Used in a body frameworks, to ensure that data a host high data quality standard, this. Helps us to prevent, data issue introduced, by all different kind of reasons. Including, ice cream this. Approach works well for our chronic jobs as don't under, and require a tough effort, to implement, however. This, approach needs, workflow owners to be aware of this issue and they need to adopt this those, two in there were closed for long, term we, are looking into implementing. A sophisticated. Solution, to, solve this promises, matically, such, as aspirin, from Netflix, and data Lake from a, derelict. From thoroughbreds. With. That Daniel, will talk about difference, between, twelve out feminists or reduce each, the best and ice cream okay. Thanks Shane now, let's compare the performance, between, HDFS. And s3, in, our cast, HDFS. And s3 achieve. Similar throughput, however. Metadata. Operation. Is much slower on s3, especially. For more as remo, operation, is, is essentially. A copy and it's an delete, unfortunately. We. Use a lot of new operations in spark. This, is especially obvious for. Applications, with, simple logic but produce, a lot of output, for. Example one. Of our spark, streaming, application, which, persists, kafka, stream to, the storage on. HDFS. The. Macro pipes runtime, is 13, seconds by RS 3 it has increased, the to 55, seconds, most. Of the time the, application is, doing nothing but moving the files are wrong this, cost of macro batch high up. Let's. Take a closer look at what, are those new operations, the. First to, move operations are, common, to every spark, application, that. Is after, tasks finished successfully. Committed. Tasks will move far from. Task, attempt, to folder to the tasks folder then. After. Job finished successfully. Committed. Job will move it again to, the job output folder, this. Is necessary, because we don't want the result of a failed job or task, be visible. In. Addition, some, application. Need. More, more operation, for, example in. A dynamic, partition, table, insertion, spark. Who will save the job output to. A staging, folder, before, would output files to. The hive, table location, this. Is because, spark. Need to figure out which, party things are newly, added by. The spark job so, it can add the patching metadata, accordingly. However. We. Cannot afford those, more operations, we. Need to find some am addition, there. Are several existing. Solutions, for, the first two more operations, file. Output committed. Algorithm, to skip. That job level, more. Operation, it moves. Output, table from track attempt, to folder directory. To, the job output in, commit tasks so. We don't need to move, it again in commit, job, this, removed, the job level more operation, the, task level more operation, is still there however. Compared. To job level more operation, Pascal.
Level Happen in parallel so it is lesser a problem, further. Inside. Pinterest, we, already, have a director. Output committee used. In MapReduce, which also. Removed, a task level more operation. Direct. Output, committee write, the file, to job, output directory. For, every successful, task, attempt. However. The. First three approach, suffers. Some degree of debt correctness, issue for. Far output, committee algorithm, to the job fair and the clean up is not done properly, successful. Task output, will, leave, there. So, downstream. Job will, read incomplete, result for. The wraptor ad-hoc committee, file. If job. Fair and clean up is not done properly. Downstream. Job will read income, will, will. Read incomplete, or even, corrupted, output. A better. Solution is Netflix, as free Committee which. Uses, s3. Multipath, upload, API to. Upload files, directly. To job output location without, leaving, incomplete. Or corrupted. Output. Even, if the job fair we. Also realize, as a more. Sophisticated solution. Such as Netflix, iceberg. Which. Is the successor of as freakometer, however. The, focus of iceberg, is to provide a transactional. Support which, we don't really need in our use cases and, also. S3. A committer. From reason how to distribution. However. It is tightly integrated, with regard. Which. Adding, extra operational. Complexity. So. We decided to adopt a streak emitter which. Solve our need but, still simple, enough. As. Recommittal. Make, use of atreus. S3, multi-part, upload API. It consists. Of three stages. First. Use. A new to invoke initiate. Multi-part. Upload to instantiate. Then. Multiple. Upload, the path API which. Actually. Upload, a file to s3. Finally. A lightweight. Complete, a multi-part, upload. Well, taya will, tell it appears, to, weaving, the pieces into a complete path and make. It visible if, anything. Goes wrong user. Will invoke about, matpat, upload, to, remove the, clutch of uploads. There. Are two, operational. Tricks. For, multi-part, upload API. Work, first. Unfinished. The pod files are simply. In a separate staging, area of the, s3 bucket until, complete. A multi-part, upload or, about. A multi-part, upload is invoked, it is. Not uncommon, spark, a Caucasian, die, in the middle so, neither complete, or about, multi-part. Upload is invoked, user. Cannot, see, partial, upload using. The usual ESRI, command but. ADA browsers, to charge you for the partial uploads so. It is important, to set, up a like s3 lifecycle. Policy to. Remove partial, uploads after, certain time. Second. About. Multipath, uploads, require. A separate, s3 permission, we, need to explicitly. Grant. The permission which, we often miss in practice. In. Task, as recommittal. Will write the output to. The local disk after, tasks succeed, inside. Commit, task as. Recommittal. Will invoke upload, part API, to. Upload that output, from, local disk to s3, however. The. Output is not visible yet as, recommittal. Will invoke, complete multi-part, upload later. In, committed, drop after all tasks finished, successfully. This. Two-phase, commit like approach, we, are ensure there's, no corrupted. Or incomplete. File if the, application fail and in. The meantime we.
Don't Need to move any files, in addition. As, recommit, I also, upload, path pass imperio. So, we can achieve better throughput. Their, HDFS. Still. We, have one more more operation, remaining, for, dynamic, partition, insertion, it, is required, because we, need a way to track the, new Fogg folders, added. Up by the spec application, and add. Partition. Metadata to hang meta stop for, those folders only for. Example, the, spark application. Produced. A new, folder, es, equal to 2000 28th January 12 in, the staging directory, by. Listing, the staging, directory, spark. Can pick out the, new folder, the s equals to 2 some contagion, or 12 and add. A path from metadata, to hang Modesto, allow. Is moving, the folder, to table, this earth to the table location, to. Get rid of this, move operation, we change as Rita. Meeta to, write a table, level tracking file to, record, the partition. Generated. By stock job, after. Job finish. We. Can read the tracking information we, can read the tracking file to find the out of the same information. However. If multiple. Spark applications, are writing, to the table at the same time there. Might be conflict. I'm tracking. File we. Haven't, found a general solution yet, instead. We, enable, it in. Certain applications. Which we know there's, only one application, writing. Through the table at any given time. With. The removal, of more. Operations, and increased. Upload, a throughput, the, macro battery runtime decreased. From 55, seconds, to, 11 seconds which. Is even better the HDFS. Switching, to s3 we, also encounter, some s3 specific, issue rate. Limit 503. Arrow is, a common, one AWS. Support can partition the s3 bucket and prefix. They. Can even set up a cron, job to. Touching a future, prefix. Which doesn't, exist. Yet, however. We, don't feel this solution is a reliable and the, increment, our own, Pascal level and a job level expression, pakka, of logic, in s3, committer, we. Also make the parallelism, of pad, file uploaded, tunable, so, user can tune their job to, maximize, the. Throughput and in, the meantime avoid. Exceed, Ezreal limit. Here. Is a list of improvements, we, made on, top of Netflix. As freakometer. Besides. Rate limit and the perio up a lot of power. Pareo. Pad file uploads we. Just mentioned we. Also add the e-tag, check for, Mata Tata upload to, make sure the integrity, of the table we. Fix the thread for eating, for, long running applications. We. Also remove, the local output. Generated, by the task as soon, as upload. Party is finished so. We can save the, local disk usage. The. Biggest benefit, by switching, to s3 is the cost saving, we. Reduce the cost by 80%, compared. To HDFS, in the, meantime as, we actually but as I all the. Claimed a 0 for 3, is for nine for. Availability, and 11.
9 For durability, in, comparison. Our HDFS. Can only achieve 3 9 availability. In our current. Support level and, the. Difference, of durability. Is even higher. Besides. The new cluster, is also different, in. Resource, scheduler. The other, cluster, is using missiles and the, new cluster, is using young. There. Are couple things we miss in missiles first. We. Can manage every, service, inside. Missiles, but, in young long-running. Service, such. As zookeeper. Hi metastar, are, separate, and entities. Misses. Plus the job as a measuring system, Aurora, provides. A lot of functionality, it. Can support simple, workflow long-running. Job in a cron job very well in. Supports, roaring, restart. And it has, built-in, application. Health check all. Of those are missing, inya and we have to rely on external workflow. System, such, as quinoa and airflow. Anderson, hang the customized, scripts. To, achieve those. There. Are couple, things we like in young it, provides, global view of all running applications, it has better queue, management for. Organization. Isolation. And more. Importantly, our other. Clusters. Are, already, using yarn. With. Better visibility and, acute, isolation, we, can use a resource more, aggressive. Aggressively. With, young the. Computer, cluster, cost less compared. To missiles. Before. We end the talk I have, another two slides to, show the current status of, spark. Adoption, inside Pinterest. We. Are still in early, stage, among. All lot of Hadoop, cluster, study, represent. 12%. Of total, vehicle, usage we. Have a few, spark, streaming, use cases but mostly, it is for batch processing. We are actively. Migrating. Our high. Workload juice box Eco in, the meantime we, are looking at ways to migrate. Our, cascading. Scouting job to, spark we. Are adopting dr.. Element, for spark in our code review process we. Integrate, a stock, element, with our internal metric, system, and also, include several features, from spark lines as a. Side effect, dr., element, increased, the load of spark, history, server significantly. And we, are in the process to, improve the history, server performance, if, you. Have interests, to work with us are any, of those please. Talk with us offline. You.
2020-08-29 16:10