Primary Key Distribution
To distribute by primary key, we need to split the primary keys into as many fragments as we have storage servers (kdvs) in our deployment. Suppose we have 5 storage servers. So we need to split the table into 10 fragments, so we need to find the 4 split points across the 5 fragments. Imagine that the primary keys are the numbers 0 through 49. The fragments we want will be:
-
[0,10]
-
[10,20]
-
[20,30]
-
[30,40]
-
[40,50]
So we need to specify the four split points, which are 10, 20, 30, and 40. The fragments are automatically distributed across all storage servers in a round robin fashion.
How can we do this splitting process for real data? Notice that is important to be able to split the data into fragments of similar size. Another important point is that we do not need to read all data for estimating the ranges of the primary keys for fragments of even size. We can do sampling of the data for what a small percentage suffices. In here we will do with a 1% of the data that is good enough. In order to split the data we need to estimate the split points of fragments of even size. Sampling lies in reading randomly a small fraction of the data, and finding the split points for this small random fraction of the data. Statistically, the split points will be similar to the ones of reading the full data set.
The next issue is how we can write randomly CSV rows? We can certainly seek a position in the CSV file that can be a random number between 0 and the length of the file minus 1. However, it might not correspond to the the beginning of a line. As we did to split the file among several threads, we just need to move to the next end of line. If we reach the end of file then either we move to the previous end of line or search a new random number.
For each of the lines that we read randomly we have to keep the columns of the primary key, in our case, client_id, load_id. For storing all the values read we use an AVL tree so we can keep the read primary keys sorted. After reading the target 1%, we just need to traverse the AVL in order and identify the keys at the split points. Since we want to split into 8 fragments, we need 7 split points, and if the total number of keys stored in the tree are k, the split points are the keys found at positions k/7, 2k/7, 3k/7, …, 6k/7.
If we know that the rows are randomly distributed with respect the key, then we can just read the first 1% of the file. We are assuming that is the case, so we reuse the code we used to split the data across threads. Basically, we define a section of the file out of 100 sections, read it as store the key values in the AVL tree, and then extract the split points as stated before. The resulting code is:
public String getSplitPoints(LoansReader loansReader, int regions) {
//Read a sampling of a random 1% of the file in order to get the split points
log.info("Looking for split points");
AVLTree<LoanAvlKey> avlTree = new AVLTree<>();
// Read 1% of file to estimate split points
try (LoansReader.Section section = loansReader.createSection(0, 100))
{
loansReader.setDuration(0);
Loan loan;
while ((loan = loansReader.read(section)) != null) {
avlTree.insert(new LoanAvlKey(loan.getClientId(), loan.getId())); //build am AVL tree with the data
}
}
// avlTree.printSort();
List<LoanAvlKey> splitKeys = avlTree.getSplitKeys(regions); //get the split points from the balanced tree
StringJoiner splits = new StringJoiner(", ");
for (LoanAvlKey key : splitKeys) {
splits.add("(" + key.getClientId() + ", " + key.getLoanId() + ")");
}
return splits.toString();
}
Once we have the split points of the data to be loaded now we can create the table
with the right partitions by using the split points from the sampling.
In LeanXcale one can define the criteria to partition the data, and one can choose
one of those partitioning criteria as distribution criteria.
We already had a partitioning criteria, and we auto partition by the column ts.
We need to add the partitioning by prefix of the primary key, the genuine primary key,
that is client_id
and load_id
, providing the list of split points previously computed
the clause PARTITION BY KEY AT.
Finally, we have to add the distribute clause, DISTRIBUTE BY KEY, that states to distribute
by the primary key partitioning.
So our final createTable method is:
public void createTable() throws SQLException {
String splitPoints;
try (LoansReader loansReader = new LoansReader())
{
splitPoints = utils.getSplitPoints(loansReader, REGIONS);
}
log.info("Split Points:{}", splitPoints);
String SQL_CREATE_TABLE_PKRANGE_AUTOSPLIT =
"CREATE TABLE " + TABLE_NAME + " (" +
COLS +
"PRIMARY KEY (client_id,loan_id,ts)" + //bidimensional field required as part of PK
")" +
" PARTITION BY KEY (client_id,loan_id) AT " + splitPoints +
" PARTITION BY DIMENSION ts EVERY 30000000 KEEP 300000000" +
" DISTRIBUTE BY KEY";
utils.createTable(SQL_CREATE_TABLE_PKRANGE_AUTOSPLIT);
}