Confluent Kafka - Challenges & Solutions
In this article we will discuss some of the common Confluent Kafka challenges and solutions to fix them. Issue 01 Issue …
In this article we will go over few scenarios and to explore ARRAY, MAP and LIST in ksqlDB
Install Confluent platform for your operating system using instructions at here
Creating an array which gives the orders made by a user. Input to generate this array is individual orders events.
Create a schema for your datagen, See here for the schema I have created to generate test data for this demo
cd into the directory which has the schema (in this case ordersForArrayDemo.avro) and issue below command. This will generate test data for our topic. In this example its random orders data for few customers
ksql-datagen schema=ordersForArrayDemo.avro format=avro topic='dev.etl.datagen.orders.src.0010' key=orderID maxInterval=5000 iterations=10
Just in case if you need to purge data in your test topic, temporarily update the retention time on the topic to one second, then wait for the purge to take effect (about one minute). Once purged, restore the previous retention.ms value
kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name "dev.etl.datagen.orders.src.0010" --add-config retention.ms=1000
kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name "dev.etl.datagen.orders.src.0010" --add-config retention.ms=604800000
Now, Start KSQL session and Issue this command to start reading data from the beginning of the topic
SET 'auto.offset.reset'='earliest';
Check messages in topic using
ksql> PRINT 'dev.etl.datagen.orders.src.0010' FROM BEGINNING LIMIT 10;
Format:AVRO
3/22/20 10:40:58 PM EDT, 800000, {"orderID": "800000", "CustomerID": "6454214343400548", "StorePickUpFlag": "No", "ItemName": "Apple iPad", "OrderDate": "09/01/2018"}
3/22/20 10:41:00 PM EDT, 799999, {"orderID": "799999", "CustomerID": "8484214343400844", "StorePickUpFlag": "No", "ItemName": "Apple TV 4K Streaming Console", "OrderDate": "09/01/2018"}
3/22/20 10:41:00 PM EDT, 799998, {"orderID": "799998", "CustomerID": "8484214343400844", "StorePickUpFlag": "Yes", "ItemName": "Nintendo Switch", "OrderDate": "01/28/2020"}
3/22/20 10:41:02 PM EDT, 799997, {"orderID": "799997", "CustomerID": "5759544343400808", "StorePickUpFlag": "Yes", "ItemName": "Arlo Pro", "OrderDate": "06/24/2019"}
3/22/20 10:41:05 PM EDT, 799996, {"orderID": "799996", "CustomerID": "8484214343400844", "StorePickUpFlag": "No", "ItemName": "Tile Mate Item Finder 4-Pack Combo", "OrderDate": "01/28/2020"}
3/22/20 10:41:05 PM EDT, 799995, {"orderID": "799995", "CustomerID": "8484214343400844", "StorePickUpFlag": "Yes", "ItemName": "Apple iPad", "OrderDate": "01/28/2020"}
3/22/20 10:41:08 PM EDT, 799994, {"orderID": "799994", "CustomerID": "6454214343400548", "StorePickUpFlag": "Yes", "ItemName": "Nintendo Switch", "OrderDate": "09/01/2018"}
3/22/20 10:41:09 PM EDT, 799993, {"orderID": "799993", "CustomerID": "6454214343400548", "StorePickUpFlag": "No", "ItemName": "Jelly Comb folding Bluetooth keyboard", "OrderDate": "06/24/2019"}
3/22/20 10:41:10 PM EDT, 799992, {"orderID": "799992", "CustomerID": "5759544343400808", "StorePickUpFlag": "No", "ItemName": "Apple TV 4K Streaming Console", "OrderDate": "01/28/2020"}
3/22/20 10:41:12 PM EDT, 799991, {"orderID": "799991", "CustomerID": "8484214343400844", "StorePickUpFlag": "No", "ItemName": "Jelly Comb folding Bluetooth keyboard", "OrderDate": "06/24/2019"}
Now lets create a STREAM on this topic using
CREATE STREAM STM_ARRAY_DEMO_0010 WITH (KAFKA_TOPIC='dev.etl.datagen.orders.src.0010', VALUE_FORMAT='Avro');
Validate the data in STREAM using below PUSH query
ksql> SELECT * FROM STM_ARRAY_DEMO_0010 EMIT CHANGES LIMIT 3;
+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|ROWTIME |ROWKEY |ORDERID |CUSTOMERID |STOREPICKUPFLAG |ITEMNAME |ORDERDATE |
+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|1584931258886 |800000 |800000 |6454214343400548 |No |Apple iPad |09/01/2018 |
|1584931260502 |799999 |799999 |8484214343400844 |No |Apple TV 4K Streaming |09/01/2018 |
| | | | | |Console | |
|1584931260691 |799998 |799998 |8484214343400844 |Yes |Nintendo Switch |01/28/2020 |
Limit Reached
Query terminated
ksql>
Create a STREAM to structure the data as map and re-partition the data by required key. Since we need order data listed by a customer we are partitioning by customer ID
CREATE STREAM STM_ARRAY_DEMO_0020 AS
SELECT CUSTOMERID, AS_MAP(AS_ARRAY('orderID','StorePickUpFlag','ItemName','OrderDate'), AS_ARRAY(ORDERID,STOREPICKUPFLAG,ITEMNAME,ORDERDATE)) as orderData
FROM STM_ARRAY_DEMO_0010
PARTITION BY CUSTOMERID;
Validate the data in this new stream
ksql> SELECT * FROM STM_ARRAY_DEMO_0020 EMIT CHANGES LIMIT 2;
+----------------------------------------+----------------------------------------+----------------------------------------+----------------------------------------+
|ROWTIME |ROWKEY |CUSTOMERID |ORDERDATA |
+----------------------------------------+----------------------------------------+----------------------------------------+----------------------------------------+
|1584931258886 |6454214343400548 |6454214343400548 |{orderID=800000, ItemName=Apple iPad, St|
| | | |orePickUpFlag=No, OrderDate=09/01/2018} |
|1584931260502 |8484214343400844 |8484214343400844 |{orderID=799999, ItemName=Apple TV 4K St|
| | | |reaming Console, StorePickUpFlag=No, Ord|
| | | |erDate=09/01/2018} |
Limit Reached
Query terminated
ksql>
Create a TABLE to structure the data as required LIST
CREATE TABLE TBL_ARRAY_DEMO_0010 AS
SELECT CUSTOMERID, COLLECT_LIST(CAST(ORDERDATA as VARCHAR)) FROM STM_ARRAY_DEMO_0020
GROUP BY CUSTOMERID
EMIT CHANGES;
Validate the data in final table using
ksql> SELECT CUSTOMERID, KSQL_COL_1 FROM TBL_ARRAY_DEMO_0010 EMIT CHANGES;
+---------------------------+----------------------------------------------------------------------------------+
|CUSTOMERID |KSQL_COL_1 |
+---------------------------+----------------------------------------------------------------------------------+
|6454214343400548 |[{orderID=800000, ItemName=Apple iPad, StorePickUpFlag=No, OrderDate=09/01/2018}, |
| |{orderID=799994, ItemName=Nintendo Switch, StorePickUpFlag=Yes, OrderDate=09/01/20|
| |18}, {orderID=799993, ItemName=Jelly Comb folding Bluetooth keyboard, StorePickUpF|
| |lag=No, OrderDate=06/24/2019}] |
|5759544343400808 |[{orderID=799997, ItemName=Arlo Pro, StorePickUpFlag=Yes, OrderDate=06/24/2019}, {|
| |orderID=799992, ItemName=Apple TV 4K Streaming Console, StorePickUpFlag=No, OrderD|
| |ate=01/28/2020}] |
|8484214343400844 |[{orderID=799999, ItemName=Apple TV 4K Streaming Console, StorePickUpFlag=No, Orde|
| |rDate=09/01/2018}, {orderID=799998, ItemName=Nintendo Switch, StorePickUpFlag=Yes,|
| | OrderDate=01/28/2020}, {orderID=799996, ItemName=Tile Mate Item Finder 4-Pack Com|
| |bo, StorePickUpFlag=No, OrderDate=01/28/2020}, {orderID=799995, ItemName=Apple iPa|
| |d, StorePickUpFlag=Yes, OrderDate=01/28/2020}, {orderID=799991, ItemName=Jelly Com|
| |b folding Bluetooth keyboard, StorePickUpFlag=No, OrderDate=06/24/2019}] |
As you see the data is coming in required format list all orders made by user as an array
Some Known Issues | Cannot compare ARRAY values |
ksql version | ‘5.4.1’ |
Confluent version | ‘5.4.1’ |
Operating system | ‘Ubuntu 18.04’ |
CREATE TABLE TBL_ARRAY_DEMO_0030 (CUSTOMERID VARCHAR(STRING), KSQL_COL_1 ARRAY < VARCHAR(STRING) >)
WITH (KAFKA_TOPIC = 'TBL_ARRAY_DEMO_0030', VALUE_FORMAT = 'AVRO', KEY = 'CUSTOMERID', PARTITIONS = 1, REPLICAS = 1);
INSERT INTO TBL_ARRAY_DEMO_0010 (CUSTOMERID, KSQL_COL_1)
VALUES ('6454214343400541', AS_ARRAY('{orderID=799997, ItemName=Arlo Pro, StorePickUpFlag=Yes, OrderDate=06/24/2019}', '{orderID=799992, ItemName=Apple TV 4K Streaming Console, StorePickUpFlag=No, OrderDate=01/28/2020}'));
INSERT INTO TBL_ARRAY_DEMO_0030 (CUSTOMERID, KSQL_COL_1)
VALUES ('6454214343400541', AS_ARRAY('{orderID=799997, ItemName=Arlo Pro, StorePickUpFlag=Yes, OrderDate=06/24/2019}', '{orderID=799992, ItemName=Apple TV 4K Streaming Console, StorePickUpFlag=No, OrderDate=01/28/2020}'));
SELECT TIMESTAMPTOSTRING(CURR.ROWTIME, 'yyyy-MM-dd HH:mm:ss Z'), CURR.KSQL_COL_1, PREV.KSQL_COL_1
FROM TBL_ARRAY_DEMO_0030 CURR
INNER JOIN TBL_ARRAY_DEMO_0010 PREV ON CURR.ROWKEY = PREV.ROWKEY
WHERE TRIM(UCASE(CURR.CUSTOMERID)) = '6454214343400541'
AND CURR.KSQL_COL_1 <> PREV.KSQL_COL_1 EMIT CHANGES;
As you see the query fails with the error “Caused by: Cannot compare ARRAY values”
In this article we will discuss some of the common Confluent Kafka challenges and solutions to fix them. Issue 01 Issue …
Prerequisites Install Confluent platform for your operating system following instructions here Install MongoDB for your …