Notification Consumer

For our guide, we will be implementing a notification consumer in the following section.

Once an application is created/requested or progresses further in the workflow, notifications can be triggered as each of these events are pushed onto kafka topics which can be listened on and a sms/email/in-app notification can be sent to the concerned user(s).

For our guide, we will be implementing a notification consumer which will listen onto the topic on which voter registration applications are created, create a customized message and send it to the notification service(sms/email) to be sent to the concerned users.

@Component @Slf4j public class NotificationConsumer { @Autowired private ObjectMapper mapper; @Autowired private NotificationService notificationService; @KafkaListener(topics = {"${egov.vt.registration.create.topic}"}) public void listen(final HashMap<String, Object> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { try { VoterRegistrationRequest request = mapper.convertValue(record, VoterRegistrationRequest.class); //log.info(request.toString()); notificationService.prepareEventAndSend(request); } catch (final Exception e) { log.error("Error while listening to value: " + record + " on topic: " + topic + ": ", e); } } }

Create a POJO by the name of SMSRequest under models folder and add the following content into it -

@Getter @AllArgsConstructor @NoArgsConstructor @Builder @ToString public class SMSRequest { private String mobileNumber; private String message; }

Next, to handle preparation of customized message and pushing the notification we will create a class by the name of NotificationService under service folder. Annotate it with @Service annotation and add the following content to it -

@Slf4j @Service public class NotificationService { @Autowired private Producer producer; @Autowired private VTRConfiguration config; @Autowired private RestTemplate restTemplate; private static final String smsTemplate = "Dear {NAME}, your voter registration application has been successfully created on the system with application number - {APPNUMBER}."; public void prepareEventAndSend(VoterRegistrationRequest request){ List<SMSRequest> smsRequestList = new ArrayList<>(); request.getVoterRegistrationApplications().forEach(application -> { SMSRequest smsRequest = SMSRequest.builder().mobileNumber(application.getApplicant().getMobileNumber()).message(getCustomMessage(smsTemplate, application)).build(); smsRequestList.add(smsRequest); }); for (SMSRequest smsRequest : smsRequestList) { producer.push(config.getSmsNotificationTopic(), smsRequest); log.info("Messages: " + smsRequest.getMessage()); } } private String getCustomMessage(String template, VoterRegistrationApplication application) { template = template.replace("{APPNUMBER}", application.getApplicationNumber()); template = template.replace("{NAME}", application.getApplicant().getName()); return template; } }