Writing a new Consumer
Kafka producer publishes messages on a given topic. Kafka Consumer is a program, which consumes the published messages from the producer. The consumer consumes the message by subscribing to the topic. A single consumer can subscribe to multiple topics. Whenever the topic receives a new message it can process that message by calling defined functions. The following snippet is a sample of code which defines a class called TradeLicenseConsumer
which contains function called listen()
which is subscribed to save-tl-tradelicense
topic and calls a function to generate notification whenever any new message is received by the consumer.
@Slf4j
@Component
public class TradeLicenseConsumer {
private TLNotificationService notificationService;
@Autowired
public TradeLicenseConsumer(TLNotificationService notificationService) {
this.notificationService = notificationService;
}
@KafkaListener(topics = {"save-tl-tradelicense")
public void listen(final HashMap<String, Object> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
notificationService.sendNotification(record);
}
}
@KafkaListener
annotation is used to create a consumer. Whenever any function has this annotation on it, it will act as kafka consumer and the message will go through the flow defined inside of this function. The topic name should be picked up from application.properties
file. This can be done as showed below:
@KafkaListener(topics = {"${persister.update.tradelicense.topic}")
where persister.update.tradelicense.topic
is the key for the topic name in application.properties
Whenever any new message is published on this topic the message will be consumed by the listen() function and will call the function sendNotification() with the message as the argument. The deserialization is controlled by the following two properties in application.properties:
spring.kafka.consumer.value-deserializer
spring.kafka.consumer.key-deserializer
The first property sets the deserializer for value while the second one sets it for key. Depending on the deserializer we have set we can expect the argument in that format in our consumer function. For example if we set the value deserializer to HashMapDeserializer and key deserializer to string like below:
Then we can write our consumer function expecting HashMap as argument like below: