Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current Restore this Version View Page History

Version 1 Current »

Connecting to Kafka pod: 

  1. Search for kafka pod and its namespace 

    1. kubectl get pods -A| grep kafka

  2. Connect to kafka pods’ bash shell 

    1. kubectl exec -it kafka_pod_name -n kafka_namespace -- bash

    2. kubectl exec -it kafka-v2-0 -n kafka-cluster -- bash

Get list of available kafka connectors: 

  1. Connect to kafka pod

  2. Execute: kafka-consumer-groups -bootstrap-server kafka-v2.kafka-cluster:9092 -list

Get lags of kafka connectors: 

  1. Connect to kafka pod

  2. Execute: kafka-consumer-groups --bootstrap-server kafka-v2.kafka-cluster:9092 --describe --group connect-cms-case-es-sink-tl (Connector name)

Delete kafka connectors: 

  1. Connect to kafka pod

  2. Execute: curl -X GET http://kafka-connect.kafka-cluster:8083/connectors/cms-case-es-sink-tl (connector name)

Steps to create kafka connectors: 

  1. Connect to kafka pod

  2. Execute the following cURL commands within the kafka pod to create the connectors for each kafka topic 

*Confirm the kafka-connect and elasticsearch service pod name and namespace within the connector cURL (base URI and connection.url parameter of data)

Kafka cURLS: 

Property- Tax (cms-case-es-sink9226)

curl --location --request POST 'http://kafka-connect.kafka-cluster:8083/connectors/' \
--header 'Content-Type: application/json' \
--data '{
    "name": "cms-case-es-sink-pt",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "connection.url": "http://elasticsearch-data-v1.es-cluster:9200/",
        "type.name": "nss",
        "topics": "pt-national-dashboard",
        "key.ignore": true,
        "schemas.enable": false,
        "schema.ignore": true,
        "value.converter.schemas.enable": false,
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "batch.size": 10,
        "max.buffered.records": 500,
        "flush.timeout.ms": 600000,
        "retry.backoff.ms": 5000,
        "read.timout.ms": 10000,
        "linger.ms": 100,
        "max.in.flight.requests": 2,
        "errors.log.enable": true,
        "errors.deadletterqueue.topic.name": "nss-es-failed",
        "tasks.max": 1
    }
}'

Trade License 

curl --location --request POST 'http://kafka-connect.kafka-cluster:8083/connectors/' \
--header 'Content-Type: application/json' \
--data '{
    "name": "cms-case-es-sink-tl",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "connection.url": "http://elasticsearch-data-v1.es-cluster:9200/",
        "type.name": "nss",
        "topics": "tl-national-dashboard",
        "key.ignore": true,
        "schemas.enable": false,
        "schema.ignore": true,
        "value.converter.schemas.enable": false,
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "batch.size": 10,
        "max.buffered.records": 500,
        "flush.timeout.ms": 600000,
        "retry.backoff.ms": 5000,
        "read.timout.ms": 10000,
        "linger.ms": 100,
        "max.in.flight.requests": 2,
        "errors.log.enable": true,
        "errors.deadletterqueue.topic.name": "nss-es-failed",
        "tasks.max": 1
    }
}'

mCollect 

curl --location --request POST 'http://kafka-connect.kafka-cluster:8083/connectors/' \
--header 'Content-Type: application/json' \
--data '{
    "name": "cms-case-es-sink9825",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "connection.url": "http://elasticsearch-data-v1.es-cluster:9200/",
        "type.name": "nss",
        "topics": "mcollect-national-dashboard",
        "key.ignore": true,
        "schemas.enable": false,
        "schema.ignore": true,
        "value.converter.schemas.enable": false,
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "batch.size": 10,
        "max.buffered.records": 500,
        "flush.timeout.ms": 600000,
        "retry.backoff.ms": 5000,
        "read.timout.ms": 10000,
        "linger.ms": 100,
        "max.in.flight.requests": 2,
        "errors.log.enable": true,
        "errors.deadletterqueue.topic.name": "nss-es-failed",
        "tasks.max": 1
    }
}'

PGR

curl --location --request POST 'http://kafka-connect.kafka-cluster:8083/connectors/' \
--header 'Content-Type: application/json' \
--data '{
    "name": "cms-case-es-sink9725",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "connection.url": "http://elasticsearch-data-v1.es-cluster:9200/",
        "type.name": "nss",
        "topics": "pgr-national-dashboard",
        "key.ignore": true,
        "schemas.enable": false,
        "schema.ignore": true,
        "value.converter.schemas.enable": false,
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "batch.size": 10,
        "max.buffered.records": 500,
        "flush.timeout.ms": 600000,
        "retry.backoff.ms": 5000,
        "read.timout.ms": 10000,
        "linger.ms": 100,
        "max.in.flight.requests": 2,
        "errors.log.enable": true,
        "errors.deadletterqueue.topic.name": "nss-es-failed",
        "tasks.max": 1
    }
}'

OBPS

curl --location --request POST 'http://kafka-connect.kafka-cluster:8083/connectors/' \
--header 'Content-Type: application/json' \
--data '{
    "name": "cms-case-es-sink9625",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "connection.url": "http://elasticsearch-data-v1.es-cluster:9200/",
        "type.name": "nss",
        "topics": "obps-national-dashboard",
        "key.ignore": true,
        "schemas.enable": false,
        "schema.ignore": true,
        "value.converter.schemas.enable": false,
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "batch.size": 10,
        "max.buffered.records": 500,
        "flush.timeout.ms": 600000,
        "retry.backoff.ms": 5000,
        "read.timout.ms": 10000,
        "linger.ms": 100,
        "max.in.flight.requests": 2,
        "errors.log.enable": true,
        "errors.deadletterqueue.topic.name": "nss-es-failed",
        "tasks.max": 1
    }
}'

 Fire Noc

curl --location --request POST 'http://kafka-connect.kafka-cluster:8083/connectors/' \
--header 'Content-Type: application/json' \
--data '{
    "name": "cms-case-es-sink9525",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "connection.url": "http://elasticsearch-data-v1.es-cluster:9200/",
        "type.name": "nss",
        "topics": "firenoc-national-dashboard",
        "key.ignore": true,
        "schemas.enable": false,
        "schema.ignore": true,
        "value.converter.schemas.enable": false,
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "batch.size": 10,
        "max.buffered.records": 500,
        "flush.timeout.ms": 600000,
        "retry.backoff.ms": 5000,
        "read.timout.ms": 10000,
        "linger.ms": 100,
        "max.in.flight.requests": 2,
        "errors.log.enable": true,
        "errors.deadletterqueue.topic.name": "nss-es-failed",
        "tasks.max": 1
    }
}'

Common

curl --location --request POST 'http://kafka-connect.kafka-cluster:8083/connectors/' \
--header 'Content-Type: application/json' \
--data '{
    "name": "cms-case-es-sink9425",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "connection.url": "http://elasticsearch-data-v1.es-cluster:9200/",
        "type.name": "nss",
        "topics": "common-national-dashboard",
        "key.ignore": true,
        "schemas.enable": false,
        "schema.ignore": true,
        "value.converter.schemas.enable": false,
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "batch.size": 10,
        "max.buffered.records": 500,
        "flush.timeout.ms": 600000,
        "retry.backoff.ms": 5000,
        "read.timout.ms": 10000,
        "linger.ms": 100,
        "max.in.flight.requests": 2,
        "errors.log.enable": true,
        "errors.deadletterqueue.topic.name": "nss-es-failed",
        "tasks.max": 1
    }
}'

egov-dss-ingest-enriched

curl -X POST \
http://kafka-connect.kafka-cluster:8083/connectors/ \
-H 'Content-Type: application/json' \
-H 'Cookie: SESSIONID=f1349448-761e-4ebc-a8bb-f6799e756185' \
-H 'Postman-Token: adabf0e8-0599-4ac9-a591-920586ff4d50' \
-H 'cache-control: no-cache' \
-d '{
"name": "cms-case-es-sink9132",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch-data-v1.es-cluster:9200",
"type.name": "general",
"topics": "egov-dss-ingest-enriched",
"key.ignore": "false",
"schema.ignore": true,
"value.converter.schemas.enable": false,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "TopicNameRouter",
"transforms.TopicNameRouter.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.TopicNameRouter.regex": ".*",
"transforms.TopicNameRouter.replacement": "dss-collection_v2",
"batch.size": 10,
"max.buffered.records": 500,
"flush.timeout.ms": 600000,
"retry.backoff.ms": 5000,
"read.timout.ms": 10000,
"linger.ms": 100,
"max.in.flight.requests": 2,
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "dss-collection_v2-es-failed",
"tasks.max": 1
}
}'

Birth

curl --location --request POST 'http://kafka-connect.kafka-cluster:8083/connectors/' \
--header 'Content-Type: application/json' \
--data '{
    "name": "cms-case-es-sink9144",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "connection.url": "http://elasticsearch-data-v1.es-cluster:9200/",
        "type.name": "nss",
        "topics": "birth-cert-national-dashboard",
        "key.ignore": true,
        "schemas.enable": false,
        "schema.ignore": true,
        "value.converter.schemas.enable": false,
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "batch.size": 10,
        "max.buffered.records": 500,
        "flush.timeout.ms": 600000,
        "retry.backoff.ms": 5000,
        "read.timout.ms": 10000,
        "linger.ms": 100,
        "max.in.flight.requests": 2,
        "errors.log.enable": true,
        "errors.deadletterqueue.topic.name": "nss-es-failed",
        "tasks.max": 1
    }
}'

Death:

curl --location --request POST 'http://kafka-connect.kafka-cluster:8083/connectors/' \
--header 'Content-Type: application/json' \
--data '{
    "name": "cms-case-es-sink9143",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "connection.url": "http://elasticsearch-data-v1.es-cluster:9200/",
        "type.name": "nss",
        "topics": "death-cert-national-dashboard",
        "key.ignore": true,
        "schemas.enable": false,
        "schema.ignore": true,
        "value.converter.schemas.enable": false,
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "batch.size": 10,
        "max.buffered.records": 500,
        "flush.timeout.ms": 600000,
        "retry.backoff.ms": 5000,
        "read.timout.ms": 10000,
        "linger.ms": 100,
        "max.in.flight.requests": 2,
        "errors.log.enable": true,
        "errors.deadletterqueue.topic.name": "nss-es-failed",
        "tasks.max": 1
    }
}'

List of connectors: 

  • cms-case-es-sink-tl - tl national dashboard 

  • cms-case-es-sink9525 - firenoc ndb

  • cms-case-es-sink9625 - obps ndb

  • cms-case-es-sink9725 - pgr ndb

  • cms-case-es-sink-dss - dss-collection_v2 

  • cms-case-es-sink9825 - mcollect ndb

  • tlindex-v1-enriched-es-sink - tlindex-v1-enriched --

  • cms-case-es-sink9132 - egov-dss-ingest-enriched --

  • cms-case-es-sink9121 - paymentsindex-v1-enriched --

  • cms-case-es-sink9143 - death-cert-national-dashboard

  • cms-case-es-sink9144 - birth-cert-national-dashboard

  • cms-case-es-sink-pt - pt ndb

  • cms-case-es-sink9225 - ws ndb

  • cms-case-es-sink9425 - common ndb 

0 Comments

You are not logged in. Any changes you make will be marked as anonymous. You may want to Log In if you already have an account.