Confluent Kafka - Challenges & Solutions
In this article we will discuss some of the common Confluent Kafka challenges and solutions to fix them. Issue 01 Issue …
sudo apt-get update && sudo apt-get upgrade
for Ubuntu and sudo yum update
for CentOSBefore we setup the connectors, Lets import some test data in MongoDB.
Download some sample dataset from “https://catalog.data.gov/dataset?res_format=CSV" to a directory in your MongoDB server
cd into the directory and confirm file is available
cd ~/sample_dataset;ls;
Use mongoimport command to import the file
mongoimport --type csv -d entechlog -c Accidental_Drug_Related_Deaths --headerline --drop Accidental_Drug_Related_Deaths_2012-2018.csv
Validate the data in MongoDB using MongoDB compass
Install MongoDB Connector for Apache Kafka, See “https://www.confluent.io/hub/mongodb/kafka-connect-mongodb” for the instructions
Make sure you have a replica set before configuring the connector. See “https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/" for more details on how to enable replica set
If you have replica set enabled then you can skip next few steps until the error message highlighted in red
sudo systemctl stop mongod
sudo systemctl edit --full mongod
ExecStart=/usr/bin/mongod --replSet rs0 --config /etc/mongod.conf
sudo systemctl start mongod;sudo systemctl status mongod;
Make sure to connect to a mongo shell and initiate the new replica set rs.initiate(), Validate the status using rs.status()
If you miss initiate step then connector will fail with
com.mongodb.MongoCommandException: Command failed with error 94 (NotYetInitialized): ‘Cannot use snapshots until replica set is finished initializing.’ on server entechlog-vm-01:27017
Here is the config for a source connector with StringConverter serde. This will not register the schema in confluent schema registry
You can either use curl OR postman OR control center UI to create the connector
Curl
curl -X PUT http://entechlog-vm-01:8083/connectors/ACCIDENTAL_DRUG_RELATED_DEATHS_0010_SRC_MONGODB/config -H "Content-Type: application/json" -d '{
"name": "ACCIDENTAL_DRUG_RELATED_DEATHS_0010_SRC_MONGODB",
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"connection.uri": "mongodb://admin:admin@entechlog-vm-01:27017/admin?authSource=admin&readPreference=primary&ssl=false",
"copy.existing": true,
"database": "entechlog",
"collection": "Accidental_Drug_Related_Deaths",
"publish.full.document.only": true,
"topic.prefix": "dev.mongodb"
}'
Postman
Confluent Control Center
Data in topic
Since “StringConverter” did not register a schema, Tried creating connectors with “AvroConverter” and “JsonConverter” as both key and value converters. “AvroConverter” registered a schema with fields as “string” which is not useful.
Connector Config - AVRO
curl -X PUT http://entechlog-vm-01:8083/connectors/ACCIDENTAL_DRUG_RELATED_DEATHS_0020_SRC_MONGODB/config -H "Content-Type: application/json" -d '{
"name": "ACCIDENTAL_DRUG_RELATED_DEATHS_0020_SRC_MONGODB",
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://entechlog-vm-01:8081",
"key.converter.schema.registry.url": "http://entechlog-vm-01:8081",
"schema.registry.url": "http://entechlog-vm-01:8081",
"connection.uri": "mongodb://admin:admin@entechlog-vm-01:27017/admin?authSource=admin&readPreference=primary&ssl=false",
"copy.existing": true,
"database": "entechlog",
"collection": "Accidental_Drug_Related_Deaths",
"publish.full.document.only": true,
"topic.prefix": "dev.mongodb.avro"
}'
Schema Registry - AVRO
Connector Config - JSON
curl -X PUT http://entechlog-vm-01:8083/connectors/ACCIDENTAL_DRUG_RELATED_DEATHS_0030_SRC_MONGODB/config -H "Content-Type: application/json" -d '{
"name": "ACCIDENTAL_DRUG_RELATED_DEATHS_0030_SRC_MONGODB",
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schema.registry.url": "http://entechlog-vm-01:8081",
"key.converter.schema.registry.url": "http://entechlog-vm-01:8081",
"schema.registry.url": "http://entechlog-vm-01:8081",
"connection.uri": "mongodb://admin:admin@entechlog-vm-01:27017/admin?authSource=admin&readPreference=primary&ssl=false",
"copy.existing": true,
"database": "entechlog",
"collection": "Accidental_Drug_Related_Deaths",
"publish.full.document.only": true,
"topic.prefix": "dev.mongodb.json"
}'
In this article we will discuss some of the common Confluent Kafka challenges and solutions to fix them. Issue 01 Issue …
Here are the steps to connect to the MongoDB database from DataStage using the ODBC connector. These steps were tested …