One key element of a distributed database is how data is distributed among the different datastores.
LeanXcale provides 3 ways of partitioning data:
Key range partitioning. This is the most efficient way of partitioning If you know how to balance your data.
Hashing. Hashing is a very simple partitioning scheme and has the advantage of doing very good at balancing data across datastores. The system will calculate a hash value from the key and it will distribute data considering the modulus of the has value. The disadvantage of hashing is that - for scans - the system has to do the scan in all datastores because data can be stored in any of them. Thus, scans require more resources than if you do a key range partitioning.
Bidimensional partitioning: Bidimensional partitioning is an automatic partitioning schema that is normally set up on top of one of the previous partitioning schemas. You need to define a time evolving parameter and a criteria that will cause the system to automatically do partition your data to get the best of resources.
And the first question is: Why should I distribute data at all? The answer is quite simple. If all the data is just in one datastore you will have all the job done by a single component and won’t take any advantage of parallelism. The more components you distribute your data, the better.
However, if the distribution is not even, then again you will be using one component more than other and this will be your bottleneck.
Therefore, we want to have a good data distribution so data is distributed as evenly as possible in terms of data access (which in the end depends on your workload).
Key range partitioning is about partitioning and distributing your data based on ranges of the primary key.
Why is this the most efficient way to distribute data? In the end this depends on the kind of queries, but in general, if you have a condition over the primary key, the condition will only be addressed to the partitions that have the information and the rest of the partitions can be handling other queries. In a multiquery system, if the data is well balanced this will be the most efficient use of the resources in parallel.
To partition by key range you can use the CREATE TABLE … DISTRIBUTE UNIFORM clause or the RECREATE TABLE clause. However, to have the best control you will be creating each partition with the ALTER TABLE ADD PARTITION clause.
Let’s say we have a table DISTRICT that has a two field primary key, and we know it is not distributed evenly so 25% of the keys go in the range on d_w_id [0, 20), 25% in the range [20, 24), 25% in the range [24, 42) and the rest 25 in [42, …).
So we first create the table:
CREATE TABLE district ( d_id integer NOT NULL, d_w_id integer NOT NULL, d_name char(10), d_street_1 char(20), d_street_2 char(20), d_city char(20), d_state char(2), d_zip char(9), d_tax decimal(4,4), d_ytd decimal(12,2), d_next_o_id integer, CONSTRAINT pk_district PRIMARY KEY (d_w_id, d_id) );
Then, we will add all partitions (since d_id is not significant in the description, I am using value 0 for all)
ALTER TABLE DISTRICT ADD PARTITION(d_w_id, d_id) FOR VALUES(20,0) MOVE ALTER TABLE DISTRICT ADD PARTITION(d_w_id, d_id) FOR VALUES(24,0) MOVE ALTER TABLE DISTRICT ADD PARTITION(d_w_id, d_id) FOR VALUES(42,0) MOVE
Note that for having the 4 partitions, we just need to define 3 split points.
Hashing is - very basically - a transformation from an input space to a number. If the distribution from the input space to the number is very uniform then the hash value will balance well the input space and you can use the hash value to distribute your data according that value instead of the input space.
In the current case, the input space can be a set of fields of any type and for LeanXcale’s partitioning you can the use the resulting hash code to partition.
One key point is what fields to use for the hash partitioning. I will try to go through the pros and cons with an example.
Let’s depart from the following table definition:
CREATE TABLE ip_packets ( IP_ORIGIN VARCHAR, IP_DESTINATION VARCHAR, PORT_ORIGIN INT, PORT_DESTINATION INT, INFOTS TIMESTAMP, PACKETID INT, BYTES_TRANSMITTED BIGINT, PRIMARY KEY (IP_ORIGIN, IP_DESTINATION, PORT_ORIGIN, PORT_DESTINATION, INFOTS) );
At first it’s not easy to know if the IP_ORIGIN will be evenly distributed but It’s likely It won’t because you will be dealing with a subnetwork or several subnetworks and It’s very difficult to know in advance. So maybe we need to go for hash partitioning. Now, which fields do we include to calculate the hash value?
As first rule of thumb the less fields the more efficient the queries will be. So let’s start with with IP_ORIGIN. Then, HASHID = HashFunction(IP_ORIGIN). So any query that has IP_ORIGIN informed can have its HASHID accurately calculated and will go to a specific partition to retrieve the data. On the other hand, if IP_ORIGIN is not informed SCANs will be sent in parallel to all datastores to get the data available in the datastore. If really there is no information in some datastore these are resources spent to have no outcome.
Since IP_ORIGIN is the first field in the primary key it should be one of the most commonly informed fields.
However, I may have one server that is generating 50% of the network packets (because It’s the central NAS of the company for example). Then, a Hash that only considers IP_ORIGIN won’t be balanced because there will be much more information in the partition where the IP of the NAS is located.
Then, we can try to balance by using the HASH of both (IP_ORIGIN, IP_DESTINATION). We could use the HASH of all (IP_ORIGIN, IP_DESTINATION, PORT_ORIGIN, PORT_DESTINATION) this will certainly balance the data better, but maybe a 10% unbalance in data size will be outperformed from the fact that most queries will have the IP_ORIGIN, IP_DESTINATION informed but won’t have all fields informed so, definitely we should stick to just IP_ORIGIN, IP_DESTINATION.
So let’s create the table considering those two fields:
CREATE TABLE ip_packets ( IP_ORIGIN VARCHAR, IP_DESTINATION VARCHAR, PORT_ORIGIN INT, PORT_DESTINATION INT, INFOTS TIMESTAMP, PACKETID INT, BYTES_TRANSMITTED BIGINT, PRIMARY KEY (IP_ORIGIN, IP_DESTINATION, PORT_ORIGIN, PORT_DESTINATION, INFOTS), HASH(IP_ORIGIN, IP_DESTINATION) TO DISTRIBUTE );
So, in summary the main advantages of hash partitioning is that allows for a simple partitioning mechanism to take advantage of all the resources of data distribution without having to know almost any information about your data in advance.
If you know your data distribution or you have a sample of the data that is representative, I would go for key range partitioning. However, if this is not clear, Hash partitioning is the way to balance your data.
There is a second situation in which hash partitioning can be the choice. We have mentioned that if the fields in the HASH are not defined in the query, your scans will be sent to all components because there is no way to know in which partitions the data for the query is located. But for some kind of queries this is not a drawback, but an advantage. For queries that traverse a small range you want to save resources to do other operations in parallel. However, If your query selectivity is small (the query is traversing a big amount of data) then the higher the degree if parallelism the better. In this situaltion HASH partitioning is a good choice even if you know your data distribution.
LeanXcale allows for multidimensional partitioning not just bidimensional partitioning, but let’s first focus on two dimensions and in the automation involved in bidimensional partitioning.
Bidimensional partitioning is an automatic partitioning schema that is normally set up on top of one of the previous partitioning schemas. You need to define a time evolving parameter and a criteria that will cause the system to automatically do partition your data to get the best of resources.
When data is inserted into a B-Tree, the tree keeps growing, and the moment the tree cannot fit in memory then IO starts to be predominant in the process, and the insert rate of the table drops considerably. In LSM trees this problem is not that important, but then, B-Trees are much more efficient for scans.
LeanXcale uses a hybrid LSM + B-Tree structure, but the size of the final B-Tree still plays an important role in the performance of the ingestion.
Bidimensional partitioning is about defining a mechanism that allows to split the tree in an automatic way so the most relevant information can always fit in memory. This way, ingestion times are kept at its maximum rate.
The typical use case is when you have a source whose events have a timestamp clock that evolves naturally with time (IoT devices, sensors, GPS information, time series) are a common sources that show that behavior.
You may have events that get into database with some delay, but typically data is arriving with an increasing timestamp information. So if you split your tree according to some rule in the timestamp, you can have tree regions that can always fit in memory keeping a sustained very fast ingestion rate.
This also has additional benefits. In those usecases the queries are usually constrained over a certain amount of time, so you will just visit a certain amount of regions. Even, it’s usually true that the older the region the less commonly visited it is.
The bidimensional partition has, then, the following advantages:
Keeps a sustained fast data ingestion
Allow for efficient primary key queries. Primary key queries will go directly through the first dimension to get the information quite fast
Allow efficient queries based on time range. Time range queries will go through the second dimension limiting the number of regions visited
Bidimensional partitioning can be applied to any field that increments monotonically, It doesn’t have to be a timestamp.
To use bidimensional partitioning, when you create the table you need to define the field for which the database should automatically do the partitioning. For this you have to use the key AUTOSPLIT. You can also define the data retention policy.
AUTOSPLIT AUTO will let LeanXcale database apply its internal policy which is based on the size of the region and memory availability.
Below you can see one example that creates a table that will be automatically split every day and whose partitions will be retained for 6 months:
CREATE TABLE bidiTable ( id BIGINT, ts TIMESTAMP AUTOSPLIT '1d' AUTOREMOVE AFTER '180d', name VARCHAR, PRIMARY KEY (id, ts) );
You can find all the options for bidimensional partitioning in the SQL syntax document.
In real time analytics, a major requirement is to be able to ingest data at high rates while at the same time compute large aggregate queries over the real-time data.
For instance, a common use case in big data is ingesting data at high rates and computing KPIs or other metrics over the ingested data. Examples of this use case are performance monitoring, IoT, eAdvertisement, smart grids, industry 4.0, etc. This kind of workload is troublesome for SQL databases because they are not efficient at ingesting data. Also, the aggregate analytical queries are very expensive because they require to traverse large amounts of data very frequently.
NoSQL Key-Value data stores, are good at ingesting data more efficiently and handle that part of the workload. However, they are not good at analytical queries, if at all supported. As a result, it is very common to find complex architechtures that need to combine different data management technologies to solve this use case.
One differential key feature of LeanXcale is online aggregates. LeanXcale has developed a new technology based on a brand new semantic multi-version concurrency control (patent pending) that enables to compute aggregates in an incremental and real-time manner, using aggregate tables. As data is ingested, it becomes possible to update the relevant aggregate tables, so aggregates are always pre-computed.
This way, the formerly large, expensive analytical aggregate queries become almost costless queries that read one row or more rows of an aggregate table. Data ingestion becomes slightly more expensive but removes the cost of computing the aggregates. Besides, if you combine the advantages of bidimensional partitioning as described above, you can have a really powerful solution.
How does LeanXcale solve the problem of concurrency to have the values updated in an efficient way? With a new technology based on a patent-pending semantic multiversion concurrency control. In this new concurrency control, writes are not blind. The writes actually carry the operation to be performed, e.g., Sum(10) and Sum(20) and not the final value as a standard update would do. Since additions are commutative, they do not conflict as far as one keeps track that they are additions until the corresponding version of the row is persisted.
In order to attain data consistency and in particular the expected data consistency in multiversion concurrency control, called snapshot isolation, one has to be really smart at how to handle multiversioning. This is exactly what the technology developed by LeanXcale does. This way, aggregates are conflictless.
To use online aggregations you just need to create an online aggregation table that is related to a raw table (having a "parent" raw table is not strictly needed but is is the most common usecase). Below you can see an example.
CREATE TABLE trips (trip_id integer NOT NULL, passengers integer, start_ts timestamp, duration integer, CONSTRAINT pk_TRIPS PRIMARY KEY (trip_id)); CREATE ONLINE AGGREGATE agg_trips AS sum(CAST(duration AS BIGINT)) duration, max(start_ts) max_start, min(start_ts) min_start, sum(CAST(passengers AS BIGINT)) passengers, count(*) count_all FROM trips WHERE year(start_ts) = 2021 GROUP BY year(TIMESTAMPADD(second, duration, start_ts)) end_year, month(TIMESTAMPADD(second, duration, start_ts)) end_month, dayofmonth(TIMESTAMPADD(second, duration, start_ts)) end_day, hour(TIMESTAMPADD(second, duration, start_ts)) end_hour;
As you have realized, it is very similar to a group by query. Just aggregate over one parent table and name the group by expressions when they are not column identifiers.
The previous statement will create a relation between the parent table TRIPS and child table AGG_TRIPS so the Query Engine would replace a compatible aggregations query on TRIPS with the equivalent query on AGG_TRIPS. It will also create a derived table AGG_TRIPS when it does not exist. Derived tables are populated automatically according to parent’s table insertions.
Anyhow, you would rather not to populate the child data automatically for some use cases. In these scenarios, you can use the keyword MANUAL to create an aggregate table.
CREATE MANUAL ONLINE AGGREGATE agg_trips AS sum(CAST(duration AS BIGINT)) duration, max(start_ts) max_start, min(start_ts) min_start, sum(CAST(passengers AS BIGINT)) passengers, count(*) count_all FROM TRIPS WHERE year(start_ts) = 2021 GROUP BY year(TIMESTAMPADD(second, duration, start_ts)) end_year, month(TIMESTAMPADD(second, duration, start_ts)) end_month, dayofmonth(TIMESTAMPADD(second, duration, start_ts)) end_day, hour(TIMESTAMPADD(second, duration, start_ts)) end_hour;
You can also create the aggregate table first and then create the relation with the parent table. Just map the column names and its types correctly. Following code is equivalent to previous example.
CREATE DELTA TABLE agg_trips( end_year BIGINT, end_month BIGINT, end_day BIGINT, end_hour BIGINT, duration SUM BIGINT, max_start MAX TIMESTAMP, min_start MIN TIMESTAMP, passengers SUM BIGINT, count_all SUM BIGINT, PRIMARY KEY(end_year, end_month, end_day, end_hour)); CREATE ONLINE AGGREGATE agg_trips AS sum(CAST(duration AS BIGINT)) duration, max(start_ts) max_start, min(start_ts) min_start, sum(CAST(passengers AS BIGINT)) passengers, count(*) count_all FROM TRIPS WHERE year(start_ts) = 2021 GROUP BY year(TIMESTAMPADD(second, duration, start_ts)) end_year, month(TIMESTAMPADD(second, duration, start_ts)) end_month, dayofmonth(TIMESTAMPADD(second, duration, start_ts)) end_day, hour(TIMESTAMPADD(second, duration, start_ts)) end_hour;
To feed data into the tables you would be doing:
UPSERT INTO trips(trip_id, passengers, start_ts, duration) VALUES (0, 3, TIMESTAMP'2021-02-11 08:23:23', 520); -- Following UPSERT is not needed for derived tables UPSERT INTO agg_trips(end_year, end_month, end_day, end_hour, passengers, max_start, min_start, duration, count_all) VALUES (2021, 2, 11, 8, 3, TIMESTAMP'2021-02-11 08:23:23', TIMESTAMP'2021-02-11 08:23:23', 520, 1);
In the case above, the aggregate table will be using as key (end_year, end_month, end_day, end_hour) and it will be aggregating the maximum and minimun of the start_ts in columns max_start and min_start, adding 1 in the column count_all, and the summation for passengers and duration. Be aware to apply the filter condition when you have one and you are not using a derived table.
Doing the UPSERT operations above with the direct Kivi API is really simple and, even much more efficient, so I would recommend using the direct KiVi API in your preferred programming language.
|Be aware that in some languages(C#, Python) you have to use method upsertDelta() instead of upsert()|
OK, but how come is this different from doing the following in a standard system?:
UPSERT INTO trips(trip_id, passengers, start_ts, duration) VALUES (0, 3, TIMESTAMP'2021-02-11 08:23:23', 520); UPDATE agg_trips SET max_start = MAX(max_start, TIMESTAMP'2021-02-11 08:23:23'), min_start = MIN(min_start, TIMESTAMP'2021-02-11 08:23:23'), count_all = count_all + 1, duration = duration + 520, passengers = passengers + 3, WHERE end_year = 2021 AND end_month = 2 AND end_day = 11 AND end_hour = 8
The real innovation is the concurrency control. In the case above, there could be a lot of updates to the row with city = ‘London’ and this will cause a lot of contention. LeanXcale provides the “semantic concurrency control” described above. By managing this kind of operations in that special way, there is no contention. This way the impact on ingestion is very low and It’s worth having all those values pre-computed. You will notice the moment you try it.
Then, the second advantage is that the Query Engine optimizer will know about this structure and when someone runs a query like:
SELECT sum(CAST(duration AS BIGINT)), max(start_ts), min(start_ts), sum(CAST(passengers AS BIGINT)), count(*) FROM trips WHERE year(start_ts) = 2021 GROUP BY year(TIMESTAMPADD(second, duration, start_ts)), month(TIMESTAMPADD(second, duration, start_ts)), dayofmonth(TIMESTAMPADD(second, duration, start_ts)), hour(TIMESTAMPADD(second, duration, start_ts));
The optimizer will automatically transform it to:
SELECT duration, max_start, min_start, passengers, count_all FROM agg_trips;
This second query runs in very little time compared to the first time will need to traverse a lot more rows.
The optimizer will even transform any query that is compatible with the online aggregate as for instance:
SELECT sum(CAST(duration AS BIGINT)), sum(CAST(passengers AS BIGINT)), count(*) FROM trips GROUP BY year(TIMESTAMPADD(second, duration, start_ts)), month(TIMESTAMPADD(second, duration, start_ts)); -- Will be transformed to SELECT SUM(duration), SUM(passengers), SUM(count_all) FROM agg_trips GROUP BY end_year, end_month UNION SELECT sum(CAST(duration AS BIGINT)), sum(CAST(passengers AS BIGINT)), count(*) FROM trips WHERE year(start_ts) <> 2021 GROUP BY year(TIMESTAMPADD(second, duration, start_ts)), month(TIMESTAMPADD(second, duration, start_ts));
OK, then, but I can calculate these aggregates in memory, what’s the advantage then? Think a moment about it: How are you handling persistency? (What happens if your service stops?), How are you handling concurrency?, How many aggregates are you handling?
The advantage is you don’t have to calculate them in memory for an arbitrary number of aggregates. Computing a few aggregates in memory is simple, but in the database you can hold a complex aggregate structure. Besides, LeanXcale database handles concurrency and persistency for you.
Online aggregates are a really powerful mechanism because they allow you to have pre-computed data immediately available to serve your application. Note that even if you need a complex KPI, this may be composed of pre-computed aggregates. That’s the case of the standard deviation statistic for example.
Common applications are:
Have immediate statistical aggregates to provide real-time results for your application
Working with multiresolution data. You may have a system whose source devices send information every second, but most statistics can be calculated much more easy at minute resolution. Aggregates can be used to store aggregated data at minute resolution and at 15 minute resolution will having second raw data. You can also use different retention policies for each resolution.
At its maximum, together with multidimensional partitioning, online aggregates can be seen as pre-computed OLAP cubes. This is a very powerful data to have available with little latency to get it.