Dataflow: A Unified Model for Batch and Streaming Data Processing

okay perfect so the last talk for the day by Francis ferry in Google we have talked about streaming today we have talked about batch analytics and now let’s hear about how we combine the two so here it is to fit Francis Perry thank you so I’m Francis Perry I’m a tech lead on the data flow team at Google and today we’re going to build a dataflow big data pipeline to process mobile gaming logs now the data flow model is a unified model so it can express both batch and streaming computations and the same model so here’s the running example that we’re going to be using today you’ve got a really new awesome addictive mobile game where users do some sort of stupid repetitive tasks over and over again to earn points right and they’re doing this all over the world and send in the score information home back to our system now the users are organized into teams they’re earning points as a team some teams are distract are geographically distributed like this green giraffe team other teams are more geographically clumped so the grit the purple monkeys and the red pandas both sort of have key locations all these users are earning scores and they’re sending those events back and our goal today is to write some interesting computations about that game usage so for example we might want to calculate the tops or the score per team at any given point now these computations that we’re going to write are both traditional batch use cases and also more real-time streaming use cases we want to be able to use our transformations in both modes because our game might have an offline mode where users are playing without an active data connection we’re going to use custom timestamps toggles for handling late data to be able to talk about the events when they occurred not when they actually entered the system so before we actually get to the code that gives us a stem oh we have to talk a little bit about the various shapes that our data might take well use this shape and see how these different types of data have motivated the evolution of big data processing at Google then with that background we’ll deep dive into the specifics on the data flow model then we’ll finally have enough information to deep dive into that demo so data shapes here’s a big collection of events right these are each event is a user the user’s team and the score they just the number of points they just earned okay now this collection is pretty small but hopefully our game takes off and becomes really popular so this collection gets very large it might get so large that we start using a repetitive structure to track it so we have our logs from Tuesday from Wednesday from Thursday but really this type of repetitive structure is just a cheap way to represent a continuous data source right once our game launches and people begin playing it they don’t stop that data just keeps coming in because we’re dealing with a distributed system however the events are unordered as they come in and we have to deal with delays so if at 8 o’clock we’re looking at the data that comes in slightly after 8 o’clock we might see an event from a user that happened at 8 o’clock so this event came into the system almost immediately here’s a second event that was slightly more delayed so perhaps it was a little bit of network congestion at this point there are events however that can be hours and hours delayed now this could be a one-off thing like a fiber cut across the Atlantic or this could be a regular usage mode like a user in airplane mode playing our game on a trans-pacific flight or somebody camping in the woods with no cell reception and playing our game in their tent right so we have to be able to handle this unordered infinite data and when we do that there’s a few tensions that come into play so if we want our results to be complete for example tracking exactly all the users that played our game today then we have to wait for those late events so we have to wait for all those flights to land and all those campers to return to civilization waiting that long introduces way too much latency right we don’t want to actually do that so there’s this inherent tension because we want results as soon as we can and in fact we might even want speculative results as the days progressing we want to know what the sum is so far that day now the more uh the more data we buffer and the more often we process our data the higher the resource costs so cost also comes into play here for now the right trade-offs between these these three knobs depends on the usage case so let’s deep dive into a few in particular so let’s say we had a billing pipeline right we’re calculating how much to charge our users for the ads our advertisers for the ads we’ve shown

on their behalf this month so in this case money is involved right and we want to make sure that we’re getting the complete data so we charge them for every single ad we’ve shown so completeness is important at the same time low latency isn’t this is a monthly billing pipeline to generate credit card bills so it can take a couple hours to run at the end of the month we don’t really care and cost low cost isn’t important because again this is this is what makes our company money so we’ll put in the resources we need to make this work but now the type of algorithm that we use here a traditional batch algorithm would also work really well in a streaming fashion if we wanted to provide our advertisers with a live cost estimate running total as we go through the month of how much they’ve spent now at this point the completeness is less important because this is just an estimate right we’re just trying to make sure they’re not drastically taken by surprise at the end of the month low latency becomes more important because now we want to see the effects of things happening in the system and cost may change a little bit in a different domain we might have an abuse detection pipeline to see when users are abusing our system at this point completeness and having all the data probably isn’t important most abuse detection algorithms are heuristic based anyway so for missing an event or two we’ll probably be okay but low latency is the one that’s really important because we want to be able to block users who are spamming our system very quickly now the same type of algorithm which is traditionally a streaming algorithm might be applied in a batch way if we want to research the right algorithm so here we could take a backlog of historical data and run our abuse detection algorithm over it multiple times and try out multiple different permutations of this algorithm to see how we should tweak it in the future to catch more spammers so now low cost is important because we want to try lots and lots of iterations of this algorithm and low latency is no longer important because we’re just doing some research here and playing around so as you can see these knobs and and how they relate depends very much on the use case and these use cases and knobs have really shaped how we’ve done data processing at Google so let’s start with a brief overview of the papers that we published in data at Google we’re going to deep dive today in particular into MapReduce flume Java and mill-wheel and how those three systems have evolved into this system we call dataflow so good old MapReduce right you have your collection of data at some point you chunk it up into pieces each piece is handed out to a different mapping machine each mapping machine has an element-wise transformation that it applies to all the elements in its chunk so there goes the first machine the second the third and so on each machine can do this independently of all the other machines then the machines take their data and each mapping machine distributes its results among reducing machines so this is the shuffle phase here where data is shuffled around between machines and then finally each reducing machine applies the reduced function to get its final output and those outputs together make up the results of our MapReduce computation so Map Reduce is a really really powerful hammer and so we started using it internally for every possible Big Data nail we could find right if you take any complex algorithm you can cram it into a map shuffle reduce structure if you are willing to distort it completely beyond recognition but you can do it and we’re engineers so we did but that’s also why we develop some Java so flume Java is a higher-level API that builds on top of MapReduce so in flume you use high-level intuitive primitives to focus on describing the shape of your computation and then the system itself does that transformation of that dag of computations into the underlying MapReduce structure so that way you still get the efficiency of a hard hard hand coded tuned MapReduce but you have the ease of use of just dealing with your algorithm not the constraints of the underlying system so let’s look at some of these batch patterns that you could do in MapReduce or flume so we can take our collection of data and we can structure it so perhaps we’re filtering it aggregating it otherwise transforming it we could do this repetitively if we had that repetitive log structure so every night we can run a batch job to transform today’s logs into the output shape that we want we could divide each collection of data up partitioning it or subdividing it for example we could divide it up into time-based windows so we could take tuesday’s logs and discover the top user per hour in those logs now the drawback of that because this is a batch system is that we’re going to wait to the nightly run to figure out the most active user at 1:00 a.m. in the morning even though we had that data long ago

another common pattern is sessions so a session captures a burst of user activity so if a user is playing our game you might see them continually scoring points during a commercial break and then their show resumes and they put the game away and so all of a sudden there’s a real gap in in their activity so sessions can be done in a traditional MapReduce style and here you see it done however you’ll notice that some users like Jose have an artificial break in their session right our logs rolled over at midnight and Jose was interacting with our game from 11:00 p.m. to 1:00 a.m. but because there was that artificial break in the middle we were unable to see that that was a full session so these types of patterns are very powerful MapReduce can do a lot but there’s a few things lacking there’s that low latency that’s missing and there’s that ability to really have a continuous data source without having to insert these artificial breaks so that’s why people started using streaming systems right for continuous real time processing and at Google that looks like a system called mill-wheel so mill-wheel is our framework for building low latency data processing systems again the user gives us a graph of computations they’d like to perform and mill wheel spins up the resources to have nodes that host each computation and handles pushing data through through the system so mill-wheel can also handle many of these same patterns so an element-wise transformation where you’re taking a stream of data and filtering it for example work very naturally in the streaming system as data flows through the system you apply your element-wise transformation aggregations however get a little trickier an aggregation usually does something like gather up all the related keys in the collection but when your data source is infinite then you’d be waiting a really long time to get that first result because you’d never know if you’re going to get more values with that same key so instead you have to but a window your data up into finite size chunks and then that way you can make progress over each chunk now the easiest way to do that is to use the processing time of the system so every 5 minutes I’ll take whatever data I’ve gotten in the last five minutes and aggregate it toss it on down the line and get the next 5 minutes worth of data however as we saw in our gaming example that there’s a difference between arrival time in the system and the actual time the event was generated so more often than not what we want is to use event time-based renewing so here we’re looking at the semantic properties of each element when deciding what time-based window to put it in so for example here we’ve got the red element occurs a arrives in the system just before 12:30 and it goes into that 12:00 to 1:00 o’clock window because it arrived right about when it occurred the green element however shows up in the system about 12:45 but it’s actually from 11:15 there was just a delay in it getting to us and so for that reason we move it back into the 11:00 a.m. window now no matter how we want to divide oh sorry this kind of event based patterns also work for sessions right so here we have the same thing where we’re building up these user sessions based on the actual event time when the user did the action not the time when the element arrived in the system so no matter how we choose to divide things up into finite time based chunks we need to make that difference between event time and processing time concrete so here in the graphs we’re going to show today event time goes along the x-axis and yellow in processing time or the time when the elements arrived in the system is on the y-axis in blue now if the system was perfect and events arrived in our system as soon as they occurred data would follow that dashed line but it doesn’t right there’s these arbitrary delays or skews that separate real-time and caused data to lag slightly in processing time so we refer to and that that skew can change over time right and we refer to that line as the watermark so the watermark is a description of event time progress it basically says at this point I don’t expect any elements older than this to arrive in the system ideally watermarks will be perfect and we know exactly how far through our data we were but that’s hard to do right some things like log files might give us a relatively good heuristic log files tend to be monotonically increasing so it’s unlikely we’re going to see data out of order but something like our distributed system definitely has data out of order so there’s no way we can get a perfect watermark in that system instead we’re going to have to use a heuristic as soon as we use a heuristic we have to understand the trade-offs we make so if our heuristic and our watermark moves too slowly then we’re waiting for elements longer than we need to and our results are going to be delayed if our watermark is too fast then some data is going to actually come in after the watermark is passed so we thought we were done seeing data from the 11 to 12

o’clock window and and we’ve moved on but then in comes a late straggling element and now this data is late we’ve already processed its friends and sent them on down the pipeline so we have to be able to decide what to do with that element so all these trade-offs come back and forth relate to the watermark right so if that late data if we have late data our results won’t be complete but if we’re making sure we wait for all that lay data then we’re not going to get the latency we want so these trade-offs you know depend on your use case and but as we saw earlier a lot of those algorithms that we deal with make sense in both modes right we had the billing pipeline which could the same algorithm could be used for the live cost estimates we had the abuse detection streaming job but we might also want to use it for doing historical backfill so engineers don’t like efficiencies inefficiencies right we’ve definitely we like efficiencies there we go we really don’t want to write systems twice to do the same thing that makes us really mad so that’s why we developed the data flow model all right so data flow model is essentially the evolution of MapReduce and it’s powerful batch processing flume and it’s clean API and mill-wheel with its stream processing so let’s see when you use dataflow there are four main questions you have to keep in mind the first is what are the transformations that you are actually doing to your data are you transforming aggregating it filtering it that kind of pipeline structure the second is how event time for element timestamps should affect processing so if an element has a semantically meaningful timestamp how does that affect where we aggregate it in our system when in processing time means how the arrival time in the system affects things so what’s what should that late data do versus early data and finally how you should update results when you get more and more data for the same window so as soon as we start getting late data we’re adding more things to a result we already computed how should we do that should we accumulate results discard repeated results etc all right so now we’re going to deep dive into each of these questions for a couple of slides so the first thing we’re going to do is figure out what we’re computing what our big data pipeline looks like so like in flume we use will use data flow to build a graph of the transformations that we want to perform on our data so here the nodes in the graph for the transformations themselves and the edges RP collections or parallel collections really big collections of elements we build up an entire pipeline as much as you can and then that pipeline is executed as a single unit in the system and again this lets the system do things like function composition and other transformations on that graph to execute it as efficiently as possible now those peak elections that are on those edges are homogeneous link type collections of data right and they can be either bounded or unbounded in size so we accept both fixed size data sets and infinite streams and P collections every element in a P collection has an implicit timestamp so it could be that event time the timestamp when the event happened in our game right if an element doesn’t have a semantically meaningful timestamp like I’m word counting works of William Shakespeare then we’ll just use the arrival time in the system P collections are initially created from backing data storage so you’ve got a file system or database or something that you want to read from now many of these data sources actually have good interpretations that are both bounded and unbounded so if we take a file system for example we could read a specific file pattern and that would give us a fixed data set that we’re going to process we could also point at a directory in our file system and treat that as an unbounded data source where some some other part of the system is dumped continually dumping log files into that directory and as they arrive we’ll continue to stream them through our pipeline but the most fun way to create a pee collection is by transforming an existing pee collection so there’s an entire library of building blocks that let you build up these data processing transformations they sort of come in three main types the first is an element-wise transformation we call this parallel do as you do the same thing in parallel over and over again this is basically both the map and the reduced function in a MapReduce style algorithm but here every element can be individually transformed into the collection into the elements in the output collection and because they can be processed independently that gives our system a lot of flexibility and how it chooses to paralyze this part of the computation aggregating transformations on the other hand require merging different different elements from the collection and aggregating them together so into into a single output element so this is something like combining or joining or grouping the final step type

of P transform that we’ll talk about is a composite P transform and this is just one that’s made up of a subgraph of other peat transforms so that basically this extensibility lets us build a library of useful transforms by building them out of existing existing primitives so now let’s deep dive into some pseudocode this is pseudocode because Java so verbose I had to tweak some of the names to make it fit on the slide but other than that this is the same code that we’re going to see in the demo so here what we’re doing is we’re starting with a raw collection of events these are going to be that user team score triple it’s flowing in from all those mobile devices and we’re going to take each of those elements and process it independently into a slightly more structured form where we extract the team and the score from that and get a key value pair of team ID and and the points and then finally we’ll take that giant collection of points and sum them up across keys so that will basically give us a team ID and a sum of all the scores that went with that key so as we walk through this code we’re going to use this running picture to help us understand what’s going on so this is a look at one specific team and all the points that came in for that team again we’ve got event in time or the time the score act you know the user actually scored the points across the x-axis and we have processing time or when the event arrived in the system on the y axis so if elements arrived in the system right when they were created they’d follow that DEF line there it goes right through the origin if the axes weren’t off and some elements like three come in pretty quickly so three came in or happened just before 1207 and arrived from the system just after 1207 some elements however are much more delayed so element nine there happened just after 12:01 but we didn’t receive it until closer until after 1208 so this user might have been playing our game somewhere with a spotty internet connection on the subway in an elevator right and so there was just slight delay before this event could be sent to our system so next we’re going to evaluate this in a traditional batch style and I’ll explain what’s happening as things animate so what’s going on here is that we’ve got event time we’ve got all the events at the beginning so we’re processing everything together which is why the rectangle is the entire width of the graph that white line represents processing time and so as the processing time increases you can see we start accumulating elements in intermediate state and that’s that bold number that’s chasing the white line now when we finally have seen all the elements and are done the box turns blue showing that we’ve remitted the result so here we just start summing all those elements and then finally we emit 51 all right so this is great but you’ll notice that we’re waiting we’re treating all this data as a single chunk right we’re requiring that we have all the data at once and then we’re processing it all together more likely since we’re building towards the ability to do streaming we’ll need to window our data so that’s that second thing the data flow model the where right and when doing again is what divides an infinite or any data set into time based element time based chunks so you can window batch data just fine you can take yesterday’s logs and calculate the top user per hour but it’s more common that you’re going to window infinite data because when you aggregate infinite data you are required to have some sort of time-based windowing to make sure that you can actually make progress as data keeps coming in so I’ve got three different windowing algorithms up there then one of the easiest ones to understand is fixed windows so this is every 5 minutes or every hour every day every month sliding windows or something like every hour give me the last 24 hours worth of data so you have these repeated overlapping windows and then session windows again are those bursts of user activity that says each user that the period of time when each user was active ok so now let’s take that pseudocode and we’re going to window our transformation here so now we’re going to take that same integer per key so it’s going to sum the scores across teams and we’re going to go ahead and put it into fixed to minute windows so once we do that if we try to execute our job in batch again you’ll see that now we have four different results that we’re computing so we have a window from 12 to 1202 1202 to 1204 and so on so these 2-minute windows and we’re only summing the elements within those windows now this is a batch engine we’re still we’re still just waiting for all the data executing it all together and emitting all the results at the end so the obvious thing that we start to want here is some better lower latency for that

first window so that’s where the wind comes in this is saying how the arrival time in the system should affect the processing that we’re going to do so we use a notion called triggers to control that so triggers tell us when we should start producing output for a window instead of waiting till we have the entire computation done for all data we might start emitting some window earlier than others one of the most common ways to trigger is relative to the watermark and again the watermark is a heuristic about event time progress it tells us what to expect about our progress through our data source so if we take that code and we’re going to go ahead and add a trigger so that we start emitting each window as the watermark passes this is actually the default so I wouldn’t need to put it here but I did it here just for clarity so at that point we’ve got our same algorithm and the watermark there is drawn in red and so you can see as each window passes the watermark we go ahead and emit the result so we’ve got five for that first window and we admit that well before we met that at 1206 well before we emit the result for our last window now there’s two things here that still aren’t ideal the first ones obvious right value number nine just got left out in the cold at really late data element that came in after we thought we were done seeing elements for that window wasn’t aggregated into our results at all and the other drawback is that we’re waiting till the watermark so there are times when we want speculative data or early results as we’re going and so we can do both of those things and solve both those problems by customizing the trigger a little bit more so now we’re saying that we’d like to trigger each window at the watermark but also do early firings every minute to give a speculative data and also do a late firing every time we see an element after the watermark so now we adjust our graph again and you’ll see that in that first window we go ahead and output the corrected sum of 14 after we see element 9 in the second window we’ve got these partial values of 7 and 14 before our final result at the watermark of 22 now here as there are multiple elements per window we’ve choosen to accumulate them but that’s not always the choice we want to make so our final set of 4 things in the model is dealing figuring out how to deal with repeated results so as we get more and more results for the same window what should we do with them so if we had an early element of 3 we fired as an early firing and then at the watermark we had 2 more elements 5 and 1 and finally a late element of 2 what should the sum be in each of our windows so we there are actually 11 if we sum those the result is actually 11 right and however there’s a few different ways we could actually expose that we could choose to only emit the sum four new elements as we go so the first time we only get three at the watermark we sum the new elements which are 5 and 1 and get 6 but we don’t we riad 3 into the mix another option is to accumulate as we did in the previous example where we’re continuing to give the running total now the drawback of this is that depending on our consumer depending on who is listening to this data source we may you know they may not that consumer may not be smart enough to update the results appropriately you may not be able to distinguish the fact that 3 and 11 are actually part of the same result that’s getting refined over time it may think they’re separate results so the most most fun example here is it to have accumulating and retracting mode so there every time we emit a new element we emit the cumulative total and we also retract the old value back sees as my team likes to call it so you know there we emit 3 the first time and then 9 and hoops nevermind remove that 3 we continue in that pattern so at the end the total observed is just 11 all right so we’re going to take our running example oops there we go and update it to use accumulating and retracting mode and while we’re at it we’re going to go for broke here and change our windowing from fixed windows to session windows so now we’re doing bursts of user activity we’re triggering every minute at the watermark and every time we see late data and we’re accumulating as we go and retracting the old results when we have a new updated result and this is particularly exciting for session windows because if you think about it the session window is looking for a continuous chunk of user activity so if we see this chunk here and then this chunk and a late data element comes in in the middle all of a sudden we might realize that’s actually the same chunk so at this point we’re going to start seeing the width of our windows changing and this just kind of blows your mind right it just goes whoops it’s not looping so you can see the windows here changing size as they accumulate new elements when we were tracked things they turn

red and we would withdraw that result so there you go so that’s that’s sort of the real power of the data flow model right so what you can see is by tuning these four parameters that what we’re computing the the where in event time the when in arrival time and the how results relate over and over accumulating those in different ways we’ve covered a vast number of standard use cases so first we did traditional batch then we did windowed batch then we updated the watermark to emit elements of the watermark and got streaming mode then we toggled our triggers to be slightly more sophisticated and got streaming with speculative and late data and finally we did streaming with retractions all right so that’s the data flow model that’s what we’ve got but a model is no fun unless we actually have a system to back it so Google Cloud dataflow is part of Google’s cloud platform so Google’s cloud platform has all sorts of things in it it’s got raw compute raus storage it’s got higher-level tools for web developers and data analysts so dataflow is just one of the tools in that arsenal and it’s both an SDK for writing programs and also a fully managed service for executing those programs on the cloud platform so the SDKs are open-source you can go check them out and check out the Java SDK on github right now the Python SDK is still in the works but we’ll get it out there as soon as we can now you use these SDKs to actually construct that data processing graph this is says what the computation is that you want to perform once you’ve done that you can run that graph in a few different ways so there’s a runner that lets you run it locally on your on your laptop or wherever you are so that’s great for developing and just playing around with small data writing unit tests which we all do you can run it on Google cloud platform on the fully managed dataflow service or you can use one of the open-source runners to run it on a third-party environment so if you happen to have a spark cluster fling cluster in your back pocket you can go ahead and run your data flow program there if you choose to run it at Google on our fully managed service then we’re going to do all sorts of good stuff for you we’re going to take that graph structure we’re going to optimize it to be as efficient as possible we’re going to manage the lifecycle of all your compute resources for you so we’ll spin up VMs is needed and resize the size of the pool of VM so adding more of shrinking that depending on how your data processing is going we’ll do smart tasks rebalancing to make sure those resources are being used efficiently and we also have a number of tools like a monitoring UI and integrated with integration with distributed cloud logs so you can easily search logs across multiple machines that kind of stuff all right so now let’s actually see this okay so we’re going to start by building a batch computation to process these logs so over here I’m going to start by creating a pipeline so this tells the system I’m going to start building that processing graph to of computations that I want to perform in this case we’re going to read just some static data from a text file and this data looks like user comma team comma points we’ll take that string and we’ll do a transformation to parse each element each string into a slightly more structured value and get out just the score or just the team and the score then we’ll call our score per team transform now this is one that I just created up here it’s a composite transformation it’s a silly composite transformation because it only has one thing inside it but I did it to illustrate the point right this is going to be a transformation that I wrote it could have any amount of complexity inside it here all it’s actually going to do is take that input team and score collection and apply again that some that we’ve been using all along and return that result once we have that result will format it into a string that we can print and then finally we’ll write it out to a text file so at this point we’ve built the pipeline graph but we haven’t actually executed it so then we’ll go ahead and ask the pipeline to run and where this runs again is configured by command line flags so it could be on my laptop it could be in Google’s cloud it could be on SPARC what I’ll show you actually what it looked like when I ran it this morning on the Google cloud platform so here’s our monitoring UI the data flow monitoring UI for this job so you can see here’s the graph structure with those five operations this was a batch job but took two minutes most of that is actually going to be starting up the VMS for a small job like this and it first it parses those events and then it does that score per team transform and this is that composite transform that I wrote so I might want to look into the structure here it’s pretty simple structure inside

it was just that some perky but it turns out some perky is actually just a library function that itself is a composite operation so I can deep dive into that and there’s the group by key and the combine that underlies that so what this is doing is letting you interact with your graph as you wrote it not as the system performs it so often when the system has to take that graph and turn it into a more efficient structure if we let you actually monitor that structure it wouldn’t look anything like what you wrote and you’d be really confused so we’re pretty excited about how this actually lets you continue to interact with your program at the same level as you wrote it let’s see what else we can point out here there’s my data had three parse errors that I added on purpose so I have a running counter here of errors as I go if this job were live and actually took more than two minutes to run we’d be able to watch that counter you know as the job is going I can click on steps and learn about how many elements the estimated size kind of stuff all right oh and here’s a quick snapshot of the output so we’ve got these team names and the total score for that team right we haven’t done any windowing here or anything special so we’ve just taken that fixed bounded collection of data from a file and transformed it into an output file but now let’s go back and let’s start reusing this score for Team transform so we’re going to take the same transform that we wrote and used in a batch pipeline and now we’ll use it in a streaming pipeline so here’s a different pipeline again we’re going ahead and creating a pipeline this time the first thing we’re going to do is read from pub/sub which is google’s messaging system right and at this point we’re going to say we want to use custom timestamps here so as events are injected into pub/sub they’ll be tagged with the time they occur and when we read them out of pub/sub we’ll be able to use that same event time instead of the arrival time then we’ll use that same parse function to parse our elements into structured data and now we have this collection of key value pairs with the team and the score we’re going to do two different things to this collection we’re going to window it in two different ways and write it out to two different places so the first thing we’re going to do is use two fixed windows and so we’re going to take fixed windows that are five minutes long and we’re going to allow later to be late up to an hour so any data that comes in up to an hour after we think that window is closed is still going to be a process by the system and when we process more and more results for the same window we’re going to accumulate so we want to keep a running total of the scores as we go after we set the windowing we can go ahead and use that same transform again in the previous pipeline and then we’re going to write that information out to bigquery which is Google’s analytic service basically like it you can use sequel to query it so the second thing we’re going to do is score events is window it into sessions so now we’re going to look for bursts of user activity separated by a gap of five minutes so here we’re looking across this whole team when you know if these teams are geographically clumped you’ll probably see the red pandas all online at the same time day until they all go to bed and then that team will sort of acquiesce and there’ll be no more events from that team so we’re trying to sort of identify those usage usage patterns in this case we’re not going to deal with late data just to show that we can if we if we want to choose to drop it and again we’re going to use that same score per team all right so at this point let’s go see what this pipeline who live demos are fun alright let’s just refresh this real quick so here’s this live pipeline running so this is running right now so you can see here this is again the same structure that we had before breeding from pub/sub parsing the elements than when doing two different ways and calling that same score per team element this one’s a streaming job it’s been running for two and a half hours and it’s processing roughly 1,800 elements per second from my really interesting fake data that I’m generating somewhere else we can deep dive into some of these steps a little bit for example so the system lag is 9 seconds so that’s telling us there are elements in the system that have been there for nine seconds and not processed so as long as this number staying very low the system is keeping up with the data that’s coming in the other thing is telling us is the watermark is 437 which I don’t have a clock I can see but should be right around now and so this is the point at which we think we’re no longer going to see data older business right so we don’t expect to see any elements later earlier than four 38 at this current point we are going to see some elements they’re slightly earlier because I’m injecting some late data into this pipeline on purpose so let’s go ahead and query over those output tables that we’re writing so here I am in the bigquery UI and if you look you

can see I’ve got two tables that we were writing the fixed data and the sessions data so the first thing I’m going to do is look at the fixed windows table and try to find an element that came in late so we can see how it dropped how it updates so here we see two for the same window the window that started at 4:30 and went to 4:35 and was processed at 437 you can see we have two updates for the orange job orange dog team the first one came in on time and the second one came in late must have been just slightly later because the rounding error doesn’t show it and so you can see here we had 43 points and the update 45 points so two more points came in late somebody scored an extra two points while they were in an elevator we finally got that event and we went ahead and updated our results with that the second table that we look at is that sessions table let’s get a fresh set of results there so these are those bursts of team activity so at 3 4 th there was a session that started with the magenta cat team and they all went offline at 4:30 9 so that session was 51 minutes long and now that team has all gone to bed alright so that’s the demo so as you can see what we did is we took this mobile gaming application that we had with logs and user events streaming in from all sorts of distributed devices and we wrote a reusable P transform to process that data in this case it was a very simple score per team but it could have been a much more complex aggregation or computation that we wanted then we took that same P transform and we ran it in multiple modes both in batch and in streaming when we ran it in streaming we were using event time not processing time so we were able to talk about when the event occurred not when it arrived in the system and we were able to easily toggle those windowing and late data settings to figure out what to deal with all right so that’s what I’ve got for you today if you’d like to learn more about cloud dataflow we just published a paper at vldb last month so you can go and read that wonderful 12 page small font academic paper if you’d rather get started by actually getting your hands on it though you can go and grab the data flow SDK for Java off of github and play with that run it on your local machine run it on the spark cluster you’ve gotten the backyard or if you want you can use a free trial and actually start running things on Google’s cloud platform and try running these things at scale alright so thank you any questions we have 15 minutes for questions so we go ahead and take questions I talk to you fast I thank you for the session very informative yeah does this have any integrated storm currently or just done so it does not we have a batch runner that runs on spark and batch mode but we don’t have any streaming open source runners that I know of yet not on spark streaming I think some folks have looked at it but spark streamings windowing semantics aren’t quite as powerful as ours so we’ve had trouble we won’t be able to translate our whole model into spark streaming high so if we’re writing into systems like bigquery how easy to up go and update it like we actually update it or do you just append the new value so in big queries case I’m so the question was how whether we’re appending to bigquery so in big cores case we are just appending to that it depends on the data sink that you’re using and what its properties are if you had one that could support writing then you can do that so cloud BigTable just came out on Google cloud platform and we can write that right updates to BigTable and overwrite rows what data sinks that they’re sticky supports and you have a sticky support for that right so we support a few we support most of the cloud the Google cloud platform syncs and sources out of the box we don’t support we’re not ever gonna possibly support all the sources and things there are so what we do is have custom api’s that let you teach us about new sources and syncs and the reason you have to do something special is that we want to be able to paralyze these things efficiently so if you wanted to give us a new source for example a new batch source you have to tell us how to split it into chunks which is pretty standard but for our are really powerful auto scaling algorithms you also have to teach us how to estimate progress through those chunks and further split that because our system will be sort of fine-tuning how data is balanced across machines so once you teach us that about your source then then you can use that with data flow thank you yeah for the example that you have where you broke by session obviously the teams the team scores yet were the events that coming in where they actually from different players that all belong with the same team they were yeah okay yep thanks

as gone I was wondering that UI that you showed was also powered by data flow to show progress of data flow so that was the managed I did the full UI for our managed service so whenever you choose to run your data flow pipeline at Google on our service then you’ll be able to see that UI of progress I guess my question is do you power the analytics for a flow by date and flow not yet we haven’t bootstrap to that point so the events which don’t have a timestamp and you apply our timestamp on the server’s itself what do you use has a timestamp ping for those events because you can’t use system clock it will vary by different servers so what do you use for that so there we use the arrival time in the system but usually in that case you’re actually doing your not really win doing usually in that case so if you’re doing word count over Shakespeare you’re using what we call the global window which covers all time and then it doesn’t really matter what the timestamps are because we’re not gonna be using them for any of the event time is then based on the system clock yeah ok yeah so you showed us a program but you also showed us a squarish thing yes how much of really rich semantics is captured by SQL particularly feeding it there or watermark graph so none of that so basically what I did in the computation I didn’t show you where that final transform where I wrote my pea collection up to bigquery is I grabbed the key the value and then some of the metadata and I wrote those explicitly to columns in my bigquery table just to be able to show you those during the demo there’s a question on the other side hi so I had a question on like if you have a batch job and a streaming job so does the user is exposed to like tuning tip topologies and/or the batch of like mappers raid users elitism things like that so on the data flow service know we’re going to auto scale those resources for you and figure out the right targets okay I have one up front here and just thinking when you are doing a window what happens if one of those nose crashes do you do any persistence for that so the persistence model and the retries depends on what runner you’re using so in the batch runner we’re actually doing windowing just based on a composite key so it uses the normal MapReduce fault tolerance mechanisms of retrying an entire bundle in streaming we also retry the bundle of the bundle level so a possible batch of elements get retried but there we’re also using persistent disk and a level DB under the hood to keep the intermediate state so that if the machine crashes we don’t lose all the data in addition to the question does does it provide any kind of liability for the messages coming in like guaranteed processing yes so we do exactly once processing not modulo any side-effects to get introduced when we retry a bundle but yeah okay thank you I have one more so in the morning we learned about like a little bit of pushback methods which are very prominent in stream processing like rate limiting the streams whenever you get an overflow whereas that doesn’t matter in batch processing and so where do you cover that part in your streaming or to you how do you call that so when the data is coming in too fast for the system yes so there the hope is it will Auto scale all right so we’ll throw more VMs at it for you Alysia our service and be able to handle that that particular algorithm is still in progress but should be out very soon okay any more questions cool let’s thank Francis ready for an amazing talk thanks Sanchez thank you thank you everyone for coming to data at scale this was our last talk and we do have office hours right now and Francis and other speakers will be available there sir thank you