Connecting to Kafka pod:
Search for kafka pod and its namespace
kubectl get pods -A| grep kafka
Connect to kafka pods’ bash shell
kubectl exec -it kafka_pod_name -n kafka_namespace -- bash
kubectl exec -it kafka-v2-0 -n kafka-cluster -- bash
Get list of available kafka connectors:
Connect to kafka pod
Execute: kafka-consumer-groups -bootstrap-server kafka-v2.kafka-cluster:9092 -list
curl http://kafka-connect.kafka-cluster:8083/connectors (for Kafka-connect pod)
Get lags of kafka connectors:
Connect to kafka pod
Execute: kafka-consumer-groups --bootstrap-server kafka-v2.kafka-cluster:9092 --describe --group connect-cms-case-es-sink-tl (Connector name)
Delete kafka connectors:
Connect to kafka pod
Execute: curl -X DELETE http://kafka-connect.kafka-cluster:8083/connectors/cms-case-es-sink-tl (connector name)
Steps to create kafka connectors:
Connect to kafka pod
Execute the following cURL commands within the kafka pod to create the connectors for each kafka topic
*Confirm the kafka-connect and elasticsearch service pod name and namespace within the connector cURL (base URI and connection.url parameter of data)
Kafka cURLS:
Property- Tax (cms-case-es-sink9226)
curl --location --request POST 'http://kafka-connect.kafka-cluster:8083/connectors/' \
--header 'Content-Type: application/json' \
--data '{
"name": "cms-case-es-sink-pt",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch-data-v1.es-cluster:9200/",
"type.name": "nss",
"topics": "pt-national-dashboard",
"key.ignore": true,
"schemas.enable": false,
"schema.ignore": true,
"value.converter.schemas.enable": false,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"batch.size": 10,
"max.buffered.records": 500,
"flush.timeout.ms": 600000,
"retry.backoff.ms": 5000,
"read.timout.ms": 10000,
"linger.ms": 100,
"max.in.flight.requests": 2,
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "nss-es-failed",
"tasks.max": 1
}
}'
Trade License
curl --location --request POST 'http://kafka-connect.kafka-cluster:8083/connectors/' \
--header 'Content-Type: application/json' \
--data '{
"name": "cms-case-es-sink-tl",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch-data-v1.es-cluster:9200/",
"type.name": "nss",
"topics": "tl-national-dashboard",
"key.ignore": true,
"schemas.enable": false,
"schema.ignore": true,
"value.converter.schemas.enable": false,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"batch.size": 10,
"max.buffered.records": 500,
"flush.timeout.ms": 600000,
"retry.backoff.ms": 5000,
"read.timout.ms": 10000,
"linger.ms": 100,
"max.in.flight.requests": 2,
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "nss-es-failed",
"tasks.max": 1
}
}'mCollect
curl --location --request POST 'http://kafka-connect.kafka-cluster:8083/connectors/' \
--header 'Content-Type: application/json' \
--data '{
"name": "cms-case-es-sink9825",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch-data-v1.es-cluster:9200/",
"type.name": "nss",
"topics": "mcollect-national-dashboard",
"key.ignore": true,
"schemas.enable": false,
"schema.ignore": true,
"value.converter.schemas.enable": false,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"batch.size": 10,
"max.buffered.records": 500,
"flush.timeout.ms": 600000,
"retry.backoff.ms": 5000,
"read.timout.ms": 10000,
"linger.ms": 100,
"max.in.flight.requests": 2,
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "nss-es-failed",
"tasks.max": 1
}
}'PGR
curl --location --request POST 'http://kafka-connect.kafka-cluster:8083/connectors/' \
--header 'Content-Type: application/json' \
--data '{
"name": "cms-case-es-sink9725",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch-data-v1.es-cluster:9200/",
"type.name": "nss",
"topics": "pgr-national-dashboard",
"key.ignore": true,
"schemas.enable": false,
"schema.ignore": true,
"value.converter.schemas.enable": false,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"batch.size": 10,
"max.buffered.records": 500,
"flush.timeout.ms": 600000,
"retry.backoff.ms": 5000,
"read.timout.ms": 10000,
"linger.ms": 100,
"max.in.flight.requests": 2,
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "nss-es-failed",
"tasks.max": 1
}
}'OBPS
curl --location --request POST 'http://kafka-connect.kafka-cluster:8083/connectors/' \
--header 'Content-Type: application/json' \
--data '{
"name": "cms-case-es-sink9625",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch-data-v1.es-cluster:9200/",
"type.name": "nss",
"topics": "obps-national-dashboard",
"key.ignore": true,
"schemas.enable": false,
"schema.ignore": true,
"value.converter.schemas.enable": false,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"batch.size": 10,
"max.buffered.records": 500,
"flush.timeout.ms": 600000,
"retry.backoff.ms": 5000,
"read.timout.ms": 10000,
"linger.ms": 100,
"max.in.flight.requests": 2,
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "nss-es-failed",
"tasks.max": 1
}
}'Fire Noc
curl --location --request POST 'http://kafka-connect.kafka-cluster:8083/connectors/' \
--header 'Content-Type: application/json' \
--data '{
"name": "cms-case-es-sink9525",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch-data-v1.es-cluster:9200/",
"type.name": "nss",
"topics": "firenoc-national-dashboard",
"key.ignore": true,
"schemas.enable": false,
"schema.ignore": true,
"value.converter.schemas.enable": false,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"batch.size": 10,
"max.buffered.records": 500,
"flush.timeout.ms": 600000,
"retry.backoff.ms": 5000,
"read.timout.ms": 10000,
"linger.ms": 100,
"max.in.flight.requests": 2,
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "nss-es-failed",
"tasks.max": 1
}
}'Common
curl --location --request POST 'http://kafka-connect.kafka-cluster:8083/connectors/' \
--header 'Content-Type: application/json' \
--data '{
"name": "cms-case-es-sink9425",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch-data-v1.es-cluster:9200/",
"type.name": "nss",
"topics": "common-national-dashboard",
"key.ignore": true,
"schemas.enable": false,
"schema.ignore": true,
"value.converter.schemas.enable": false,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"batch.size": 10,
"max.buffered.records": 500,
"flush.timeout.ms": 600000,
"retry.backoff.ms": 5000,
"read.timout.ms": 10000,
"linger.ms": 100,
"max.in.flight.requests": 2,
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "nss-es-failed",
"tasks.max": 1
}
}'egov-dss-ingest-enriched
curl -X POST \
http://kafka-connect.kafka-cluster:8083/connectors/ \
-H 'Content-Type: application/json' \
-H 'Cookie: SESSIONID=f1349448-761e-4ebc-a8bb-f6799e756185' \
-H 'Postman-Token: adabf0e8-0599-4ac9-a591-920586ff4d50' \
-H 'cache-control: no-cache' \
-d '{
"name": "cms-case-es-sink9132",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch-data-v1.es-cluster:9200",
"type.name": "general",
"topics": "egov-dss-ingest-enriched",
"key.ignore": "false",
"schema.ignore": true,
"value.converter.schemas.enable": false,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "TopicNameRouter",
"transforms.TopicNameRouter.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.TopicNameRouter.regex": ".*",
"transforms.TopicNameRouter.replacement": "dss-collection_v2",
"batch.size": 10,
"max.buffered.records": 500,
"flush.timeout.ms": 600000,
"retry.backoff.ms": 5000,
"read.timout.ms": 10000,
"linger.ms": 100,
"max.in.flight.requests": 2,
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "dss-collection_v2-es-failed",
"tasks.max": 1
}
}'Birth
curl --location --request POST 'http://kafka-connect.kafka-cluster:8083/connectors/' \
--header 'Content-Type: application/json' \
--data '{
"name": "cms-case-es-sink9144",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch-data-v1.es-cluster:9200/",
"type.name": "nss",
"topics": "birth-cert-national-dashboard",
"key.ignore": true,
"schemas.enable": false,
"schema.ignore": true,
"value.converter.schemas.enable": false,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"batch.size": 10,
"max.buffered.records": 500,
"flush.timeout.ms": 600000,
"retry.backoff.ms": 5000,
"read.timout.ms": 10000,
"linger.ms": 100,
"max.in.flight.requests": 2,
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "nss-es-failed",
"tasks.max": 1
}
}'Death:
curl --location --request POST 'http://kafka-connect.kafka-cluster:8083/connectors/' \
--header 'Content-Type: application/json' \
--data '{
"name": "cms-case-es-sink9143",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch-data-v1.es-cluster:9200/",
"type.name": "nss",
"topics": "death-cert-national-dashboard",
"key.ignore": true,
"schemas.enable": false,
"schema.ignore": true,
"value.converter.schemas.enable": false,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"batch.size": 10,
"max.buffered.records": 500,
"flush.timeout.ms": 600000,
"retry.backoff.ms": 5000,
"read.timout.ms": 10000,
"linger.ms": 100,
"max.in.flight.requests": 2,
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "nss-es-failed",
"tasks.max": 1
}
}'List of connectors:
cms-case-es-sink-tl - tl national dashboard
cms-case-es-sink9525 - firenoc ndb
cms-case-es-sink9625 - obps ndb
cms-case-es-sink9725 - pgr ndb
cms-case-es-sink-dss - dss-collection_v2
cms-case-es-sink9825 - mcollect ndb
tlindex-v1-enriched-es-sink - tlindex-v1-enriched --
cms-case-es-sink9132 - egov-dss-ingest-enriched --
cms-case-es-sink9121 - paymentsindex-v1-enriched --
cms-case-es-sink9143 - death-cert-national-dashboard
cms-case-es-sink9144 - birth-cert-national-dashboard
cms-case-es-sink-pt - pt ndb
cms-case-es-sink9225 - ws ndb
cms-case-es-sink9425 - common ndb