Kafka Examples
In this section you can find configuration examples for insert, update and delete. The section is based on the supplied configuration in the "Configuration" provision. Each of the following examples illustrate some configuration tips explained later.
Insert
To configure the connector to execute insert, just define insert as insert.mode.
name=local-lx-sink
connector.class=com.leanxcale.connector.kafka.LXSinkConnector
tasks.max=1
topics=mytopic
connection.url=lx://ip_bd_leanxcale:9876
connection.user=APP
connection.password=APP
connection.database=mytestdatabase
auto.create=false
insert.mode=insert
batch.size=1000
connection.check.timeout=20
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
sink.connection.mode=kivi
sink.transactional=true
sink.defer.timeout=10000
table.name.format=mytable
pk.mode=kafka
fields.whitelist=field1,field2,field3
This example configuration tips are the following:
-
auto.create: With a false value, the connector expects the target table to be already created. If not, it will raise an error.
-
insert.mode: If mode is insert, this is the operation used for insertion. It will fail if there is a duplicate PK.
-
batch.size: increased to 1000.
-
key.converter: Serializer of the registry key on the Kafka server, which is Avro for this example.
-
key.converter.schema.registry.url: URL where the schema registry is listening from and contains the configuration file.
-
value.converter: Serializer of the registry value in the Kafka server, which is also Avro, for this case.
-
value.converter.schema.registry.url: URL where you are listening to the schema registry. It is the one that contains your configuration file.
-
sink.connection.mode: LeanXcale connection mode value that defines the connection through the NoSQL interface. This is the only configuration available right now.
-
sink.transactional:
-
If false, means the ACID capabilities are not going to be complied. This scenario potentially increases the insertion performance, and is indicated in case of big initial loads without consistency requirements over operational environments.
-
If true, means the ACID capabilities are ensured. This is indicated in case of event driven architectures in which operational systems have to be aware immediately of changes to data and ACID must be complied. This decreases the performance of insertions but ensures coherency on transactions.
-
For more information about LeanXcale ACID transactions, please refer to Concepts / ACID transactions
-
-
table.name.format: Name of the target table. It can be defined with the same name as the topic or not. In this case, the target table name is "mytable".
-
pk.mode: Indicates the primary key mode of the target table. The kafka value means that it will use the kafka topic coordinates as pk in the target table. Please note it makes no sense to define pk.fields with this option. These coordinates are 3:
-
__connect_topic: name of the topic
-
__connect_partition: partition number on the topic
-
__connect_offset: offset number on the topic
-
The rest of the configurations can be checked in the default configuration described in the configuration section.
Upsert
To configure the connector to execute upsert, just define upsert as insert.mode.
name=local-lx-sink
connector.class=com.leanxcale.connector.kafka.LXSinkConnector
tasks.max=1
topics=mytopic
connection.url=lx://ip_bd_leanxcale:9876
connection.user=APP
connection.password=APP
connection.database=mytestdatabase
auto.create=true
insert.mode=upsert
batch.size=1000
connection.check.timeout=20
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
sink.connection.mode=kivi
sink.transactional=false
sink.defer.timeout=10000
table.name.format=${topic}
pk.mode=record_value
pk.fields=id
fields.whitelist=field1,field2,field3
This example configuration tips are the following:
-
insert.mode: If mode is upsert, this is the operation used for insertion. If there is a duplicate PK, it will overwrite it.
-
pk.mode: Indicates the primary key mode of the target table. The record_value value means that it will use the value field of the record stored in the topic.
-
pk.fields: If not defined, all the record value fields will be taken as pk. If defined, it will take the defined ones as pk. In this example, it will take the ID field.
Update
To configure the connector to execute upsert, just define update as insert.mode.
name=local-lx-sink
connector.class=com.leanxcale.connector.kafka.LXSinkConnector
tasks.max=1
topics=mytopic
connection.url=lx://ip_bd_leanxcale:9876
connection.user=APP
connection.password=APP
connection.database=mytestdatabase
auto.create=true
insert.mode=update
batch.size=1000
connection.check.timeout=20
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
sink.connection.mode=kivi
sink.transactional=false
sink.defer.timeout=10000
table.name.format=${topic}
pk.mode=record_key
fields.whitelist=field1,field2,field3
This example configuration tips are the following:
-
insert.mode: If mode is update, this is the operation used for insertion. Only works for duplicate PKs, it will overwrite it only if it exists. It will fail in other case.
-
pk.mode: If defined as record_key, it will use the key field value of the record stored in Kafka as the primary key in the target table. As there is no pk.fields defined in configuration, it will use all the fields contained in the record key as pk fields.
Delete
To configure the connector to delete rows based on the records receive, set delete.enabled as true.
name=local-lx-sink
connector.class=com.leanxcale.connector.kafka.LXSinkConnector
tasks.max=1
topics=mytopic
connection.url=lx://ip_bd_leanxcale:9876
connection.user=APP
connection.password=APP
connection.database=mytestdatabase
auto.create=true
insert.mode=insert
delete.enabled=true
batch.size=1000
connection.check.timeout=20
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
sink.connection.mode=kivi
sink.transactional=false
sink.defer.timeout=10000
table.name.format=${topic}
pk.mode=record_key
fields.whitelist=field1,field2,field3
This example configuration tips are the following:
-
delete.enabled: If deletion is or not enabled for this connector. If enabled, it is mandatory to set pk.mode as record_key. The connector will delete the record from the target table only if the record key contains a stored pk and the record value is null.
-
pk.mode: record_key mandatory if deletion is enabled.
Source Examples
Configuration example
Here you can find an example of source connector configuration (file connect-lx-source.properties)
name=local-lx-source
connector.class=com.leanxcale.connector.kafka.LXSourceConnector
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
connection.properties=lx://localhost:9876/test@APP
source.table=LX_SOURCE_TABLE
source.queue.size=10000
source.from.mode=offset
source.topic.name=${table}
source.fields.whitelist=field1,field2
In this example, the connector will connect to the table name LX_SOURCE_TABLE and will just copy the fields "field1" and "field2" of tuples inserted/modified after last run (offset stored in kafka) to a topic with the same name as the table
The possible values of the property source.from.mode are: * offset: tuples saved/updated after the offset returned by kafka. This is, the last tuple ts committed to kafka * now: only tuples inserted/updated from the starting ts will be returned * begin: all tuples, old and new ones, will be returned