Show
First run the docker-compose file.
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.4.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-enterprise-kafka:5.4.0
container_name: kafka
depends_on:
- zookeeper
ports:
# Exposes 9092 for external connections to the broker
# Use kafka:29092 for connections internal on the docker network
# See [https://rmoff.net/2018/08/02/kafka-listeners-explained/] for details
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema-registry:
image: confluentinc/cp-schema-registry:5.4.0
container_name: schema-registry
ports:
- 8081:8081
depends_on:
- zookeeper
- kafka
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
kafka-connect-01:
image: confluentinc/cp-kafka-connect:5.4.0
container_name: kafka-connect-01
depends_on:
- zookeeper
- kafka
- schema-registry
ports:
- 8083:8083
environment:
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X\{connector.context}%m (%c:%L)%n"
CONNECT_BOOTSTRAP_SERVERS: "kafka:29092"
CONNECT_REST_PORT: 8083
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect-01"
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: '[http://schema-registry:8081|http://schema-registry:8081/]'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: '[http://schema-registry:8081|http://schema-registry:8081/]'
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: '/usr/share/java,/db-leach'
volumes:
#- db-leach:/db-leach/
- C:/Users/qingzhou/kafka_confluent/jdbc/lib:/db-leach/
command:
- /bin/bash
- -c
- |
# JDBC Drivers
# # debezium mssql connector
# #export CLASSPATH=/db-leach/debezium-connector-sqlserver/debezium-connector-sqlserver-1.3.0.Beta1.jar ./etc/kafka/connect-distributed.properties
# #plugin.path=/usr/local/share/kafka/plugins
# #cp -r /db-leach/debezium-connector-sqlserver /usr/local/share/kafka/plugins
# Now launch Kafka Connect
sleep infinity &
/etc/confluent/docker/run
After all the container is running. Create the connector through REST API:
curl -X PUT [http://localhost:8083/connectors/debezium_Transaction/config] \
-H "Content-Type: application/json" -d '
{ "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "database.hostname": "DATA.NET", "database.port": "1433", "database.user": "user", "database.password": "", "database.dbname": "Odyssey", "database.server.name": "Tranx", "table.include.list": "dbo.Transaction,cdc.dbo_Transaction_ct ", "database.history.kafka.bootstrap.servers": "kafka:29092", "database.history.kafka.topic": "schema-changes.Tranx", "snapshot.isolation.mode": "snapshot", "include.schema.changes": "true", "numeric.mapping": "best_fit", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081","value.converter": "io.confluent.connect.avro.AvroConverter","value.converter.schema.registry.url": "http://schema-registry:8081","transforms": "Cast","transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value","transforms.Cast.spec": "LineAmount:float64,WithholdingTax:float64","poll.interval.ms" : 3600}
'