Skip to content

Latest commit

 

History

History
196 lines (157 loc) · 5.83 KB

DEBEZIUM_CDC_CONFIGURATION.MD

File metadata and controls

196 lines (157 loc) · 5.83 KB

Debezium Registration

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 127.0.0.1:8083/connectors/ -d @debeziumConfig.json

or

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 127.0.0.1:8083/connectors/ -d '{
  "name": "streaming_ETL_pipeline_MySQL-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "172.17.0.1",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "Debezium@123#",
    "database.server.name": "mysql",
	  "database.server.id": "223344",
    "database.include.list": "streaming_etl_db",
	  "database.allowPublicKeyRetrieval": true,
	  "database.history.kafka.bootstrap.servers": "kafka:9092",
	  "database.history.kafka.topic": "mysql-streaming_etl_db-person",
    "include.schema.changes": false,
    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
	  "key.converter.schemas.enable": "false",
	  "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schema-changes.streaming_etl_db",
	  "time.precision.mode": "connect",
	  "topic.prefix": "dbserver",
    "transforms": "unwrap",
	  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
	  "value.converter":"org.apache.kafka.connect.json.JsonConverter",
	  "value.converter.schemas.enable": "false"
  }
}'

debezium-registration

Post Debezium registration.

http://localhost:8083/connectors?expand=info&expand=status

connectors

{
  "streaming_ETL_pipeline_MySQL-connector": {
    "info": {
      "name": "streaming_ETL_pipeline_MySQL-connector",
      "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.allowPublicKeyRetrieval": "true",
        "database.user": "debezium",
        "database.server.id": "223344",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "mysql-streaming_etl_db-person",
        "time.precision.mode": "connect",
        "transforms": "unwrap",
        "database.server.name": "mysql",
        "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
        "database.port": "3306",
        "include.schema.changes": "false",
        "key.converter.schemas.enable": "false",
        "topic.prefix": "dbserver",
        "schema.history.internal.kafka.topic": "schema-changes.streaming_etl_db",
        "database.hostname": "172.17.0.1",
        "database.password": "Debezium@123#",
        "value.converter.schemas.enable": "false",
        "name": "streaming_ETL_pipeline_MySQL-connector",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "database.include.list": "streaming_etl_db"
      },
      "tasks": [
        {
          "connector": "streaming_ETL_pipeline_MySQL-connector",
          "task": 0
        }
      ],
      "type": "source"
    },
    "status": {
      "name": "streaming_ETL_pipeline_MySQL-connector",
      "connector": {
        "state": "RUNNING",
        "worker_id": "172.23.0.6:8083"
      },
      "tasks": [
        {
          "id": 0,
          "state": "RUNNING",
          "worker_id": "172.23.0.6:8083"
        }
      ],
      "type": "source"
    }
  }
}

http://localhost:8083/connectors/streaming_ETL_pipeline_MySQL-connector/status

connector-status

{
  "name": "streaming_ETL_pipeline_MySQL-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "172.23.0.6:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "172.23.0.6:8083"
    }
  ],
  "type": "source"
}

Debezium UI

http://localhost:8080

debezium-ui

Kafka UI

http://localhost:9099

Kafka UI after Debezium Registration

Person Topic

Person Topic


Client Connecting from the Same Docker Network

docker-compose exec kafka bash -c "kafka-topics --list --bootstrap-server localhost:9092"

Client Connect from Same Docker Network

Client Connecting from the Same Host

update the path to kafka-topics.sh on your machine

/home/anantha/kafka/bin/kafka-topics.sh --bootstrap-server=localhost:29092 --list

Client Connect Same Host

Display Kafka Topic Messages from the Same Host

/home/anantha/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic dbserver.streaming_etl_db.geo --from-beginning

Kafka Topic Contents

Create a from the Same Host

/home/anantha/kafka/bin/kafka-topics.sh \
--bootstrap-server 0.0.0.0:29092 \
--topic readings \
--create --partitions 6 \
--replication-factor 1

Create Kafka Topic

Describe Topic from the Same Host

/home/anantha/kafka/bin/kafka-topics.sh --bootstrap-server=localhost:29092 --describe dbserver.streaming_etl_db.geo

Describe Kafka Topic

Post Message to Topic from the Same Host

/home/anantha/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:29092 --topic readings

Post Message To Kafka Topic