Spark Connector

Spark

LeanXcale provides a Spark connector that enables you to load and save standard Spark DataSets through the Kivi direct API.

The connector is developed using the Spark Datasource framework. By simply importing the connector, it may be called through the load and save methods.

1. Installation

To use the Spark connector for LeanXcale, simply include the dependency in your project configuration:

<dependency>
  <groupId>com.leanxcale</groupId>
  <artifactId>spark-lx-connector</artifactId>
  <version>1.6.6</version>
</dependency>

Be sure to include the LeanXcale public Maven repository in your Maven configuration.

    <repository>
      <id>maven-releases</id>
      <url>https://nexus.leanxcale.com/repository/maven-releases</url>
    </repository>

If you prefer to include the library in your project manually, then download the Spark Connector and the Java KiVi Direct API driver from the Drivers page.

2. Quick Start

2.1. Read from LeanXcale

To load a dataset from a LeanXcale table:

  1. Create a DataFrameReader with the spark.read() method.

  2. Add the following configuration:

    • Configure LeanXcale as the source format.

    • Define the LeanXcale datasource specific options of:

      • URL

      • Database name

      • Table name

  3. Call the load() method to load the dataset.

import com.leanxcale.spark.LeanxcaleDataSource;

String sparkApp = "sparkApp";
String url = "kivi:lxis://lxserver:9876";
String tableName = "table";
String databaseName = "database";

SparkSession spark = SparkSession.builder().appName(sparkApp).getOrCreate();

Dataset<Row> ratings = spark.read()
        .format(LeanxcaleDataSource.SHORT_NAME)
        .option(LeanxcaleDataSource.URL_PARAM, url)
        .option(LeanxcaleDataSource.TABLE_PARAM,tableName)
        .option(LeanxcaleDataSource.DATABASE_PARAM, databaseName)
        .load();

2.2. Write to LeanXcale

To save a dataset as a LeanXcale table:

  1. Create a DataFrameWriter by calling the write() method on the Dataset.

  2. Add the following configuration:

    • Configure LeanXcale as write format

    • Configure the write mode

    • Configure the destination table name

    • Define the LeanXcale datasource specific options of:

      • URL

      • Database name

import com.leanxcale.spark.LeanxcaleDataSource;

String sparkApp = "sparkApp";
String url = "kivi:lxis://lxserver:9876";
String tableName = "table";
String databaseName = "database";

Dataset<Row> result = // Dataset that cames from spark operations.

result.write()
    .format(LeanxcaleDataSource.SHORT_NAME)
    .mode(SaveMode.Overwrite)
    .option(LeanxcaleDataSource.URL_PARAM, url)
    .option(LeanxcaleDataSource.DATABASE_PARAM, databaseName)
    .saveAsTable(table);

2.3. Configuring security

If your LeanXcale instance is configuring with security, you will have to generate an application certificate in order to allow your Spark application to connect to LeanXcale.

Once this is done, configure the connector to enable security and provide the path to the certificate file.

String sparkApp = "sparkApp";
String url = "kivi:lxis://lxserver:9876";
String tableName = "table";
String databaseName = "database";
String certificatePath = "/path/to/my/cert/file.kcf"

Dataset<Row> ratings = spark.read()
    .format(LeanxcaleDataSource.SHORT_NAME)
    .option(LeanxcaleDataSource.URL_PARAM, url)
    .option(LeanxcaleDataSource.TABLE_PARAM,tableName)
    .option(LeanxcaleDataSource.DATABASE_PARAM, databaseName)
    .option(LeanxcaleDataSource.ENABLE_SECURITY, true)
    .option(LeanxcaleDataSource.CETH_PATH, certificatePath)
    .load();

2.4. Filter and manipulate data

The LeanXcale connector is completely integrated with the Spark Dataset Framework, meaning you can rely on normal Spark behaviors when operating on a dataset.

2.4.1. Print data

result.printSchema();

2.4.2. Find and Filter

To get a result matching a set of conditions, define a set of filters to be pushed to the KiVi datastore:

result.filter(col("age").gt(21)).show();

2.4.3. Projections

A projection is the set of fields to be retrieved from the table, which can also perform operations on the fields:

result.select("name").show();

result.select(col("name"), col("age").plus(1)).show();

2.4.4. Aggregations

Most common simple aggregations can be performed through the Spark API. For example, defining an array with the fields to be used as a "Group By" key (if any) returns a list of aggregation expressions:

result.groupBy("age").count().show();

3. Other Examples

We feature additional detailed examples and real-world scenarios on our blog. To learn more about how you can connect your Scala application to LeanXcale using JDBC and the direct KiVi API, see:

TWe feature additional detailed examples and real-world scenarios in our blog. To learn how to leverage the power of Spark and the LeanXcale connector to build a simple recommendation engine, see: