How to use ksql-test-runner to test ksql queries
ksql-test-runner is a ksqlDB testing tool to test set of KSQL statements. In this article we will see how to use …
In this article we will see how to generate a tombstone record in Kafka and to observe the behavior in ksqlDB, MongoDB and Postgres. To delete records which are no longer needed Kafka uses tombstone records. These are messages with a valid key and null value.
Run the Kafka producer and generate some test records
docker run -it --rm -e bootstrap_servers=192.168.1.9:39092 -e topic_name=kafka.ksql.tombstone.demo -e no_of_records=10 entechlog/faker-kafka-profile-datagen
Start ksqlDB CLI session and examine the records in topic by running below command. Make sure set offset to earliest
set 'auto.offset.reset'='earliest';
ksql> print 'kafka.ksql.tombstone.demo';
Format:STRING
6/27/20 3:22:03 PM EDT , wcook , {"username": "wcook", "name": "Amber Gaines", "sex": "F", "address": "5876 Wendy Vista\x5CnGarciaborough, AZ 63886", "mail": "nancyallen@gmail.com", "birthdate": "1973-02-01"}
6/27/20 3:22:05 PM EDT , christopherwhite , {"username": "christopherwhite", "name": "James Bentley", "sex": "M", "address": "22491 Julie Junctions Apt. 075\x5CnNew Christine, NE 46532", "mail": "kevin33@gmail.com", "birthdate": "1985-03-27"}
6/27/20 3:22:05 PM EDT , leesteven , {"username": "leesteven", "name": "Donald Smith", "sex": "M", "address": "9882 Blanchard Street\x5CnNorth Matthewhaven, NV 87603", "mail": "lauracrawford@yahoo.com", "birthdate": "1929-07-16"}
6/27/20 3:22:05 PM EDT , pgrant , {"username": "pgrant", "name": "Joshua Gill", "sex": "M", "address": "128 Allen Pike\x5CnGarrisonport, VT 75201", "mail": "amanda38@yahoo.com", "birthdate": "1975-12-09"}
6/27/20 3:22:05 PM EDT , michelleblackwell , {"username": "michelleblackwell", "name": "Stacy Barrett", "sex": "F", "address": "162 Stewart Fall Apt. 636\x5CnSharimouth, DE 57409", "mail": "lindseyhurst@gmail.com", "birthdate": "2000-09-10"}
6/27/20 3:22:05 PM EDT , yblackburn , {"username": "yblackburn", "name": "Stacy Ayala", "sex": "F", "address": "PSC 5638, Box 6004\x5CnAPO AE 68274", "mail": "petersshawn@yahoo.com", "birthdate": "1935-04-14"}
6/27/20 3:22:05 PM EDT , jamesdominguez , {"username": "jamesdominguez", "name": "Charles Barton", "sex": "M", "address": "617 Anna Lights Suite 189\x5CnLucaston, ND 69540", "mail": "mcgeedenise@hotmail.com", "birthdate": "1992-07-27"}
6/27/20 3:22:05 PM EDT , wphillips , {"username": "wphillips", "name": "Daniel Jones", "sex": "M", "address": "580 Thompson Ports Suite 407\x5CnPort Katelyn, RI 03888", "mail": "milesmatthew@hotmail.com", "birthdate": "1918-07-16"}
6/27/20 3:22:05 PM EDT , hailey78 , {"username": "hailey78", "name": "Neil Malone", "sex": "M", "address": "00078 Jill Roads\x5CnNew Brianburgh, AZ 59199", "mail": "jakesantiago@gmail.com", "birthdate": "1980-07-25"}
6/27/20 3:22:05 PM EDT , sarah60 , {"username": "sarah60", "name": "Ryan Gonzalez", "sex": "M", "address": "11126 David Loop\x5CnLake Kimberlytown, MI 18851", "mail": "rowlandrobert@yahoo.com", "birthdate": "1922-08-05"}
Lets also make sure topic is compacted by running below command
kafka-topics --zookeeper entechlog-vm-01:2181 --alter --topic kafka.ksql.tombstone.demo --config cleanup.policy=compact
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
Going forward, please use kafka-configs.sh for this functionality
Updated config for topic kafka.ksql.tombstone.demo.
Review the topic compaction policy by running below describe command
kafka-topics --describe --zookeeper entechlog-vm-01:2181 --topic "kafka.ksql.tombstone.demo"
Topic: kafka.ksql.tombstone.demo PartitionCount: 1 ReplicationFactor: 1 Configs: cleanup.policy=compact
Topic: kafka.ksql.tombstone.demo Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Create RAW ksqlDB table, This table is backed by the topic we just created in previous step
CREATE TABLE TBL_TOMBSTONE_DEMO_0010 (
ROWKEY VARCHAR KEY
,NAME VARCHAR
,SEX VARCHAR
,ADDRESS VARCHAR
,MAIL VARCHAR
,BIRTHDATE VARCHAR
)
WITH (
KAFKA_TOPIC = 'kafka.ksql.tombstone.demo'
,VALUE_FORMAT = 'JSON'
,REPLICAS = 1
,PARTITIONS = 1
);
Create chained ksqlDB table on top of RAW table, This will do simple transformations on few fields
CREATE TABLE TBL_TOMBSTONE_DEMO_0020 AS
SELECT ROWKEY AS USERNAME
,UCASE(NAME) AS NAME
,CASE
WHEN SEX = 'M'
THEN 'MALE'
WHEN SEX = 'F'
THEN 'FEMALE'
ELSE 'OTHERS'
END AS SEX
,LCASE(MAIL) AS MAIL
,BIRTHDATE
FROM TBL_TOMBSTONE_DEMO_0010 EMIT CHANGES;
Examine the records in ksqlDB table
ksql> SELECT * FROM TBL_TOMBSTONE_DEMO_0020 EMIT CHANGES;
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|ROWTIME |ROWKEY |USERNAME |NAME |SEX |MAIL |BIRTHDATE |
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|1593285723143 |wcook |wcook |AMBER GAINES |FEMALE |nancyallen@gmail.|1973-02-01 |
| | | | | |com | |
|1593285725048 |christopherwhite |christopherwhite |JAMES BENTLEY |MALE |kevin33@gmail.com|1985-03-27 |
|1593285725056 |leesteven |leesteven |DONALD SMITH |MALE |lauracrawford@yah|1929-07-16 |
| | | | | |oo.com | |
|1593285725061 |pgrant |pgrant |JOSHUA GILL |MALE |amanda38@yahoo.co|1975-12-09 |
| | | | | |m | |
|1593285725066 |michelleblackwell|michelleblackwell|STACY BARRETT |FEMALE |lindseyhurst@gmai|2000-09-10 |
| | | | | |l.com | |
|1593285725069 |yblackburn |yblackburn |STACY AYALA |FEMALE |petersshawn@yahoo|1935-04-14 |
| | | | | |.com | |
|1593285725075 |jamesdominguez |jamesdominguez |CHARLES BARTON |MALE |mcgeedenise@hotma|1992-07-27 |
| | | | | |il.com | |
|1593285725080 |wphillips |wphillips |DANIEL JONES |MALE |milesmatthew@hotm|1918-07-16 |
| | | | | |ail.com | |
|1593285725091 |hailey78 |hailey78 |NEIL MALONE |MALE |jakesantiago@gmai|1980-07-25 |
| | | | | |l.com | |
|1593285725095 |sarah60 |sarah60 |RYAN GONZALEZ |MALE |rowlandrobert@yah|1922-08-05 |
| | | | | |oo.com | |
Create chained ksqlDB table on top of RAW table, This will do simple transformations on few fields. This is same as previous table but we are also setting the value format as AVRO to make it easy for Postgres JDBC connection.
CREATE TABLE TBL_TOMBSTONE_DEMO_0030
WITH (VALUE_FORMAT = 'AVRO') AS
SELECT ROWKEY AS USERNAME
,UCASE(NAME) AS NAME
,CASE
WHEN SEX = 'M'
THEN 'MALE'
WHEN SEX = 'F'
THEN 'FEMALE'
ELSE 'OTHERS'
END AS SEX
,LCASE(MAIL) AS MAIL
,BIRTHDATE
FROM TBL_TOMBSTONE_DEMO_0010 EMIT CHANGES;
Create MongoDB sink connector by running below command. Its important to note we are setting "delete.on.null.values": "true"
which is going to handle the tombstone records.
curl --location --request PUT 'http://entechlog-vm-01:8083/connectors/TOMBSTONE_DEMO_SINK_MONGODB/config' \
--header 'Content-Type: application/json' \
--data-raw '{
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"transforms.id_to_object.field": "_id",
"name": "TOMBSTONE_DEMO_SINK_MONGODB",
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "id_to_object",
"errors.retry.timeout": "3",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"topics": "TBL_TOMBSTONE_DEMO_0020",
"errors.deadletterqueue.topic.name": "TBL_TOMBSTONE_DEMO_0020_MONGO_DLQ",
"errors.deadletterqueue.topic.replication.factor": "1",
"errors.deadletterqueue.context.headers.enable": "true",
"transforms.id_to_object.type": "org.apache.kafka.connect.transforms.HoistField$Key",
"connection.uri": "mongodb://admin:admin@entechlog-vm-01:27017/admin?authSource=admin&readPreference=primary&ssl=false",
"database": "demo",
"collection": "TOMBSTONE_DEMO",
"delete.on.null.values": "true",
"key.projection.type": "none",
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy"
}'
Validate the status of connector by running below command
curl --location --request GET 'entechlog-vm-01:8083/connectors/TOMBSTONE_DEMO_SINK_MONGODB/status'
{
"name": "TOMBSTONE_DEMO_SINK_MONGODB",
"connector": {
"state": "RUNNING",
"worker_id": "127.0.1.1:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "127.0.1.1:8083"
}
],
"type": "sink"
}
Examine the records in MongoDB
Create Postgres sink connector by running below command
"delete.enabled": true
which helps to handle tombstone recordsdemo
schema in Postgres was created manually before creating the connectorcurl --location --request PUT 'http://entechlog-vm-01:8083/connectors/TOMBSTONE_DEMO_SINK_POSTGRES/config' \
--header 'Content-Type: application/json' \
--data-raw '{
"value.converter.schema.registry.url": "http://entechlog-vm-01:8081",
"name": "TOMBSTONE_DEMO_SINK_POSTGRES",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable":"false",
"topics": "TBL_TOMBSTONE_DEMO_0030",
"connection.url": "jdbc:postgresql://entechlog-vm-01:5432/",
"connection.user": "postgres",
"connection.password": "postgres",
"insert.mode": "upsert",
"delete.enabled": true,
"table.name.format": "demo.TOMBSTONE_DEMO",
"pk.mode": "record_key",
"pk.fields": "ROWKEY",
"auto.create": "true"
}'
Validate the status of connector by running below command
curl --location --request GET 'entechlog-vm-01:8083/connectors/TOMBSTONE_DEMO_SINK_POSTGRES/status'
{
"name": "TOMBSTONE_DEMO_SINK_POSTGRES",
"connector": {
"state": "RUNNING",
"worker_id": "127.0.1.1:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "127.0.1.1:8083"
}
],
"type": "sink"
}
Examine the records in Postgres
At this point we have ksqlDB table, MongoDB collection and Postgres tables with required data. Now lets create a Tombstone record (record with key and no values) and see the impact in target objects.
Create a tombstone record by running below command. Here leesteven
is the key for a record in topic
echo "leesteven:" | kafkacat -P -Z -b entechlog-vm-01:9092 -t kafka.ksql.tombstone.demo -K:
Examine the records in ksqlDB, As you see here record for leesteven
has been deleted from KsqlDB table now
ksql> SELECT * FROM TBL_TOMBSTONE_DEMO_0020 EMIT CHANGES;
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|ROWTIME |ROWKEY |USERNAME |NAME |SEX |MAIL |BIRTHDATE |
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|1593285723143 |wcook |wcook |AMBER GAINES |FEMALE |nancyallen@gmail.|1973-02-01 |
| | | | | |com | |
|1593285725048 |christopherwhite |christopherwhite |JAMES BENTLEY |MALE |kevin33@gmail.com|1985-03-27 |
|1593285725061 |pgrant |pgrant |JOSHUA GILL |MALE |amanda38@yahoo.co|1975-12-09 |
| | | | | |m | |
|1593285725066 |michelleblackwell|michelleblackwell|STACY BARRETT |FEMALE |lindseyhurst@gmai|2000-09-10 |
| | | | | |l.com | |
|1593285725069 |yblackburn |yblackburn |STACY AYALA |FEMALE |petersshawn@yahoo|1935-04-14 |
| | | | | |.com | |
|1593285725075 |jamesdominguez |jamesdominguez |CHARLES BARTON |MALE |mcgeedenise@hotma|1992-07-27 |
| | | | | |il.com | |
|1593285725080 |wphillips |wphillips |DANIEL JONES |MALE |milesmatthew@hotm|1918-07-16 |
| | | | | |ail.com | |
|1593285725091 |hailey78 |hailey78 |NEIL MALONE |MALE |jakesantiago@gmai|1980-07-25 |
| | | | | |l.com | |
|1593285725095 |sarah60 |sarah60 |RYAN GONZALEZ |MALE |rowlandrobert@yah|1922-08-05 |
| | | | | |oo.com | |
Examine the records in MongoDB, As you see here record for leesteven
has been deleted from MongoDB collection now
Examine the records in Postgres, As you see here record for leesteven
has been deleted from Postgres table now
Here we saw creating a tombstone record correctly in the parent topic will be also reflected in Kafka streams, ksqlDB and any target databases like MongoDB and Postgres.
ksql-test-runner is a ksqlDB testing tool to test set of KSQL statements. In this article we will see how to use …
If you have worked on ksqlDB, you would have come across this scenario multiple times. To drop a ksqlDB tables OR …