mGramseva- Services Re-Indexing
Overview:
We are using re-indexing to get all the data to respective indexer. We have 2 steps for this. First as to run the connector from playground, which is followed by legacyindexer service call from indexer service, which internally call the respective plain search service to get the data and to send to respective indexer.
Prerequisites:
Access to kubectl of the environment targetted
Postman scripts
Plain search apis in the respective services
We have mainly 3 indexes in mGramseva for Re-indexing.
Water-services
Echallan-services
dss-collection_v2
Stpes to follow for re-indexing:
Delete the kafka connector if already exists with the kafka connection, using below command to delete connector
curl -X DELETE http://kafka-connect.mgramseva:8083/connectors/water-services-enriched-es-sink
ws-services re-indexing
Kafka Connector Curl to be runned from playground pod
curl --location --request POST 'http://localhost:8083/connectors/' \ --header 'Cache-Control: no-cache' \ --header 'Content-Type: application/json' \ --header 'Postman-Token: 419e68ba-ffb9-4da9-86e1-7ad5a4c8d0b9' \ --data '{ "name": "water-services-temp-es-sink", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "type.name": "general", "tasks.max": "1", "max.retries": "15", "key.ignore": "false", "retry.backoff.ms": "5000", "max.buffered.records": "25", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "errors.log.enable": "true", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "read.timeout.ms": "100000", "topics": "water-services-temp-enriched", "batch.size": "25", "max.in.flight.requests": "2", "schema.ignore": "true", "behavior.on.malformed.documents": "warn", "flush.timeout.ms": "3600000", "errors.deadletterqueue.topic.name": "water-services-enriched-failed", "errors.tolerance": "all", "value.converter.schemas.enable": "false", "name": "water-services-temp-es-sink", "connection.url": "http://elasticsearch-data-v1.mgramseva:9200", "linger.ms": "1000", "transforms": "TopicNameRouter", "transforms.TopicNameRouter.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.TopicNameRouter.regex": "water-services-temp-enriched*", "transforms.TopicNameRouter.replacement": "water-services" } }'
Invoke Legacy Index of ws-service
curl --location --request POST 'http://localhost:8055/egov-indexer/index-operations/_legacyindex' \ --header 'Content-Type: application/json' \ --data-raw '{ "RequestInfo": { "apiId": "string", "ver": "string", "ts": null, "action": "string", "did": "string", "key": "string", "msgId": "string", "authToken": "ca3256e3-5318-47b1-8a68-ffcf2228fe35", "correlationId": "e721639b-c095-40b3-86e2-acecb2cb6efb", "userInfo": { "id": 23299, "uuid": "e721639b-c095-40b3-86e2-acecb2cb6efb", "userName": "9337682030", "name": "Abhilash Seth", "type": "CITIZEN", "mobileNumber": "9337682030", "emailId": "abhilash.seth@gmail.com", "roles": [ { "id": 281, "name": "Citizen" } ] } }, "apiDetails": { "uri": "http://ws-services.mgramseva:8080/ws-services/wc/_plainsearch", "tenantIdForOpenSearch": "pb", "paginationDetails": { "offsetKey": "offset", "sizeKey": "limit", "maxPageSize": 25, "limit":25 }, "responseJsonPath": "$.WaterConnection" }, "legacyIndexTopic": "ws-connection-legacyIndex", "tenantId": "pb" }'
Delete the kafka connection after all the data has been re-indexed by following below command to delete connector
curl -X DELETE http://kafka-connect.mgramseva:8083/connectors/water-services-enriched-es-sink
Alias water-services-enriched as water-services
KQL
POST /_aliases
{
"actions": [
{
"add": {
"index": "water-services-enriched",
"alias": "water-services"
}
}
]
}
cURL
curl -X POST http://elasticsearch-data-v1.mgramseva:9200/_aliases \
--header 'Content-Type: application/json' \
-d '{
"actions": [
{
"add": {
"index": "water-services-enriched",
"alias": "water-services"
}
}
]
}'
EChallan -Reindexing
Delete the kafka connector if already exists with the kafka connection, using below command to delete connector.curl -X DELETE http://kafka-connect.mgramseva:8083/connectors/echallan-services-enriched-es-sink
Kafka Connector Call to be runned from Playgroun podcurl --location --request POST 'http://kafka-connect.mgramseva:8083/connectors/' \ --header 'Cache-Control: no-cache' \ --header 'Content-Type: application/json' \ --header 'Postman-Token: 419e68ba-ffb9-4da9-86e1-7ad5a4c8d0b9' \ --data-raw '{ "name": "echallan-services-enriched-es-sink", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "type.name": "general", "tasks.max": "1", "max.retries": "15", "key.ignore": "true", "retry.backoff.ms": "5000", "max.buffered.records": "25", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "errors.log.enable": "true", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "read.timeout.ms": "100000", "topics": "echallan-services-enriched", "batch.size": "25", "max.in.flight.requests": "2", "schema.ignore": "true", "behavior.on.malformed.documents": "warn", "flush.timeout.ms": "3600000", "errors.deadletterqueue.topic.name": "echallan-services-enriched-failed", "errors.tolerance": "all", "value.converter.schemas.enable": "false", "name": "echallan-services-enriched-es-sink", "connection.url": "http://elasticsearch-data-v1.mgramseva:9200/", "linger.ms": "1000", "transforms": "TopicNameRouter", "transforms.TopicNameRouter.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.TopicNameRouter.regex": "echallan-services-enriched*", "transforms.TopicNameRouter.replacement": "echallan-services-enriched" } }'
Legacy Index call from postman by port forwarding egov-indexer podcurl --location --request POST 'http://localhost:8055/egov-indexer/index-operations/_legacyindex' \ --header 'Content-Type: application/json' \ --data-raw '{ "RequestInfo": { "apiId": "string", "ver": "string", "ts": null, "action": "string", "did": "string", "key": "string", "msgId": "string", "authToken": "ca3256e3-5318-47b1-8a68-ffcf2228fe35", "correlationId": "e721639b-c095-40b3-86e2-acecb2cb6efb", "userInfo": { "id": 23299, "uuid": "e721639b-c095-40b3-86e2-acecb2cb6efb", "userName": "9337682030", "name": "Abhilash Seth", "type": "CITIZEN", "mobileNumber": "9337682030", "emailId": "abhilash.seth@gmail.com", "roles": [ { "id": 281, "name": "Citizen" } ] } }, "apiDetails": { "uri": "http://echallan-services.mgramseva:8080/echallan-services/eChallan/v1/_plainsearch", "tenantIdForOpenSearch": "pb", "paginationDetails": { "offsetKey": "offset", "sizeKey": "limit", "maxPageSize": 25, "limit":25 }, "responseJsonPath": "$.challans" }, "legacyIndexTopic": "echallan-legacyIndex", "tenantId": "pb" }'
Delete the kafka connection after all the data has been re-indexed by follwing below command to delete connector
curl -X DELETE http://kafka-connect.mgramseva:8083/connectors/water-services-enriched-es-sink
Alias echallan-services-enriched as echallan-services
KQL
POST /_aliases
{
"actions": [
{
"add": {
"index": "echallan-services-enriched",
"alias": "echallan-services"
}
}
]
}
cURL
curl -X POST http://elasticsearch-data-v1.mgramseva:9200/_aliases \
--header 'Content-Type: application/json' \
-d '{
"actions": [
{
"add": {
"index": "echallan-services-enriched",
"alias": "echallan-services"
}
}
]
}'
Dss collection v2 re-indexing
Delete the kafka connector if already exists with the kafka connection, using below command to delete connector
curl -X DELETE http://kafka-connect.mgramseva:8083/connectors/cms-case-es-sink9121
Kafka Connector call to be run from playground pod
curl --location --request POST 'http://kafka-connect.mgramseva:8083/connectors/' \
--header 'Cache-Control: no-cache' \
--header 'Content-Type: application/json' \
--header 'Postman-Token: 419e68ba-ffb9-4da9-86e1-7ad5a4c8d0b9' \
--data-raw '{
"name": "cms-case-es-sink9121",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch-data-v1.mgramseva:9200",
"type.name": "payments",
"topics": "paymentsindex-v1-enriched",
"key.ignore": "false",
"schema.ignore": true,
"value.converter.schemas.enable": false,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "TopicNameRouter",
"transforms.TopicNameRouter.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.TopicNameRouter.regex": ".*",
"transforms.TopicNameRouter.replacement": "paymentsindex-v1",
"batch.size": 10,
"max.buffered.records": 500,
"flush.timeout.ms": 600000,
"retry.backoff.ms": 5000,
"read.timout.ms": 10000,
"linger.ms": 100,
"max.in.flight.requests": 2,
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "paymentsindex-v1-es-failed",
"tasks.max": 1
}
}'
curl --location --request POST 'http://kafka-connect.mgramseva:8083/connectors/' \
--header 'Cache-Control: no-cache' \
--header 'Content-Type: application/json' \
--header 'Postman-Token: 419e68ba-ffb9-4da9-86e1-7ad5a4c8d0b9' \
--data-raw '{
"name": "cms-case-es-sink9132",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch-data-v1.mgramseva:9200",
"type.name": "general",
"topics": "egov-dss-ingest-enriched",
"key.ignore": "false",
"schema.ignore": true,
"value.converter.schemas.enable": false,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "TopicNameRouter",
"transforms.TopicNameRouter.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.TopicNameRouter.regex": ".*",
"transforms.TopicNameRouter.replacement": "dss-collection_v2",
"batch.size": 10,
"max.buffered.records": 500,
"flush.timeout.ms": 600000,
"retry.backoff.ms": 5000,
"read.timout.ms": 10000,
"linger.ms": 100,
"max.in.flight.requests": 2,
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "dss-collection_v2-es-failed",
"tasks.max": 1
}
}'
payment re-indexing run from postman call
curl --location --request POST 'http://localhost:8055/egov-indexer/index-operations/_legacyindex' \
--header 'Content-Type: application/json' \
--data-raw '{
"RequestInfo": {
"apiId": "string",
"ver": "string",
"ts": null,
"action": "string",
"did": "string",
"key": "string",
"msgId": "string",
"authToken": "b843ef27-1ac6-49b8-ab71-cd0c22f4e50e",
"correlationId": "e721639b-c095-40b3-86e2-acecb2cb6efb",
"userInfo": {
"id": 23299,
"uuid": "e721639b-c095-40b3-86e2-acecb2cb6efb",
"userName": "9337682030",
"name": "Abhilash Seth",
"type": "EMPLOYEE",
"mobileNumber": "9337682030",
"emailId": "abhilash.seth@gmail.com",
"roles": [
{
"id": 281,
"name": "Employee"
}
]
}
},
"apiDetails": {
"uri": "http://collection-services.mgramseva:8080/collection-services/payments/_plainsearch",
"tenantIdForOpenSearch": "pb",
"paginationDetails": {
"offsetKey": "offset",
"sizeKey": "limit",
"maxPageSize": 50,
"startingOffset": 0
},
"responseJsonPath": "$.Payments"
},
"legacyIndexTopic": "egov-payment-legacy-index",
"tenantId": "pb"
}'