so hi everyone welcome to our talk about immigration in spark i'm tomer koren i'm a senior software engineer at microsoft together with me is roy levine which is a senior researcher a researcher in microsoft both of us are working on a product called azure security center which is mainly focused on threat detection in the cloud so today we have several goals that we're going to speak about the first one is uh tell you about immigration that we did recently for our production system uh we migrated our big data platform from legacy technology to spark the second goal will be to present a methodological approach how to accomplish this immigration task and the last goal is to is to explain about the measuring quality how we measure the quality are we optimize the performance and what is the definition of them so a little bit background to get a little bit context about azure security center and thread detection in the cloud so as you may know a lot of organizations had their servers in the on-prem and recently they start migrating all this load into the cloud now you as you probably know in the internet there are bad people that attackers that constantly attacking their resources and trying to do brute force attacks trying to steal data and for that reason we wanted our customers to be safe and we have the product called azure security center which protects the load both in the cloud and both in the on-prem a system now specifically the product one of the key ideas to protect customers to notify them upon security threats so one of the components that we're responsible of is to provide some detection analytics that whenever there is some attacks we will notify the customer in alerts we can notify customer alerts about incoming brute force attacks sql workforce and a lot of other threats now the engine the key engine for the all those analytics is a big data platform and this in this broker in this slide you see the actually the detection pipeline back-end the architecture you see from the left you see that we are processing several data sources there are some data sources at various various sizes the one of them the biggest one of them is around contain around four terabytes of events per hour which is a lot we then process all this information and we created the second level of data we call process data and then we have several detection pipelines so some of our pipelines use the the use supervised models and the component usually is a feature extraction model we extract feature and then we are classifying do some classification and then produce alerts and other pipelines maybe pipelines that are a run anomaly detection in those pipeline the the structure is more complicated we need to calculate state every hour and then store the state and then eventually create a long time series based on previous states and then run down the anomaly detection engine and then send alerts some of the components in this diagram are reusable for example alert publisher is a component that is used both by the anomaly detection and a classification based pipeline okay so now that we know about the general pipeline let's talk about the challenges that we are facing during the immigration between a legacy platform to spark so you may expect an immigration process to be a straightforward task but in reality things are starting to become a little bit complicated so what are the challenges that we are facing so when we migrate the different technologies in our case cosmos which is some flavor of to spark first of all you have to maintain the semantics you need that all the a given input on a given input we will have the same output on both of the systems in addition we need to follow some real-time constraints our system run on the hourly some of their python run in an hourly basis some of them run more time and we need that every pipeline will finish in time we cannot run let pipeline run more than 60 minutes if this is an hourly based pipeline and the third challenge is of course cost we cannot just spend as much as we want so let's focus on the challenges so as for semantics the first thing that we're going when you start to migrate code is to rewrite the udf in our case most of our udf was written in c sharp and we had to translate them to python in addition we have to rewrite all the transformation in our scenario we uh legacy the transformation was written in new sql and we had to convert them to pi spark this of course may lead to a lot of bugs as we discovered now as for an example we have here two a two example of a [Music] transformation one is written in use equal and one is written in pi spark you can see that their two languages are totally different and some activity may require more code than the other now another thing that i want to mention here for example is that the order by as you can see both of the codes use a window function and a window function as an order by expression because dollars are different technologies in some scenarios when the timestamp is there are several records with the same timestamp the selected order in in this case we use the row number each row maybe get a different row number based on the platform so in new sql we'll see a different number and also in price part this could be lead to a problem because in some of our features we used to select server like top 5 or top 10 and then the the rows that burn selected are different this affect our semantics another point that we want to that i want to mention is different data types in spark we are using of course python the data types in our previous system we used a c chart in this table you see two basic operations that we did between two numbers number one and number two in some cases the number was now in the other case it's zero and we tried all the combination you can see that in cosmos while in spark you see a lot of nulls in scenario when the number is null or division by zero in cosmos we get different types esp especially you can see the infinity and minus infinity this change is very important because later on the pipeline we in this in the usually in the case of infinity we replace the number with this infinity symbol with i number and also for the minus infinity when we replace it with low number when we get null we cannot decide what we're going to do and then there is a chance for difference and again because we are changing a platform the machine learning libraries are going also going to change if in the cosmos the other platform we use something that's called klc in pi spark we use something called sk learn so there is a difference in algorithm also now another thing that we add in our scenario was a dif change on the schema in the legacy system we for example we had a scenario when we described the connect and network ip connection between source and destination in this case we also tried to map whenever we knew uh to allocate the host name for the specific ip we added a field called us name and then another field that is called if that mentioned if the host name is belong to the source id or if it belongs to the desktop because we only had one a field for the host name in case both sides for example in this example both the source and destination we had the mapping for the hostname we had to duplicate the row and one row put the true value and the second we had to put a false now inspired we decided to allocate two fields for the host name so you see that for the scenario i just mentioned we have two mapping in one row so in overall you see that instead of three rows in the legacy system we have two rows in spark another thing that is important to mention we also had some field called austin austin the hostname was related to each other and we decided to skip it in spark but later when we did some investigation we realized that in one percent of the time there is a mismatch between the host id and the hostname which also led to semantics problem and then again there are the real time constraint because we have several pipelines that run that are run on an hourly basis we and some of them run of a lot of data we need to make sure that once we implement them the the overall run time will be less than 60 minutes in order to avoid accumulated latency and you may think of that some of the challenge can be mitigated by adding more and more and more server it's maybe true but when you add more and more and more server the cost increased dramatically and then the manager is not very happy so you need to stay on some level on predefined threshold specifically on our system because we regret the existing system we had a predefined threshold that we couldn't just expend more of it now roy is going to introduce the formulation thank you tomer so i'd like to uh formalize the the problem a little bit and formalize our solution so if we go back to this pipeline this detection pipeline that tomer presented earlier we can see that we have many different detections and one way to go about it is to look at each detection as one big monolith and then try to migrate that but that's not really going to work because the detections have a lot of code like we're talking thousands of lines of code complex code there's no real way to just rewrite this entire detection and then compare the results and hope that they will be the same so instead of doing that what we did is that we split the we look at the each detection and and see the the components it's built out of so these components contains like sub parts that the detection requires like feature engineering like classification state management stuff like that we can take each one of these components and then migrate them separately so if we take a cosmos component we want to create an equivalent spark component and then what we need to do is we want to thoroughly test these components so to to make sure that we don't have any problems what we do is that we feed in the cosmos data inputs into the cosmos component to generate the outputs we can then take those same inputs and feed them into the spark feature engineering in this case uh generate the outputs and compare the outputs unfortunately it turns out this is not that simple because as tomer mentioned earlier the schemas don't match so there's going to be a schema mismatch there's no way to just take those inputs and feed them into spark so what we need to do is we need a translator that's going to translate the inputs into the spark component and then produce the outputs and then we can compare the outputs using some kind of a compar comparator function that that we also need to uh write correctly this way we can we can validate each of the components to make sure that they pass our tests and they actually produce the same semantics so how do we manage this all of this together so we have uh many many components each uh from a different detection some components are reused and what about the connections between these components so they're connected in some way you can't just test each component separately and hope everything is going to work because you need to connect everything together and then test everything together when it's connected so that part requires validation and and also it requires uh us to specify how to connect the different components so to manage all that we introduce a framework that we used for this a framework which is called sciflow so i'll explain about this framework uh so we begin by introducing a multi-transformer a multi-transformer basically is going to represent um component a single component this component is going to get uh is going to receive as input a multi-data item the multi-data item is basically just a set of data items each data item can be either a data frame or a model that was pre-trained and is being fed into the multi-transformer then in turn this multi-transformer is going to generate a new multi-data item and this new multi-data item can then be uh tested accordingly to make sure it's the same as what we had in the cosmos component now we can connect the different multi transformers with one another to create basically a dag a directly a cyclic graph of multi-transformers this could represent an entire detection the detection itself can also receive the multi-data items and produce new multi-data items so in fact when we use composition the detection itself is also another form of multi-transformer and the idea of a dag or directly a chicken graph is not new spark uses it as well but spark uses a dag on very primitive types like operations such as select join filter and so on here we're using the concept of a dug over higher level objects or basically objects that that have a sense in a higher level of abstraction things like feature engineering is connected to a model training or model prediction and that way when we look at this dog we can make sense of it and understand what it does in the logical sense so our multi-transformers can also be stateful because some of our components actually are based on historical values like the anomaly detection that tomer mentioned it needs to read data from a previous slice and then compute a new slice and which which it will read in the next iteration in that way it's stateful okay so i presented sciflow and let's look at uh what would happen if we just wanted to do this using many many notebooks just a simple notebook approach and databricks create many notebooks maybe one notebook per detection and then run that so first in terms of deployment how would you deploy that you would need to deploy uh these notebooks in some way into the cluster maybe one cluster or multiple clusters with sciflow all you need to do is just take the dag that you created and deploy that into the cluster it's easier to do you can unit test because cyflow you're developing everything inside an ide you can do unit testing rather in a rather simple way just the way unit testing is done in natural programming you create standalone spark and run your unit tests there is shared utility code which is easier to manage using a repository versus when you need to upload a wheel file and then use it in notebooks uh i think the most important thing that scifill provide is the structure the way the the the different components are connected with one another this in sciflow is very simple you could even see it using the ui that spark provides whereas with notebooks you need some sort of framework to say how these different notebooks connect with one another or how the flow in one large notebook actually works it's difficult to understand code containing thousands of lines finally in terms of typing so you don't really have any schema checks when you connect different notebooks in some kind of a custom way whereas in scifo every edge is being checked by the cyflow engine to make sure the schema is matching properly so um it turns out that when we want to validate components uh it's not as simple as uh you you could originally think so you have this output of spark and the output of cosmos and you want to compare them so you want to write this comparator which is going to compare them but then there are a few challenges so recall the few challenges that tomer presented so for example some of the ml models or legacy models which we don't have available for us in pi spark this is c-sharp code we can't run that inside pi spark so the models are going to generates slightly different results it doesn't mean that there is an error it just means that the the results are not entirely um the same furthermore some of the semantics are are not completely deterministic like for example selecting top k where some values could be equal as mentioned um some schema translations could result in enact in results not being always the same and some uh randomly generated you your ids also cause a problem when you just want to syntactically compare results so how do we deal with all this and how much should we really spend in this comparator logic to make sure that the results are really really the parity is full and accurate so we try to achieve high parity but if the priority is not perfect we could go to the final output results and see so we have the alerts generated by the legacy component by cosmos and the alerts generated by spark we can look at the resources that we are generating the alerts on and compare them to see if they are going to be the same resources then it means that despite our results not being syntactically the same in terms of what of the alerts that we produced were good we're fine we could uh mark that uh as done and move forward when the results are not perfect we can use these measures like jacquard similarity precision and recall and then decide based on these values so for example for some detections that are known to be a little bit more more noisy we can focus more on precision than on recall so for example we could set a higher threshold to make sure we have high precision um maybe at the expense of some recall if the values are low or just too low no matter what threshold is then we need to maybe continue to hunt bugs that may still exist in the company thanks ray so now we talked about the metrics let's talk about tuning performance so we checked then we verified that our two data streams are equal and the correctness threshold was achieved but what about the running time so in this case what we in this steps what we're going to do is just measure the running time on an actual data this is the next step so we just insert the real data then real new data into the pipeline and then start to measure their running time now at beginning there were some pipelines when we first time we run them and suddenly we saw that it takes too much time and some of the times i had to go to sleep and then woke up in the morning just to realize that the pipeline the overall time was taking 14 hours which is way above our expected ratio so in this case what i had to do is start to debugging the problem now since it's very hard by looking at the actual physical graph to understand where exactly the component that calls for this eye latency and there is not a tool that i am known of for providing profiling over barcode we had to debug each component separately so for example let's assume that we want to start investigating the feature engineering part so what we are going to do here we are trying to find the culprit so what we will start doing is first divide the feature and engineering part into sub transformation we will select several transformation transformations and group them together into logical group and then what we're going to do is output the result materialize the result into this now let's assume that the the first check of the first part was taking us 20 minutes which is unacceptable for us base our expectation and then we continue on the second part with the second group of transformation in this case we suddenly saw that we have 13 hours so in this case we made it to find the problematic area now that we found a problem we need to start solving it and start working on some optimization of this component in order to reduce the running time so what's worked for us best for tuning parameters with several things first of all for example we in the anomaly detection we add a pipeline that aggregate a lot of hourly slices and then read them all in a resolution of one day or one month let's assume that we have a lot of hourly slices and we read them at 30 days of hourly slices there will be a lot a lot of files because each hour is a lot of partition and the number of total partition number fives to read is very big so what we decided to do we created a daily process that aggregate the hourly slices into one daily stream so we added daily aggregator and then we produce once a day we produce the daily stream and this lead us to improvement in some of our analytics another optimization that we that work best for us is to modify the default partitions of the cluster let's assume that our cluster has 300 cores in total so we decided to define the partition level based on the number of course multiplied by three so in this case for the cluster i mentioned it will be around 1 000 partition those partitions are okay are good for a shuffling partition and also for the general partition another thing that you're going to you'll be used is to use the broadcasting when the udf had the large signature and are and was used uh frequently in summer some of the cases there we had several udfs that are run once at the time after another and we once we broadcast the udf we saw significant improvement now another thing that was very good for us was cash some of the data frames that are being used in more than one time it is very wise to use there to cache them but be aware in the beginning we start to cache a lot of data frame and we didn't free the memory and what happened we got a lot of error for remembering failure and spill error and it was very hard to debug them so eventually what we did we limit the number of cash and in addition we used unpersist we remove the data frame immediately after we use them and we don't need them anymore just to free the memory and avoid the memory failure so this is was the tuning performance and now let's summarize our talk so in this talk we present the general challenges uh that usually we're facing when we come to migrate large-scale a big data pipeline from legacy technology into spark let's remember the challenges the challenges were preserving the semantics real-time constraint and of course costs in addition we introduced cyclo which is a framework work we use to increase their usability and avoid connectivity bugs this allows us to separate the code problem into separate components and achieve a good progress in immigration in addition we talked about different validation strategies how to compare the data and at the end i talked about the overall time reduction by optimization and once you optimize the code you can use less stronger machine and which reduce the overall cost of the system so this is was the lecture i hope you learned a lot and we managed to translate the message and we've really enjoyed and thank you everyone for your time you
2021-01-03