Execution

After everything above is installed and configured, the processes have to be started.

Zookeeper

The first step is to start the process manager, Zookeeper. Without this tool, no other component will start because they rely on this to find one another. Navigate to the confluent-5.4.1/bin directory and run:

nohup ./zookeeper-server-start ../etc/kafka/zookeeper.properties > nohup_zk.out &

Previewing the newly generated log with the command tail, the last few lines will including something like:

[2020-03-31 09:07:55,343] INFO Using org.apache.zookeeper.server.NIOServerCnxnFactory as server connection factory (org.apache.zookeeper.server.ServerCnxnFactory)
[2020-03-31 09:07:55,345] INFO Configuring NIO connection handler with 10s sessionless connection timeout, 2 selector thread(s), 24 worker threads, and 64 kB direct buffers. (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2020-03-31 09:07:55,347] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2020-03-31 09:07:55,357] INFO zookeeper.snapshotSizeFactor = 0.33 (org.apache.zookeeper.server.ZKDatabase)
[2020-03-31 09:07:55,359] INFO Snapshotting: 0x0 to /tmp/zookeeper/version-2/snapshot.0 (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2020-03-31 09:07:55,361] INFO Snapshotting: 0x0 to /tmp/zookeeper/version-2/snapshot.0 (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2020-03-31 09:07:55,372] INFO Using checkIntervalMs=60000 maxPerMinute=10000 (org.apache.zookeeper.server.ContainerManager)

Kafka server

After Zookeeper is started, the Kafka server is initiated from the confluent-5.4.1/bin directory with the command:

nohup ./kafka-server-start ../etc/kafka/server.properties > nohup_kafka_server.out &

By tailing the log file with tail, something like the following will be displayed:

[2020-03-31 09:11:44,434] INFO Kafka version: 5.4.1-ccs (org.apache.kafka.common.utils.AppInfoParser)
[2020-03-31 09:11:44,435] INFO Kafka commitId: 1c8f62230319e789 (org.apache.kafka.common.utils.AppInfoParser)
[2020-03-31 09:11:44,435] INFO Kafka startTimeMs: 1585638704434 (org.apache.kafka.common.utils.AppInfoParser)
[2020-03-31 09:11:44,455] INFO [Producer clientId=producer-1] Cluster ID: _KCRiquMRVm85wsVsE3ATQ (org.apache.kafka.clients.Metadata)
[2020-03-31 09:11:44,497] INFO [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer)
[2020-03-31 09:11:44,499] INFO Successfully submitted metrics to Kafka topic __confluent.support.metrics (io.confluent.support.metrics.submitters.KafkaSubmitter)
[2020-03-31 09:11:45,815] INFO Successfully submitted metrics to Confluent via secure endpoint (io.confluent.support.metrics.submitters.ConfluentSubmitter)

Schema registry

After the Kafka server starts, the registry schema is initiated so that Avro registers the schema of the records it sends. From confluent-5.4.1/bin, launch the command:

nohup ./schema-registry-start ../etc/schema-registry/schema-registry.properties > nohup_schema_registry_server.out &

Check the status with tail:

[2020-03-31 09:14:39,120] INFO HV000001: Hibernate Validator 6.0.11.Final (org.hibernate.validator.internal.util.Version:21)
[2020-03-31 09:14:39,283] INFO JVM Runtime does not support Modules (org.eclipse.jetty.util.TypeUtil:201)
[2020-03-31 09:14:39,283] INFO Started o.e.j.s.ServletContextHandler@66b7550d{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:824)
[2020-03-31 09:14:39,294] INFO Started o.e.j.s.ServletContextHandler@2ccca26f{/ws,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:824)
[2020-03-31 09:14:39,305] INFO Started NetworkTrafficServerConnector@3e57cd70{HTTP/1.1,[http/1.1]}{0.0.0.0:8081} (org.eclipse.jetty.server.AbstractConnector:293)
[2020-03-31 09:14:39,305] INFO Started @2420ms (org.eclipse.jetty.server.Server:399)
[2020-03-31 09:14:39,305] INFO Server started, listening for requests... (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:44)

Kafka Sink Connector for LeanXcale

Finally, the Kafka Sink connector for LeanXcale is initiated. Execute the following command:

nohup ./connect-standalone ../etc/kafka/connect-standalone.properties ../etc/kafka/connect-lx-sink.properties > nohup_connect_lx.out &

Looking at the log:

[2020-03-31 09:19:49,235] INFO [Consumer clientId=connector-consumer-local-jdbc-sink-0, groupId=connect-local-jdbc-sink] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:533)
[2020-03-31 09:19:49,285] INFO [Consumer clientId=connector-consumer-local-jdbc-sink-0, groupId=connect-local-jdbc-sink] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:533)
[2020-03-31 09:19:49,305] INFO [Consumer clientId=connector-consumer-local-jdbc-sink-0, groupId=connect-local-jdbc-sink] Finished assignment for group at generation 1: {connector-consumer-local-jdbc-sink-0-ae0d5fda-a1f2-4a10-95ca-23bd3246481c=Assignment(partitions=[mytopic-0])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:585)
[2020-03-31 09:19:49,321] INFO [Consumer clientId=connector-consumer-local-jdbc-sink-0, groupId=connect-local-jdbc-sink] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:484)
[2020-03-31 09:19:49,324] INFO [Consumer clientId=connector-consumer-local-jdbc-sink-0, groupId=connect-local-jdbc-sink] Adding newly assigned partitions: mytopic-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:267)
[2020-03-31 09:19:49,337] INFO [Consumer clientId=connector-consumer-local-jdbc-sink-0, groupId=connect-local-jdbc-sink] Found no committed offset for partition mytopic-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1241)
[2020-03-31 09:19:49,343] INFO [Consumer clientId=connector-consumer-local-jdbc-sink-0, groupId=connect-local-jdbc-sink] Resetting offset for partition mytopic-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState:381)

If there are data on the topic, the connector will immediately start to consume the records and store them on LeanXcale according to the supplied configuration. We will find in the log file lines like these:

[2020-03-31 09:33:43,342] INFO Flushing records in writer for table com.leanxcale.connector.kafka.utils.metadata.TableId@a25226fa with name mytopic (com.leanxcale.connector.kafka.sink.impl.LXWriterImpl:54)
[2020-03-31 09:33:43,342] INFO Connecting to database for the first time... (com.leanxcale.connector.kafka.connection.impl.AbstractLXConnectionProvider:33)
[2020-03-31 09:33:43,342] INFO Attempting to open connection (com.leanxcale.connector.kafka.connection.impl.AbstractLXConnectionProvider:60)
Conn: loading libkv from libkv.so
Java HotSpot(TM) 64-Bit Server VM warning: You have loaded library /tmp/libkv41983156673418059.so which might have disabled stack guard. The VM will try to fix the stack guard now.
It's highly recommended that you fix the library with 'execstack -c <libfile>', or link it with '-z noexecstack'.
[2020-03-31 09:33:43,408] INFO Registering builder com.leanxcale.kivi.database.constraint.GeohashConstraintBuilder@355d0da4 for prefix GH (com.leanxcale.kivi.database.impl.TableMetadataSerializer:34)
[2020-03-31 09:33:43,409] INFO Registering builder com.leanxcale.kivi.database.constraint.DeltaConstraintBuilder@4d19b0e7 for prefix DEL (com.leanxcale.kivi.database.impl.TableMetadataSerializer:34)
[2020-03-31 09:33:43,410] INFO Registering builder com.leanxcale.kivi.database.constraint.UUIDConstraintBuilder@6033bc5c for prefix UUID (com.leanxcale.kivi.database.impl.TableMetadataSerializer:34)
[2020-03-31 09:33:43,411] INFO Registering builder com.leanxcale.kivi.database.constraint.ReflectStorableConstraint@3ff680bd for prefix ABS (com.leanxcale.kivi.database.impl.TableMetadataSerializer:34)
09:33:43 1585640023972632000 kv.Conn[26954]: *** no.auth = 1
09:33:43 1585640023972664000 kv.Conn[26954]: *** no.crypt = 2
09:33:43 1585640023972669000 kv.Conn[26954]: *** no.chkfull = 1
09:33:43 1585640023972673000 kv.Conn[26954]: *** no.flushctlout = 1
09:33:43 1585640023972677000 kv.Conn[26954]: *** no.npjit = 1
09:33:43 1585640023972681000 kv.Conn[26954]: *** no.tplfrag = 1
09:33:43 1585640023972684000 kv.Conn[26954]: *** no.dictsort = 1
[2020-03-31 09:33:44,346] INFO New session created: 1 (com.leanxcale.connector.kafka.connection.impl.LXKiviConnectionProvider:46)
[2020-03-31 09:33:44,346] INFO Table com.leanxcale.connector.kafka.utils.metadata.TableId@a25226fa is not registered for the connector. Checking db... (com.leanxcale.connector.kafka.sink.impl.LXWriterImpl:60)
[2020-03-31 09:33:44,347] INFO Table mytopic not found. Creating it (com.leanxcale.connector.kafka.sink.impl.LXWriterImpl:62)
[2020-03-31 09:33:44,453] INFO Commited session: 1 (com.leanxcale.connector.kafka.connection.impl.LXKiviConnectionProvider:78)
[2020-03-31 09:33:44,453] INFO Registering table mytopic in connector (com.leanxcale.connector.kafka.sink.impl.LXWriterImpl:71)
[2020-03-31 09:33:44,453] INFO Writing 1 records (com.leanxcale.connector.kafka.sink.impl.LXWriterImpl:84)
[2020-03-31 09:33:44,817] INFO Commited session: 1 (com.leanxcale.connector.kafka.connection.impl.LXKiviConnectionProvider:78)
[2020-03-31 09:33:44,821] INFO Flushing records in writer for table com.leanxcale.connector.kafka.utils.metadata.TableId@a25226fa with name mytopic (com.leanxcale.connector.kafka.sink.impl.LXWriterImpl:54)
[2020-03-31 09:33:44,821] INFO Connecting to database... (com.leanxcale.connector.kafka.connection.impl.AbstractLXConnectionProvider:36)
[2020-03-31 09:33:44,821] INFO Attempting to open connection (com.leanxcale.connector.kafka.connection.impl.AbstractLXConnectionProvider:60)
[2020-03-31 09:33:44,821] INFO New session created: 2 (com.leanxcale.connector.kafka.connection.impl.LXKiviConnectionProvider:46)
[2020-03-31 09:33:44,821] INFO Table mytopic is already registered for the connector (com.leanxcale.connector.kafka.sink.impl.LXWriterImpl:75)
[2020-03-31 09:33:44,821] INFO Writing 4 records (com.leanxcale.connector.kafka.sink.impl.LXWriterImpl:84)
[2020-03-31 09:33:45,004] INFO Commited session: 2 (com.leanxcale.connector.kafka.connection.impl.LXKiviConnectionProvider:78)
[2020-03-31 09:33:45,080] INFO Flushing records in writer for table com.leanxcale.connector.kafka.utils.metadata.TableId@a25226fa with name mytopic (com.leanxcale.connector.kafka.sink.impl.LXWriterImpl:54)
[2020-03-31 09:33:45,080] INFO Connecting to database... (com.leanxcale.connector.kafka.connection.impl.AbstractLXConnectionProvider:36)
[2020-03-31 09:33:45,080] INFO Attempting to open connection (com.leanxcale.connector.kafka.connection.impl.AbstractLXConnectionProvider:60)
[2020-03-31 09:33:45,080] INFO New session created: 3 (com.leanxcale.connector.kafka.connection.impl.LXKiviConnectionProvider:46)
[2020-03-31 09:33:45,080] INFO Table mytopic is already registered for the connector (com.leanxcale.connector.kafka.sink.impl.LXWriterImpl:75)
[2020-03-31 09:33:45,080] INFO Writing 500 records (com.leanxcale.connector.kafka.sink.impl.LXWriterImpl:84)
[2020-03-31 09:33:45,563] INFO Commited session: 3 (com.leanxcale.connector.kafka.connection.impl.LXKiviConnectionProvider:78)
[2020-03-31 09:33:45,603] INFO Flushing records in writer for table com.leanxcale.connector.kafka.utils.metadata.TableId@a25226fa with name mytopic (com.leanxcale.connector.kafka.sink.impl.LXWriterImpl:54)
[2020-03-31 09:33:45,604] INFO Connecting to database... (com.leanxcale.connector.kafka.connection.impl.AbstractLXConnectionProvider:36)
[2020-03-31 09:33:45,604] INFO Attempting to open connection (com.leanxcale.connector.kafka.connection.impl.AbstractLXConnectionProvider:60)
[2020-03-31 09:33:45,604] INFO New session created: 4 (com.leanxcale.connector.kafka.connection.impl.LXKiviConnectionProvider:46)
[2020-03-31 09:33:45,604] INFO Table mytopic is already registered for the connector (com.leanxcale.connector.kafka.sink.impl.LXWriterImpl:75)
[2020-03-31 09:33:45,604] INFO Writing 500 records (com.leanxcale.connector.kafka.sink.impl.LXWriterImpl:84)
[2020-03-31 09:33:45,910] INFO Commited session: 4 (com.leanxcale.connector.kafka.connection.impl.LXKiviConnectionProvider:78)
[2020-03-31 09:33:45,924] INFO Flushing records in writer for table com.leanxcale.connector.kafka.utils.metadata.TableId@a25226fa with name mytopic (com.leanxcale.connector.kafka.sink.impl.LXWriterImpl:54)
[2020-03-31 09:33:45,924] INFO Connecting to database... (com.leanxcale.connector.kafka.connection.impl.AbstractLXConnectionProvider:36)
[2020-03-31 09:33:45,924] INFO Attempting to open connection (com.leanxcale.connector.kafka.connection.impl.AbstractLXConnectionProvider:60)
[2020-03-31 09:33:45,924] INFO New session created: 5 (com.leanxcale.connector.kafka.connection.impl.LXKiviConnectionProvider:46)
[2020-03-31 09:33:45,924] INFO Table mytopic is already registered for the connector (com.leanxcale.connector.kafka.sink.impl.LXWriterImpl:75)
[2020-03-31 09:33:45,924] INFO Writing 500 records (com.leanxcale.connector.kafka.sink.impl.LXWriterImpl:84)

The lines represent:

  • The first connection to the DB.

  • How the connector does not find the table previously existing in the DB, so creates it based on the schema defined in the producer code.

  • How to insert the first record.

  • How, for the subsequent four records, the connector finds the table it previously created.

  • How to write these four records.

  • How it collects all 500 records, which is the default number of records to collect from the Kafka server during each poll call, and inserts these into LeanXcale.

Kafka Source Connector for LeanXcale

In order to run the Kafka Source connector for LeanXcale execute the following command:

nohup ./connect-standalone ../etc/kafka/connect-standalone.properties ../etc/kafka/connect-lx-sink.properties > nohup_connect_lx.out &

You can check in the log that the connector has started properly:

[2021-08-27 13:24:02,648] INFO Starting LX Source Task with config{connector.class=com.leanxcale.connector.kafka.LXSourceConnector, source.table=TABLE_TEST, tasks.max=1, topics=mytopic, source.from.mode=offset, source.poll.frequency=100, connection.properties=lx://localhost:9876/test@APP, value.converter.schema.registry.url=http://localhost:8081, fields.whitelist=trip_duration_secs,start_time,stop_time,start_station_id,start_station_name, start_station_latitude,start_station_longitude,end_station_id,end_station_name,end_station_latitude, end_station_longitude,bike_id,user_type,user_birth_year,user_gender, task.class=com.leanxcale.connector.kafka.source.LXSourceTask, name=local-lx-source, value.converter=io.confluent.connect.avro.AvroConverter, key.converter=io.confluent.connect.avro.AvroConverter, key.converter.schema.registry.url=http://localhost:8081, source.queue.size=11110, source.topic.name=${table}} (com.leanxcale.connector.kafka.source.LXSourceTask:47)
[2021-08-27 13:24:02,648] INFO Created connector local-lx-source (org.apache.kafka.connect.cli.ConnectStandalone:112)
[2021-08-27 13:24:02,649] INFO LXConnectorConfig values:
...
[2021-08-27 13:24:03,070] INFO Connecting to table:TABLE_TEST (com.leanxcale.connector.kafka.source.LXReader:85)
[2021-08-27 13:24:03,080] INFO LX Source Task properly started (com.leanxcale.connector.kafka.source.LXSourceTask:64)
[2021-08-27 13:24:03,081] INFO WorkerSourceTask{id=local-lx-source-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:209)
[2021-08-27 13:24:03,186] INFO Built schema [Schema{STRUCT}] with fields [[Field{name=FIELD1, index=0, schema=Schema{INT32}}, Field{name=FIELD2, index=1, schema=Schema{STRING}}, Field{name=FIELD3, index=2, schema=Schema{FLOAT64}}]] for table:[TABLE_TEST] (com.leanxcale.connector.kafka.source.LXSourceTask:166)

The connector will automatically connect to the LX table and starting to streaming data to the kafka topic according to the given configuration.

If you set the log level to DEBUG you can check the details of the records sent to kafka in each poll request:

[2021-08-27 13:54:27,721] INFO Built schema [Schema{STRUCT}] with fields [[Field{name=FIELD1, index=0, schema=Schema{INT32}}, Field{name=FIELD2, index=1, schema=Schema{STRING}}, Field{name=FIELD3, index=2, schema=Schema{FLOAT64}}]] for table:[TABLE_TEST] (com.leanxcale.connector.kafka.source.LXSourceTask:167)
[2021-08-27 13:54:27,732] DEBUG Returning [660] records (com.leanxcale.connector.kafka.source.LXSourceTask:75)
[2021-08-27 13:54:28,863] DEBUG Returning [0] records (com.leanxcale.connector.kafka.source.LXSourceTask:75)
[2021-08-27 13:54:29,869] DEBUG Returning [0] records (com.leanxcale.connector.kafka.source.LXSourceTask:75)
...
[2021-08-27 13:54:43,935] DEBUG Returning [30] records (com.leanxcale.connector.kafka.source.LXSourceTask:75)
[2021-08-27 13:54:44,947] DEBUG Returning [50] records (com.leanxcale.connector.kafka.source.LXSourceTask:75)
[2021-08-27 13:54:45,959] DEBUG Returning [0] records (com.leanxcale.connector.kafka.source.LXSourceTask:75)

If you set the log level to TRACE you can also see the records details sent to kafka.

You can also start a kafka consumer to check the records received by the Kafka:

./kafka-avro-console-consumer  --bootstrap-server localhost:9092 --topic TABLE_TEST --property schema.registry.url=http://localhost:8081

{"FIELD1":{"int":1000},"FIELD2":{"string":"field2 text:1000"},"FIELD3":{"double":1000.1}}
{"FIELD1":{"int":1001},"FIELD2":{"string":"field2 text:1001"},"FIELD3":{"double":1001.1}}
{"FIELD1":{"int":1002},"FIELD2":{"string":"field2 text:1002"},"FIELD3":{"double":1002.1}}
{"FIELD1":{"int":1003},"FIELD2":{"string":"field2 text:1003"},"FIELD3":{"double":1003.1}}
{"FIELD1":{"int":1004},"FIELD2":{"string":"field2 text:1004"},"FIELD3":{"double":1004.1}}