Integrate Data pipeline with Odin

What is a Data Pipeline

Data pipelines are where a data source is connected to another one, the target, via a pipeline. The source generates data, and the data is synced in real-time using the pipeline to the target. The data pipeline comprises three major components:

________ _____________ ______

| Source | ---> | Transformer | ---> | Sink |

-------- ------------- ------

  1. Source - The service responsible for publishing data to Kafka Topics is the source. The source will use Kafka connect (source connectors) to publish data to topics from databases or other data sources.

  2. Transformer (optional) - This is a KSQL transformer responsible for modifying the data & applying transformations. e.g., modifying a column name, changing data format, etc. Transformers are only required when there is modification needed in the data.

  3. Sink - The service which will consume data from the topic will be Sink. The sink will also use Kafka connect (sink connectors) to read data from topics and dump it into the database.

Pipeline Integration in Application

Following are the steps/changes required in an application to integrate the data pipeline into service.

STEP 1:

Go to your project directory and run mvn com.dream11:pipeline-maven-plugin:new to create a pipeline directory structure with the following parameter:

processorType (required, Enum) -> kafkaconnect/ksql 
description (required, String) -> Used to create config file name 
pipelineName (required, String) -> name of pipeline 
pipelineType (required, String) -> source/transformer/sink

The above will create empty pipeline configuration files based on your selected pipeline type.

source/<connector_name>/connector_name-%s.conf -> for source connectors 
sink/<connector_name>/connector_name-%s.conf -> for sink connectors 
transformer/transformer_name.sql -> for ksql transformations

STEP 2:

Add connection-%s.conf file in resources/datapipeline/<pipeline_name> .

To update the connection configuration, please use the following:

ksqlHost = "<ksql_host>" //if you are using ksql for transformation 
kafkaConnectHost = "<connect_host>" 
bootstrapServers = "<kafka servers>" 

STEP 3:

Provide pipeline config in the source/sink configuration file that was created in step 1. Following is the Sample Source connector, which uses JDBC source connectors to read from Mysql Db & publish to the topic:

"connector" : "RDBMS_JDBC",
"topicKeyColumns" : ["id"],
"sourceTables" : ["table_name"],
"database" : {
    "name" : "db_name",
    "hostname" : ${CONFIG_SERVICE_MYSQL_FTA_MASTER_HOST},
    "port" : 3306,
    "username" : ${VAULT_COMMON_STAG_DB_USER},
    "password" : ${VAULT_COMMON_STAG_DB_PASSWORD}
}

STEP 4:

Add a create pipeline command in .odin/<service_name>/pre-deploy.sh to create pipeline. This command creates defined pipelines in Kafka connect. Following is a sample pre-deploy.sh content to create pipeline.

Note: Env variable SERVICE_NAME is used to create the name of the connector. eg

  • debezium-source-connector-<FILE_NAME>-<SERVICE_NAME>-<VPC_SUFFIX>-<TEAM_SUFFIX>

  • jdbc-source-connector-<FILE_NAME>-<SERVICE_NAME>-<VPC_SUFFIX>-<TEAM_SUFFIX>

SERVICE_NAME="sample-service"
RESOURCES_PATH="./resources/"
runPipelineCommand="mvn pipeline:create -Dapp.environment=${ENV} -Dresources.path=${RESOURCES_PATH};"
if [[ "$ENV" == prod* ]] || [[ "$ENV" == uat* ]]; then
  pipelineCmd="echo 'Not using pipelines in production'"
else
  pipelineCmd=$runPipelineCommand
fi
(eval "${pipelineCmd}")

NOTE : Please refer to https://github.com/dream11/data-pipeline#readme - Connect to preview for detiled pipeline configuration options

STEP 5:

Now create a service definition with the following components.

  • application: this is the repo in which you have integrated the pipeline

  • Kafka: this is the Kafka in which you publish the database records. (ownership of kafka can vary on a case by case basis, get in touch with us if you don't have clarity in this)

  • Kafka connect

Following is the sample service definition.

{
  "name": "fantasy-tour-admin",
  "version": "1.0.0-SNAPSHOT",
  "team": "devx",
  "components": [{
    "type": "application",
    "name": "gameplay-admin",
    "version": "1.0.0-SNAPSHOT",
    "config": {
      "build_type": "java",
      "build_version": "11"
    }
  }, {
    "type": "rds",
    "name": "gpa-aurora",
    "version": "5.7-1.0.0",
    "config": {
      "binlog": {
        "enable": "true"
      }
    }
  }, {
    "type": "kafka-zookeeper",
    "name": "gpa-kafka",
    "version": "2.8.1-1.0.0",
  }, {
    "type": "kafka-connect",
    "name": "gpa-kc",
    "version": "4.1.0-1.0.0",
    "config": {
      "bootstrap_server": "gpa-kafka${TEAM_SUFFIX}.dream11${VPC_SUFFIX}.local",
    },
    "depends_on": [
      "gpa-kafka"
    ]
  }]
}

STEP 6:

Use the service definition created in the above step to deploy your service in an Odin env, and pipeline will be created as part of the pre-deploy step.

More details of how to onboard your service and deploy in an env can be found in the ONBOARD YOUR SERVICE section

Last updated