Tagging and Processing Data in Real… Hari Shreedharan (Cloudera) Siddhartha Jain (Salesforce Inc)

hi my name is Sarah Jane and I actually don’t work for Cloudera I work on sale source and this application we building for some time to do a log processing normalizing data at large scale that’s something we’re going to talk about today so so we’ve I vote for the trust team in Salesforce and we collect lots of unstructured logs and we’ve tried different off-the-shelf products and some open source products to do basic you take basically unstructured data structure it and then make it use useful for analyst for you know search and other use cases and we found that you know each time like with commercial products and open source we ran into several issues more and more at scale and so with with this stack that people together wis park in Kafka we tried to resolve or most of these issues and we had a point where that the application looks fairly stable so we want to process and normalize long streams so streaming is the key over here for us it’s basically a massive regex matching engine v we are currently we peak at about 100 k events per second and we want to scale we want an architecture or a design that can scale to a million events per second we want to be able to just add more data streams into the pipeline without necessarily having to retool in terms of software or configurations or hardware we may have to add but that that’s probably the only factor that may we want to be able to add and as with most acts we want horizontal scalability and fault tolerance so basically we don’t want a massive one lytic system in the back in that says you cannot have more than 12 cores or 20 codes or hundred codes we want to be just it be able to infinitely scale horizonte and then within the sack we want a plug and play between different deals and also be able to bring down the tears that do processing up to you know for allow for maintenance or crashes or anything of the sort and still allow the pipeline to either pick up from where it left off or continue to function some broad level goals for in terms of use cases so a basic use case is to be able to search through all the data that we collect and that can be something like elastic search or solar or any technology that allows us to just you know casually search the data a lot of computations so data science use cases or even very simple use cases of aggregating data and then looking at the data and doing doing some kind of alerts and a special use case which is enriching the data so as streams of data come in we want to be able to integrate or look up key values from thread feeds that come into coming from other sources internal and external and then join all of that data together and so this is this gets really interesting because you’re doing really massive joints either a compute time or execute time in terms of user expectations or tolerance due to some internal compliance reasons the maximum is 5 minutes although in the stack we have up now the the lag is about less than 30 seconds for the most part even when need peak at 100 cave-ins per second and I can said before we want to be able to scale to a million events per second another important thing for us is across the entire data set across use cases we want to maintain a consistent data dictionary so frontline analysts when they look at the data they they can then use that schema and talk to the data scientist in the back end and the data scientists develop any models they know exactly what different line analysts can expect so to allow that collaboration between a very tactical frontline

analyst and also in the backend data scientists who are doing much deeper dives ok so I’m harish ribbon I’ve been at Cloudera for about three and a half years i’m still at Cloudera so when Salesforce actually told us they looking at developing this app and they came up with a couple of these solutions we actually tried to investigate and figure out if you know what’s wrong with this we try to play devil’s advocate and figure out what’s wrong with it each of these solutions and i’m also an apache flume pmc member so I all obviously had a bias towards using flume and we then started investigating storm and spark streaming we did not support some supports Tom by itself so you know most of that work was done by Salesforce folks so i won’t claim any credit for that but we actually investigated doing much of this in different different ways the first one was to use flume so flume is actually a pretty pluggable pluggable system you can actually take each piece of flume and replace it with your own so because most of the pieces which we are required we’re already available in flume we actually just picked up like the syslog source with the file channel file Shannon’s a durable right ahead log but its local it doesn’t actually it’s not a distributed right I’d log to a custom interceptor to sync to HDFS now a custom interceptor is basically a piece of code that receives all the data and process it it’s because most of the processing was individual pieces of data we could have actually gone this way the only problem with this was that if a node which is running this flume app failed and you know you lost the disk which is actually writing which is actually holding that data there’s a problem the file channel is actually writing to a single disk unless you spend time and money reading this or putting it on some storage that’s already replicated for you you will lose that data and since data loss was not an acceptable solution at any you know you know I cannot lose data even if I lose a disk in that case this was not really a weibull solution but it was a solution which could have been used if you know we actually ignored that one edge case where the data could be lost or delayed with stomm that issue wasn’t there because you’re not actually storing a whole lot of data anywhere but the problem with storm was that the storm API is really really low level writing a storm bolt and a storm spout and all that is pretty low level and there’s no real API available to do any transformations for you so we ended up having like you know at them again not we they ended up having to write a whole lot of code themselves to do this even the save to HDFS API available in storm was you know pretty barren they had to actually take the flume code and port it over to storm to actually make it work I don’t think they actually did the whole thing and said hey this is not actually going to work the third one was to use spark streaming so I started working with sue design a scene when they were actually deploying the spark streaming implementation in the implementation they had something that pushes data into Kafka so Kafka was the first point where spark streaming would actually pull the data out from spark wouldn’t directly talk to syslog because you know tongue to syslog is a pain in itself so spark streaming would pull the data out from Kafka and then the spark app which we will talk about now actually computes a compute certain aspects of the application of the data tags it and then writes it back to Kafka so writing back to Kafka was one of the pieces of interest and they actually wrote their own piece of code that does this right back to Kafka but it is something that we also from a clowder point of view have heard from multiple customers they have wanted to write data back to Kafka so we actually implemented this and just put it on github for on the cloud or github so if people want to use it there’s a library out there which can be used we have not yet tried it in sales forces specific scenario but we have actually tested out so riding back to Kafka from a spark d stream or our DD works fairly well provided you have enough capacity on your Kafka cluster to and from Kafka there was you know other ways to push it to the storage systems which we are going to talk about elasticsearch HDFS etc that’s a you know so I just want to talk quickly about the solution stacks I’ll handed over to siddhartha to continue thank you I’m so yeah basically between those three I did the entire end-to-end for the flume pipeline and then we figured support scalability was an issue scalability and redundancy was an issue

storm I got as far as reading the entire getting started page and that was way too long because I’m not a developer and by the time I was reading done reading it and then I looked at this park one sparkin space he said download the tarball and then run some demon and then invoke the spark shell and so the repeal featured in spark or Scala helped a lot in just you know trying out code and seeing what works because these were early days I was trying to play with spark streaming back in point nine so yeah that that’s one of the things that factor a good decision so this is just an overview of the entire stack or the core of the of the sack that we’ve built so far and prediction eyes so at the bottom of what we do is we force everything all the data that we collect into syslog and it product provides a very reliable transport of data and is you know it’s something that a lot of devices especially operating systems and network devices just natively speak and if an application or device doesn’t speak we we put some connectors to transform the data into into basically a syslog message and then we aggregate again all this all this unstructured data without do any doing any processing with it into into a tier of a syslog demons that then directly push the data into Kafka and so all this data goes into a topic that means called say unstructured data and then spark streaming the spark streaming app picks up the data from the unstructured topic does a massive regex matching and spits out JSON the JSON that spits out goes to a different topic in Kafka and that’s where all the consumer tears hang off so in this case the consumer tears are basically the elk stack and the entire Hadoop stack this ensures that use the this consistency in the data dictionary and also basically just a data and users and data scientists don’t have to like you know take raw data do their own normalization for each use case and then school of a deer and their own little tiny you know location on HD affairs so my sequel or someplace so it’s a very consistent way of looking at the data and also consuming the data also takes off the load of logs action film for doing any computation these are just purely transport engines and then in here I can plug in play with a lot of technologies I can hang off say a graph processing engine just off kafka the kafka bus I can replace Kafka itself and if some new technology comes along I can just plunk it in David spark gets old I take fancy to storm I can replace a spark app and I can do all these things in parallel while maintaining the existing production pipeline so it’s just a matter of switching forking pipelines this is what the data this is what typical data flow looks like or the life of a single event as as it goes through a pipeline so at the top an unstructured message comes in via syslog we feed it into the kafka topic that again just stores wrong message as is the spark application has a configuration file which has a whole bunch of red x’s and some tagging information that is specific to each regex so we do this massive regex matching and then we spit out the json to not one but multiple topics what we do is each message once it’s process goes into a normalized structured topic in addition to that regex that that smash has its own topic so all the messages that match specific regex will go to that topic and then across the data actually each Key has its own topic so all the values for that key will get posted to to those topics so I think of it more as a streaming table and then because that allows applications that plug into to Kafka to not have to consume from the firehose they can just consume a sip from whatever you know specific keys or specific messages that they’re looking for so okay so I’ve been running the sack for for some time now so some lessons we learned over a few months now so we right now hit about a peak of 100k

events per second aggregates of 3 billion even today end-to-end delayed from the myth of a point we receive a message in syslog to the time it shows up in something like elasticsearch we typically send see a delay of like 20 seconds which is fairly good and it’s a factor of how much hard where you throw out the problem the bottlenecks we’ve seen is as I think TD was talking about this earlier is make sure that the entire all the tears have paralyzed them into it so right now we have only to ask log instances that are writing into Kafka and that tends to be a bottle like and that’s basically because I’m waiting for some more hardware the spark configuration we have a hundred executors 200 gigs of ram across three hundred executors and then the cpu use 400 but those are massively oversubscribed basically also something interesting we ran into until one dot one was the inherent partitioning of data cross executors didn’t quite work as advertised and because so we had to use concurrent jobs in spark swimming too to keep keep up with the data flow or the volume of data that was coming in otherwise we would see massive shelling delays that would just keep climbing up until it crash the application that I’m I’m glad to say it’s been fixed as of one dot three we still have to restart the application every few days but it’s a lot better than earlier when we had to be started in like a few hours so in this park streaming app there were certain choices we needed to make which actually affected the quality of the job in production so when we had this running for a long time we started seeing multiple issues sometimes it was you know data is not getting processed like it just looked like it got stuck I don’t know what happened sometimes you know stuff was getting processed but it was really really slow so we had a batch interval of you know five seconds but each batch would take several seconds more than that to get processed eventually leading to a massive scheduling delay and over you know several hours you would have like a massive number of batches which are pending and data never getting processed in time so all of the 22nd 30-second delays started increasing and it would go to one minute two minutes and eventually several minutes so we had to actually start looking at various way to you know fix this issue so the first thing was conquering jobs vs actually Union and partition so before I go into the details this was actually using the kafka receiver based API not the one which cody actually talked about yesterday which was the receiver less but panel apparently pulling data from all of the kafka topics kafka partitions on different executors so this one was based on the receiver so the data was still being pulled in by a single receiver so we had multiple receivers to actually pull the data in at that point we actually tried concurrent jobs vs Union in the oddities the D streams and then partitioning it so we actually ended up having to repartition the data to actually make sure all of the all of the executors were being used and processing it once we received it because the partitioning in a receiver based in the receiver based Park streaming API is basically based on when your data comes in rather than how many partitions the data is being pulled in from so it’s not a true indicator of parallelism so we had to apply some repartitioning logic to ensure that we wouldn’t end up actually underutilizing the cluster so that was one of the things which we had to do the second one was reading and writing from Kafka from to Kafka right that’s a very tricky thing to do because every time you write data back to Kafka there is you know inherent API in Kafka which hides a lot of detail from you but you don’t realize until you actually use it that you know if you use a producer and keep using the same producer instance that’s sticky to a single partition so you end up sending all of your data to a single partition rather than you know parallel me to different partitions on the in the same topic so things like that actually affected the time delays in processing our data because the data would come in we’d process it and write it back to Kafka within the batch so the longer it took to write to Kafka the higher the scheduling delay was and when the scheduling delay starts increasing it started becoming more and more painful because we had no idea why it was increasing so a large part of our problems were how we did our i/o how we did rights to the external system rather than you know internally inside spark it

was not just the application not the way the application was written but API is in the external system and with fifty to sixty percent probability i can say most people would use park streaming with Kafka so you know this is some these are some lessons we learnt when we actually started using Kafka that if you are using producers and keeping them around for long they’ll end up writing to the same topics all over again and if you’re using Kafka produced Kafka consumers to pull data from Kafka you probably want to do it on multiple machines so you want to use either use the receiver less API so that is already parallel for you or if you’re using the receiver based API make sure that you have many was running and they’re pulling data often panel the other aspect of things was we also needed to make sure we scale the kafka cluster with the amount of data and the amount of processing we did so as the spa cluster size increased kafka started becoming a bottleneck when Kafka wouldn’t receive the data fast enough part of it was because we didn’t have enough partitions part of it was just that you know producers would get sticky the other aspect is you just needed more nodes you needed more Kafka nodes to support they all the partitions we were adding so we had to actually make sure that the kafka cluster also scales together with the amount of data and the amount of parallel processing we are doing on the spot site now what were the issues faced gladly none of them were actually inherent to spark streaming it was either an aspect of bad configuration or you know us not using the ape api’s are the available data smart enough like if we if you use a single receiver we knew there were problems we knew that only after we did it so we had to actually add more receivers so none of these things were actually sparks own fault but these are things that people who actually production eyes Park streaming are more more likely to run into and these are things you would learn from experience rather than like it’s not written in the book unless it is there in those learning squad book which I’m not right but I would eventually read it at some point Patrick so going from pre 1 dot 0 releases to 1 dot 3 was a big change not just ability not just ease of use it was also the aspect of increasing number of configuration variables like spark point 9 there was configuration variables but not a whole lot from one you know one zero one one one or two hundred three then there’s a wall added and then there was the parallel Kafka API added the number of config knobs just increased so much that we had to fine-tune each one of those to figure out how it works so and when we actually played around with the configuration even minor changes in configuration actually you know affect the way the application performs so sometimes it was actually difficult for us to identify if was just the configuration change that you know fix the issue or if we were doing something wrong somewhere else so sometimes configuration can be hard especially when you have so many options available both on the cuff her side and on the spark side and then again on the spark streaming site jaan came to we were running this thing on yon if I think we did not mention it but we were running this on Yan Yan actually caused some headaches for us the first one being blog aggregation yawn yawn does log aggregation which means at the end of the you know where the app is done it will copy all the logs to HDFS the only problem is that it will copy the logs only when the app is actually done not when the app is running for an app like this where it’s really long running it starts filling up disks it starts causing issues on executor nodes because every time we log something that is actually causing the node to become more and more you know more more overloaded with a whole lot of data being written out to a local disk rather than to HDFS so this was one of the issues we faced there is still no real solution for this yarn log aggregation cannot still copy data periodically I think they’re working on it but this is one of the things we are investigating whether we should actually turn on yarn log aggregation to make sure that data gets copied periodically and doesn’t crash the cluster young client versus the on cluster mode again I have no idea why we have a problem here it’s supposed to work both are supposed to be identical but this app works in yarn client mode but not in cluster mode the moment we start in cluster mode we get a bunch of yarn errors and dice so we have not actually spent time investigating it to be fair we are like okay it works in cluster more client mode let’s just run it like we look at classroom or later eventually when we get the time and if we want to actually look at it will probably look at it in figure it out it’s probably a again a small configuration issue somewhere but that is one of the issues we face and we really don’t have a solution to it other solutions these are actually very subjective these are not really you know

these are not issues for everyone it’s issues in some cases the first thing it’s a distributed app really difficult to actually debug an ID anyone here tried to actually debug a spark a pin ID you probably would find it really difficult to do it because when running on a cluster was a local mode there are so many things that you wouldn’t actually be able to figure out why is happening so difficulty to debug is not just a spark issue it’s just a general distributed systems issue the second aspect of things were like how do we debunk and how do we debunk bottlenecks failures like it’s almost always really difficult to find out why things failed or what are the bottlenecks in them you know in the whole flow with so many moving components yarn kafka spark spark streaming it was actually very difficult to find out when things were getting slow and why sometimes it was just you know scheduling delays work just because it is taking a lot a lot of time to process because executors were oversubscribed sometimes it was also because of the aspect where kafka rights were really slow figuring out which one it was it was quite difficult debugging it and the problem is that again you can’t even run ide so we had to do a lot of logging to figure it out at which point yarn log aggregation came in the way so you know one problem followed by another problem made it more and more difficult to identify and figure out what the issues were then we learned a very important lesson I’m fairly sure many people in this room agree with the last point if something gets fixed in a future version of spark like you upgrade from 13 to 14 and suddenly your problem goes away don’t bother trying to figure out why it was not working 13 like a thousand commits go in between wonder 3 and 1 dot for you are probably not going to be able to figure out why that went in I work on spark I work I commit I contribute code to spark I can’t figure out how half of the things got fixed so I just give up like oh it work in one dot for fine it works in one or just upgrade to one dot for right so that’s all just you find that it’s fixed just be happy with it now out of this is open source I mean all of this based on open source so if you have problems you can always go and look at the code the best part of it is that being pluggable you can actually add more pieces of you know open source you at Cassandra yeah go ahead so that’s the whole aspect of things which we liked about spark and which we like what about streaming which we for which we chose this everything was pluggable most of this was integration and plumbing we didn’t actually end up writing like fifty thousand lines of code to get this working it was just getting the plumbing ride talking to Kafka right talking to deploying it right making sure it’s configured right and it’s all running on hardware which you can just order and not like fancy hardware built specifically for this use case it’s just hard where you can order deployed in a data center somewhere so this the whole aspect of getting stuff done was much easier considering that most of this was just taking stuff and fixing it in such a way that they work together okay that’s pretty much what we have if you have any questions apart from you know confidential stuff which Salesforce won’t share with you anything else so it’s a quick comment it’s not open source yet and I cannot give you a very concrete timeline but we are looking to open source all the work including several other enhancements and other consumer apps that go on top of the stack ok I think they don’t have any questions yeah we are done hey thank you thank you very much