Skip to content

Latest commit

 

History

History

streaming-etl-mongodb-snowflake

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 

Streaming ETL pipeline from MongoDB to Snowflake with Apache Kafka®

Pre-reqs

Cloud environments

Create a cluster in MongoDB and Confluent Cloud - when you do so make sure they are in the same region. For example, us-east-1 on AWS.

You should have credentials and endpoint details for the following. See local .env for values if it exists.

  • Confluent Cloud

    • Broker endpoint: CCLOUD_BROKER_ENDPOINT

    • API Key: CCLOUD_API_KEY

    • API Secret: CCLOUD_API_SECRET

  • MongoDB Atlas

    • Endpoint: MONGODB_ENDPOINT

    • Username: dbUser

    • Password: MONGODB_PW

  • Snowflake

    • Endpoint: SNOWFLAKE_ENDPOINT

    • User: kafka

    • Private key: SNOWFLAKE_PRIVATE_KEY

Accessing MongoDB Atlas

$ brew install mongodb/brew/mongodb-community-shell
$ mongo "mongodb+srv://MONGODB_ENDPOINT/myFirstDatabase" --username dbUser
MongoDB shell version v4.2.0
Enter password:
connecting to: mongodb://cluster0-shard-00-00.5r0gb.mongodb.net:27017,cluster0-shard-00-01.5r0gb.mongodb.net:27017,cluster0-shard-00-02.5r0gb.mongodb.net:27017/myFirstDatabase?authSource=admin&compressors=disabled&gssapiServiceName=mongodb&replicaSet=atlas-8rmbva-shard-0&ssl=true
2021-04-07T13:54:53.345+0100 I  NETWORK  [js] Starting new replica set monitor for atlas-8rmbva-shard-0/cluster0-shard-00-00.5r0gb.mongodb.net:27017,cluster0-shard-00-01.5r0gb.mongodb.net:27017,cluster0-shard-00-02.5r0gb.mongodb.net:27017
2021-04-07T13:54:53.345+0100 I  CONNPOOL [ReplicaSetMonitor-TaskExecutor] Connecting to cluster0-shard-00-01.5r0gb.mongodb.net:27017
2021-04-07T13:54:53.345+0100 I  CONNPOOL [ReplicaSetMonitor-TaskExecutor] Connecting to cluster0-shard-00-02.5r0gb.mongodb.net:27017
2021-04-07T13:54:53.345+0100 I  CONNPOOL [ReplicaSetMonitor-TaskExecutor] Connecting to cluster0-shard-00-00.5r0gb.mongodb.net:27017
2021-04-07T13:54:56.010+0100 W  NETWORK  [ReplicaSetMonitor-TaskExecutor] DNS resolution while connecting to cluster0-shard-00-02.5r0gb.mongodb.net:27017 took 2665ms
2021-04-07T13:54:56.010+0100 W  NETWORK  [ReplicaSetMonitor-TaskExecutor] DNS resolution while connecting to cluster0-shard-00-00.5r0gb.mongodb.net:27017 took 2665ms
2021-04-07T13:54:56.415+0100 I  NETWORK  [ReplicaSetMonitor-TaskExecutor] Confirmed replica set for atlas-8rmbva-shard-0 is atlas-8rmbva-shard-0/cluster0-shard-00-00.5r0gb.mongodb.net:27017,cluster0-shard-00-01.5r0gb.mongodb.net:27017,cluster0-shard-00-02.5r0gb.mongodb.net:27017
Implicit session: session { "id" : UUID("133cb9e7-c7ec-49b0-99e7-b50b2cf60c5a") }
MongoDB server version: 4.4.4
WARNING: shell and server versions do not match
Welcome to the MongoDB shell.
For interactive help, type "help".
For more comprehensive documentation, see
        http://docs.mongodb.org/
Questions? Try the support group
        http://groups.google.com/group/mongodb-user
MongoDB Enterprise atlas-8rmbva-shard-0:PRIMARY>
----

Setting up a public keypair for Snowflake

Per the connector documentation create a local key pair.

Create necessary user and privileges on Snowflake, adding the private key generated above to the user.

Setting up the local demo components

Pre-reqs

For the purpose of the demo we run a local Kafka Connect worker to push dummy data to MongoDB. For this you need to have Docker and Docker Compose installed.

Create a .env file that includes the MongoDB and Confluent Cloud credentials in it - see .env.example for a template.

Create topics on Confluent Cloud

# Set the environment first
$ ccloud environment list
      Id      |                   Name
+-------------+-------------------------------------------+
  * env-05wkq | default
    env-q5w66 | ccloud-stack-199188-ccloud-stack-function
    env-1dpyz | ABC_Retail_01
$ ccloud environment use env-1dpyz
Now using "env-1dpyz" as the default (active) environment.

# Within the environment, set the cluster
$ ccloud kafka cluster list
      Id      |   Name    |   Type   | Provider |   Region   | Availability | Status
+-------------+-----------+----------+----------+------------+--------------+--------+
    lkc-r1jq9 | cluster_0 | STANDARD | aws      | eu-north-1 | single-zone  | UP

$ ccloud kafka cluster use lkc-r1jq9

# Create the required topics
$ ccloud kafka topic create abc-clicks
$ ccloud kafka topic create abc-transactions
$ ccloud kafka topic create abc-inventory

Create the local datagen connector

Run the Kafka Connect worker:

docker-compose up -d

Wait for it to start, and then check that the necessary connectors are installed correctly

docker exec -it kafka-connect bash -c 'echo -e "\n\n  Waiting for Kafka Connect to be available\n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) ; echo -e $(date) " Kafka Connect HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then  break ; fi ; sleep 5 ; done '
curl -s localhost:8083/connector-plugins|jq '.[].class'

Should return

"io.confluent.kafka.connect.datagen.DatagenConnector"
[…]

Create the source DataGen connectors

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/datagen-abc-clicks/config \
    -d '{
      "connector.class"               : "io.confluent.kafka.connect.datagen.DatagenConnector",
      "kafka.topic"                   : "abc-clicks1",
      "schema.filename"               : "/data/datagen/abc_clicks.avsc",
      "key.converter"                 : "org.apache.kafka.connect.storage.StringConverter",
      "value.converter"               : "org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable": "false",
      "max.interval"                  : 10000,
      "iterations"                    : 10000000,
      "tasks.max"                     : "1",
      "transforms"                    : "insertTSNow",
      "transforms.insertTSNow.type"   : "com.github.jcustenborder.kafka.connect.transform.common.TimestampNowField$Value",
      "transforms.insertTSNow.fields" : "click_ts"
  }'

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/datagen-abc-transactions/config \
    -d '{
      "connector.class"               : "io.confluent.kafka.connect.datagen.DatagenConnector",
      "kafka.topic"                   : "abc-transactions1",
      "schema.filename"               : "/data/datagen/abc_txn.avsc",
      "key.converter"                 : "org.apache.kafka.connect.storage.StringConverter",
      "value.converter"               : "org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable": "false",
      "max.interval"                  : 10000,
      "iterations"                    : 10000000,
      "tasks.max"                     : "1",
      "transforms"                    : "insertTSNow",
      "transforms.insertTSNow.type"   : "com.github.jcustenborder.kafka.connect.transform.common.TimestampNowField$Value",
      "transforms.insertTSNow.fields" : "txn_ts"
      }'

Check they’re both running

$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
         jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
         column -s : -t| sed 's/\"//g'| sort
source  |  datagen-abc-clicks        |  RUNNING  |  RUNNING  |  io.confluent.kafka.connect.datagen.DatagenConnector
source  |  datagen-abc-transactions  |  RUNNING  |  RUNNING  |  io.confluent.kafka.connect.datagen.DatagenConnector

Setting up the Cloud demo components

Create the MongoDB Sink connector on Confluent Cloud

This streams the dummy transaction data from Confluent to MongoDB so that it can then be streamed back in from MongoDB source connector in the demo.

Create the Sink connector using the Confluent Cloud GUI or run ccloud connector create --config data/ccloud/mongodb_sink.json

Load inventory reference data into MongoDB

Install CLI

brew install mongodb-database-tools

Import data

mongoimport --uri mongodb+srv://dbUser:MONGODB_PW@MONGODB_ENDPOINT/abc \
            --collection inventory \
            --drop --jsonArray \
            --file data/mongodb/products.json
2021-04-07T16:00:29.422+0100    connected to: mongodb+srv://[**REDACTED**]@MONGODB_ENDPOINT
2021-04-07T16:00:29.458+0100    dropping: test.inventory
2021-04-07T16:00:29.561+0100    3 document(s) imported successfully. 0 document(s) failed to import.

Demo

Stream data from MongoDB into Confluent

Create the Sink connector using the Confluent Cloud GUI or run ccloud connector create --config data/ccloud/mongodb_source.json

Check that data is arriving in topics atlas0.abc.inventory and atlas0.abc.transactions.

Set up ksqlDB

First, create a ksqlDB application on your cluster in Confluent Cloud. Then declare streams on the source data:

  • Clicks

    CREATE STREAM clicks(
        ip VARCHAR,
        userid INT,
        prod_id INT,
        bytes BIGINT,
        referrer VARCHAR,
        agent VARCHAR,
        click_ts BIGINT
        )
    WITH (
        KAFKA_TOPIC='abc-clicks1',
        VALUE_FORMAT='JSON',
        TIMESTAMP='click_ts'
    );
  • Transactions (sourced from MongoDB)

    CREATE STREAM transactions (
      fullDocument STRUCT<
        cust_id INT,
        prod_id INT,
        txn_ts BIGINT>)
      WITH (
        KAFKA_TOPIC='atlas0.abc.transactions1',
        VALUE_FORMAT='JSON'
      );
  • Inventory (source from MongoDB)

    CREATE STREAM inventory00 (
      fullDocument STRUCT<
        product_id INT,
        name VARCHAR,
        "list" INT,
        discount INT,
        available INT,
        capacity INT,
        txn_hour INT>)
      WITH (
        KAFKA_TOPIC='atlas0.abc.inventory',
        VALUE_FORMAT='JSON'
      );
    
    SET 'auto.offset.reset' = 'earliest';
    
    CREATE TABLE INVENTORY AS
      SELECT
    FULLDOCUMENT->PRODUCT_ID AS PRODUCT_ID,
    LATEST_BY_OFFSET(FULLDOCUMENT->NAME) AS NAME,
    LATEST_BY_OFFSET(FULLDOCUMENT->"list") AS LIST_PRICE,
    LATEST_BY_OFFSET(FULLDOCUMENT->DISCOUNT) AS DISCOUNT,
    LATEST_BY_OFFSET(FULLDOCUMENT->AVAILABLE) AS AVAILABLE,
    LATEST_BY_OFFSET(FULLDOCUMENT->CAPACITY) AS CAPACITY,
    LATEST_BY_OFFSET(FULLDOCUMENT->TXN_HOUR) AS TXN_HOUR
    FROM INVENTORY00
    GROUP BY FULLDOCUMENT->PRODUCT_ID;

Stream processing with ksqlDB

Some of this SQL won’t make sense as it is for demo purposes and is 'good enough' for illustrating the concept of what can be done.

SET 'auto.offset.reset' = 'earliest';

CREATE TABLE PRODUCT_TXN_PER_HOUR WITH (FORMAT='AVRO') AS
SELECT T.FULLDOCUMENT->PROD_ID,
       COUNT(*) AS TXN_PER_HOUR,
       MAX(I.TXN_HOUR) AS EXPECTED_TXN_PER_HOUR,
       (CAST(MAX(I.AVAILABLE) AS DOUBLE)/ CAST(MAX(I.CAPACITY) AS DOUBLE))*100 AS STOCK_LEVEL, I.NAME AS PRODUCT_NAME
FROM  TRANSACTIONS T
      LEFT JOIN INVENTORY I
      ON T.FULLDOCUMENT->PROD_ID = I.PRODUCT_ID
WINDOW HOPPING (SIZE 1 HOUR, ADVANCE BY 5 MINUTES)
GROUP BY T.FULLDOCUMENT->PROD_ID,
         I.NAME;

-- KSQL does not support persistent queries on windowed tables :-(
SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss','Europe/London') AS WINDOWSTART_TS,
       TIMESTAMPTOSTRING(WINDOWEND,'yyyy-MM-dd HH:mm:ss','Europe/London') AS WINDOWEND_TS,
       PROD_ID,
       PRODUCT_NAME,
       TXN_PER_HOUR,
       EXPECTED_TXN_PER_HOUR,
       STOCK_LEVEL
FROM  PRODUCT_TXN_PER_HOUR
WHERE windowstart > UNIX_TIMESTAMP()-(1000 * 60 * 80)
  AND WINDOWEND < UNIX_TIMESTAMP()
EMIT CHANGES;

-- Work around this (kinda) by declaring a stream on the topic (we lose the window start/end data though, and can't expose it earlier either https://github.com/confluentinc/ksql/issues/7369)
CREATE STREAM PRODUCT_TXN_PER_HOUR_STREAM WITH (KAFKA_TOPIC='pksqlc-7y33pPRODUCT_TXN_PER_HOUR', FORMAT='AVRO');

-- Apply predicate on the stream to match the business conditions specified
--  -> High inventory level (>80% of capacity)
--  -> Low transactions (< expected transactions/hour)
CREATE STREAM ABC_PROMOTIONS_01 AS
SELECT  ROWKEY,
        TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS TS,
        AS_VALUE(ROWKEY -> PROD_ID) AS PROD_ID ,
        ROWKEY -> PRODUCT_NAME AS PRODUCT_NAME,
        STOCK_LEVEL ,
        TXN_PER_HOUR ,
        EXPECTED_TXN_PER_HOUR
   FROM PRODUCT_TXN_PER_HOUR_STREAM
WHERE TXN_PER_HOUR < EXPECTED_TXN_PER_HOUR
  AND  STOCK_LEVEL > 80
  ;

Stream the identified promotions to Snowflake

Create sink connector from Confluent Cloud GUI, or with ccloud connector create --config data/ccloud/snowflake_sink.json

CCloud CLI

$ ccloud connector list
     ID     |                  Name                  | Status  |  Type  | Trace
+-----------+----------------------------------------+---------+--------+-------+
  lcc-g72w3 | MongoDbAtlasSinkConnector_transactions | RUNNING | sink   |
  lcc-r1g09 | MongoDbAtlasSourceConnector            | RUNNING | source |
  lcc-779yp | SnowflakeSinkConnector_0               | RUNNING | sink   |

$ ccloud connector describe lcc-779yp
Connector Details
+--------+--------------------------+
| ID     | lcc-779yp                |
| Name   | SnowflakeSinkConnector_0 |
| Status | RUNNING                  |
| Type   | sink                     |
| Trace  |                          |
+--------+--------------------------+


Task Level Details
  TaskId |  State
+--------+---------+
       0 | RUNNING


Configuration Details
             Config             |                                                                          Value
+-------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------+
  name                          | SnowflakeSinkConnector_0
  snowflake.database.name       | DEMO_DB
  tasks.max                     |                                                                                                                                                        1
  internal.kafka.endpoint       | PLAINTEXT://kafka-0.kafka.******.svc.cluster.local:9071,kafka-1.kafka.******.svc.cluster.local:9071,kafka-2.kafka.******.svc.cluster.local:9071
  input.data.format             | AVRO
  kafka.api.secret              | ****************
  valid.kafka.api.key           | true
  connector.class               | SnowflakeSink
  kafka.dedicated               | false
  kafka.endpoint                | SASL_SSL://CCLOUD_BROKER_ENDPOINT
  kafka.region                  | eu-central-1
  schema.registry.url           | https://************.aws.confluent.cloud
  snowflake.private.key         | ****************
  topics                        | pksqlc-7y33pABC_PROMOTIONS_01
  kafka.api.key                 | ****************
  cloud.provider                | aws
  snowflake.metadata.createtime | true
  snowflake.schema.name         | public
  snowflake.url.name            | SNOWFLAKE_ENDPOINT
  snowflake.user.name           | kafka
  cloud.environment             | prod