Spark Connector

Spark

LeanXcale provides a Spark connector which allows you to load and save standard Spark DataSets through the Kivi direct API.

The connector is developed using the Spark Datasource framework, so to use it you only have to import the connector, and use it through the load and save methods.

1. Installation

To use the Spark connector for LeanXcale, simply include the dependency in your project configuration:

<dependency>
  <groupId>com.leanxcale</groupId>
  <artifactId>spark-lx-connector</artifactId>
  <version>1.5.4</version>
</dependency>

To use it, don’t forget to include the LeanXcale public maven repository in your maven configuration.

    <repository>
      <id>maven-releases</id>
      <url>https://nexus.leanxcale.com/repository/maven-releases</url>
    </repository>

If you prefer to include the library in your project manually, you can download the Spark Connector and the Java KiVi Direct API driver from the Drivers page.

2. Quick Start

2.1. Reading from LeanXcale

To load a dataset from a LeanXcale table:

  1. Create a DataFrameReader with the spark.read() method

  2. Add configuration:

    • Configure LeanXcale as the source format.

    • Define the LeanXcale datasource specific options:

      • URL

      • Database name

      • Table name

  3. Load the dataset by calling the load() method

import com.leanxcale.spark.LeanxcaleDataSource;

String sparkApp = "sparkApp";
String url = "kivi:lxis://lxserver:9876";
String tableName = "table";
String databaseName = "database";

SparkSession spark = SparkSession.builder().appName(sparkApp).getOrCreate();

Dataset<Row> ratings = spark.read()
        .format(LeanxcaleDataSource.SHORT_NAME)
        .option(LeanxcaleDataSource.URL_PARAM, url)
        .option(LeanxcaleDataSource.TABLE_PARAM,tableName)
        .option(LeanxcaleDataSource.DATABASE_PARAM, databaseName)
        .load();

2.2. Writing to LeanXcale

To save a dataset to a LeanXcale table:

  1. Create a DataFrameWriter by calling the write() method on the Dataset

  2. Add configuration:

    • Configure LeanXcale as write format

    • Configure the write mode

    • Configure the destination table name

    • Define the LeanXcale datasource specific options:

      • URL

      • Database name

import com.leanxcale.spark.LeanxcaleDataSource;

String sparkApp = "sparkApp";
String url = "kivi:lxis://lxserver:9876";
String tableName = "table";
String databaseName = "database";

Dataset<Row> result = // Dataset that cames from spark operations.

result.write()
    .format(LeanxcaleDataSource.SHORT_NAME)
    .mode(SaveMode.Overwrite)
    .option(LeanxcaleDataSource.URL_PARAM, url)
    .option(LeanxcaleDataSource.DATABASE_PARAM, databaseName)
    .saveAsTable(table);

2.3. Filtering and manipulating data

The LeanXcale connector is completely integrated with the Spark Dataset Framework. This means you can rely on normal Spark behavior to operate on the dataset.

2.3.1. Print data

result.printSchema();

2.3.2. Finding and Filtering

To get a result matching a set of conditions, define a set of filters to be pushed to the KiVi datastore:

result.filter(col("age").gt(21)).show();

2.3.3. Projections

A projection is the set of fields to be retrieved from the table.

The projection can also perform operations on the fields:

result.select("name").show();

result.select(col("name"), col("age").plus(1)).show();

2.3.4. Aggregations

Most common simple aggregations can be done through the Spark API.

Define an array with the fields to be used as the "group by" key (if any), followed by a list of aggregation expressions:

result.groupBy("age").count().show();

3. Other Examples

We have more detailed examples and real-world scenarios available on our blog.

To learn how to use the power of Spark and the LeanXcale connector to build a simple recommendation engine, see: