Indexer-Service

Objective:

Objective of egov-indexer service are listed as below.

  1. To provide a one stop framework for indexing the data to elasticsearch.
  2. To create provision for indexing live data, reindexing from one index to the other and indexing legacy data from the datastore.


Requirements:

  1. Prior Knowledge of Java/J2EE.
  2. Prior Knowledge of SpringBoot.
  3. Prior Knowledge of Elasticsearch.
  4. Prior Knowledge of REST APIs and related concepts like path parameters, headers, JSON etc.
  5. Prior Knowledge of Kafka and related concepts like Producer, Consumer, Topic etc.


Setup, Definitions & Functionality:

Setup:

  1. Step 1: Write configuration as per your requirement. Structure of the config file is explained later in the same doc.
  2. Step 2: Check-in the config file to a remote location preferrably github, currently we check the files into this folder - https://github.com/egovernments/egov-services/tree/master/core/egov-indexer/src/main/resources for dev and QA and this folder - https://github.com/egovernments/punjab-rainmaker-customization/tree/master/configs/egov-indexer for UAT.
  3. Step 3: Provide the absolute path of the checked-in file to DevOps, to add it to the file-read path of egov-indexer. The file will be added to egov-indexer's environment manifest file for it to be read at start-up of the application.
  4. Step 4: Run the egov-indexer app, Since it is a consumer, it starts listening to the configured topics and indexes the data.


Definitions:

  1. Config file - A YAML (xyz.yml) file which contains configuration for index requirements.
  2. Consumer - A Kafka broker that pics data from speicific topics and processes it.
  3. Topic - A virtual space for a producer to push the data to and the consumer to read the data from.


Functionality:

  1. Performs three major tasks namely: LiveIndex, Reindex and LegacyIndex.
  2. LiveIndex: Task of indexing the live transaction data on the platform. This keeps the es data in sync with the db.
  3. Reindex: Task of indexing data from one index to the other. ES already provides this feature, indexer does the same but with data transformation.
  4. LegacyIndex: Task of indexing legacy data from the tables to ES.
  5. Provides flexibility to index the entire object, a part of the object or an entirely different custom object all using one input json from modules.
  6. Provides features for customizing index json by field mapping, field masking, data enrichment through external APIs and data denormalization using MDMS.
  7. One stop shop for all the es index requirements with easy-to-write and easy-to-maintain configuration files.
  8. Designed as a consumer to save API overhead. The consumer configs are written from scratch to have complete control over the consumer behaviour.




Feature List V1:

  1. LiveIndex feature to index live transaction data on the platform.
  2. Reindex feature to index data from one index to the other.
  3. LegacyIndex feature for indexing legacy data from the tables to ES.
  4. Custom Mapping, Enrichment, Denormalization and Mask Fields.

Feature List V1.1:(In future we need to implement)

  1. Multiconsumer design for faster operation
  2. KafkaConnect Implementation instead of REST calls to ES.


Impact:

  1. Used by PGR for all its index requirements.
  2. Used by PT for all its index requirements.
  3. Used by TL for all its index requirements.
  4. Used by Collections for all its index requirements.
  5. Used by Billing Service for all its index requirements.


Impacted By:

  1. Changes in the version of ElasticSearch APIs.
  2. Deprication/Enhancment in consumer configurations of Kafka.
  3. Upgrade of ElasticSearch.


How to Use:

  1. Configuration:

As mentioned above, indexer uses a config file per module to store all the configurations pertaining to that module. Indexer reads multiple such files at start-up to support indexing for all the configured modules. The file contains following keys:

  1. serviceName: Name of the module to which this configuration belongs.
  2. summary: Summary of the module.
  3. version: version of the configuration.
  4. mappings: List of definitions within the module. Every definition corresponds to one index requirement. Which means, every object received onto the kafka queue can be used to create multiple indexes, each of these indexes will need configuration, all such configurations belonging to one topic forms one entry in the mappings list. The keys listed henceforth together form one definition and multiple such definitions are part of this mappings key.
  5. topic: Topic on which the data is to be received to activate this particular configuration.
  6. configKey: Key to identify to what type of job is this config for. values: INDEX, REINDEX, LEGACYINDEX. INDEX: LiveIndex, REINDEX: Reindex, LEGACYINDEX: LegacyIndex.

     iii. indexes: Key to configure multiple index configurations for the data received on the particular topic. Multiple indexes based on different requirement can be created using the same object. This list of such configurations is a part of this key. uses the following keys:

  1. name: Index name on the elasticsearch. (Index will be created if it doesn't exist with this name.)
  2. type: Document type within that index to which the index json has to go. (Elasticsearch uses the structure of index/type/docId to locate any file within index/type with id = docId)
  3. id: Takes comma separated JsonPaths. The JSONPath is applied on the record received on the queue, the values hence obtained are appended and used as id for the record.
  4. isBulk: Boolean key to identify whether the JSON received on the Queue is from a Bulk API. In simple words, whether the JSON contains a list at the top level.
  5. jsonPath: Key to be used in case of indexing a part of the input JSON and in case of indexing a custom json where the values for custom json are to be fetched from this part of the input.
  6. timeStampField: JSONPath of the field in the input which can be used to obtain the timestamp of the input.
  7. fieldsToBeMasked: A list of JSONPaths of the fields of the input to be masked in the index.
  8. customJsonMapping: Key to be used while building an entirely different object using the input JSON on the queue. Has the following keys:
  9. indexMapping: A skeleton/mapping of the JSON that is to be indexed. Note that, this JSON must always contain a key called "Data" at the top-level and the custom mapping begins within this key. This is only a convention to smoothen dashboarding on Kibana when data from multiple indexes have to be fetched for a single dashboard.
    ii) fieldMapping: Contains a list of configurations. Each configuration contains keys to identify the field of the input JSON that has to be mapped to the fields of the index json which is mentioned in the key 'indexMapping' in the config. Has the following keys:   
    iii) 
    inJsonPath: JSONPath of the field from the input.
    iv) 
    outJsonPath: JSONPath of the field of the index json.
    v) externalUriMapping: Contains a list of configurations. Each configuration contains keys to identify the field of the input JSON that are to be enriched using APIs from the external services. The confiugration for those APIs also is a part of this. Uses the following keys: 
         i) path: URI of the API to be used. (it should be POST/_search API.)
        ii) queryParam: Configruation of the query params to be used for the API call. It is a comma seperated key-value pair, where key is the parameter name as per the API contract and value is the JSONPath of the field to be equated against this parameter.
       iii) apiRequest: Request Body of the API. (Since we only use _search APIs, it should be only RequestInfo.)
       iv) uriResponseMapping: Contains a list of configuration. Each configuration contains two keys: One is a JSONPath to identify the field from response, Second is also a JSONPath to map the response field to a field of the index json mentioned in the key 'indexMapping'.
            i) inJsonPath: JSONPath to identify the field from response.
           ii) 
    outJsonPath: JSONPath to map the response field to a field of the index json.
    vi) 
    mdmsMapping: Contains a list of configurations. Each configuration contains keys to identify the field of the input JSON that are to be denormalized using APIs from the mdms service. The confiugration for those mdms APIs also is a part of this. Uses the following keys:
         i) path: URI of the API to be used. (it should be POST/_search    API.)
         ii) moduleName: Module Name from MDMS.
         iii) masterName: Master Name from MDMS.
         iv) tenantId: Tenant id to be used.
         v)  filter: filter to be applied on the data to be fetched.
         vi) filterMapping: Maps the field of input json to variables in the filter
              i) 
    variable: Variable in the filter
             ii) 
    valueJsonpath: JSONPath of the input to be mapped to the variable
         vii) uriResponseMapping: AS EXPLAINED ABOVE.

Reference - https://raw.githubusercontent.com/egovernments/punjab-rainmaker-customization/master/configs/egov-indexer/rainmaker-pgr-indexer.yml


  1. API call:

INDEX:

No API, it is a consumer, so picks from the Kafka Topic as and when the data is posted on the Queue by the module.


REINDEX:

URI:  http://egov-indexer.egov:8080/egov-indexer/index-operations/_reindex

Body: Body consists the following:

  1. RequestInfo: Header details as used on the egov platform
  2. index: Name of the index to pick the data from.
  3. type: Type within the index to pick the data from.
  4. reindexTopic: Topic name which will be used by the indexer to pick the reindex configurations from.
  5. tenantId: Tenant Id
  6. batchSize: Batchsize of the records to be fetched.

Instance:

{"RequestInfo":{"apiId":"emp","ver":"1.0","ts":1234,"action":"create","did":"1","key":"abcdkey","msgId":"20170310130900","authToken":"57e2c455-934b-45f6-b85d-413fe0950870","correlationId":"fdc1523d-9d9c-4b89-b1c0-6a58345ab26d"},"index":"index","type":"type","reindexTopic":"telemetry-reindex","tenantId":"pb","batchSize":"2000"}


LEGACYINDEX:

URI:  http://egov-indexer.egov:8080/egov-indexer/index-operations/_legacyindex

Body: Body consists the following:

  1. RequestInfo: Header details as used on the egov platform
  2. apiDetails: Details of the OPEN SEARCH API to be used for fetching data from DB.
  1.                  i) uri: URI of the Open Search API
  2.      ii) tenantIdForOpenSearch: tenantId to be used.

     iii) paginationDetails: Details of the pagination clause to be sent in the API request

      1. 1) offsetKey: key for offset
      2. 2) sizeKey: key for size
      3. 3) maxPageSize: Page size to be used for every search call. Default - 200.
    1. iv) responseJsonPath: JSONPath on the response to fetch data from.
  1. legacyIndexTopic: Topic name which will be used by the indexer to pick the legacyindex configurations from.
  2. tenantId: Tenant Id

Instance: This is an example from PT's legacyindex.

{"RequestInfo":{"apiId":"string","ver":"string","ts":null,"action":"string","did":"string","key":"string","msgId":"string","authToken":"b843ef27-1ac6-49b8-ab71-cd0c22f4e50e","correlationId":"e721639b-c095-40b3-86e2-acecb2cb6efb","userInfo":{"id":23299,"uuid":"e721639b-c095-40b3-86e2-acecb2cb6efb","userName":"9337682030","name":"Abhilash Seth","type":"CITIZEN","mobileNumber":"9337682030","emailId":"abhilash.seth@gmail.com","roles":[{"id":281,"name":"Citizen"}]}},"apiDetails":{"uri":"http://pt-services-v2:8080/pt-services-v2/property/_plainsearch","tenantIdForOpenSearch":"pb","paginationDetails":{"offsetKey":"offset","sizeKey":"limit","maxPageSize":50},"responseJsonPath":"$.Properties"},"legacyIndexTopic":"pt-property-legacyindex","tenantId":"pb.amritsar"}


Interaction Diagram: