Making Apache Spark Better with Delta Lake

Making Apache Spark Better with Delta Lake

Show Video

Hi everybody. Uh welcome to our webinar today um making apache, spark better with delta look. Before we get started with today's presentation. We wanted to go over, a few housekeeping, items to ensure that you have the best possible experience. Please note that your audio connections will be muted for the webinar, for everyone's, viewing comfort. If you have any concerns, or questions. Please, pose those questions. In the question panel or chat. In that panel we encourage you to use this time to ask us many questions, and clarify, any doubts that you may have on today's topic. Our key presenter, today, michael armbrust. Is the original, creator, of spark sql. And structure streaming. And one of the primary, creators of delta lake. He is principal, engineer, at databricks. And so without, any further delay. Take it away michael. Thank you denny, i'm super excited to be here today to talk about how you can make apache, spark better by using delta lake. Um however before i jump into that i want to start by talking about this concept, of a data lake and why so many people are excited with it and also why they, there's a lot of challenges, when they try to set these things up uh as well, so first of all what what is what is a data lake and what does it mean to me so, the promise of a data lake is basically, this, organizations. Have a lot of data, it might be kind of carefully, curated, customer data in your oltp, system. It might be raw click streams, coming from your web server, or it might be kind of unstructured, data coming from a bunch of sensors. And. The promise of a data lake is you can take all of that and just dump it in the data lake. And this is actually really powerful. Uh when you compare it to a traditional database, because in a traditional database, you have to start. By coming up with the schema, and doing a lot of cleaning. This is often called kind of schema, on right. And what a data lake allows you to do is it allows you to kind of forego, that process, and just start by collecting, everything. Because sometimes you don't know. Why data is valuable, until much later and if you haven't stored it then you've already lost it, and so with the data lake it's just a bunch of files out in a file system it could be s3, or hdfs. Or azure blob storage. And you can just dump everything there and then come back and look at it later. And the idea is when you're done and once you've collected it all then you can actually get insights from it you can do data science and machine learning. You can build powerful, tools for your business like recommendation. Engines, or fraud detection algorithms, you can even do crazy things like cure cancer, using genomics, and dna sequencing. However, i've seen this story many many times and typically what happens, is. Unfortunately. The the data at the beginning is garbage. And so the data that you store in your data lake is garbage, and as a result, you get garbage out from these uh, these kind of more advanced processes that you try to do at the end. And, you know what why does that happen, why, why is it it's so difficult to get quality and reliability. Out of these data lakes, and you know what what does this kind of typical project look like, so i want to walk you through a story that i've seen kind of happen over and over again at many organizations. When they sit down and try to extract, insights from their data. And it typically goes something like this and this is kind of you know what is considered cutting edge today but before delta lake. So a pretty common pattern is you've got a stream of events. They're coming into some system like apache, kafka. And your mission is to do two things, you need to do streaming analytics, so you can know what's going on in real time in your business. And you also want to do ai and reporting where you can kind of look at a longer period of time and do a longitudinal. Analysis. And actually look at kind of the history, and trends and make predictions, about the future. So how are we going to do this so you know kind of step one, and i sit down at my computer.

And I know that uh you know spark, has really good apis, for reading from patchy kafka. You can use data frames data sets and sql and spark sql to kind of cross and process and do uh, uh aggregations. And time windows and all kinds of things and come up with your streaming analytics. And so kind of we start with that and off the bat it's working pretty well. But this brings us to challenge number one which is historical, queries. Uh you know kafka, is great, for kind of getting real-time, analysis, but it can only store. You know a day or a week worth of data you don't want to be storing years and years worth of data in kafka. And so we've got to solve this problem as well you know real time is really good for what's happening at this moment, but it's not so good at looking for trends historically. So, i've been reading a lot of blog posts and a pretty common pattern that happens here, is there's this thing called the lambda architecture. Which as far as i can tell is basically, you just do everything twice. You have one real-time, thing that is kind of doing an approximation. And giving you kind of exactly what's happening at this very moment. And you have another pipeline, which is maybe a little more curated, it runs a little bit more slowly. Uh but it's archiving, all of that data into your data lake. And, so that's kind of that's step number one so if we want to solve this historical, query problem we're going to also set up the lambda architecture, on top of just you know kind of vanilla apache, spark. And once i've got all that data in the data link the idea is now i can run spark sql queries over that as well. And uh and then i can do ai and reporting, that's you know that was a bit of extra work some extra coordination. But fortunately, spark has unified, apis, for batch and streaming, and so it's possible to do it and get it set up. But that brings us to challenge number two, like i said before. Data in the real world is often messy you know some team upstream, from you changes the schema, without telling you. Um and you know now you have problems, and so kind of a pattern that i see here is you need to add validations. So you need to actually write extra spark sql programs that are just checking to make sure that your assumptions, about the data are correct, and you know if they go if they're wrong it sends off an email so that you can correct it, now of course because we've done the lambda architecture, we have to do validations, in two different places. But that's you know again that's something that we can do we can use spark to do it and so now we set up validation, to handle the messy data.

That Unfortunately, brings us to challenge number three, which is mistakes and failures. Those validations, are great but sometimes you forget to put one in place or there's a bug in your code, or even harder is you know your code just crashes in the middle because you're running on ec2, and your spot instances, died or whatever, and now you have to worry about how do i clean that up, the real problem with using these kind of distributed, systems, and kind of distributed, file systems, is if a job crashes in the middle it leaves garbage results, out there that need to be cleaned up and so you're kind of, forced to do all of the reasoning about correctness, yourself the system isn't giving you a lot of help here. And so a, pretty common pattern, is, people rather than working on an entire table at a time, because if something goes wrong you need to recompute, the entire table. They'll instead, break it up into partitions. So you have a different folder, each folder stores a day, or an hour or a week you know whatever kind of granularity. Makes sense for your use case. And you can build a lot of kind of scripting around it, so that it's easy for me to do recomputation. So if uh, if. If one of those partitions, gets corrupted, for any reason, whether it was a mistake in my code or just a, job failure. I can just delete that entire directory. And reprocess, that that data for that one partition, from scratch. And so kind of by building this partitioning, and reprocessing, engine now i can handle these mistakes, and failures. It was a little bit of extra code to write but you know now i can kind of. Sleep safe and sound knowing that this is going to work. That brings us to challenge number four though. Updates. It's very difficult, to do point updates it's very easy to add data but it's very difficult to change data in this data lake and how to do it correctly. And you know you may need to do this for gdpr, reasons, you may need to do retention. You might have to do anonymization. Or other things or you might just have kind of errors, in the data. And, uh so now you have to end up writing a whole nother class of spark jobs, that do updates, and merges, and you know this is this can be very difficult, and typically because it's so difficult what i see people do. Is rather than do individual, updates which would be very cheap, they actually just anytime, they need to do something whenever you know they get a set of dsrs, once a month, they will, copy, the entire, table, removing, anybody who's asked to be forgotten, due to gdpr. And they could do it but it's another spark job to run it's very costly. And there's kind of a subtlety, here that makes it extra difficult. Which is, if you modify, a table while somebody is reading it generating, a report.

They're Going to see inconsistent. Results and that report will be wrong so you'll need to be very careful to schedule, this to avoid. Uh any conflicts, uh you know when you're performing those modifications. But you know these are all problems that people can solve you know you do this at night you run your reports during the day or something. And so now we've got a mechanism for doing updates. However the problem here, is this has become really complicated. And what that means is you're wasting a lot of time and money solving, systems, problems. Rather than doing what you really want to be doing which is extracting, value from your data. And the way i look at this is these are all distractions. Of the data lake that prevents you from. From actually accomplishing, your job at hand, and to kind of summarize what i think these are, you know a big one here is no atomicity. When you run a distributed, computation. Uh if the job fails in the middle, you still have some partial results out there it's not all or nothing, and so atomicity, means, that when a job runs it either completely, finishes, correctly. Or if anything goes wrong it completely, rolls back and nothing happens, so you no longer leave your data in a corrupt state requiring you to kind of tediously, build these tools to do manual recovery. Another key problem is there's no quality enforcement. It's up to you in every job to manually, check the quality of the data that's coming in against all of your assumptions. There's no help from the system, like invariants, in a traditional database where you can say, no this column is required, or this must be this type of schema. All of that stuff is kind of left up to you as the programmer, to handle. And then finally, there's no control, for consistency, or isolation. And this means you can really only do one, write operation, to any data lake. Any daylight table at a time. And it makes it very difficult to mix streaming, and batch. To do. Operations, while people are reading from it, and these are all things that you kind of you would expect from from your data storage system you would want to be able to do these things and people should always be able to see a consistent, snapshot, automatically.

So Let's kind of take a step back now and look at, what this process looks like with delta lake instead. And the idea of delta lake is we take this relatively, complicated, architecture, where a lot of these correctness. And other things were left up to you manually, writing spark programs. Uh and we kind of change it to something like this, where you're thinking only about data flow, where you bring in all of the data from your organization. And flow it through continually, improving the quality until it's ready for consumption. And the kind of hallmarks, of this architecture, here, are first. Of all delta lake brings to apache, spark. Full acid transactions. And what this means, is you know every every spark job that runs will now complete. Either, the entire job or nothing at all. Uh, people who are reading and writing from the same time are guaranteed. Guaranteed to see consistent, snapshots. And you know once something is written out it's definitely written out and it will not be lost those are kind of the, the hallmarks, of acid, and this allows you to focus, on your actual data flow, rather than thinking about all of these extra systems problems and solving kind of this this known thing over and over again. Another key aspect of delta lake is it's based on open standard, and it's open source so it's a full apache license, no kind of silly common clauses or anything like that, you can take it and use it for whatever application, you want completely, for free, and you know personally that would be really important to me if i was throwing petabytes, of data right data has a lot of gravity, there's a lot of inertia, when you collect a lot of data. And i wouldn't want to put it in some black box where it's very difficult for me to extract it, and this means that you can store that mass amount of data without worrying about lock-in. Um you know so both is it open source, but it's also based on open standards, so that i'll talk about this in more detail later in the talk, but underneath the covers, delta is actually storing your data in parquet. Uh so you can read it with other engines and there's kind of a growing community, around delta like building this native support in there, but worst case scenario, if you decide you want to leave from delta lake all you need to do is delete the transaction, log and it just becomes, a normal parquet, table. And then finally. You know delta lake is deeply powered by apache spark, and so what this means is if you've got existing, spark jobs whether they're streaming, or batch. You can easily, convert those to getting all kind of benefits of delta without having to rewrite those programs from scratch. And i'm going to talk exactly about what that looks like later in the talk.

But Now i want to take this picture and simplify, it a little to talk about some of the other hallmarks, that i see of the delta lake architecture. And where i've seen people be very successful. So first of all i want to kind of zone in on this this idea of data quality levels. You know these are not fundamental, things of delta lake i think these are things that people. You know use in a variety, of systems. But i've seen people very successful, with this pattern, uh you know alongside, the the features of delta. And so these are just kind of, general, classes, of data quality. And the idea, here, is, as you bring data into the data lake, rather than try to make it perfect, all at once, you're going to incrementally. Improve the quality of your data until it's ready for consumption. And i'll talk about why i think that's actually a really powerful, pattern, that can actually help you be more productive. So, starting at the beginning, is your bronze level data, this is a dumping ground for raw data you'll notice, it's still on fire and i actually think that's a good thing, because, the kind of core idea here is if you capture, everything without doing a lot of munching, or pursing on it there's no way that you can have bugs, in your parsing and munching, code, you're keeping everything, from the beginning. Uh and you can often, actually keep, a year's worth of retention, here and i'll talk a little bit about why i think that's actually really important, but this means you can collect everything, you don't have to spend a bunch of time. Ahead of time deciding, what data is going to be valuable and what it is not you can kind of figure that out as you go as you do your analysis. Moving on from bronze we move on to kind of silver level data. This is data that is not yet ready for consumption, it's not a report that you're going to give to your ceo. But i've already done some cleanup, i've filtered out one particular, event type i've, parsed some json, and given it a better schema.

Um. And or you know maybe i've joined and augmented, different data sets that i kind of got all the information i want in one place. And you might ask. If this data, isn't ready for consumption, why am i creating a table and you know taking the time to materialize, it and there's actually a couple of different reasons for that. One is oftentimes. These intermediate, results, are useful to multiple people in your organizations. And so by creating, these silver level tables where you've taken your domain, knowledge, and clean the data up you're allowing them to benefit from that kind of automatically, without having to do that work themselves. But the more interesting, and kind of more subtle point here, is it also can really help with debugging. When there's a bug in my final report. Being able to query, those intermediate, results, is very powerful, because i can actually see what data produced, those bad results and see where in the pipeline, it made. Sense. And this is a good reason to have multiple, hops in your pipeline. And then finally we move on to kind of the gold class of data. This is clean data, it's ready for consumption. Business level aggregates, that actually talk about kind of how, how things are running and how things are working and you know this is almost ready for a report. Um. And here you start using a variety of different engines so like i said you know delta lag already worked very well with spark, and there's also a lot of interest in adding support for presto, and others and so you can do your kind of streaming analytics, and ai and reporting. On it as well. So now i want to talk about how people actually move data. Through the delta lake through these different quality, classes. And, you know one of the patterns that i see over and over again is streaming, is actually a really powerful, concept, here. And you know before i go too deep into streaming i want to correct some misconceptions. That i often hear. So one thing you know that people usually think when they hear streaming. They think you know it's got to be super fast. It's got to be really complicated, because you want it to be really fast. And you know spark actually does support that mode, if that's an application, that you have there's continuous, processing. Where you continually. Pull the the server for new data kind of holding onto that core, it supports millisecond, latencies. But that's actually not the only, application. Where streaming can make sense. Uh, streaming, to me is really about incremental, computation. It's about a query that i want to run continuously. As new data arrives. So rather than thinking about this is a bunch of discrete, jobs and putting all of the management, of those discrete jobs kind of on me or some workflow, engine. Streaming takes that away you write a query once you say you know i want to read from the bronze table i'm going to do these operations, i want to write to the silver table. And you just run it continuously. As and and you don't have to think about the kind of complicated, bits of, what data is new what data has already been processed. How do i process, that data, and commit it downstream, transactionally. How do i checkpoint, my state, so that if the job crashes and restarts, i don't lose my place in the stream. Structured, streaming takes care of all of these concerns, for you, and so you know rather than being more complicated.

I Think it can actually, simplify, your data architecture. And, streaming in apache spark actually has this really nice kind of cost latency, trade-off. That you can tune. So at the far end you could use continuous, processing, mode, you can kind of hold onto those cores for streaming, persistently. And you can get millisecond, latency. In the middle zone you can use micro batch, and the nice thing about micro batch is now you can have many streams on the cluster. And they're time multiplexing. Those cores, so you know you run a really quick job and then you give up that core and then someone else comes in and runs it and with this you can get you know seconds to minutes latency, this is kind of a sweet spot for many people because, you know it's very hard to tell if something is if one of your your reports, is up to date within the last minute, but you do care if it's up to date within the last hour. And then finally, there's also this thing called trigger once mode in structured streaming. So if you have a job where data only arrives. You know once a day or once a week or once a month, it doesn't make any sense, to have to have that cluster, up and running all the time especially if you're running in the cloud where you can give it up and stop paying for it. Um, and and structured streaming actually has a feature for this use case as well, and and it's called trigger once where basically, rather than run the job continuously. Anytime your data arrives. You boot it up you say trigger once, it, reads any new data that has arrived, processes, it commits it downstream transaction, and shuts down. And so this can give you the benefits, of streaming, kind of the ease of coordination. Without any of the costs that are traditionally associated, with an always running cluster. Now of course streams, are not the only way to move data through through a delta lake you know batch jobs are very important as well like i mentioned before. You may have gdpr. Kind of these you know corrections, that you need to make, you may have change data capture, coming from some other system where you've got a set of updates coming from your operational, store. And you just want to reflect that within the within your delta lake, and for this we have upserts, and of course we also support just standard, insert, and delete and those kinds of commands as well, and so, the really nice thing about delta lag is it supports both of these paradigms. And you can use the right tool for the right job. And so, and you can kind of seamlessly, mix streaming and match, you know without, without worrying about correctness, or coordination. And one kind of final pattern here that i want to talk about is this idea of recomputation. So when you have, this early table, that keeps all of your raw results, and when you have very long retention, on that so you know years worth of the original data. And when you use streaming, in between the different nodes, of, your kind of delta lake data graph. It's very easy for you to do recomputation. You might want to do recomputation, because there was a bug in your code or you might want to do recomputation. Because there's some new thing that you've decided that you want to extract. And the really nice thing here because of the way that streaming, works. Is this is very simple, so. Just to kind of give you a mental model for how structured streaming works in apache spark. We basically have the model, that a streaming query should always return, the same results as a batch query over the same amount of data. So what that means is when you start a new stream, against a delta table. It starts by taking a snapshot, of that table, at the moment that the stream started. And you kind of do this backfill, operation. Where you process, all of the data in that snapshot, breaking it up into nice little chunks and checkpointing, your state along the way committing it downstream. And when you get to the end of that snapshot. We switch, to tailing the transaction, log, and only processing, new data that has arrived since the query started. And what this means, is that you get the same result as though you had run the query at the end anyway but with significantly, less work than running it over and over and over again from scratch. So if you want to do recomputation. Under this model. All you need to do, is, clear out the downstream, table. Create a new checkpoint. And start it over and it will automatically, process from the, beginning of time and catch up to where we are today.

So That's actually a pretty powerful pattern for kind of correcting mistakes, and doing other things. So uh, you know now that we've kind of gone over the kind of high level i want to talk about some specific, use cases where delta lake has been instrumental. In, both reducing, cost and easing the management, of using apache spark on on top of these data lakes. So uh you know delta lake i want to give a little bit of a history, here, uh so you know delta lake is actually uh, two years old we kind of had it inside of data bricks for for the last two years it was a proprietary. Solution. Um and we've got some of our largest customers, using it so i'm going to talk in particular about comcast, but also riot games jam city and nvidia. You know a bunch of big names that you know, um they've been using it for many years uh and about two months ago at the spark summit we decided to open source it so everybody, you know even. Even people running on prem or in these other locations. Could get access to the power of delta lake. So, um i want to talk about one particular, use case that i thought was really cool this is uh this is comcast. So their their problem here is they have set top boxes, around the world, and in order to understand, you know how people are are interacting, with their their programming, they need to kind of sessionize, this information, so you know you watch this tv show, you change the channel you go over here you go back to this other tv show, and with this they can they can create better content by understanding, how people consume it. And as you can imagine comcast, has many subscribers, so there's petabytes, of data. And before delta lake they were running this on top of apache, spark. And the problem was the spark job to do the sessionization. Was so big that the spark job would just the spark scheduler would just tip over. And so you know rather than run one job, what they actually had to do was they had to take, this one job. Partition, it by user id so they kind of take the user id, they hash it they mod it by um i think. By, 10x or 10 so they break it into kind of 10 different jobs. And, then they run each of those jobs independently. And that means that there's 10x, the overhead. In terms of coordination. You need to make sure those are all running you need to pay for all of those instances, you need to handle, failures, and 10 times as many jobs. And that's pretty complicated. And the really cool story about switching this to delta, was they were able to switch a bunch of these kind of manual processes, to streaming.

And They were able to, dramatically. Reduce their cost by bringing this down into one job running on one tenth of the hardware. So they're now computing the same thing, but with 10x, less overhead, and you know 10x less cost, and so that's, a pretty kind of powerful thing here that what delta's. Scalable, metadata, can really bring to apache spark. And i'm going to talk later in the talk exactly how that all works. But before i get into that i want to say you know i want to show you exactly how easy it is to get started, if you're already using apache spark, with delta lake. So, getting started is trivial, um. All you so it's published on spark packages. All you need to do to install. Delta lake on your start cluster, is use uh spark packages. So if you're using pi spark you can just do dash dash packages, and then delta. If you're using the spark shell same thing. If you're building, a kind of java. Or scala. Jar, and you want to depend, on delta, all you can, all you need to do is add a maven dependency. And then changing your code is equally simple, if you're using the data frame reader and writer in spark sql. All you need to do is change the data source from parquet, or json, or csv, or whatever you're using today to delta. And everything else should work the same, the only difference is now everything will be scalable, and transactional. Which as we kind of saw before can be very powerful. So everything i've talked about so far has been mostly about these kinds of system, problems of correctness. If my job crashes, i don't want it to corrupt the table. If two people write to the table at the same time i want them to bolstee, consistent, snapshots. But, data quality, is actually more than that you can write code that runs correctly, but there can be a bug in your code, and get the wrong answer. And so this is why, we're kind of expanding, the notion of data quality. To allow you to kind of declaratively. Talk about the quality constraints. So this is work that's kind of coming, coming uh, in the next quarter or so, um but the idea here is we allow you to in a single place, specify. The layout, and constraints. Of your delta lake, so, you know first we can see kind of uh some important things like where the data is stored.

You Can optionally, turn on, strict schema checking. Delta lake has two different modes here. And i i kind of often see people use both of them as they move through their data quality journey. In the earlier, tables you will use kind of schema inference where you know maybe you just read a bunch of json, and just put it exactly, as it is into the delta link, we have nice tools here where we will automatically. Perform, safe schema migrations. So if you're writing data into delta lake you can flip on the merge, schema flag, and it will just automatically. Add new columns that appear in the data to the table so that you can just capture everything, without spending a bunch of time writing ddl. We of course also. Support kind of standard, strict schema checking where you say create table with the schema. Reject, any data that doesn't match that schema, and you can use alter table, to change the schema of a table. And, often i see this used kind of down the road and kind of the gold level tables where you really want strict enforcement, of what's going in there. Uh and then finally you know you can register tables in the hive metastore that support is coming soon, and, also, put human readable descriptions, so people coming to this table can see things like, you know this data comes from this source it is parsed in this way it is owned by this team, these kind of extra human information that you can use to understand what data will get you the answers you want. And then finally the feature that i'm most excited about is this notion of expectations. An expectation. Allows you to take, your notion of data quality and actually encode it into the system, so you can say things like for example, uh here i said you know i expect, that this table is going to have a valid timestamp. And i can say what it means to be a valid timestamp, for me and for my, organization. So you know i expect that the timestamp, is there, and i expect that it happened after 2012. Because you know my organization, started in 2012, and so if you see data from say, 1970. Due to a date parsing error, we know that's incorrect, and we want to reject it. So this is very similar, you know to those of you are familiar with a traditional database, this this sounds a lot like a an invariant. Where you can say not null or other things on a table. But there's kind of a subtle difference here, i think if you so the idea of invariants, are you can say things about tables. And, if one of those invariants, is violated, the transaction, will be aborted, will automatically, fail. And i think the problem with big data why invariants, alone are not enough, is if you stop processing. Every single time you see something unexpected. Especially, in those earlier bronze tables, you know you're never going to process, anything.

And And that can you know really hurt your your agility. And so the cool thing about expectations. Is we actually have a notion of tunable, severity. So we do support this kind of fail stop which you might want to use on a table that you know your finance, department, is consuming. Because you don't want them to ever see anything that is incorrect. But we also have these kind of weaker things where you can just monitor. How many records, are valid and how many are failing to parse, and alert at some threshold. Or even more powerful, we have this notion of data quarantining. Where you can say. Any record that doesn't meet my expectations. You know don't fail the pipeline. But also don't let it go through, just quarantine, it over here in another table so i can come and look at it later, and decide, you know what i need to do to kind of remediate, that situation. So this allows you to continue, processing. But without kind of corrupting, downstream, results with this this you know an invalid, record. So like i said this is a feature that we're actively, working on now stay tuned to github, for more uh, for more work on it i think you know this kind of fundamentally, changes the way that you think, about data quality with apache, spark and with with your with your data lake. So now that i've been over the the high level you know what is delta why should you care about it i want to go into the nitty-gritty, details. Of how delta, actually works because it sounds almost too good to be true that we can bring these full acid transactions. Into a distributed, system like apache spark and still maintain good performance. Um so, first of all let's start by looking at what a delta table looks like when it's actually stored out on disk, so, it's going to look you know to those of you that have a data lake already this should look really familiar. It's just a directory, stored in your file system you know s3, hdfs. Azure blob storage, uh adls. Um it's just a directory, with a bunch of rk files in it, and there's one extra bit that is very important, and that is that we also store this transaction, log. And inside of the transaction, log, there are different table versions.

So And i'll talk a little bit kind of about those table versions in a moment. But, we, we still, store, the data, in in partition, directories. However that's actually mostly for debugging. They're kind of also modes of delta where we can work directly, with storage systems in the most optimal way. So for example in s3, they recommend. If you're going to be writing a lot of data out regularly, but rather than create date partitions, which create kind of hot spots of temporal, temporal, locality. Instead, you kind of randomly hash partition, and kind of because of the power of delta's metadata, we can do that as well. And then finally you know standard data files which are just normal, encoded parquet, that can be read by you know any, system out there. So. What is actually in those table versions how do we how do we reason about what the current state of a table is, so. Each one of those table versions, has a set of actions, that apply to the table and change it in some way. And the current state of a table you know at this moment, is the result of the sum of all of those actions. So what kind of actions am i talking about, well you know for one example we can change the metadata. So we can say you know this is the name of the table this is the schema of the table, you can add a column to the table or something you can set the partitioning, of the table, so one action you can take is change the metadata. The other actions, uh are add a file and remove a file, so you know so we write out a parquet file. And then to actually make it visible, in the table, it needs to also be added to the transaction, log and i'll talk about why that kind of extra level of interaction is a really powerful trick in a moment. And another kind of detail here is when we add files into delta, we can keep a lot of optional statistics, about them so you know in some versions we can actually keep, um, the min and max value for every column which we can use to do data skipping, or quickly compute aggregate values over the table. And then finally you can also you know remove data from the table by removing the file, and again this is kind of a lazy operation, this level of indirection, is really powerful. When we remove, a file from the table we don't necessarily, delete that data immediately. Allowing us to do other cool things like time travel. And so the result here of taking all these things is you end up with the current metadata, a list of files, and then also some details like a list of transactions, that have committed, you know the protocol, version that we're at. So how does this allow us to get acid, to actually kind of you know get to get these nice properties, of of transactional, databases. So one detail here is when we're creating these table versions, we store them as a, ordered atomic units called commits, so i talked about this before. You know we create. Version, 0 of the table, by creating this file. 0.json. And the idea here is when we when delta constructs, that file in the file system. We will use underlying, atomic primitives. So on s3. In order to guarantee, atomicity, all you need to do is upload to the system. And the way they do this is you start your upload by saying i expect to upload this many bytes. And unless you actually successfully, upload that many bytes s3 won't accept the right so you're guaranteed, that you'll either get the whole file or none of the file. On other systems, like azure, or hdfs. What we'll do is we'll create a temporary, file with the whole contents, and then we'll do an atomic, rename. So that the entire file is created or, or not. And so then you can kind of have successive, versions so you know in version one we added these two files, or sorry in version zero we added these two files in version one we remove them and put in the third so for example you could be doing compaction, here where you atomically, take those two files and compact them into, one larger file. Now another kind of important detail, here is you know we want adamicity, for each of these commits but we also want serializability. We want everybody, to agree, on the order, of changes to this table, so we can correctly, do things like you know merge into for change data capture and kind of other things that require this property. And so in order to agree on these changes even when there's multiple writers we need this property called mutual exclusion. If two people, try to create the same version of a delta table, only one of them can succeed. So, just to kind of make this a little bit more clear, you know user one could write version zero of the table. User two could write version one. But if they both try to write version, two, uh then one of them can succeed but the other one must get an error message saying sorry your transaction, didn't go through.

And Now you're probably saying wait a second but if any time two people do something at once. It fails that sounds like i'm wasting a lot of time and a lot of work that sounds like a lot of complexity, for me, and fortunately, this is where we use a third kind of cool trick called optimistic, concurrency. And the idea of optimistic concurrency, is when you perform an operation on the table, you're just going to optimistically. Assume that it's going to work, and if you have a conflict. You'll just check to see if that conflict, matters, to you and if it doesn't you're allowed to optimistically. Try again, and in in most cases. It actually turns out that the transactions, are not overlapping. And you're allowed to kind of automatically, remediate, this, so to give you a kind of a concrete example here. Let's say we have two users, and both of these users, are streaming, into the same table. So when both of them begin, their streaming, right they start by reading the version of the table at that moment they both read in version, zero. They. Read in the schema, of the table so they make sure that the data that they're pending has the correct format. And then they write some data files out for the you know the contents, of the stream that are going to be recorded, in this batch. And they record, what was read and what was written from the table. Now they both try to commit, and in this case user one wins the race and user two loses. But what user two will do, is they'll check to see if anything has changed. And because the only thing they read about the schema. Of the table was the schema. And the schema has not changed they're allowed to automatically. Try again. And this is all kind of hidden from you as the developer, this all happens automatically, under the covers. So they'll both try to they'll both try to commit and they'll both succeed. Now the final trick that we have here is you know, tables can have massive, amounts of metadata.

And Those of you who have tried to put millions, of partitions, into the high metastore, are probably familiar with this problem. Uh it can actually you know once you get to those data sizes the metadata, itself, can actually be the thing that brings the system down. Uh and so we have a trick for this which is actually we've already got a distributed, processing, system capable of handling, massive amounts of data. We'll just use spark. So we take the transaction, log with its set of actions. We read it in with spark. We can actually encode it as a checkpoint, in part k, a checkpoint, is basically, the entire, state of a table at some version. So when you're reading, the transaction, log rather than have to read the entire transaction, log you can just start with a checkpoint. And then any subsequent, changes that happened after that. And then this itself, can be processed with spark. So when you come to a massive, table, that has millions of files and you ask the question like. How many records, were added yesterday. What we'll do is we'll run two different spark jobs, the first one queries the metadata, and says which files are relevant, to yesterday. And it'll get back that list of files, and then you'll run another spark job that actually processes, them and does the count, and by doing this in two phases. We can drastically, reduce the amount of data that needs to be processed we'll only look at the files that are relevant to the query, and we'll use spark to do that filtering. So you know, before i we end and go to questions i want to talk a little bit about the roadmap, you know like i said before, you know while this this project has been out there for for a couple of years now, it's just recently been open source we have a pretty exciting road map for the rest of the year, uh you know basically, our goal, is for the open source delta lake project, to be fully api compatible, with what's available inside of data databricks, and so the roadmap for the rest of the quarter is basically open sourcing a lot of cool features that we have.

Um So you know we actually a couple weeks ago released version. Zero two zero, that added support, for reading from s3, and also reading from azure blob store, and azure data lake. Um, and then this month we are planning to do a zero three zero release. That is going to add scala apis, for update delete, merge and vacuum. Uh, and python, apis, will be following, shortly. And then for the rest of this quarter we have a couple things, uh kind of on our on our plan. We want to add full ddl, support so that's create table and alter table, and we also want to give you the ability to store delta tables in the hive metastore. Which you know i think is very important for data discovery, in different organizations. And we want to take those dml, commands from before, update delete and merge, and actually hook them into the spark sql parser so you can use standard, sql, to uh, to do those operations, as well, and then you know moving forward. Kind of uh you know let us know what you want so if you're you know interested, in, doing more. I uh recommend you to check out our website. At delta.io. Uh and you know has kind of a high-level overview of the project there's a quick start guide, on how you can get started and it also has links to github. Where you can watch the progress, and see what our roadmap, is and submit your own issues on where you think the project should be going, so i definitely encourage you to do that. But you know with that i think we'll uh we'll move over to questions. So let me just, pull those up and see what we got. Okay. Um, so the first question, is, will the material, and recording, be available, afterwards. And for that, i'm hoping denny can actually let us know. Uh denny are you here. I am no problem at all so yes i i. Just as a quick call out, for everybody who signed up for this webinar, we will actually, send out both the slides. And also the recording, out it takes about. Uh 12 to 24 hours for the process to complete, so you should be receiving, the email, uh either later today or early tomorrow. Awesome thank you very much so yeah all that all that should be there so you can you can check this out later, there's also videos on on youtube if uh you know so so do, stay tuned for for more stuff about delta lake, uh moving on to other questions, uh the the first one is, does delta lake add any performance, overhead, which is a really interesting question i want to kind of break that down so.

First Of all you know delta lake is designed, to be a kind of a high throughput, system. So. Each individual, operation, there is a little bit of overhead, in performing, it so you basically, because rather than just write out the files. We need to write out the files, and also write out the transaction, log, so that adds a couple of seconds, to your spark job. Now the important thing here is we design delta to be massively, parallel, and very high throughput. So you get a couple of seconds added to your spark job, but that is mostly, independent, of the size, of your spark job, so what delta lake is really really good at is ingesting, trillions, of records of data or petabytes, of data or gigabytes, of data. What delta is not good at is, inserting, individual, records, if you run one spark job. Per you one record per spark job. You it will there will be a lot of overhead, so the kind of the trick here, is you want to use delta in the places where spark makes the most sense, which are you know relatively, large jobs spread out across lots of machines and in those cases, the overhead, is negligible. Uh the next question, is, since it has acid properties, will my system be highly available, as well and that's actually a really good question i want to unpack a little bit, so um. You know delta again is it's designed specifically. To take advantage of the cloud. And to do the kind of you know, take advantage of these nice properties so to me, there's a couple of nice properties, of the cloud you know one is, uh, the cloud is kind of, very scalable. Uh you know you put you can put tons of data into s3. And it kind of just handles, it arbitrarily. It's generally pretty highly available, so you can kind of always read data from s3 no matter where you are, if you really really care there's even things like replication. Where you can kind of replicate, your data to multiple regions. And delta plays very nicely with that so. Reading from a delta table, should be very highly available, because it's really just the availability. Of that underlying, storage system. Now. Where you know those of you who are familiar with the cap there might be something but wait a second, so for writes, uh you know when we think about consistency, availability, and partition tolerance, delta, chooses, consistency. So we will if if you cannot, talk to kind of the central, coordinator. You know depending on whether you're an s3 that might be kind of your own service that you're running, on on azure you know they've taken kind of the consistency. Approach we've like we use an atomic operation, there, uh the system will pause, but the nice thing here is because of that kind of optimistic, concurrency, mechanism, that doesn't necessarily, mean, you're. You lose that that whole job that you might have been running for hours, it just means you'll have to wait until you're able to talk to that service. So i would say in terms of reads, very highly available, in terms of rights we choose consistency. But in general, that that actually still works out pretty well. Um, the next thing was you keep all levels of data, well and you know i think i want to kind of clarify, the uh. The um. The idea behind that broad silver gold. Not everybody, keeps the raw data around not everybody keeps all of the data, you might have, retention, requirements, that say, you're only allowed to keep two years of data. Uh so really i think it's kind of up to you, to decide, what data makes sense to hold on to, the only thing i would say is i think that the nice thing about data lakes and kind of how delta applies to them in general, is, you, you are empowered.

To Hold on to the raw data and as much of it as you want, and so it's you know you're not there are no technical limitations, allowing you to keep all the data, and as a result many organizations, that i work with do actually keep, everything, that they are legally allowed to keep for a very long time and you only remove it when they have to get rid of it. Um the next question is what what do you write that logic in are we able to write logic in scala, so delta lake plugs into all of the existing apis, of apache spark, and that means you can kind of use any of those so if you're a scala programmer, you can use scala. If you are a java programmer that works as well we also, also have bindings, in python. And if you know you're kind of an analyst and you don't want to program it all we also support pure sql. So uh you know really kind of our, our idea here is, the underlying, engine is written in scala, and delta is also written in scala, but your logic can be written in whatever language, you're comfortable with this is another case where i think you need the right tool for the right job. So personally, i do a lot of my stuff in scala but when i need to make graphs i switch over to python and use matplotlib. But still delta gives me the ability to kind of filter, through massive amounts of data. Shrink it down to something that will fit into pandas, and then i do some graphing with it. Um so the next question is is presto, part of delta lake or is it all only spark and that's a great question that's actually something that's evolving, uh pretty quickly right now, so um, there's a couple of different answers to this so i'll tell you both where we're at and where we're going. Um so right now uh there is a there's a feature inside of databricks, that we're working on open sourcing. Which allows you to, have, writers, for delta. Write out these things called manifest, files that allow you to query a delta table, in a consistent. Way. From presto, or athena or kind of you know any of these other presto based systems. However, we're working, deeply with uh you know starburst, one of the companies behind presto. To build a native connector for presto. Uh we've also got active interest from the hive community. Um and the scalding, community so there's a bunch of interest in building connectors, so, today the core of delta is built in spark, but i think the really powerful thing about open source and open standards, is that you know that means anybody can integrate with it and you know kind of us at the project we're committed to growing that ecosystem, and working with anybody, so if you're a committer on one of those projects. Please join our mailing list join our slack channel check it out um, and and you know let us know how we can help you build build these additional connectors. Uh next question can we experiment with delta lake in the community, edition, of databricks. Yes you can delta lake is available in community edition. Check it out everything should be there let us know what you think. Uh, the next question is condell's table to be queried with hive, yeah so basically same same answer to presto. Um, we there there's active interest in the community to building the support, it's uh it's not available today but that that's definitely something that we would uh. We'd like to build. Um next question how does delta lake handle slowly, changing dimensions, going from raw to gold. Um. Yeah so well that's that's a good question and there's actually a blog post on databricks.com. If you google. Slowly changing dimensions, delta. It walks you through all of the details, but i think you know really the right answer here is, with the merge operator. Um and plus kind of the the powers are sparked it's actually pretty easy to build all of the different types, of slowly changing dimensions, and that the magic thing that delta is adding. On top of spark that enables, this, is those transactions.

Modifying, A table in place would be incredibly, dangerous without transactions. And delta makes that possible, and you know therefore kind of enables this type of use. Case. Um next one was we uh usually deal with azure, we'd like to know whether delta lake uh has any different behavior, when it's running on azure event hub instead of kafka. Uh and yeah i'm gonna answer this question a little bit more generally, so i think you know i talked about one of the powerful, things about delta, being its integration, with spark, and you know one of the big reasons there is i kind of view spark as the skinny waste of the big data ecosystem. There are spark connectors, for, almost every every big data system in the world. And so if spark can read from it it works with delta lake, and so event hub, in particular. Has both a native connector that plugs into spark data sources and also has a kafka api. That works with, sparks kafka, so, you can very easily read from event hub and kind of do all the stuff i talked about today. Uh you know using event hub instead of kafka. And really, that applies to any system that spark can read from. Uh and and just in general to kind of you know answer azure a little bit more, delta. Fully supported, on azure. Uh including, adls, we just recently, kind of uh improved our support for adls, gen 2.. Um so you know it's, available both for you to download and it's also part of the azure data bricks kind of out of the box. Um, and, yeah so, the next question is what exactly is the scala, api. Um, for, for. The dml, commands like update. And the answer was does it look like the kind of spark sql where you do spark sql and you pass in a string. That does that update, and the answer is we're actually going to support both, so in uh if you actually go to the github repository, i believe this code has already been merged, so you can see the scala, api. If not there's a there's a design dock up that talks about the details, there. On the ticket for adding an update, but the idea here is there will both be a scala function. That's called update that you can kind of use programmatically. Without having to do string interpolation. And there's also, an uh, you know kind of a. Sequel way to do it so you'll you'll be able to kind of create a sql string and pass that in so again this is like you know you use the language, that you are most comfortable with that is already part of your toolkit. Uh and delta should work with that kind of automatically. Uh it, next question was does uh. Does delta lake work with hdfs. Uh yes it, fully works with hdfs. Hdfs, has, all of the primitives that we need so you don't need, any any kind of extra details. You know and what i'm talking about there is hdfs, has support, for an atomic, rename. That fails, if the destination, already exists. So as long as you're running a new enough version of hdfs. Which is it's not even that new, um that should work automatically. And if you check out the um, getting started guide. In the delta docs at delta.io. It has, all the different storage systems that we support, and details, for what you need to do to set that up. Um. Next question. Are update, delete, at single, row or record level. And, um there's kind of two answers to this so. Yes, delta does allow you to do kind of fine-grained. Individual. Row updates. So you don't necessarily. Have to do your updates or your deletes at the partition, level. If you do them at the partition level they're significant, if you do like deletes for example at the partition level, those are significantly, more efficient. Because we can just drop the metadata, we don't actually have to do any any manually. Uh rewriting. But if if they're not the partition level if you're doing a fine-grained. Uh you know single row update or delete. Um what we'll do is we'll actually find, the relevant parquet files. Rewrite, them, commit the ads and deletes to make that operation. Happen, uh you know and then and then that's kind of the transaction, that does it so, it does support it but it does involve rewriting, individual, files, so what i'll say here is, if delta is definitely not designed, to be an oltp, system.

You Should not use it if you have lots of individual, row updates, but we do support that fine granularity. Use case. Do you know exactly, when the scala apis for delta lake will be available, well so there's a couple of answers to that so delta lake reading and writing and streaming and batch, already work in scala, that is available today. If you're talking specifically, about update delete and merge i believe most of that code has already, been. Already put into the repository, so if you download it and build it yourself. Uh. It's there, we are hoping to make the release in july, so hopefully this month uh they'll be the next release that contains. You know those extra scala apis. Um, let's see. Yeah so the next question was about data quality can we have any other field for validation. Purpose apart from time stamp, yes. So the expectations. That we talked about before. Are just general, sql, expressions. So any expectation, that you can encode in sql is allowed, so it could be you know in that example is a very kind of simple comparison, operation, with some specific, date, but it can be anything you want it could even be a you know udf, that is is checking the quality of the data, so really the the important thing here is we just allow you to put those in as properties, of your data flow, rather than its manual, validations, that you need to remember, to do on your own. So, that kind of enforces, that globally, across anybody, that is is using the system. Uh does delta lake support merging from a data frame instead of a temporary, table, um, yeah so, uh that's a, once the scala, and python apis, are available. Then you can you can pass in a data frame. Today, inside of databricks, the only thing that is available is kind of sql dml, and in that case you do need to register it as a temporary table. But uh like i said stay tuned for the end of, the end of the month we'll have a release as our scholar apis. And then you'll be able to pass in a data frame. Yourself. Uh and just i've seen this question a couple times so i'll just answer it one more time, we support both adls, uh you know gen 1 and gen 2, although gen 2 is going to be faster, because we have some extra optimizations. There. The next one is in the checkpointing, example is the spark job computing the delta like checkpoint, internal, or required to be handwritten. That's a great question, so you know when you're using streaming. To, read from, or write to a delta table or both if you're just using it in between two different delta tables, the checkpointing, is handled by structured, streaming so you don't need to do any extra work, to construct, that that checkpoint, that's kind of built into the engine, the way the way structured streaming works in spark.

Is Every source and every sink there's a contract, that allows us to kind of do that checkpointing, automatically. So the source needs to be able to say. I'm processing, the data from here to here, and those notions, of kind of where they are in the stream we call them offsets. Those need to be serializable. We just store those in the checkpoint. We basically use the checkpoint as a right ahead log so we say, batch number 10 is going to be this data. Then we attempt to process batch number 10, then we write it to the sync. And the the guarantee, here is the sync must be item potent, so it must only accept batch number 10. Once, and if we try to write it twice due to a failure. It must reject, that and kind of just you know skip over it, um and by putting all of these kind of uh constraints, together you actually get exactly one's processing, with automatic checkpointing. Without you needing to do any extra. Work. Uh. Great question, uh here, why not use polygot, persistence, and use an rdbms. For storing asset transactions. That is a great question. And we actually tried this in fact one of the early versions, of delta, used by sql. And the problem here, is. My sql, is a single machine. And, so, just, getting, the list of files out for a large table, can actually become the bottleneck. Whereas, when you store this metadata, in a form that spark itself can natively, process. You can leverage spark to do that processing. So you know there's nothing stopping you from implementing, the, the delta, kind of transaction, protocol, on top of a storage system. In fact there's a pretty long conversation. On the github repository. Right now, that's kind of going back and forth about what it would take to build a foundation, db, version of delta. Um and you know that's certainly possible but in our initial scalability, testing we found that spark was the fastest way to do this at least, out of the systems we tested that that's why we decided to do it that way. Um another question does that mean we don't need data frames and can do all transformations. On delta lake instead. And i would say no that well, you know i think, you you can only use, update delete, uh and merge, without using any kind of actual data frame code you can use pure sql. But really i think this is kind of right tool for the right job. Delta lake does, integrate, deeply with spark data frames, and you know personally i find that to be a very powerful, tool for doing transformations.

Um It's kind of like sql plus plus because you have all these relational, concepts. But embedded, in a full programming, language. Um and that you know that actually i think can be a very productive way to write your data pipelines. Um how does delta lake manage newer versions of spark, yeah so delta lake requires, spark, 2.4.3. Which is uh you know pretty a pretty recent release. And that's because there were actually bugs in earlier versions of spark, that prevented, data sources, from correctly, plugging into it, um so uh but you know in general we're working on spark compatibility, that's actually one of our kind of core. Core projects for this quarter, is making sure that everything in delta plugs into nice public stable apis, of spark, so we can work with multiple versions, in the future. Um. Uh one more question. Does delta lake support orc. Yeah you know that's actually a really good question that i get quite a bit, there's there's again there's a discussion, on github about adding the support, so i encourage you to go check that out and vote on that issue if this is something that is important to you, and there's kind of two answers to this you know one is, the delta lake transaction, protocol, the thing that actually goes in the transaction, log, actually does suppo

2020-09-22 17:18

Show Video

Other news