Multi threading
The idea is to have a set of threads ingesting in parallel in order to use all the available resources in the server. Each thread will take care of ingesting a fraction of the rows. So how many threads do we need to fully utilize the database server’s resources?

We need at least as many threads as cores used by the database server. However, there might not be outstanding work to be done on the database between requests, so to ensure that the database server always has some work to do, it is recommended to have 2 times as many threads as cores in the database server. This way, all the cores in the database server will always be in use.
One important question is how to share the work among the threads. The idea is that we split the CSV file into as many fragments as threads we have, say n, so it means that we have to find n-1 split points. Since we want fragments of more or less the same size we split the file into chunks of filesize/(n-1). As you might have anticipated, we are not splitting into full lines, this means, that the split point will have to move to the next end of line. In that way, each fragment will have full lines. Note that the CSV has also a header, so we will have to skip it.
Now the ingestData
method takes care of creating all the threads and
pass them the assigned to ingest.
public void ingestData() {
try (LoansReader csv = new LoansReader())
{
List<Callable<Object>> task = new ArrayList<>(N_THREADS);
for (int i = 0; i < N_THREADS; i++) {
task.add(Executors.callable(new doIngestData(csv, i)));
}
ExecutorService taskExecutor = Executors.newFixedThreadPool(N_THREADS);
taskExecutor.invokeAll(task);
taskExecutor.shutdown();
} catch (InterruptedException e) {
log.trace("Threaded dataIngest interrupted", e);
}
}
The thread class is named doIngestData
and its constructor
just store the parameters: the reader object, csv
,
and the fragment number of the data to load, indx
, into fields of the object:
private class doIngestData implements Runnable {
private final int indx;
private final LoansReader csv;
doIngestData(LoansReader csv, int indx)
{
this.csv = csv;
this.indx = indx;
}
The actual main loop ingesting the data is now in the run
method of the thread class.
The only change lies in that the read
method gets the fragment from which to read
as a parameter (csv.read(section)
), so each thread only reads lines
from its assigned fragment (we skip the rest since it remains the same):
public void run() {
try (LoansReader.Section section = csv.createSection(indx, N_THREADS);
Connection connection = utils.connect();
PreparedStatement prepStmt = connection.prepareStatement(utils.SQL_INSERT_PREPARED))
{
//...
Problem:
Through a deeper analysis of the behavior of inserting with a large number of rows (i.e., millions), we now observe that the insertion rate decreases over time as the table size increases. Why does this happen? We need to understand how a database works. Data is stored on the leaves of a B+ tree. The database maintains a cache of blocks. When the tree is small, all the blocks fit in memory and access is at memory speed. However, as the tree grows, the cache can no longer hold all the blocks. At some point, the tree becomes so large that scattered accesses to the tree require that a block be flushed from the cache before it can be read, but since the block is dirty (it was previously inserted), it must be written to disk before it can be evicted from the cache. This results in having to perform at least 2 IOs for each row insertion, the one to evict a dirty block and the one to read the new block.
Solution:
LeanXcale provides the ability to automatically split historical tables into fragments with time locality. Thus, each fragment is never too large and is processed at memory speed. If it becomes too big, a new fragment is created. The old fragment gets cold and is kicked out of the cache and the new fragment can use the whole memory of the cache for itself.