...
Connect(port-forward) with the Kafka-connect server.
We can create a new connector with a POST API call to localhost:8083/connectors.
The request body for that API call is written in the below JSON file :
Code Block { "name": "fiscal-event-mongodb-sink", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "connection.uri": "${mongo-db-authenticated-uri}", "database": "${mongo-db-name}", "collection": "fiscal_event", "topics": "language json file fiscal-event-mongodb-sink
", "key.ignore": "true", "schema.
ignore": true, "value.converter.schemas.enable": false, "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "mongo.errors.tolerance": "all", "mongo.errors.log.enable": true, "errors.log.enable": true, "errors.deadletterqueue.context.headers.enable": true, "errors.deadletterqueue.topic.name": "fiscal-event-mongodb-dead-letter", "batch.size": 500, "max.buffered.records": 1000, "flush.timeout.ms": 600000, "retry.backoff.ms": 5000, "read.timout.ms": 10000, "linger.ms": 1000, "max.in.flight.requests": 2, "tasks.max": 1 } }
...
Within that file, wherever ${---} replace it with the actual value based on the environment. Get ${mongo-db-authenticated-uri} from the configured secrets of the environment.
(Optional) Verify and make changes to the topic names.
...
The connector is ready. You can check it using API call GET localhost:8083/connectors.
Druid Sink
We will use the Druid console to start ingesting data from a Kafka topic to the Druid data store. Please follow the steps mentioned below to start the Druid Supervisor
Open the Druid console
Go to the Load Data section
Select Other
Click on Submit Supervisor
Copy...Paste the JSON from the below file in the available text box.
{ "type": "kafka", "spec": { "ioConfig": { "type": "kafka", "consumerProperties": { "bootstrap.servers": "kafka-v2.ifix:9092" }, "topic": "fiscal-event-druid-sink", "inputFormat": { "type": "json" }, "useEarliestOffset": true }, "tuningConfig": { "type": "kafka" }, "dataSchema": { "dataSource": "fiscal-event", "timestampSpec": { "column": "eventTime", "format": "millis" }, "transformSpec": { "transforms": [ { "type": "expression", "name": "bill", "expression": "if(\"eventType\" == 'BILL', \"amount\", 0)" }, { "type": "expression", "name": "receipt", "expression": "if(\"eventType\" == 'RECEIPT', \"amount\", 0)" }, { "type": "expression", "name": "payment", "expression": "if(\"eventType\" == 'PAYMENT', \"amount\", 0)" }, { "type": "expression", "name": "demand", "expression": "if(\"eventType\" == 'DEMAND', \"amount\", 0)" } ] }, "dimensionsSpec": { "dimensions": [ "id", { "type": "double", "name": "amount" }, "version", "tenantId", "eventId", { "type": "long", "name": "ingestionTime" }, "eventType", "referenceId", "parentEventId", "parentReferenceId", { "type": "long", "name": "fromBillingPeriod" }, { "type": "long", "name": "toBillingPeriod" }, "coa.id", "coa.coaCode", "coa.majorHead", "coa.majorHeadName", "coa.subMajorHead", "coa.subMajorHeadName", "coa.minorHead", "coa.minorHeadName", "coa.subHead", "coa.subHeadName", "coa.groupHead", "coa.groupHeadName", "coa.objectHead", "coa.objectHeadName", "department.id", "department.code", "department.name", "departmentEntity.id", "departmentEntity.code", "departmentEntity.name", "departmentEntity.hierarchyLevel", "departmentEntity.ancestry[0].id", "departmentEntity.ancestry[0].code", "departmentEntity.ancestry[0].name", "departmentEntity.ancestry[0].hierarchyLevel", "departmentEntity.ancestry[1].id", "departmentEntity.ancestry[1].code", "departmentEntity.ancestry[1].name", "departmentEntity.ancestry[1].hierarchyLevel", "departmentEntity.ancestry[2].id", "departmentEntity.ancestry[2].code", "departmentEntity.ancestry[2].name", "departmentEntity.ancestry[2].hierarchyLevel", "departmentEntity.ancestry[3].id", "departmentEntity.ancestry[3].code", "departmentEntity.ancestry[3].name", "departmentEntity.ancestry[3].hierarchyLevel", "departmentEntity.ancestry[4].id", "departmentEntity.ancestry[4].code", "departmentEntity.ancestry[4].name", "departmentEntity.ancestry[4].hierarchyLevel", "departmentEntity.ancestry[5].id", "departmentEntity.ancestry[5].code", "departmentEntity.ancestry[5].name", "departmentEntity.ancestry[5].hierarchyLevel", "departmentEntity.ancestry[6].id", "departmentEntity.ancestry[6].code", "departmentEntity.ancestry[6].name", "departmentEntity.ancestry[6].hierarchyLevel", "departmentEntity.ancestry[7].id", "departmentEntity.ancestry[7].code", "departmentEntity.ancestry[7].name", "departmentEntity.ancestry[7].hierarchyLevel", "departmentEntity.ancestry[8].id", "departmentEntity.ancestry[8].code", "departmentEntity.ancestry[8].name", "departmentEntity.ancestry[8].hierarchyLevel", "departmentEntity.ancestry[9].id", "departmentEntity.ancestry[9].code", "departmentEntity.ancestry[9].name", "departmentEntity.ancestry[9].hierarchyLevel", "departmentEntity.ancestry[10].id", "departmentEntity.ancestry[10].code", "departmentEntity.ancestry[10].name", "departmentEntity.ancestry[10].hierarchyLevel", "expenditure.id", "expenditure.code", "expenditure.name", "expenditure.type", "government.id", "government.name", "project.id", "project.code", "project.name", { "name": "bill", "type": "double" }, { "name": "receipt", "type": "double" }, { "name": "demand", "type": "double" }, { "name": "payment", "type": "double" } ] }, "granularitySpec": { "queryGranularity": "none", "rollup": false, "segmentGranularity": "hour" } } } }Code Block the druid-ingestion-config.json file in the available text box.
You should verify the Kafka topic name and the Kafka bootstrap server address before submitting the config.
Now submit the config and the data ingestion should start into the fiscal-event data source.
Interaction Diagram
Environment
...
Note : Kafka topic needs to be configured with respect to environment
Key | Value | Description | Remarks |
---|---|---|---|
|
| Fiscal event post processor will consume data from this topic | Kafka topic should be same as configured in Fiscal event service. |
|
| Dereferenced fiscal event data will be pushed to this topic | NA |
|
| NA | NA |
|
| Dereferenced fiscal event data will be pushed to this topic and will consumed Kafka connect to persist in Mongo Data Store. | Mongo Kafka connects topic should be same. |
|
| Flattened Fiscal Event data will be pushed to this topic. | While druid ingest of fiscal event , make sure it has the same topic as mentioned here. |
Configurations and Setup
Update all the DB, Kafka producer & Consumer And URI configuration in the dev.yaml, qa.yaml, prod.yaml file.
...