How to use ksql-test-runner to test ksql queries
ksql-test-runner is a ksqlDB testing tool to test set of KSQL statements. In this article we will see how to use β¦
In this article we will see how to generate test data for Kafka using Faker. We could also generate test data using Confluent ksql-datagen Tool, If you want to use ksql-datagen read more about it at here
Create a python module with faker to generate test data, Here we are going to use faker.simple_profile() to generate some random user profile information
#!/usr/bin/env python
from faker import Faker
from confluent_kafka import Producer
import socket
import json
import sys, getopt, time, os
import argparse
def main():
# Display the program start time
print('-' * 40)
print(os.path.basename(sys.argv[0]) + " started at ", time.ctime())
print('-' * 40)
print('Number of arguments :', len(sys.argv))
print('Argument List:', str(sys.argv))
parser = argparse.ArgumentParser(description="""
This script generates sample data for specified kafka topic.
""")
parser.add_argument("--bootstrap_servers", help="Bootstrap servers", required=True)
parser.add_argument("--topic_name", help="Topic name", required=True)
parser.add_argument("--no_of_records", help="Number of records", type=int, required=True)
args = parser.parse_args()
global bootstrap_servers
bootstrap_servers = args.bootstrap_servers
global topic_name
topic_name = args.topic_name
global no_of_records
no_of_records = args.no_of_records
print("Bootstrap servers : " + bootstrap_servers)
print("Topic name : " + topic_name)
print("Number of records : " + str(no_of_records))
class DatetimeEncoder(json.JSONEncoder):
def default(self, obj):
try:
return super(DatetimeEncoder, obj).default(obj)
except TypeError:
return str(obj)
def faker_datagen():
conf = {'bootstrap.servers': bootstrap_servers,
'client.id': socket.gethostname()}
producer = Producer(conf)
faker = Faker()
count = 0
while count < no_of_records:
profile = faker.simple_profile()
#print(profile)
#print(profile['username'])
message = json.dumps(profile, cls=DatetimeEncoder)
key=str(profile['username'])
producer.produce(topic=topic_name, value=message, key=key)
producer.flush()
count += 1
if __name__ == "__main__":
main()
faker_datagen()
sys.exit()
Create “requirements.txt” file with following information
Faker
confluent-kafka
Create “Dockerfile” with following commands. This will be used to create a docker image with the python code
FROM python:3
COPY . /usr/src/
WORKDIR /usr/src/
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
RUN apt-get update && apt-get install -y \
iputils-ping \
iproute2 \
curl \
dos2unix \
netcat \
net-tools \
&& rm -rf /var/lib/apt/lists/*
COPY . .
ENV bootstrap_servers localhost:9092
ENV topic_name datagen.user.profile
ENV no_of_records 1
CMD ["sh", "-c", "python /usr/src/app/faker-kafka-profile-datagen.py --bootstrap_servers $bootstrap_servers --topic_name $topic_name --no_of_records $no_of_records"]
Place all files in a folder as shown below
β Dockerfile
β requirements.txt
β
ββββapp
faker-kafka-profile-datagen.py
Create Docker images by running below build command. Tag/Name the image as per your use case so that you can identify the image in future
docker build --tag entechlog/faker-kafka-profile-datagen .
docker images
Run the docker image by running below command. Here you can change the bootstrap_servers, topic_name and no_of_records based on your needs
docker run -it --rm -e bootstrap_servers=192.168.1.9:39092 -e topic_name=datagen.user.profile -e no_of_records=1 entechlog/faker-kafka-profile-datagen
Before running the docker run, updated below config in Kafka broker so that producer from docker can talk to Kafka broker in a different server
listeners=PLAINTEXT://:9092,DOCKER_LISTENER://:19092,DNS_LISTENER://:29092,IP_LISTENER://:39092
advertised.listeners=PLAINTEXT://localhost:9092,DOCKER_LISTENER://host.docker.internal:19092,DNS_LISTENER://entechlog-vm-01:29092,IP_LISTENER://192.168.1.9:39092
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,DOCKER_LISTENER:PLAINTEXT,DNS_LISTENER:PLAINTEXT,IP_LISTENER:PLAINTEXT
Validate the records in Kafka topic. You can do this by a Kafka consumer OR by PRINT datagen.user.profile
from ksql session OR by simply browsing the messages in control center.
As show in above image, we have test data in Kafka topic and we can run this image anytime when we need to generate test data in a Kafka topic
Use this topic and data to create stream OR TABLE in ksqlDB
CREATE STREAM STM_DATAGENUSER_PROFILE (
USERNAME VARCHAR
,NAME VARCHAR
,SEX VARCHAR
,ADDRESS VARCHAR
,MAIL VARCHAR
,BIRTHDATE VARCHAR
)
WITH (
KAFKA_TOPIC = 'datagen.user.profile'
,VALUE_FORMAT = 'JSON'
,KEY='USERNAME'
);
Message
----------------
Stream created
----------------
Query the data in STREAM using below query
ksql> SELECT ROWKEY, BIRTHDATE FROM STM_DATAGENUSER_PROFILE EMIT CHANGES;
+------------------------------------------+--------------------------------+
|ROWKEY |BIRTHDATE |
+------------------------------------------+--------------------------------+
|rharris |1987-06-03 |
|amydavidson |1955-08-15 |
|williamzimmerman |1918-02-18 |
|ashleygutierrez |1915-10-16 |
|aharris |2004-08-01 |
|smithmelissa |1999-10-29 |
|mjones |2005-06-29 |
|matthew54 |1928-08-04 |
|raymondgarrison |1928-07-24 |
|zachary13 |1951-10-27 |
|alan32 |1974-08-06 |
|paynechristopher |1914-12-21 |
|jacksondunn |1924-08-03 |
Source code for this faker datagen docker image can be found here
Stop container | docker stop faker-kafka-profile-datagen |
---|---|
Remove container | docker rm faker-kafka-profile-datagen |
SSH into container | docker exec βit faker-kafka-profile-datagen /bin/bash |
Find Docker Gateway | `/sbin/ip route |
Netcat | docker run -it --rm --entrypoint "/bin/nc" faker-kafka-profile-datagen -vz 192.168.1.9 39092 |
Remove containers | docker container rm $(docker container ls -aq) |
Remove images | docker image prune -a |
ksql-test-runner is a ksqlDB testing tool to test set of KSQL statements. In this article we will see how to use β¦
If you have worked on ksqlDB, you would have come across this scenario multiple times. To drop a ksqlDB tables OR β¦