Installation

To implement the general architecture, the Kafka suite must first be downloaded. Confluent is an open source distribution by Kafka, founded by the original creators of Kafka, and offers a set of tools and utilities related to the complete management of the server. Specifically, this package includes Zookeeper, Kafka Server, Schema Registry, and Kafka Connect, which covers all the components needed to execute.

The Kafka suite can be downloaded from here.

If we untar the file, the following directory structure should remain:

Folder

Description

/bin/

Driver scripts for starting and stopping services

/etc/

Configuration files

/lib/

System services

/logs/

Log files

/share/

JARS and licenses

/src/

Source files that require a platform-dependent build

Configuration

Please note the Kafka configuration described in this documentation is the default and simplest one, with just one broker.

Zookeeper

Navigate to the confluent-5.4.1/etc/kafka directory and open the zookeeper.properties file. Leave the default configuration in place, and make sure the file contains:

clientPort=2181

This is the port to listen for client connections.

Kafka server

Navigate to the confluent-5.4.1/etc/kafka directory and open the server.properties file. Leave the default configuration, and make sure the file contains:

broker.id=0
listeners=PLAINTEXT://:9092

With these lines, the broker’s identifier is defined (i.e., the server, where this example includes only one, but more servers could act in a cluster) as well as the port where Kafka listens (i.e., 9092).

To run the client from a different machine from the kafka server, we must set up a reachable server ip (localhost/0.0.0.0 not valid for remote connections)

listeners=PLAINTEXT://ip_kafka_server:9092
Schema registry

Navigate to the confluent-5.4.1/etc/schema-registry directory and open the schema-registry.properties file. Leave the default configuration, and make sure the file contains:

listeners=http://0.0.0.0:8081

This line defines the IP address and port where the schema registry listens, and the default value is 8081.

Kafka connect

Navigate to the confluent-5.4.1/etc/kafka directory and open the connect-standalone.properties file. The following lines must exist:

bootstrap.servers=0.0.0.0:9092 // Kafka server ip and port
group.id=mytopic // Consumer group id
max.poll.interval.ms=300000 // The maximum delay between invocations of poll()
consumer.max.poll.records=10000 // The maximum number of records returned in a single call to poll().
offset.flush.interval.ms=50000 // Maximum number of milliseconds to wait for records to flush and partition offset data
rest.port=8083 // Port for the connect REST API to listen on.
plugin.path=/usr/local/share/kafka/plugins // Kafka Connect plugin directory
Kafka connector for LeanXcale

Download the Kafka connector for LeanXcale from the Drivers page.

The connector is contained in a tar.gz, so it must be unpacked into the directory indicated as plugin.path in the Kafka connect configuration.

Once extracted, the connector has to be configured. In case you are running a sink connector, navigate to the directory confluent-5.4.1/etc/kafka and create a new file named "connect-lx-sink.properties" with the following example configuration added to the file:

name=local-lx-sink // Name of the connector
connector.class=com.leanxcale.connector.kafka.LXSinkConnector // Java class executed when the connector is lifted
tasks.max=1 // Number of threads that read from the topic
topics=mytopic // Name of the topic to read from

connection.url=lx://ip_bd_leanxcale:9876 // LeanXcale URL
connection.user=APP // LeanXcale user
connection.password=APP // LeanXcale password
connection.database=mytestdatabase // LeanXcale database
auto.create=true // If true, the connector should create the target table if not found
delete.enabled=true // If deletion is enabled.
insert.mode=insert // Insertion mode. May be insert, upsert or update.
batch.size=500 // Maximum number of records to be sent in a single commit to LeanXcale
connection.check.timeout=20 // Active connection checkout time

// Key and value converters according to the architecture and with the schema registry url
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

sink.connection.mode=kivi // LeanXcale connection mode value that defines the connection through the NoSQL interface
sink.transactional=false  // Whether the load on LeanXcale is going to be executed with ACID transactions or not.
// This scenario potentially increases the insertion performance, and is indicated in case of big initial loads without
consistency requirements over operational environments.
sink.defer.timeout=10000 // Number of milliseconds to wait before closing the physical connection to LeanXcale

table.name.format=${topic} // Name of the destination table where ${topic} represents what it will be called as the topic.
pk.mode=record_key // Indicates the primary key mode of the target table where
// record_key means that it will use the key field of the record stored
// in Kafka as the primary key in the target table.
pk.fields=id // Fields of the pk among what arrives in the key field. In our
// case, only one exists in the "id" field.
fields.whitelist=field1,field2,field3 // List of the fields in the record to be
// columns in the target table. If all fields are included, then all will
// become columns. However, those that should not be created in the table can
// be removed.

In case you are running a source connector, navigate to the directory confluent-5.4.1/etc/kafka and create a new file named "connect-lx-source.properties" with the following example configuration added to the file:

#Source connector name
name=local-lx-source
#Java class executed when the connector is lifted
connector.class=com.leanxcale.connector.kafka.LXSourceConnector
#Number of threads started by kafka
tasks.max=1
#Key and value converters according to the architecture and with the schema registry url
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

#leanxcale connection URL
connection.properties=lx://localhost:9876/test@APP

#Source table to read data from and sent it to kafka
source.table=LX_SOURCE_TABLE
#Max size of the intermediate queue where read tuples are temporary stored
source.queue.size=10000
#Sets where the tuples will be read from. Available modes are: offset(tuples from last run), now(only new tuples), begin(all tuples)
source.from.mode=offset
#"Name of the topic where send the tuples. The pattern ${table} will be replaced by the name of the source table
source.topic.name=${table}
#If not empty, just fields from this list will be sent to kafka
source.fields.whitelist=
Security configuration

REVISAR SEGURIDAD TLS

If your LeanXcale instance is configuring with security, you will have to generate an application certificate in order to allow your Kafka connector to connect to LeanXcale. Learn how to do this

Once this is done, configure the connector to enable security and provide the path to the certificate file using this properties in the configuration file:

// Security properties
connection.security.enable=true
connection.security.ceth.path=/path/to/file.kcf

You can find more examples in subsequent provisions of this documentation, and also check Kafka official documentation in https://docs.confluent.io/current/