This article was published as a part of the Data Science Blogathon
Overview of Apache Calcite
Making your own SQL database or running SQL queries against a NoSQL database seems to be a very daunting task. And if we are talking about a distributed database, then the complexity increases many times over. Fortunately, Apache Calcite, an open-source framework, can help you do this fairly easily.
Long ago, Apache Ignite was a distributed cache. In this large distributed āhash-mapā you could put something on the key, and take something from there. This all worked quickly, but many users would like to do something else with their data in Ignite. For example, to carry out some kind of analytics, and indeed everything that we are used to doing with the help of SQL.
And at one point, the Apache Ignite developers had an idea to tie SQL to Apache Ignite. The first attempt was made with a small, lightweight H2 base written in Java, which was taught to work with Ignite.
It looked like this: each node of the Ignite cluster raised the H2 base inside itself. When an SQL query came to Ignite, it first got into the H2 database, parsed it, optimized it, and started executing it. But not according to the data that was in the H2 database, but according to the data in the Ignite database. That is, Ignite has learned to feed its data to H2.
This is how it worked within a single node. But since Ignite is a distributed system, and there can be many nodes, the next step was to execute SQL in a distributed manner.
Letās consider this challenge using the example of a request: find all employees receiving exactly the average salary. For example, there is a small cluster that consists of three nodes: two data nodes and one client. Each of them has an H2 instance. And each request ā for example, to calculate the average salary for an enterprise ā Apache Ignite will always be split into two parts:
The first will be a map request, which is executed on the nodes where the data is located. It is impossible to pull out the global average from the average on each node, so in this map query, you first need to calculate the number of salaries and the number of employees. The results of the request are sent to the node (in this case, to the client) where the request came from. There, the results of the map query are processed by the reduced query.
The Reduce query reads the result of the entire distributed query. That is, it sums up all salaries and the total number of employees who came from all nodes divides one by the other in order to get the average salary total for the cluster ā what was needed according to the conditions of the problem.
But this scheme had its drawbacks. For example, a query needs to be split into two parts, and not every query can be split so that it can be executed in a distributed fashion in two passes.
At the same time, in Apache Ignite, each of these requests was fought into two more parts (Map and Reduce). That is, it turned out only 4 requests, and Apache Ignite, unfortunately, cannot perform this, because only one Map and only one Reduce are possible.
It would be possible to improve the old engine, but the disadvantage of this approach is that H2, on which SQL in Apache Ignite is based, is, in principle, not intended for distributed execution. So trying to improve it would look like trying to improve a hack, which is not always a good thing.
Writing your own SQL engine from scratch would also be an interesting task. Engineers enjoy solving challenging and challenging tasks. But this one ā very laborious and long, with an unsecured outcome ā could stretch over man-decades of work. And users need results now.
The third option was to fix the SQL in Apache Ignite. Figuratively speaking, take PostgreSQL, pull the optimizer out of it and somehow screw it into the current engine.
Apache Calcite ā a framework for creating SQL databases
But, fortunately, PostgreSQL did not have to be gutted, because a ready-made solution was found ā Apache Calcite, which positions itself as a framework for creating SQL databases. This means: using Apache Calcite, you can take any NoSQL database, add Apache Calcite to it and magically get a SQL database.
Apache Calcite is a fairly mature product. It started in the early 2000s, while Apache has been known to us since 2013. Despite the fact that Calcite is not heard by a wide audience, it is used by a huge number of vendors: both those that make databases and commercial vendors.
The usage pattern for Apache Calcite is pretty simple. When you have a request, you give it to Calcite, which parses and optimizes it. As a result, you get a query plan in the form of a relational expression, which you need to execute. In fact, this plan can be executed anywhere, not only in Apache Ignite:
How to use Apache Calcite
All that remains is to teach Apache Calcite to make the plan you need, and your system to execute this plan. How to do this, now weāll see.
How Apache Calcite Works
Internally, Apache Calcite consists of a huge number of components, but the following three are most commonly used:
Someone puts their own parser or optimizer, but usually, they take all three because in Calcite they are already adjusted to each other, there is no need to finish anything.
Parser
When a SQL query arrives at the parser, it parses it into an abstract syntax tree ā or SqlNode in Apache Calcite terms.
This is similar to how we at school parsed a sentence by composition: subject, predicate, etc. Here, according to the same principle, we select columns and an operation ā SELECT or UPDATE. And also ā from which tables to take data, what to join and with what conditions to convert a query into a relational expression.
Converter
At this stage, the tree is validated for correct composition and converted to a relational expression ā or RelNode in Calcite terms.
This is a rather important transition from declarative SQL, when we say: āGive us data that is sorted this way, from such and such tables and so and so jointedā ā to an imperative relational expression that says: āTo get an answer to your request, first scan the tables with the employees of the department, then join them, filter out unnecessary lines and leave only those columns that we requested. ā
In principle, this is already a work plan that can be implemented. But there are nuances. This plan is not optimal because the filter is at the top. It turns out that we join all tables entirely, and only then filter out unnecessary data. It is optimal to first make a filter, throwing away the extra columns, and then join the remaining lines together.
Itās the same with Project. Instead of dragging all the columns from the very bottom of the tree to the top, eating up memory with unnecessary columns, you can first read-only those columns that we need. And this is where the optimizer comes in to improve the relational expression.
Optimizer and its rules
Optimizers in Apache Calcite are configured using rules. The rule specifies a modification to the relational expression to rewrite the tree to an equivalent one. A rule in Calcite is a regular class in Java and has two parts:
-
Pattern ā what this rule works for;
-
Transformation ā what this rule with the tree does.
Figuratively speaking, the optimizer runs through the tree and looks for familiar patterns of those rules that are configured in it. When it finds a match (for example, Filter over Join), it calls a transformation, and Filter goes under Join.
Apache Calcite has two optimizers. HepPlanner is a rule-based optimizer. Its advantage is that it is fast. He applies the rules while he sees patterns (matches). But you need to strictly select the rules so that they do not cancel each other. If two rules contradict each other (for example, one moves the filter down and the other up), then they will go in cycles. HepPlanner is used for always (or almost always) optimal transformations, such as decorrelation of a request or push-down Filter and Project.
But there is also a more interesting optimizer ā cost-based optimizer VolcanoPlanner. Cost-based means that a certain cost is assigned to each tree. This is a scalar value ā the lower the bone, the more optimal the tree is for us and the easier it will be to execute it. The optimizerās job is to find the tree with the smallest cost.
VolcanoPlanner does not rewrite the tree in a place like a heuristic optimizer. Instead, it stores all modifications to the query tree in a special structure called a MEMO.
The MEMO is as follows. For example, we have some kind of tree (here Filter and Project) and another operator appears (like Join) that is equivalent to Project.
Since Join gives exactly the same lines and the same result as Project, VolcanoPlanner puts both of them in the same Set equivalence class. And on request, it gives everything that is in the same set. Filter, accordingly, also has its own set. And then it doesnāt matter what will be its child node ā we say that it will be the set itself. In theory, any node could qualify as a data provider for Filter.
Since the optimizer knows the cost of each node (for example, for Project it is conditionally 10, for Join ā 30) and remembers the best cost for each set, then in our example, Project wins.
When we apply all the rules, having optimized the tree, VolcanoPlanner will choose a winner from each set and build a new tree from it, which will be the optimal plan:
Note that the new Project is in the same set as Filter. This is because the VolcanoPlanner rules have such an invariant ā they rewrite the tree to an identical construction, that is, to something that gives the same result.
The bottom Filter fell into a different set because it does not give the same as the set on the left with Project. The set on the left gives the data āoff-loadedā, but not filtered, and the set on the right ā vice versa. Therefore, they are in different categories.
In addition, VolcanoPlanner is used for physical planning.
VolcanoPlanner ā physical planning
Physical planning in Apache Calcite is cost-based optimization. Join two tables is a boolean expression. But there are many algorithms for getting it ā for example, HashJoin, MergeJoin, NestedLoopJoin. It takes physical planning to figure out which algorithm to use for a particular Join.
If the system is able to execute HashJoin, then the HashJoinRule rule is added toApache Calcite ā it rewrites the original tree to the tree where the top node is HashJoin. If the system is able to execute MergeJoin, then the rule will rewrite the logical Join for a specific physical implementation with MergeJoin. Since the MergeJoin has a requirement that the inputs of the join must be sorted by the keys by which the Join goes, then at this point it will be necessary to add another sorting of the inputs.
When the physical scheduling is complete, the optimizer evaluates each plan by assigning a value to each node ā one node is more expensive, the other is cheaper. As a result, the tree receives the final cost:
In our example, HashJoin won, despite the fact that MergeJoin itself is cheaper ā itās a simple operation, you donāt need to build any hash tables there. This happened due to expensive sorting at the entrance.
You might think that HashJoin in the optimizer will always beat MergeJoin, but in reality, it is not. In the case where the data is already sorted ā for example, sorted indexes ā MergeJoin wins.
Of course, the optimizer must somehow understand whether to insert the sort or not. To do this, there are so-called traits in Apache Calcite, where you can say that the left input of the join must be sorted by x and the right one by y.
Accordingly, both simply scanned data with subsequent sorting and index scans that do not need sorting will be included in the sets on the left and right. Apache Calcite will calculate the cost again and choose the winner at the end. In our example, index scans can win, and thus we avoid expensive sorting.
So you can get a ready-made plan. To give the result to the user, it is the turn of the runtime that understands and implements the plan.
How to make a runtime
A prepared query in the form of a plan is similar to a certain program in some of its own programming languages. In order to perform it, it can be interpreted. One way to interpret the plan is to turn each node in the tree into an iterator.
For example, the Scan iterator is an entity that can read data from a table and return it further. The Filter iterator can take data from below, from Scan, and apply a predicate to it. If the predicate says true, we pass the line further. If it does not satisfy the predicate, then the next one is searched. The Project iterator simply leaves only the columns that were in the query.
There are other options. You can use code generation, that is, turn a tree into code in some language and start executing it. But code generation can take a very long time (seconds) if you have a complex, flowing query. A combination of approaches can speed it up.
For example, some systems first build a tree from iterators, and in the background try to compile the query using code generation. The request starts to be executed by iterators, interpreted, and then when the finished code-generated code appears, the execution smoothly switches to the code-generated engine, because most often it works faster.
Sometimes additional hardware is used for execution, for example, a GPU or FPGA. A vector processor instruction is very often used because it allows data to be processed very quickly, especially if it is stored in a columnar form.
But what about a distributed system like Apache Ignite? Our ready-made plan from the example can only be executed on one node so far. That is, in the case of a simple request like Filter and Scan, we must send its result to the client application, for example, to the JDBC driver. Which is executed on one client node, and the lower part of the Scan tree at this time is executed somewhere in the cluster, where the data on the nodes is located. That is, parts of the tree must be executed in different places in the system.
For example, in our example, Scan can always be executed on Data nodes, the client application will wait for data on the client node, and Filter can also be executed on the client. To send data, Apache Calcite has an Exchange operator. It does not change or filter the data, it changes the distribution of this data in the cluster:
But this plan is not the best option. We send absolutely all the data that we have in the cluster, and only then do we filter it on the client. It makes more sense to filter on the cluster rather than on the client. Therefore, the cost-based optimizer must take into account the data distribution.
How to tell Apache Calcite about distributed data
This is also done using traits. Sorting is a trait, a physical property of data, that is sorted in a certain way. Data distribution is also a trait. He says the data is distributed in some way across the cluster. The optimizer in Calcite knows how to work with these traits.
Here the top node has a Single trait, that is, it is executed on only one node. The bottom node has the Hash distribution trait, that is, the data is somehow partitioned by the primary key hash. The optimizerās job is to determine which trait the filter should have and where to plug Exchange into.
To put it simply, the optimizer tries to insert Exchange wherever it can. For example, he can put, as here, after the filter, but he can put before the filter:
Now you can go back to the first request, which Apache Ignite initially could not fulfill ā find all employees receiving exactly the average salary. Now it will be able to execute such queries without rewriting the query, and Apache Calcite will produce something like this:
CODE
SingletonExchange // Send the received result to the client Scan (table = [emps]) // Scan emps again on all nodes BroadcastExchange II Send the resulting average to all nodes Project (# 0 = [# 0 / # 1]) // Get the average as a sum / quantity Agg (# 0 = [SUM (# 0) 1, # 1 = [SUM (# 1) 1) // Count global aggregates SingletonExchange // Send local aggregates to client Agg (# 0 = [SUM (salary) 1, # 1 = [COUNT (1) 1) // Count local aggregates Scan (table = [emps]) // Scan emps on all nodes HashJoin (condition = [salary = # 0]) 11 Join emp * with mean
Apache Ignite did it!
Everything that is marked in bold is executed on one node, everything non-bold is executed distributed in the cluster. It turns out, they executed it in a distributed fashion, collected data, counted something, scattered it, counted it again, and collected it again ā the execution turns out to be quite complicated, but with Apache Calcite, it is much easier to do this than with the same H2.
CONCLUSION
The main beauty of Calcite is that it was created just for customization in order to use it in different projects. It is very customizable. You can add your own relational expressions to it by changing the standard filter or join. Add rules when rewriting a relational tree or override costs for your own or built-in operators. For example, if a HashJoin on your system will take a long time to execute because it is harder and harder than a MergeJoin, then you can reduce the cost of a MergeJoin ā and it will be more often chosen as the winner in the plan.
- 11
- Account
- Additional
- ADvantage
- algorithm
- algorithms
- All
- analytics
- Apache
- Application
- article
- audience
- Beauty
- BEST
- build
- challenge
- change
- child
- code
- commercial
- construction
- Costs
- Creating
- Current
- data
- Database
- databases
- developers
- DID
- driver
- Early
- employees
- Engineers
- Enterprise
- etc
- exchange
- execution
- Fashion
- FAST
- Figure
- finds
- First
- Fix
- form
- fpga
- Framework
- Fulfill
- Global
- good
- GPU
- hack
- Hardware
- hash
- here
- How
- How To
- HTTPS
- huge
- idea
- index
- IT
- Java
- Job
- join
- Key
- keys
- language
- Languages
- large
- learned
- Line
- local
- Long
- looked
- map
- Match
- moves
- nodes
- Option
- Options
- order
- Other
- Pattern
- physical
- planning
- Product
- Program
- Programming
- programming languages
- project
- projects
- property
- Reality
- reduce
- Results
- rules
- running
- salaries
- scan
- School
- Science
- sees
- sense
- set
- Simple
- small
- So
- speed
- split
- SQL
- Stage
- start
- started
- stores
- system
- Systems
- talking
- TIE
- time
- top
- Transformation
- Update
- us
- users
- value
- vendors
- wait
- WHO
- win
- within
- Work
- works
- X