Kafka Connector
Objective
Configuring Elasticsearch Connector
Before running Kafka Connect Elasticsearch we need to configure it. We’ll create elasticsearch-connect.properties with the following content:
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=logs
topic.index.map=logs:logs_index
connection.url=http://localhost:9200
type.name=log
key.ignore=true
schema.ignore=true
This file will be provided as one of the configuration files and will define the behavior of the connector. We said that we wanted to use io.confluent.connect.elasticsearch.ElasticsearchSinkConnector sink, which will be responsible for sending data to Elasticsearch and we set its name to elasticsearch-sink. The name should be unique for a given connector. We also said that we want a single task to be created for that connector to work (tasks.max property), but Kafka may create fewer tasks if it can’t achieve the specified level of parallelism. We want to read data from the logs topic (keep in mind that we can specify multiple topics as the source of the data using the topicsproperty) and that data from the logs topics should be placed in an index called logs_index(using the topic.index.map) property. We want to use local Elasticsearch instance (specified using connection.url) and the data should use the log type (because of the value of the type.name property). Finally, we told Kafka Connect to ignore key and schema by using the key.ignore=true and schema.ignore=true, because we assume that we will use the templates in Elasticsearch to control the data structure and analysis, which is a good practice in general.
Running Kafka Connect Elasticsearch in Standalone Mode
To run the connector in standalone mode we will use the connect-standalone.sh which is provided with Kafka and can be found in the bin directory. It requires two configuration files, the one that we already created and another one, which we will call connect-standalone.properties and which will have the following contents:
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
It defines Kafka brokers list, key and value converters, whether schemas should be used, and so on.
After we are done with the configuration, to run the connector we just execute the following command:
$ bin/connect-standalone.sh config/connect-standalone.properties config/elasticsearch-connect.properties
This will start the connector as a separate JVM process on the same server that Kafka is started and any data you put in the defined topic in Kafka will be sent to Elasticsearch. However – we have a single point of failure here – a single instance of the connector. We can run Kafka Connect Elasticsearch connector in distributed mode to leverage the distributed nature of Kafka itself, so let’s do that next.
Running Kafka Connect Elasticsearch in Distributed Mode
Running Kafka Connect Elasticsearch in a standalone mode is fine, but it lacks the main benefits of using Kafka Connect – leveraging the distributed nature of Kafka, fault tolerance, and high availability. The difference in running the connector in standalone and distributed mode is where Kafka Connect stores the configuration, how it assigns where the work should be done, where to store the offsets and tasks statuses.
Let’s start with the configuration. We will store it in the connect-distributed.properties file:
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.flush.interval.ms=10000
group.id=connect-cluster
offset.storage.topic=connect-offsets
config.storage.topic=connect-configs
status.storage.topic=connect-status
We already covered some of the properties shown above. The new properties are:
- group.id – the identifier of the cluster for Kafka Connect group. It should be unique and must not interfere with consumers reading data from the given Kafka cluster.
- offset.storage.topic – the name of the topic Kafka Connect will use to store offsets. The idea behind this topic is to have many partitions, be replicated and configured for compaction.
- config.storage.topic – the name of the topic Kafka Connect will use to store configuration. The idea behind the topic configuration is to have a single partition and be highly replicated.
- status.storage.topic – the name of the topic Kafka Connect will use to store work status. It should have multiple partitions, replicas and be compacted.
Once we have such configuration, we can start the connector in distributed mode:
$ bin/connect-distributed.sh config/connect-distributed.properties
You may have noticed one difference compared to running Kafka Connect in standalone mode – we didn’t provide the configuration for the connector itself. This is not a mistake! When using Kafka Connect in distributed mode we need to create connectors using the REST API. Let’s not focus on this now, though, as we’ll get to that a bit later.
So what is the difference between standalone and distributed Kafka Connect mode? Both end up running in their own JVM process as Kafka Connect clients and as such they both need access to Kafka libraries, which is why running them on Kafka brokers makes sense. The major difference is in how the job is executed. In the standalone mode the work is performed in a single process, while in distributed mode it is shared by all available Kafka Connect client instances running along Kafka broker instances. Another difference is in where the client stores its configuration – in distributed mode it is stored inside Kafka, in its own topics defined by the configuration (using the offset.storage.topic, config.storage.topicand status.storage.topic properties). In standalone mode offsets are stored in the configuration file specified by the offset.storage.file.filename property. So, if you have your Kafka Connect Elasticsearch running in distributed mode you can leverage multiple instances of it and either create multiple tasks (using the tasks.max property) or rely on failover that comes for free if you are running Kafka Connect in distributed mode and you have multiple instances of Kafka Connect Elasticsearch started. For example, if you set max.tasks=1 and have 2 instances of Kafka Connect started, than when one fails, the other will get the task to execute. This, of course, helps avoid Kafka Consumer Lag.
Success metrics
Goal | Metric |
---|---|
Assumptions
Milestones
Requirements
# | Requirement | User Story | Importance | Jira Issue | Notes |
---|---|---|---|---|---|
1 | HIGH | ||||
2 |
User interaction and design
Open Questions
Question | Answer | Date Answered |
---|---|---|