Hyperspace: An Indexing Subsystem for Apache Spark

Today I will be covering the background of the pilot project, the vision, some of the concepts that help you understand what’s happening inside this project Why is it something that you have to care about And my colleague here will showcase some real world examples to hopefully convince you by the end of this talk, that Hyperspace is an awesome Spark So before we dive into anything technical, let’s just start with the most foundational question, which is what is an index? I can give you the most obvious answer from the textbook In fact, I just lifted this out of a textbook In databases, an index is a data structure that improves the speed of data retrieval, which in other words, it’s Grady X completion at the cost of additional rights and storage space But if you’re anything like me, then a real world analogy would be tremendously useful in just understanding what really is an index So let’s take another crack at it Right from, since we were kids you might remember pretty much going to the back of your textbook, trying to figure out like where exactly is a particular key phrase appearing in the textbook And at that point in time you might have come across an index from the back of your textbook For instance, if I wanted to find where in the textbook, the phrase nested loop join up, I would quickly go to the section, which has every word starting with the word, the letter N and I would look up nested loop and I will immediately see that it’s appearing between 718 and 722 page in the textbook And that’s an index for you In short one can imagine an index is a shortcut to accelerating some of the quiz So let’s begin with the overview of Hyperspace We have some pretty broad goals in Hyperspace to begin with our first and primary goal here is to be agnostic to data format Our second goal is to offer a path towards low cost index meta data management What do we mean by this? Well, we wanna make sure that all of the index contents, as well as the associated meta data is stored on the link, and this does not assume any other service to operate correctly Our third goal is to enable multi-engine drop Four, we wanna enable extensible indexing infrastructure In other words, what you see here today is more of a beginning than an end And finally, we wanna make sure that we satisfy all of security, privacy, and compliance requirements At the bottom most, as you can see here, there is a data Lake and on the data Lake, there are datasets that you already have, or potentially structured like parquet or even unstructured CSV TSV The biggest assumption as I’ve laid out in my earlier slide is that we assume that the index is also living on the data Lake, both the contents and the meta data Our first area of investment is the indexing infrastructure This is pretty important And you can think of this as the layer that has no intelligence, but it is providing you with raw APIs in order to be able to do everything that you can with Hyperspace To begin with one of the first pieces of work that we’re investigating is index creation and maintenance EPS to name a few things like log management API So how do you ensure that you’re able to provide concurrency across multiple engines on the same index? Once we build all of the index creation and maintenance APIs, next comes the complimentary deal of the indexing infrastructure, which is the query infrastructure, this whole work deals with, how do you ensure that an optimizer is index aware? Today’s optimizer in Spark is not really aware of what an index is for instance, right? So as part of this work, what we’re trying to enable is a transparent query rewrite in the background, which is to say if there is an index and we figured out that the index will potentially provide any kind of acceleration, then be transferred into the rewrite this query one of the most elegant benefits that you end up getting with this is users do not have to change a single line of code other than setting a configuration through Spark And finally, do we want all of this in the manual way? Obviously not Like I think the holy grail of all of the world of indexing is index recommendation, which takes a user squarely workload and provides them with the recommendation of top gain this is that they can build in order to expect some kind of acceleration in their workload Let me take a moment to just show you how simple using Hyperspace is going to be Today, we offer Hyperspace in three languages, one the Scala, second Python and third is .NET Although this slide shows you the APIs in Scala,

the first set of APIs we provide is the usage APIs and as expected Usage APIs you’ll be able to create, delete, restore, vacuum, and rebuilding The second set of APIs that we offer is the Smart If you’ve used Spark, then you’ll probably already know about df.explain or dataframe.explain what it does is it takes your query it produces the logical plan, the optimized plan, the analyze plan, and the physical plan This Hyperspace.explain is built with a similar philosophy What do I mean by that? It can take your query and it’ll give you a very cool diff between how it has figured out what indexes it has used, and it’ll show you what exactly are the index it has used How did the plan end up changing? The two other APIs that we plan on introducing in a couple of weeks is the Water API, which allows you to hypothetically test whether an index will benefit your query or not And the final holy grail of the Smart, which is the recommended API, which will take a workload from you and provide you with a list of top gain with recommendations that you can then go ahead and build in the background One question that might be lingering in your mind at this stage is as I’m using all of these indexes, where exactly are the indexes getting in creative, right? The cool thing about what I mentioned and laid out in the goals and vision of the project is the index live on the Lake Why is this important? It’s very interesting because now if you start looking at your file system like HDFS, or maybe ADFS, or even AWSS3, for instance, right? Imagine you have your data laid out as I’ve shown in the bottom It’s basically just some path to data files Now, the moment you start creating an index, what you’ll observe is the indexing API create index takes a name for the index and once you provide the name of the index, it creates a folder and the folder contains all of the information in a self-describing way to point back into the original dataset that you have created Why is this important? You need to capture pretty much every detail of what exactly you have considered on the original dataset at the time of indexing, because you wanna be able to detect if the dataset changed from the time you created the index to ensure that you’re not providing any incorrect results And I’m sure you must have noticed at this stage, the presence of something called a Hyperspace log, obviously, right? It’s in a different color So it’s probably something of interest Hyperspace log is a progress log or an operational log, which is the heart of our entire design When you initially create an index, it captures the operation create if let’s say the underlying data ended up changing, then perhaps you might want to run a refresh Once you do a refresh And the refresh completes what ends up happening is the index is moved back into an active state and there is a pointer that gets laid out in this case, index directory two and index directory three What you’ll notice is Hey, wait a second Why are we seeing index directory one, two and three that’s because we maintain different snapshots and different versions of the index to enable multi reader and multi writer concurrency And one of the very interesting aspects of having the index on the Lake is it provides several benefits just to name a few, one now index scan scales The second biggest benefit is our index today is laid out in an open format and that open format as parquet And one of the major advantages you get, if you’ve been lingering around in the Spark community, then you already probably know that there’s probably close to a decade worth of investment in making sure arcade as a format is highly optimized and you don’t need to worry about it anymore Whatever optimizations are present for parking, you automatically end up kind of leveraging all of those optimizations as part of the index themselves And finally, as you can tell, we have enabled a server less access protocol What do I mean by this? It’s simple Like Hyperspace does not really depend on any external service So before we move on to my colleague doing a demo, I’d like to introduce Azure Synapse Analytics, which is a product that Microsoft has released recently And Hyperspace has built out of box inside Azure Synapse Analytics So now I switch over to my colleague, Terry Kim, who’s going to show you the end-to-end experience of how Hyperspace looks like and how you can use it in production – [Terry] Okay. In this demo, I go over Hyperspace APIs for creating and using indexes I’ll be using Azure Synapse Analytics, which comes with Hyperspace out of the box And we’ll share this notebook after the session Okay, let’s get started First I’m going to create two data frames, one for employees and one for departments And here are the contents of the data frames

Employees data frame has employed ID, employee name and department ID Department data frame has department ID, department name and location And these are the two data frames I’ll be using throughout this demo Now that we have data frames to work with, let me create a Hyperspace object, which has methods for creating and managing it exist Next, I’m going to create few indexes To create an index you need to have a index config object and specify name, index columns, and included columns Index columns are the columns used for join or filters, included columns are otherwise used for selecting project Once you have this index configure object, you can just say Hyperspace that create index and pass the data frame, which you want to build index from along with the conflict object In this example, I’m going to create one index for employee data frame and another one for departments data frame And the reason I chose department ID as an index column is that I’ll be joining employee data frame with departments data frame on this column We also have an API for listing indexes we created To do that you do Hyperspace, that index is that show and this will display all the indexes that are created These are the two indexes we just created And this also has more information about the index such as index column, included columns, index location, and so on The object returned by indexes is a Spark data frame So if you want you can perform any data frame operations on it Now let’s start utilizing the indexes we just created Before doing that, I’m going to disable broadcast join because the indexes we created are applicable to so much on This is the join query we will be executing I’m joining employees data frame with departments data frame on department ID and selecting employee name and department name And I’ll be showing physical plan and then the results of the join And this is the output So this is the physical plan of joining two data frames So you would see sort merge join and sort dept stage And this is the result of the join Now finally, let’s enable Hyperspace, how do we do it? We just say Spark that enable Hyperspace And what it does is it’s going to inject the Hyperspace optimizer rules and start utilizing the indexes available And we will be executing exactly the same join query And this is the output However, I think it’s a little hard to see what really changes after enabling the Hyperspace And that’s where the explain API comes into play So you do Hyperspace that explained and pass the data frame, and it will give you an overview of what changes if Hyperspace is enabled And this is the output So this is the output from the join query that we just excluded And this is the physical plan with Hyperspace on, and this is the fiscal plan with Hyperspace off And the highlighted portion is the difference And you see that sort and exchange have been removed from the physical plan with Hyperspace on And this also lists the indexes that are used, and these are the two indexes we created in this demo And finally, it also shows you the physical operator stats For example, what is saying is with Hyperspace disabled, their work to show for exchange physical notes, but with Hyperspace enabled the number of this note goes to zero and is the same for the sort physical operator Hyperspace also works with sequel comments, and this is equivalent to the join using the data frame APIs So basically this is joining these two tables on the department ID, and this should give you the same result as expected Okay, let me switch a gear a little bit and talk about some of the APIs for managing indexes versus delete index If you wanna delete your index, you can do Hyperspace.delete index and pass being the name of the index you have to delete This will be a soft delete, meaning that it’s not going to remove the index from the file system If I look at the output here, you’ll see that the state has changed from active to delete And once the index goes to delete, the state is not going to be picked up by Hyperspace And this API is useful if you want to disable some of the indexes temporarily Next is restore index So this API is used if you want to bring back that deleted index, let me show you the output

So if you perform restore index, it would change the state of this index from deleted to active And once the index goes to active state, it will be used by Hyperspace once again, next is vacuum Vacuum is a hard delete operation, meaning that it will physically remove the index from the file system The prerequisite is that you have to first delete the index So this is the sequence So you first need to delete index and your vacuum index And here is the output The index goes from active state to deleted state And when you do vacuum index, it will be removed from the file system And last but not least, we have a refresh index API, as opposed we have new employees who were added to our original data Now that the index and data are out of sync, Hyperspace is smart enough not to use the index because they will give you a wrong result However, you can use the refresh index API to rebuild the index and once the data and index sites in sync Hyperspace will use the index and that’s the end of the demo, and we can switch back to the slide – Thank you so much, Terry Wasn’t that demo exciting? I’m itching to actually try this out In fact, I’ll provide you with pointers in the later half of my presentation It shows how you can try this Hyperspace thing on your boxes today The covering index, the covering index in other words, creates a copy of the original data in a different sort So there’s no black magic Like I mentioned at the beginning of my talk an index provides squarely excavation at the cost of additional rights and storage So let’s take one example just to make sure that we can make the concept here Imagine a table having three columns And you’re trying to execute a query This query is super simple It says select B where A equals red The only way to execute this query would be to do a full scan on top of this table And this full scan would end up taking linear time Obviously I’m not considering optimizations like min-max, but yeah I’ve tried to dumb it down and simplify just to get the concept across Now, you can technically build a covering index on column A and include column B Now, when you re-execute this query that I’ve shown you here select B where A equals red Now you can resort to doing a binary search and this steaks logarithmic time, while the concept is pretty simple, it dates tremendously in doing things like free to acceleration, not convinced that it works Well, let’s go into something more complex now Imagine now you’re joining two different tables, table A and table B on column A and assume that you are running this query, select B, C where you’re joining on the column A from both the tables without an index Spark will decide to shuffle both sides of the table into the same number of partitions And remember that the data is not sorted So as part of step two, Spark ends up going and sorting both the sides before it decides to go ahead and do a merge in step three, with an index, with the right index is build the Hyperspace extinctions that we have built, enable the Spark optimize it to pick the right index And remember the index is already pre shuffled and preselected So Spark can happily move into step two, which is the merge step And I’m pretty sure you must have noticed by now that the shuffle got eliminated And since shuffle is the most expensive step potentially in a query now this query can scale to much larger data sets and potentially run even faster Now, having learned these two particular concepts, let me switch back to my colleague to show you some of the deep dives into real world complex workloads and hopefully convince you that we can really get some nice acceleration – [Terry] Okay now I do a deep dive on two quarries, TCP-DS, which is one of the well-known big data benchmarks And I show you how much Hyperspace can speed up those quarries First, this is Query 93, which is a fairly simple query You are joining two fact tables and a dimension table followed by aggregation Now let’s take a look at how the execution plan looks in the Spark UI This is the plan executed by a Spark And as you can see, this is the join of two fact tables followed by a join with the dimension table, followed by an aggregation For the first join we see that there were about 140 gigabytes of data being shuffled and sorted This is a good opportunity to utilize the Hyperspace covering indexes to eliminate the shuffle Now, the window on your right is the plan executed by Spark with the right set of index is created As you can see the shuffle and sort have been removed, but the rest of the plan remains the same Let’s compare the execution time This benchmark was run with scale factor 1000

and the data is stored in parquet The execution time has significantly gone down from six minutes to 32 seconds giving you about 10X speed up Next, let’s take a look at Query 64 This is a fairly complex query with multiple joints and aggregations across about 15 different tables So let’s take a look at the Spark UI for the executed plan Let me zoom out a bit to show you the overall plan and sure enough, it’s a complex one If I scroll down a bit, I see shuffle and sort of about 110 gigabytes of data 50 Gigabytes of shuffle, 110 gigabytes of shuffle, 50 gigabytes of shuffle and so on after creating and applying the indexes, this is the new plan on your right I’ll make it a full screen The big shuffle and sort have been removed here, here, here, and here This is an interesting scenario where we create an index If you cannot create a cover index on one side of the join, because it involves aggregations joins and so on, you can just have the covering index on one side if applicable, and it can also eliminate the shuffle for that side Finally, let’s compare the execution time It went down from 10 minutes to 3.4 minutes to give you about 3X speed up So we just took a brief look at the speed-ups in two of the TPCs quarries Now, while we talk about the improvements we observed across all the TPCs quarries – Thank you so much, Terry, as you’ve seen Terry has just demonstrated Hyperspace acceleration on a couple of non-trivial queries I’m pretty sure the moment he actually showed you this whole plan zoomed out at least I zoomed out I zoned out in fact Thanks a lot, Terry And now you’ve made me even more curious to try this out Now, as he has shown you deep dive into two queries, let me talk about Hyperspace performance in general The compute configuration that we’ve used to test this out is on the left for anybody who’s interested in recruiting some of our benchmarks At the top is a graph that shows a workload evaluation derived from PCT benchmark or in short PCT hedge The scale factor that we’ve used is thousand and the evaluation was performed using a participant 2.4 We will end up getting support for Apache 3.0 Spark 3.0 subsequently And remember one very interesting thing All of this base tables of TPC-H are laid out in parquet And as you already know, parquet is a highly efficient columnar format And all of the explanation that I’m about to show you in this workload as well as the next one is on top of partly data The graph here, the X-axis shows the TPC-H query number, TPC-H has 22 queries and that’s what you see here And the Y-axis shows the duration that it took to execute that query and the dotted line that you see here indicates the game that we achieved by using the indexes in this case I want you to observe this and just digest how much gain they’re able to get, right? Obviously you can see a spectrum of gains You see gains of as low as like about 1.1X, but you can also see gains as high as 9X Now let’s look at the second workload derived from the same TPC benchmark DS, or in short TPC-DS for brevity we just show the top 20 queries, TPC-DS workload consists of a total of 103 queries And what you can notice here is a similar pattern Again, there are no regressions And in fact, up to 11 regressions to be had The overall benchmark is really key because it conveys one other important aspect, which is we don’t regress in any of the queries, right? So if you look at both TPC-H and TPC-DS one thing that will immediately stand out is we’re getting a pretty high gain of 2X and 1.8X respectively I’m also pretty thrilled to announce that we are open-sourcing Hyperspace as of today I agree it’s version 0.1 it’s a start to recap Hyperspace as an extensive indexing subsystem for Apache Spark It’s a simple add on, we have not changed anything in Spark core

and rest assured you can feel thrilled that it’s the same technology that powers the indexing engine inside Azure Synapse Analytics And it works out of the box with a purchase part 2.4 and the support for this Apache Spark 3.0 is going to come in in the next few weeks And we offer Hyperspace in three languages, no compromises You can choose whatever language you feel comfortable with Give us feedback What would you like to see in the upcoming versions And we’ll build it for you And today you can access you can download, you can build, you can also use Hyperspace from one of these two URLs They both take you to the same place Hyperspace is currently on GitHub You can feel free to explore the code base, give us suggestions in terms of what you’d like to see Having said that I would also like to take a brief moment, thanking everybody who made us reach this point in time In this slide, I’m going to discuss a couple of areas that we are planning on investing and probably also areas of opportunities where we could collaborate The first area is meta data management and lifecycle Do you like what you’re seeing? Did we capture enough meta data for instance? The second area of investment is indexing enhancements Well, what we demonstrated today is indexing or immutable data sets or static data How would we do the same thing for incremental indexing? What are the things that will change? What is the performance implications? How would you add support for things updatable instructors like dental Lake or Uber code, for instance, right? The third area of investment is optimizing enhancements Like I mentioned, this is a very humble beginning in this space we are by no means experts at optimizers And this is where we could definitely use your help in making sure that we’re taking the right decisions while optimizing queries And one of the area of active investment that we’re exploring is quality explain ability Why did you use a particular index? Why did you not use a particular index? The fourth area of investment is more index types, more on this in the next slide The first one is documentation and tutorials Well, indexing is not a silver bullet And in all honesty, we would like to tell you that indexes are always going to work, but I can’t really tell you that you should take care while using the indices, making sure that they really accelerate your workload There are lots and lots of questions you need to ask yourself before you go ahead and decide to build an index This is what we want to be able to capture as part of our best practices, gotchas, and probably what we want to be able to run more reproducible experiments for our community And finally, the holy grail when my awesome colleague Terry was showing you some of these cool demos, and I was showing you some of the performance evaluation across all of these complex queries I’m sure one thought must have struck across your mind, which is wait a second how did you decide what indexes to even build in the first place, right? That was done through a prototype that we have implemented over the last few months in the form of an index recommendation engine It’s quite simple today What it does is it takes your workload and recommends the top key indexes that you can go ahead and use It’s not perfect, but we plan on open sourcing that over the next few weeks Let me answer the final question which is what type of Hyperspace is gonna build together right, today in our talk, we covered one type of a Hyperspace, which is the covering index, but in Hyperspace, it is very important to remember that the index term is used extremely broadly to refer to a more generic class called a derived dataset In short, a derive dataset as the name states was designed from the original data set, but it aids in providing some hints to optimize it, to accelerate the quick, that’s the simple definition So what are some other types of Hyperspace that can be built together? We only have coding indexes, but we plan on going and implementing more types like the chunk elimination index, which is something you can use to do pointed look-ups or the materialized views, which has an utterly complex area But it’s also very, very useful, especially when you have canned queries that you’re running and you have multiple users who are doing this, and finally also provide potentially statistics and maintain them so that a cost-based optimizer can utilize them to provide you with higher query acceleration With that, I’d like to conclude my talk by mentioning a couple of key things Number one, today, I’m super thrilled to announce the open-sourcing of Hyperspace, a tech that we have built as part of Azure Synapse Analytics it’s LB version 0.1 It’s obviously not ready for you to use in production It’s just a humble attempt at giving back something to the community And point number two, the tech was buried without really modern thing, a single line of Spark core, which gives you some tremendous powers in terms of putting it on your own Spark clusters, without having to worry about deploying Spark itself And because the tech is also deployed as part of

Azure Synapse Analytics rest assured you can feel safe that there’s a tremendous amount of product investment that’s going on in it And point number three, we’ve demonstrated a very preliminary acceleration on some of the key workloads And of course, I’d like to emphasize this as more of a beginning we might be making some mistakes There’s definitely scope for improvement It’s not perfect, but that’s where we really need your guidance in making sure that we will Hyperspace into something that our users would really love and use in their day-to-day workload And last but not the least, I do have the URL right here on the right side bottom If it’s the whole Hyperspace project today is on GitHub, you can try it out you can let us know if you run into any issues, please feel free to open an issue on the GitHub thing So I on behalf of my entire team, look forward to seeing you folks on our GitHub page and look forward to collaborating with all of you Thank you so much for your talk Thank you for tuning in and have a great rest of the day