/
Kafka Connectors for Digit

Connecting to Kafka pod: 

  1. Search for kafka pod and its namespace 

    1. kubectl get pods -A| grep kafka

  2. Connect to kafka pods’ bash shell 

    1. kubectl exec -it kafka_pod_name -n kafka_namespace -- bash

    2. kubectl exec -it kafka-v2-0 -n kafka-cluster -- bash

Get list of available kafka connectors: 

  1. Connect to kafka pod

  2. Execute: kafka-consumer-groups -bootstrap-server kafka-v2.kafka-cluster:9092 -list

  3. curl http://kafka-connect.kafka-cluster:8083/connectors (for Kafka-connect pod)

Get lags of kafka connectors: 

  1. Connect to kafka pod

  2. Execute: kafka-consumer-groups --bootstrap-server kafka-v2.kafka-cluster:9092 --describe --group connect-cms-case-es-sink-tl (Connector name)

Delete kafka connectors: 

  1. Connect to kafka pod

  2. Execute: curl -X DELETE http://kafka-connect.kafka-cluster:8083/connectors/cms-case-es-sink-tl (connector name)

Steps to create kafka connectors: 

  1. Connect to kafka pod

  2. Execute the following cURL commands within the kafka pod to create the connectors for each kafka topic 

*Confirm the kafka-connect and elasticsearch service pod name and namespace within the connector cURL (base URI and connection.url parameter of data)

Kafka cURLS: 

Property- Tax (cms-case-es-sink9226)

1curl --location --request POST 'http://kafka-connect.kafka-cluster:8083/connectors/' \ 2--header 'Content-Type: application/json' \ 3--data '{ 4 "name": "cms-case-es-sink-pt", 5 "config": { 6 "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", 7 "connection.url": "http://elasticsearch-data-v1.es-cluster:9200/", 8 "type.name": "nss", 9 "topics": "pt-national-dashboard", 10 "key.ignore": true, 11 "schemas.enable": false, 12 "schema.ignore": true, 13 "value.converter.schemas.enable": false, 14 "key.converter": "org.apache.kafka.connect.storage.StringConverter", 15 "value.converter": "org.apache.kafka.connect.json.JsonConverter", 16 "batch.size": 10, 17 "max.buffered.records": 500, 18 "flush.timeout.ms": 600000, 19 "retry.backoff.ms": 5000, 20 "read.timout.ms": 10000, 21 "linger.ms": 100, 22 "max.in.flight.requests": 2, 23 "errors.log.enable": true, 24 "errors.deadletterqueue.topic.name": "nss-es-failed", 25 "tasks.max": 1 26 } 27}' 28

Trade License 

1curl --location --request POST 'http://kafka-connect.kafka-cluster:8083/connectors/' \ 2--header 'Content-Type: application/json' \ 3--data '{ 4 "name": "cms-case-es-sink-tl", 5 "config": { 6 "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", 7 "connection.url": "http://elasticsearch-data-v1.es-cluster:9200/", 8 "type.name": "nss", 9 "topics": "tl-national-dashboard", 10 "key.ignore": true, 11 "schemas.enable": false, 12 "schema.ignore": true, 13 "value.converter.schemas.enable": false, 14 "key.converter": "org.apache.kafka.connect.storage.StringConverter", 15 "value.converter": "org.apache.kafka.connect.json.JsonConverter", 16 "batch.size": 10, 17 "max.buffered.records": 500, 18 "flush.timeout.ms": 600000, 19 "retry.backoff.ms": 5000, 20 "read.timout.ms": 10000, 21 "linger.ms": 100, 22 "max.in.flight.requests": 2, 23 "errors.log.enable": true, 24 "errors.deadletterqueue.topic.name": "nss-es-failed", 25 "tasks.max": 1 26 } 27}'

mCollect 

1curl --location --request POST 'http://kafka-connect.kafka-cluster:8083/connectors/' \ 2--header 'Content-Type: application/json' \ 3--data '{ 4 "name": "cms-case-es-sink9825", 5 "config": { 6 "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", 7 "connection.url": "http://elasticsearch-data-v1.es-cluster:9200/", 8 "type.name": "nss", 9 "topics": "mcollect-national-dashboard", 10 "key.ignore": true, 11 "schemas.enable": false, 12 "schema.ignore": true, 13 "value.converter.schemas.enable": false, 14 "key.converter": "org.apache.kafka.connect.storage.StringConverter", 15 "value.converter": "org.apache.kafka.connect.json.JsonConverter", 16 "batch.size": 10, 17 "max.buffered.records": 500, 18 "flush.timeout.ms": 600000, 19 "retry.backoff.ms": 5000, 20 "read.timout.ms": 10000, 21 "linger.ms": 100, 22 "max.in.flight.requests": 2, 23 "errors.log.enable": true, 24 "errors.deadletterqueue.topic.name": "nss-es-failed", 25 "tasks.max": 1 26 } 27}'

PGR

1curl --location --request POST 'http://kafka-connect.kafka-cluster:8083/connectors/' \ 2--header 'Content-Type: application/json' \ 3--data '{ 4 "name": "cms-case-es-sink9725", 5 "config": { 6 "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", 7 "connection.url": "http://elasticsearch-data-v1.es-cluster:9200/", 8 "type.name": "nss", 9 "topics": "pgr-national-dashboard", 10 "key.ignore": true, 11 "schemas.enable": false, 12 "schema.ignore": true, 13 "value.converter.schemas.enable": false, 14 "key.converter": "org.apache.kafka.connect.storage.StringConverter", 15 "value.converter": "org.apache.kafka.connect.json.JsonConverter", 16 "batch.size": 10, 17 "max.buffered.records": 500, 18 "flush.timeout.ms": 600000, 19 "retry.backoff.ms": 5000, 20 "read.timout.ms": 10000, 21 "linger.ms": 100, 22 "max.in.flight.requests": 2, 23 "errors.log.enable": true, 24 "errors.deadletterqueue.topic.name": "nss-es-failed", 25 "tasks.max": 1 26 } 27}'

OBPS

1curl --location --request POST 'http://kafka-connect.kafka-cluster:8083/connectors/' \ 2--header 'Content-Type: application/json' \ 3--data '{ 4 "name": "cms-case-es-sink9625", 5 "config": { 6 "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", 7 "connection.url": "http://elasticsearch-data-v1.es-cluster:9200/", 8 "type.name": "nss", 9 "topics": "obps-national-dashboard", 10 "key.ignore": true, 11 "schemas.enable": false, 12 "schema.ignore": true, 13 "value.converter.schemas.enable": false, 14 "key.converter": "org.apache.kafka.connect.storage.StringConverter", 15 "value.converter": "org.apache.kafka.connect.json.JsonConverter", 16 "batch.size": 10, 17 "max.buffered.records": 500, 18 "flush.timeout.ms": 600000, 19 "retry.backoff.ms": 5000, 20 "read.timout.ms": 10000, 21 "linger.ms": 100, 22 "max.in.flight.requests": 2, 23 "errors.log.enable": true, 24 "errors.deadletterqueue.topic.name": "nss-es-failed", 25 "tasks.max": 1 26 } 27}'

 Fire Noc

1curl --location --request POST 'http://kafka-connect.kafka-cluster:8083/connectors/' \ 2--header 'Content-Type: application/json' \ 3--data '{ 4 "name": "cms-case-es-sink9525", 5 "config": { 6 "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", 7 "connection.url": "http://elasticsearch-data-v1.es-cluster:9200/", 8 "type.name": "nss", 9 "topics": "firenoc-national-dashboard", 10 "key.ignore": true, 11 "schemas.enable": false, 12 "schema.ignore": true, 13 "value.converter.schemas.enable": false, 14 "key.converter": "org.apache.kafka.connect.storage.StringConverter", 15 "value.converter": "org.apache.kafka.connect.json.JsonConverter", 16 "batch.size": 10, 17 "max.buffered.records": 500, 18 "flush.timeout.ms": 600000, 19 "retry.backoff.ms": 5000, 20 "read.timout.ms": 10000, 21 "linger.ms": 100, 22 "max.in.flight.requests": 2, 23 "errors.log.enable": true, 24 "errors.deadletterqueue.topic.name": "nss-es-failed", 25 "tasks.max": 1 26 } 27}'

Common

1curl --location --request POST 'http://kafka-connect.kafka-cluster:8083/connectors/' \ 2--header 'Content-Type: application/json' \ 3--data '{ 4 "name": "cms-case-es-sink9425", 5 "config": { 6 "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", 7 "connection.url": "http://elasticsearch-data-v1.es-cluster:9200/", 8 "type.name": "nss", 9 "topics": "common-national-dashboard", 10 "key.ignore": true, 11 "schemas.enable": false, 12 "schema.ignore": true, 13 "value.converter.schemas.enable": false, 14 "key.converter": "org.apache.kafka.connect.storage.StringConverter", 15 "value.converter": "org.apache.kafka.connect.json.JsonConverter", 16 "batch.size": 10, 17 "max.buffered.records": 500, 18 "flush.timeout.ms": 600000, 19 "retry.backoff.ms": 5000, 20 "read.timout.ms": 10000, 21 "linger.ms": 100, 22 "max.in.flight.requests": 2, 23 "errors.log.enable": true, 24 "errors.deadletterqueue.topic.name": "nss-es-failed", 25 "tasks.max": 1 26 } 27}'

egov-dss-ingest-enriched

1curl -X POST \ 2http://kafka-connect.kafka-cluster:8083/connectors/ \ 3-H 'Content-Type: application/json' \ 4-H 'Cookie: SESSIONID=f1349448-761e-4ebc-a8bb-f6799e756185' \ 5-H 'Postman-Token: adabf0e8-0599-4ac9-a591-920586ff4d50' \ 6-H 'cache-control: no-cache' \ 7-d '{ 8"name": "cms-case-es-sink9132", 9"config": { 10"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", 11"connection.url": "http://elasticsearch-data-v1.es-cluster:9200", 12"type.name": "general", 13"topics": "egov-dss-ingest-enriched", 14"key.ignore": "false", 15"schema.ignore": true, 16"value.converter.schemas.enable": false, 17"key.converter": "org.apache.kafka.connect.storage.StringConverter", 18"value.converter": "org.apache.kafka.connect.json.JsonConverter", 19"transforms": "TopicNameRouter", 20"transforms.TopicNameRouter.type": "org.apache.kafka.connect.transforms.RegexRouter", 21"transforms.TopicNameRouter.regex": ".*", 22"transforms.TopicNameRouter.replacement": "dss-collection_v2", 23"batch.size": 10, 24"max.buffered.records": 500, 25"flush.timeout.ms": 600000, 26"retry.backoff.ms": 5000, 27"read.timout.ms": 10000, 28"linger.ms": 100, 29"max.in.flight.requests": 2, 30"errors.log.enable": true, 31"errors.deadletterqueue.topic.name": "dss-collection_v2-es-failed", 32"tasks.max": 1 33} 34}'

Birth

1curl --location --request POST 'http://kafka-connect.kafka-cluster:8083/connectors/' \ 2--header 'Content-Type: application/json' \ 3--data '{ 4 "name": "cms-case-es-sink9144", 5 "config": { 6 "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", 7 "connection.url": "http://elasticsearch-data-v1.es-cluster:9200/", 8 "type.name": "nss", 9 "topics": "birth-cert-national-dashboard", 10 "key.ignore": true, 11 "schemas.enable": false, 12 "schema.ignore": true, 13 "value.converter.schemas.enable": false, 14 "key.converter": "org.apache.kafka.connect.storage.StringConverter", 15 "value.converter": "org.apache.kafka.connect.json.JsonConverter", 16 "batch.size": 10, 17 "max.buffered.records": 500, 18 "flush.timeout.ms": 600000, 19 "retry.backoff.ms": 5000, 20 "read.timout.ms": 10000, 21 "linger.ms": 100, 22 "max.in.flight.requests": 2, 23 "errors.log.enable": true, 24 "errors.deadletterqueue.topic.name": "nss-es-failed", 25 "tasks.max": 1 26 } 27}'

Death:

1curl --location --request POST 'http://kafka-connect.kafka-cluster:8083/connectors/' \ 2--header 'Content-Type: application/json' \ 3--data '{ 4 "name": "cms-case-es-sink9143", 5 "config": { 6 "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", 7 "connection.url": "http://elasticsearch-data-v1.es-cluster:9200/", 8 "type.name": "nss", 9 "topics": "death-cert-national-dashboard", 10 "key.ignore": true, 11 "schemas.enable": false, 12 "schema.ignore": true, 13 "value.converter.schemas.enable": false, 14 "key.converter": "org.apache.kafka.connect.storage.StringConverter", 15 "value.converter": "org.apache.kafka.connect.json.JsonConverter", 16 "batch.size": 10, 17 "max.buffered.records": 500, 18 "flush.timeout.ms": 600000, 19 "retry.backoff.ms": 5000, 20 "read.timout.ms": 10000, 21 "linger.ms": 100, 22 "max.in.flight.requests": 2, 23 "errors.log.enable": true, 24 "errors.deadletterqueue.topic.name": "nss-es-failed", 25 "tasks.max": 1 26 } 27}'

List of connectors: 

  • cms-case-es-sink-tl - tl national dashboard 

  • cms-case-es-sink9525 - firenoc ndb

  • cms-case-es-sink9625 - obps ndb

  • cms-case-es-sink9725 - pgr ndb

  • cms-case-es-sink-dss - dss-collection_v2 

  • cms-case-es-sink9825 - mcollect ndb

  • tlindex-v1-enriched-es-sink - tlindex-v1-enriched --

  • cms-case-es-sink9132 - egov-dss-ingest-enriched --

  • cms-case-es-sink9121 - paymentsindex-v1-enriched --

  • cms-case-es-sink9143 - death-cert-national-dashboard

  • cms-case-es-sink9144 - birth-cert-national-dashboard

  • cms-case-es-sink-pt - pt ndb

  • cms-case-es-sink9225 - ws ndb

  • cms-case-es-sink9425 - common ndb