preloader
blog-post

Exploring ARRAY, MAP and LIST in ksqlDB

Table of Contents

In this article we will go over few scenarios and to explore ARRAY, MAP and LIST in ksqlDB

Prerequisites

Install Confluent platform for your operating system using instructions at here

Scenario

Creating an array which gives the orders made by a user. Input to generate this array is individual orders events.

  • Create a schema for your datagen, See here for the schema I have created to generate test data for this demo

  • cd into the directory which has the schema (in this case ordersForArrayDemo.avro) and issue below command. This will generate test data for our topic. In this example its random orders data for few customers

    ksql-datagen schema=ordersForArrayDemo.avro format=avro topic='dev.etl.datagen.orders.src.0010' key=orderID maxInterval=5000 iterations=10
    
  • Just in case if you need to purge data in your test topic, temporarily update the retention time on the topic to one second, then wait for the purge to take effect (about one minute). Once purged, restore the previous retention.ms value

    kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name "dev.etl.datagen.orders.src.0010" --add-config retention.ms=1000
    
    kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name "dev.etl.datagen.orders.src.0010" --add-config retention.ms=604800000
    
  • Now, Start KSQL session and Issue this command to start reading data from the beginning of the topic

    SET 'auto.offset.reset'='earliest';
    
  • Check messages in topic using

    ksql> PRINT 'dev.etl.datagen.orders.src.0010' FROM BEGINNING LIMIT 10;
    Format:AVRO
    3/22/20 10:40:58 PM EDT, 800000, {"orderID": "800000", "CustomerID": "6454214343400548", "StorePickUpFlag": "No", "ItemName": "Apple iPad", "OrderDate": "09/01/2018"}
    3/22/20 10:41:00 PM EDT, 799999, {"orderID": "799999", "CustomerID": "8484214343400844", "StorePickUpFlag": "No", "ItemName": "Apple TV 4K Streaming Console", "OrderDate": "09/01/2018"}
    3/22/20 10:41:00 PM EDT, 799998, {"orderID": "799998", "CustomerID": "8484214343400844", "StorePickUpFlag": "Yes", "ItemName": "Nintendo Switch", "OrderDate": "01/28/2020"}
    3/22/20 10:41:02 PM EDT, 799997, {"orderID": "799997", "CustomerID": "5759544343400808", "StorePickUpFlag": "Yes", "ItemName": "Arlo Pro", "OrderDate": "06/24/2019"}
    3/22/20 10:41:05 PM EDT, 799996, {"orderID": "799996", "CustomerID": "8484214343400844", "StorePickUpFlag": "No", "ItemName": "Tile Mate Item Finder 4-Pack Combo", "OrderDate": "01/28/2020"}
    3/22/20 10:41:05 PM EDT, 799995, {"orderID": "799995", "CustomerID": "8484214343400844", "StorePickUpFlag": "Yes", "ItemName": "Apple iPad", "OrderDate": "01/28/2020"}
    3/22/20 10:41:08 PM EDT, 799994, {"orderID": "799994", "CustomerID": "6454214343400548", "StorePickUpFlag": "Yes", "ItemName": "Nintendo Switch", "OrderDate": "09/01/2018"}
    3/22/20 10:41:09 PM EDT, 799993, {"orderID": "799993", "CustomerID": "6454214343400548", "StorePickUpFlag": "No", "ItemName": "Jelly Comb folding Bluetooth keyboard", "OrderDate": "06/24/2019"}
    3/22/20 10:41:10 PM EDT, 799992, {"orderID": "799992", "CustomerID": "5759544343400808", "StorePickUpFlag": "No", "ItemName": "Apple TV 4K Streaming Console", "OrderDate": "01/28/2020"}
    3/22/20 10:41:12 PM EDT, 799991, {"orderID": "799991", "CustomerID": "8484214343400844", "StorePickUpFlag": "No", "ItemName": "Jelly Comb folding Bluetooth keyboard", "OrderDate": "06/24/2019"}
    
  • Now lets create a STREAM on this topic using

    CREATE STREAM STM_ARRAY_DEMO_0010 WITH (KAFKA_TOPIC='dev.etl.datagen.orders.src.0010', VALUE_FORMAT='Avro');
    
  • Validate the data in STREAM using below PUSH query

    ksql> SELECT * FROM STM_ARRAY_DEMO_0010 EMIT CHANGES LIMIT 3;
    +----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
    |ROWTIME               |ROWKEY                |ORDERID               |CUSTOMERID            |STOREPICKUPFLAG       |ITEMNAME              |ORDERDATE             |
    +----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
    |1584931258886         |800000                |800000                |6454214343400548      |No                    |Apple iPad            |09/01/2018            |
    |1584931260502         |799999                |799999                |8484214343400844      |No                    |Apple TV 4K Streaming |09/01/2018            |
    |                      |                      |                      |                      |                      |Console               |                      |
    |1584931260691         |799998                |799998                |8484214343400844      |Yes                   |Nintendo Switch       |01/28/2020            |
    Limit Reached
    Query terminated
    ksql> 
    
  • Create a STREAM to structure the data as map and re-partition the data by required key. Since we need order data listed by a customer we are partitioning by customer ID

    CREATE STREAM STM_ARRAY_DEMO_0020 AS
    SELECT CUSTOMERID, AS_MAP(AS_ARRAY('orderID','StorePickUpFlag','ItemName','OrderDate'), AS_ARRAY(ORDERID,STOREPICKUPFLAG,ITEMNAME,ORDERDATE)) as orderData 
    FROM STM_ARRAY_DEMO_0010
    PARTITION BY CUSTOMERID;
    
  • Validate the data in this new stream

    ksql> SELECT * FROM STM_ARRAY_DEMO_0020 EMIT CHANGES LIMIT 2;
    +----------------------------------------+----------------------------------------+----------------------------------------+----------------------------------------+
    |ROWTIME                                 |ROWKEY                                  |CUSTOMERID                              |ORDERDATA                               |
    +----------------------------------------+----------------------------------------+----------------------------------------+----------------------------------------+
    |1584931258886                           |6454214343400548                        |6454214343400548                        |{orderID=800000, ItemName=Apple iPad, St|
    |                                        |                                        |                                        |orePickUpFlag=No, OrderDate=09/01/2018} |
    |1584931260502                           |8484214343400844                        |8484214343400844                        |{orderID=799999, ItemName=Apple TV 4K St|
    |                                        |                                        |                                        |reaming Console, StorePickUpFlag=No, Ord|
    |                                        |                                        |                                        |erDate=09/01/2018}                      |
    Limit Reached
    Query terminated
    ksql>
    
  • Create a TABLE to structure the data as required LIST

    CREATE TABLE TBL_ARRAY_DEMO_0010 AS
    SELECT CUSTOMERID, COLLECT_LIST(CAST(ORDERDATA as VARCHAR)) FROM STM_ARRAY_DEMO_0020
    GROUP BY CUSTOMERID
    EMIT CHANGES;
    
  • Validate the data in final table using

    ksql> SELECT CUSTOMERID, KSQL_COL_1 FROM TBL_ARRAY_DEMO_0010 EMIT CHANGES;
    +---------------------------+----------------------------------------------------------------------------------+
    |CUSTOMERID                 |KSQL_COL_1                                                                        |
    +---------------------------+----------------------------------------------------------------------------------+
    |6454214343400548           |[{orderID=800000, ItemName=Apple iPad, StorePickUpFlag=No, OrderDate=09/01/2018}, |
    |                           |{orderID=799994, ItemName=Nintendo Switch, StorePickUpFlag=Yes, OrderDate=09/01/20|
    |                           |18}, {orderID=799993, ItemName=Jelly Comb folding Bluetooth keyboard, StorePickUpF|
    |                           |lag=No, OrderDate=06/24/2019}]                                                    |
    |5759544343400808           |[{orderID=799997, ItemName=Arlo Pro, StorePickUpFlag=Yes, OrderDate=06/24/2019}, {|
    |                           |orderID=799992, ItemName=Apple TV 4K Streaming Console, StorePickUpFlag=No, OrderD|
    |                           |ate=01/28/2020}]                                                                  |
    |8484214343400844           |[{orderID=799999, ItemName=Apple TV 4K Streaming Console, StorePickUpFlag=No, Orde|
    |                           |rDate=09/01/2018}, {orderID=799998, ItemName=Nintendo Switch, StorePickUpFlag=Yes,|
    |                           | OrderDate=01/28/2020}, {orderID=799996, ItemName=Tile Mate Item Finder 4-Pack Com|
    |                           |bo, StorePickUpFlag=No, OrderDate=01/28/2020}, {orderID=799995, ItemName=Apple iPa|
    |                           |d, StorePickUpFlag=Yes, OrderDate=01/28/2020}, {orderID=799991, ItemName=Jelly Com|
    |                           |b folding Bluetooth keyboard, StorePickUpFlag=No, OrderDate=06/24/2019}]          |
    
  • As you see the data is coming in required format list all orders made by user as an array

Some Known Issues Cannot compare ARRAY values
ksql version ‘5.4.1’
Confluent version ‘5.4.1’
Operating system ‘Ubuntu 18.04’
  • Attempted to compare two ARRAY fields and it failed with “Cannot compare ARRAY values”, Lets go over some steps to recreate this issue.
  1. Create a new table which will have the same structure as the final table from above steps
CREATE TABLE TBL_ARRAY_DEMO_0030 (CUSTOMERID VARCHAR(STRING), KSQL_COL_1 ARRAY < VARCHAR(STRING) >)
	WITH (KAFKA_TOPIC = 'TBL_ARRAY_DEMO_0030', VALUE_FORMAT = 'AVRO', KEY = 'CUSTOMERID', PARTITIONS = 1, REPLICAS = 1);
  1. Insert test record which we will use for compare
INSERT INTO TBL_ARRAY_DEMO_0010 (CUSTOMERID, KSQL_COL_1)
VALUES ('6454214343400541', AS_ARRAY('{orderID=799997, ItemName=Arlo Pro, StorePickUpFlag=Yes, OrderDate=06/24/2019}', '{orderID=799992, ItemName=Apple TV 4K Streaming Console, StorePickUpFlag=No, OrderDate=01/28/2020}'));

INSERT INTO TBL_ARRAY_DEMO_0030 (CUSTOMERID, KSQL_COL_1)
VALUES ('6454214343400541', AS_ARRAY('{orderID=799997, ItemName=Arlo Pro, StorePickUpFlag=Yes, OrderDate=06/24/2019}', '{orderID=799992, ItemName=Apple TV 4K Streaming Console, StorePickUpFlag=No, OrderDate=01/28/2020}'));
  1. Run the ksql to compare the data in array
SELECT TIMESTAMPTOSTRING(CURR.ROWTIME, 'yyyy-MM-dd HH:mm:ss Z'), CURR.KSQL_COL_1, PREV.KSQL_COL_1
FROM TBL_ARRAY_DEMO_0030 CURR
INNER JOIN TBL_ARRAY_DEMO_0010 PREV ON CURR.ROWKEY = PREV.ROWKEY
WHERE TRIM(UCASE(CURR.CUSTOMERID)) = '6454214343400541'
	AND CURR.KSQL_COL_1 <> PREV.KSQL_COL_1 EMIT CHANGES;

As you see the query fails with the error “Caused by: Cannot compare ARRAY values”

References

Share this blog:
Comments

Related Articles