Spark Connector
LeanXcale provides a Spark connector which allows you to load and save standard Spark DataSets through the Kivi direct API.
The connector is developed using the Spark Datasource framework, so to use it you only have to import the connector, and use it through the load
and save
methods.
1. Installation
To use the Spark connector for LeanXcale, simply include the dependency in your project configuration:
<dependency>
<groupId>com.leanxcale</groupId>
<artifactId>spark-lx-connector</artifactId>
<version>1.5.4</version>
</dependency>
To use it, don’t forget to include the LeanXcale public maven repository in your maven configuration.
<repository>
<id>maven-releases</id>
<url>https://nexus.leanxcale.com/repository/maven-releases</url>
</repository>
If you prefer to include the library in your project manually, you can download the Spark Connector and the Java KiVi Direct API driver from the Drivers page.
2. Quick Start
2.1. Reading from LeanXcale
To load a dataset from a LeanXcale table:
-
Create a
DataFrameReader
with thespark.read()
method -
Add configuration:
-
Configure LeanXcale as the source format.
-
Define the LeanXcale datasource specific options:
-
URL
-
Database name
-
Table name
-
-
-
Load the dataset by calling the
load()
method
import com.leanxcale.spark.LeanxcaleDataSource;
String sparkApp = "sparkApp";
String url = "kivi:lxis://lxserver:9876";
String tableName = "table";
String databaseName = "database";
SparkSession spark = SparkSession.builder().appName(sparkApp).getOrCreate();
Dataset<Row> ratings = spark.read()
.format(LeanxcaleDataSource.SHORT_NAME)
.option(LeanxcaleDataSource.URL_PARAM, url)
.option(LeanxcaleDataSource.TABLE_PARAM,tableName)
.option(LeanxcaleDataSource.DATABASE_PARAM, databaseName)
.load();
2.2. Writing to LeanXcale
To save a dataset to a LeanXcale table:
-
Create a
DataFrameWriter
by calling thewrite()
method on the Dataset -
Add configuration:
-
Configure LeanXcale as write format
-
Configure the write mode
-
Configure the destination table name
-
Define the LeanXcale datasource specific options:
-
URL
-
Database name
-
-
import com.leanxcale.spark.LeanxcaleDataSource;
String sparkApp = "sparkApp";
String url = "kivi:lxis://lxserver:9876";
String tableName = "table";
String databaseName = "database";
Dataset<Row> result = // Dataset that cames from spark operations.
result.write()
.format(LeanxcaleDataSource.SHORT_NAME)
.mode(SaveMode.Overwrite)
.option(LeanxcaleDataSource.URL_PARAM, url)
.option(LeanxcaleDataSource.DATABASE_PARAM, databaseName)
.saveAsTable(table);
2.3. Filtering and manipulating data
The LeanXcale connector is completely integrated with the Spark Dataset Framework. This means you can rely on normal Spark behavior to operate on the dataset.
2.3.2. Finding and Filtering
To get a result matching a set of conditions, define a set of filters to be pushed to the KiVi datastore:
result.filter(col("age").gt(21)).show();