Notification Consumer
For our guide, we will be implementing a notification consumer in the following section.
Once an application is created/requested or progresses further in the workflow, notifications can be triggered as each of these events are pushed onto kafka topics which can be listened on and a sms/email/in-app notification can be sent to the concerned user(s).
For our guide, we will be implementing a notification consumer which will listen onto the topic on which voter registration applications are created, create a customized message and send it to the notification service(sms/email) to be sent to the concerned users.
@Component
@Slf4j
public class NotificationConsumer {
@Autowired
private ObjectMapper mapper;
@Autowired
private NotificationService notificationService;
@KafkaListener(topics = {"${egov.vt.registration.create.topic}"})
public void listen(final HashMap<String, Object> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
try {
VoterRegistrationRequest request = mapper.convertValue(record, VoterRegistrationRequest.class);
//log.info(request.toString());
notificationService.prepareEventAndSend(request);
} catch (final Exception e) {
log.error("Error while listening to value: " + record + " on topic: " + topic + ": ", e);
}
}
}
Create a POJO by the name of SMSRequest under models folder and add the following content into it -
@Getter
@AllArgsConstructor
@NoArgsConstructor
@Builder
@ToString
public class SMSRequest {
private String mobileNumber;
private String message;
}
Next, to handle preparation of customized message and pushing the notification we will create a class by the name of NotificationService
under service
folder. Annotate it with @Service
annotation and add the following content to it -
@Slf4j
@Service
public class NotificationService {
@Autowired
private Producer producer;
@Autowired
private VTRConfiguration config;
@Autowired
private RestTemplate restTemplate;
private static final String smsTemplate = "Dear {NAME}, your voter registration application has been successfully created on the system with application number - {APPNUMBER}.";
public void prepareEventAndSend(VoterRegistrationRequest request){
List<SMSRequest> smsRequestList = new ArrayList<>();
request.getVoterRegistrationApplications().forEach(application -> {
SMSRequest smsRequest = SMSRequest.builder().mobileNumber(application.getApplicant().getMobileNumber()).message(getCustomMessage(smsTemplate, application)).build();
smsRequestList.add(smsRequest);
});
for (SMSRequest smsRequest : smsRequestList) {
producer.push(config.getSmsNotificationTopic(), smsRequest);
log.info("Messages: " + smsRequest.getMessage());
}
}
private String getCustomMessage(String template, VoterRegistrationApplication application) {
template = template.replace("{APPNUMBER}", application.getApplicationNumber());
template = template.replace("{NAME}", application.getApplicant().getName());
return template;
}
}