Guide to Read Data Efficiently in LeanXcale
1. Introduction
When retrieving data from a database, several considerations must be addressed to ensure efficient data access. This document will adopt an iterative approach, highlighting a specific aspect of data querying in each iteration.
The queries will be conducted within the context of a historical payments table. Historical tables typically grow significantly due to their nature, necessitating a careful approach to querying. Inefficient queries can result in excessive execution times or high memory consumption, potentially leading to failures. As we identify new considerations, we will iteratively evolve the schema of this table to optimize performance and efficiency.
In what follows we assume there is an installation of LeanXcale with 4 or more physical cores (i.e., 8 vCPUs) and 4 kvds.
The initial version of the payments table schema is defined as follows:
CREATE TABLE PAYMENTS1 (
pmt_id bigint,
pmt_ts timestamp,
src_acct varchar,
dst_acct varchar,
amount double,
PRIMARY KEY (pmt_id)
);
This table encapsulates the following information:
-
pmt_id: A unique identifier for each payment transaction.
-
pmt_ts: The timestamp associated with the payment.
-
src_acct: The source account from which the payment is initiated.
-
dst_acct: The destination account to which the payment is directed.
-
amount: The monetary value of the payment.
-
PRIMARY KEY: The unique identifier for the table, designated as pmt_id.
1.1. Missing Primary Key
To retrieve payments from a specific source account for a designated month, the following query will be executed:
SELECT src_acct, dst_acct, amount
FROM payments
WHERE src_acct = ?
AND extract(year from pmt_ts) = ?
AND extract(month from pmt_ts) = ?
The first observation is that the query is performing slowly. To understand the underlying reasons for this inefficiency, it is essential to examine the query execution plan, which reveals the operations the database is performing.
To obtain the query plan, the following command should be executed:
EXPLAIN PLAN for <query text>
The query plan will provide insights into the execution strategy, including any table scans, join methods, or index usage, allowing us to identify potential performance bottlenecks.
EnumerableCalc(expr#0..3=[{inputs}], SRC_ACCT=[$t1], DST_ACCT=[$t2], AMOUNT=[$t3]) KiviPKTableScanRel(table=[[db, APP, PAYMENTS1, filter:AND(=($2, ?0), =(EXTRACT(FLAG(YEAR), $1), ?1), =(EXTRACT(FLAG(MONTH), $1), ?2)), project:[1, 2, 3, 4]]])
The query plan is represented as a tree structure (or sometimes a list). Upon examining the query plan, we observe the following. At the lowest level, we encounter a Scan operator, specifically KiviPKTableScanRel. This indicates that a full table scan is being performed.
We know that it is indeed a full scan because there are no constraints indicated, such as minimum or maximum limits (i.e., no min/max or mini/maxi values specified). This lack of limits indicates that the query is evaluating all records in the table, which results to the observed slowness of the query execution.
How we can solve the problem? Basically, we have to provide an index to access efficiently rows for a particular origin bank account and month, one alternative is to use the primary key to have fast access.
To incorporate the source account as part of the primary key, the table should be created as follows:
CREATE TABLE PAYMENTS2 (
pmt_id bigint, pmt_ts timestamp,
src_acct varchar, dst_acct varchar, amount double,
PRIMARY KEY(src_acct, pmt_id));
Upon reviewing the query plan for the query, we find the following details:
EnumerableCalc(expr#0..3=[{inputs}], SRC_ACCT=[$t1], DST_ACCT=[$t2], AMOUNT=[$t3]) KiviPKTableScanRel(table=[[db, APP, PAYMENTS2, mini:[?0], maxi:[?0], filter:AND(=(EXTRACT(FLAG(YEAR), $1), ?1), =(EXTRACT(FLAG(MONTH), $1), ?2)), project:[1, 2, 3, 4]]])
In this execution plan, we observe that the scan operator now includes limits (mini:[?0], maxi:[?0]), indicating that it is performing a range query rather than a full table scan.
Upon closer examination of the limits, we note that the first parameter (?0) is utilized as both the minimum and maximum limit. The additional 'i' in mini and maxi signifies that these limits are inclusive.
1.2. No Partitioning
After introducing the primary key, we still observe that the query execution time remains suboptimal. This can be attributed to the lack of proper partitioning and distribution of the table across the storage engines (kvds).
With four physical cores and four kvds, it is essential to utilize all available CPU resources for query execution. By default, a newly created table is not partitioned or distributed, meaning that all data is handled by a single storage engine (kvds). Since each kvds is assigned to one physical core, this configuration effectively limits the query to the processing power of a single core, whereas we aim to leverage all four cores for improved performance.
To enhance performance, it is standard practice to partition and distribute data according to the primary key for regular tables. To achieve this, we need to add the PARTITION BY clause to the CREATE TABLE statement, specifying the dimension by which the table will be partitioned, in this case, the primary key.
Furthermore, to distribute the table across four kvds, we must define three split points (4 - 1 = 3), which will create four ranges of primary keys. The split points should be chosen to ensure that the resulting partitions are as evenly distributed as possible. For instance, assuming that the primary key is uniformly distributed, we can select the split points based on the prefixes: '25', '50', and '75'. This approach assumes that all primary keys have the same number of digits (e.g., 10), leading to an even distribution of rows across the partitions.
The resulting CREATE TABLE statement with partitioning is as follows:
CREATE TABLE PAYMENTS3
(pmt_id bigint, pmt_ts timestamp, src_acct varchar,
dst_acct varchar, amount double,
PRIMARY KEY(src_acct, pmt_id))
PARTITION BY key(src_acct) at ('25'), ('50'), ('75');
1.3. Missing Secondary Index
Now let us assume we also need to query by destination account. We would perform the query as follows:
SELECT src_acct, dst_acct, amount
FROM payments
WHERE dst_acct = ?
AND extract(year from pmt_ts) = ? AND extract(month from pmt_ts) = ?
Again we realize the query is slow. To diagnose the issue, we should analyze the query plan:
EnumerableCalc(expr#0..3=[{inputs}], SRC_ACCT=[$t1], DST_ACCT=[$t2], AMOUNT=[$t3]) KiviPKTableScanRel(table=[[db, APP, PAYMENTS3, filter:AND(=($3, ?0), =(EXTRACT(FLAG(YEAR), $1), ?1), =(EXTRACT(FLAG(MONTH), $1), ?2)), project:[1, 2, 3, 4]]])
In this plan, we observe that the scan operator KiviPKTableScanRel is scanning the table using the primary key (PK). However, there are no specified limits (min/max), indicating that the operation is a full table scan. This is inefficient, especially for large datasets.
Furthermore, we notice that the WHERE clause has been translated into a filter condition: AND(=($3, ?0). This comparison checks the third column (destination account) against the first parameter (?0), which is applied to all rows in the table. As a result, this approach leads to unnecessary evaluations across the entire dataset, contributing to the slowness.
To address this performance issue, we need to implement a faster access method. Given that we already have a meaningful primary key, the most effective solution is to introduce a secondary index on the destination account. This will facilitate more efficient queries, allowing the query to directly target relevant records without scanning the entire table.
By adding a secondary index on the dst_acct column, we can significantly improve query performance for conditions involving that column:
CREATE INDEX PAYMENTS4_IDX
ON payments4
(dst_acct);
After creating the index, let’s examine the updated query plan:
EnumerableCalc(expr#0..3=[{inputs}], SRC_ACCT=[$t1], DST_ACCT=[$t2], AMOUNT=[$t3]) KiviIndexTableScanRel(table=[[db, APP, PAYMENTS4, db-APP-PAYMENTS4_IDX, mini:[?0], maxi:[?0], filter:AND(=(EXTRACT(FLAG(YEAR), $1), ?1), =(EXTRACT(FLAG(MONTH), $1), ?2)), project:[1, 2, 3, 4]]])
In this plan, we observe two key improvements. First, the scan operator has changed to KiviIndexTableScanRel, indicating that the query is now utilizing the secondary index rather than performing a primary key scan. This is a significant improvement, since it can traverse just the rows with the destination account of interest. Second, the presence of limits on the scan operator (mini:[?0], maxi:[?0]) indicates that this is now an index range query. The query efficiently uses the destination account value passed as a parameter (?0) to limit the range of scanned records. These enhancements result in a more efficient data retrieval process, allowing the database to quickly access only the relevant rows based on the destination account and the specified filtering criteria. Overall, the addition of the secondary index has substantially improved the query’s performance.
1.4. Improving Aggregation with Online Aggregates
Let’s examine a typical aggregation query for payments that groups data by destination account:
SELECT dst_acct, SUM(amount) AS amount
FROM payments
WHERE extract(year from pmt_ts) = ? AND extract(month from pmt_ts) = ?
GROUP BY dst_acct
Despite its intent, this query executes slowly. To diagnose the performance issues, we should analyze the query plan:
KiviAggregateTableScanRel(table=[[db, APP, PAYMENTS4, filter: AND(=(EXTRACT(FLAG(YEAR), $1), ?0), =(EXTRACT(FLAG(MONTH), $1), ?1)), project:[1, 3, 4], aggregate_group:{1}, aggregates:[SUM($2)]]])
In this plan, we observe that, first, the scan is a full scan, indicated by the absence of limits (min/max), meaning that the entire table is being evaluated. This is inefficient for large datasets. Second, the aggregation is computed as the scan traverses each row, further contributing to the slow performance. To enhance the speed of the query, we can utilize LeanXcale Online Aggregates. These are efficient materialized views of aggregate queries that are efficiently updated in real time (without creating any contention due to concurreency control as in traditional databases), providing significant performance benefits.
The online aggregate can be created as follows:
CREATE ONLINE AGGREGATE PAYMENTS5_OA AS SUM(amount) AS amount FROM payments5
GROUP BY EXTRACT(YEAR FROM pmt_ts) AS y,
EXTRACT(MONTH FROM pmt_ts) AS m,
dst_acct;
After creating the online aggregate, let’s examine the updated query plan:
EnumerableAggregate(group=[{2}], AMOUNT=[SUM($3)]) KiviDerivedTableRangeScanRel(table=[[db, APP, PAYMENTS5_OA]], aggregateQuery=[SUM($AMOUNT) group by $Y, $M, $DST_ACCT], min=[[#?0, #?1]], includeMin=[true], max=[[#?0, #?1]], includeMax=[true])
In this plan, we notice two significant improvements. First, the scan operator has changed to KiviDerivedTableRangeScanRel, indicating that the query is now utilizing a derived table instead of performing a primary key scan. This is more efficient for aggregation queries since only reads the aggregated results instead of the detailed data of the parent table. Second, the presence of limits on the scan operator (min and max) indicates that this is now a derived table range query, effectively using the year and month parameters (?0 and ?1) to restrict the scanned records.
It’s important to note that when we create an online aggregate, a derived table is established that is associated with the parent table containing the online aggregate. This derived table has a primary key comprising the columns of the GROUP BY clause—in this case, year, month, and destination account. This structure allows for efficient querying and retrieval of aggregated data, resulting in significantly improved performance.
1.5. Reusing Online Aggregates for Similar Aggregation Queries
Now, let’s examine another similar query that aggregates data by year instead of by month:
SELECT extract(month from pmt_ts), dst_acct, SUM(amount) AS amount
FROM payments
WHERE extract(year from pmt_ts) = ?
GROUP BY extract(month from pmt_ts), dst_acct
In this query, we are retrieving totals for a specific year, rather than for individual months. By analyzing the query plan, we find:
EnumerableAggregate(group=[{1, 2}], AMOUNT=[SUM($3)]) KiviDerivedTableRangeScanRel(table=[[db, APP, PAYMENTS5_OA]], aggregateQuery=[SUM($AMOUNT) group by $Y, $M, $DST_ACCT], min=[[#?0]], includeMin=[true], max=[[#?0]], includeMax=[true])
From this plan, we observe the following. The query is effectively leveraging the previously created online aggregate. This is possible because the online aggregate maintains a coarser granularity (monthly aggregates) that can be aggregated further to compute yearly totals. Derived Table Scan: The operator is a KiviDerivedTableRangeScanRel, indicating that the query reads from the monthly aggregates stored in the online aggregate to compute the yearly aggregates. This approach is considerably more efficient than recalculating the totals from scratch across the entire dataset. By using the existing monthly aggregates, the query minimizes the amount of data processed, resulting in faster execution times and improved performance overall.
1.6. The Importance of Generating Stats
Let’s consider a top-k aggregation query designed to retrieve the top 10 destination accounts by total amount transferred in a specific month:
SELECT payments.src_acct, payments.dst_acct, payments.amount, payments.pmt_ts FROM
(SELECT dst_acct, SUM(amount) AS amount
FROM payments WHERE extract(year from pmt_ts) = ? AND extract(month from pmt_ts) = ?
GROUP BY dst_acct ORDER BY 2 DESC LIMIT 10) topk
JOIN payments AS payments ON topk.dst_acct = payments.dst_acct
WHERE extract(year from pmt_ts) = ? AND extract(month from pmt_ts) = ?
This query aims to fetch the top 10 destination accounts along with their corresponding payments for a specific month. The query plan reveals that it indeed utilizes the online aggregate, as indicated by the derived table scan. However, despite this optimization, the query executes slowly.
By analyzing the query plan:
EnumerableCalc(expr#0..5=[{inputs}], SRC_ACCT=[$t1], DST_ACCT=[$t2], AMOUNT=[$t3], PMT_TS=[$t0]) EnumerableHashJoin(condition=[=($2, $4)], joinType=[inner]) KiviPKTableScanRel(table=[[db, APP, PAYMENTS4, filter:AND(=(EXTRACT(FLAG(YEAR), $1), ?2), =(EXTRACT(FLAG(MONTH), $1), ?3)), project:[1, 2, 3, 4]]]) EnumerableLimit(fetch=[10]) EnumerableSort(sort0=[$1], dir0=[DESC]) EnumerableAggregate(group=[{2}], AMOUNT=[SUM($3)]) KiviDerivedTableRangeScanRel(table=[[db, APP, PAYMENTS5_OA]], aggregateQuery=[SUM($AMOUNT) group by $Y, $M, $DST_ACCT], min=[[#?0, #?1]], includeMin=[true], max=[[#?0, #?1]], includeMax=[true])
The plan indicates a hash join between the online aggregate (the right side of the join) and the parent table (the left side). The left side of the join is executing a full scan, meaning every row in the parent table is being evaluated against the hash table created from the aggregated results. This results in inefficiency, especially given that only a limited number of rows are ultimately selected.
The optimizer’s choice of this suboptimal plan stems from a lack of fresh statistics about the underlying data. The optimizer relies on these statistics to make informed decisions about the most efficient execution strategy. In this case, because statistics were not run, the optimizer could not accurately assess the data distribution, leading it to select a less efficient plan. To rectify this issue, we need to gather updated statistics:
ANALYZE STATISTICS FOR ALL TABLES
After running the statistics, let’s review the new query plan:
EnumerableCalc(expr#0..5=[{inputs}], SRC_ACCT=[$t3], DST_ACCT=[$t4], AMOUNT=[$t5], PMT_TS=[$t2]) EnumerableCorrelate(correlation=[$cor1002], joinType=[inner], requiredColumns=[{0}]) EnumerableLimit(fetch=[10]) EnumerableSort(sort0=[$1], dir0=[DESC]) EnumerableAggregate(group=[{2}], AMOUNT=[SUM($3)]) KiviDerivedTableRangeScanRel(table=[[db, APP, PAYMENTS5_OA]], aggregateQuery=[SUM($AMOUNT) group by $Y, $M, $DST_ACCT], min=[[#?0, #?1]], includeMin=[true], max=[[#?0, #?1]], includeMax=[true]) KiviIndexTableScanRel(table=[[db, APP, PAYMENTS5, db-APP-PAYMENTS5_IDX, mini:[$cor1002.DST_ACCT], maxi:[$cor1002.DST_ACCT], filter:AND(=(EXTRACT(FLAG(YEAR), $1), ?2), =(EXTRACT(FLAG(MONTH), $1), ?3)), project:[1, 2, 3, 4]]])
The plan now employs a correlate join, which is more efficient as it correlates the derived table directly with the index scan. The left side now utilizes an index range scan for each row in the derived table. The scan operates with the same value for both limits, optimizing the retrieval of relevant rows. This new plan significantly enhances query performance by minimizing unnecessary evaluations and leveraging the available online aggregate effectively.
1.7. Online Aggregates for Different Temporal Columns
Dates can be stored using various SQL data types, including timestamp, date, or even integers. Below are examples demonstrating how to create online aggregates for each case.
For a timestamp column as in the following schema:
CREATE TABLE PAYMENTS1 (pmt_id bigint, pmt_ts timestamp, src_acct varchar, dst_acct varchar, amount double,
PRIMARY KEY(pmt_id, pmt_ts))
PARTITION BY DIMENSION pmt_ts EVERY INTERVAL '1' day
PARTITION BY HASH(pmt_id)
DISTRIBUTE BY HASH;
To efficiently aggregate payments by day and by month, you can define the following online aggregate:
CREATE ONLINE AGGREGATE PAYMENTS1_OA AS SUM(amount), COUNT(*) FROM PAYMENTS1
GROUP BY CTUMBLE(pmt_ts, INTERVAL '1' day, TIMESTAMP '1970-01-01 00:00:00') AS pmt_day
PARTITION BY DIMENSION pmt_day EVERY INTERVAL '1' month;
It should be noted, that since aggregations can be large, they can also be partitioned as large tables, in this case there is a partition per month.
If the table uses a date column instead, the schema will look like this:
CREATE TABLE PAYMENTS2 (pmt_id bigint, pmt_ts date, src_acct varchar, dst_acct varchar, amount double,
PRIMARY KEY(pmt_id, pmt_ts))
PARTITION BY DIMENSION pmt_ts EVERY INTERVAL '1' month
PARTITION BY HASH(pmt_id)
DISTRIBUTE BY HASH;
The corresponding online aggregate for monthly data aggregation would be:
CREATE ONLINE AGGREGATE PAYMENTS2_OA AS SUM(amount), COUNT(*)
FROM PAYMENTS2
GROUP BY EXTRACT(MONTH FROM pmt_ts) AS pmt_month;
For a table where the date is encoded as an integer (i.e., in the format DDMMYY), the schema is structured as follows:
CREATE TABLE PAYMENTS3 (pmt_id bigint, pmt_ts bigint, src_acct varchar, dst_acct varchar, amount double,
PRIMARY KEY(pmt_id, pmt_ts))
PARTITION BY DIMENSION pmt_ts EVERY 100
PARTITION BY HASH(pmt_id)
DISTRIBUTE BY HASH;
The online aggregate for this setup can be created using:
CREATE ONLINE AGGREGATE PAYMENTS3_OA AS SUM(amount), COUNT(*) FROM PAYMENTS3
GROUP BY pmt_ts - mod(pmt_ts, 100) AS pmt_month;