-
Bug
-
Resolution: Unresolved
-
Major
-
2.0.1.Final
-
None
-
False
-
None
-
False
In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.
Bug report
For bug reports, provide this information, please:
What Debezium connector do you use and what version?
Debezium.2.0.1.Final with Cassandra4
What is the connector configuration?
connector.name=Debezium-xx
commit.log.relocation.dir=xx
commit.log.real.time.processing.enabled=true
http.port=9000
cassandra.driver.config.file=xx
cassandra.ssl.enabled=false
cassandra.ssl.config.path=xx
cassandra.config=xx
cassandra.hosts=xx
cassandra.port=xx
cassandra.username=xx
cassandra.password=xx
kafka.producer.bootstrap.servers=xx
kafka.producer.retries=3
kafka.producer.retry.backoff.ms=1000
kafka.producer.ssl.ca.location=/etc/debezium/cluster-ca-certificate.pem
kafka.producer.ssl.protocol=TLS
kafka.producer.ssl.endpoint.identification.algorithm=
kafka.producer.security.protocol=SASL_SSL
kafka.producer.sasl.login=null
kafka.producer.sasl.mechanism=SCRAM-SHA-256
kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="xx" password="xx";
kafka.producer.ssl.truststore.location=/etc/debezium/truststore.jks
kafka.producer.ssl.truststore.password=xx
topic.prefix=xx
snapshot.mode=NEVER
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=xx
key.converter.basic.auth.user.info=xx
key.converter.basic.auth.credentials.source=USER_INFO
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=xx
value.converter.basic.auth.user.info=xx
value.converter.basic.auth.credentials.source=USER_INFO
offset.backing.store.dir=/tmp/debezium/offsets/
offset.flush.interval.ms=500
max.offset.flush.size=10000
What is the captured database version and mode of deployment?
Cassandra 4.0.14 cluster with 3 nodes, RF=3, deployed on AWS
Kafka 3.7.1 RF=3 number of partitions=3 with KRaft Controller & Karapace Schema Registry 3.15.0, deployed on AWS
What behavior do you expect?
When I insert one record to Cassandra, consumer will get 3 messages for that insertion generated by different 3 nodes.
What behavior do you see?
When I insert one record to Cassandra, consumer will get 5 messages at most for that insertion, and 2 of nodes send same change events twice to Kafka
Do you see the same behaviour using the latest released Debezium version?
(Ideally, also verify with latest Alpha/Beta/CR version)
Due to the Avro conversion bug, we don't update Debezium version yet.
Do you have the connector logs, ideally from start till finish?
(You might be asked later to provide DEBUG/TRACE level log)
Please focus on record related to weather.test, based on the result I read from Kafka, the Cassandra insertion timestamp is 04:56:27, and no other action for Cassandra later.
Nov 28 04:56:34 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG i.d.c.c.AbstractCassandra4CommitLogParser Finished reading commit log segments LogicalCommitLog{commitLogPosition=CommitLogPosition(segmentId=1732759045759, position=0), synced=4925963, completed=false, log=/var/lib/cassandra/data/cdc_raw/CommitLog-7-1732759045759.log, index=/var/lib /cassandra/data/cdc_raw/CommitLog-7-1732759045759_cdc.idx, commitLogSegmentId=1732759045759} on position CommitLogPosition(segmentId=1732759045759, position=4919299) Nov 28 04:56:34 ip-10-1-107-210 debezium-connector-cassandra[8088]: INFO i.d.c.c.Cassandra4CommitLogRealTimeParser Sleep for idx file to be complete Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.p.i.RecordAccumulator [Producer clientId=producer-1] Assigned producerId 1 and producerEpoch 0 to batch with base sequence 1119 being sent to partition zishinew.instaclustr.debezium_sla_latency-2 Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.apache.kafka.clients.NetworkClient [Producer clientId=producer-1] Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=1785) and timeout 30000 to node 3: {acks=-1,timeout=30000,partitionSizes=[zishinew.instaclustr.debezi um_sla_latency-2=306]} Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.apache.kafka.clients.NetworkClient [Producer clientId=producer-1] Received PRODUCE response from node 3 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=1785): ProduceResponseData(responses=[TopicProduceResponse(name='zishinew.i nstaclustr.debezium_sla_latency', partitionResponses=[PartitionProduceResponse(index=2, errorCode=0, baseOffset=3840, logAppendTimeMs=-1, logStartOffset=0, recordErrors=[], errorMessage=null)])], throttleTimeMs=0) Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.p.i.TransactionManager [Producer clientId=producer-1] ProducerId: 1; Set last ack'd sequence number for topic-partition zishinew.instaclustr.debezium_sla_latency-2 to 1119 Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.p.i.RecordAccumulator [Producer clientId=producer-1] Assigned producerId 1 and producerEpoch 0 to batch with base sequence 652 being sent to partition zishinew.instaclustr.debezium_sla_latency-0 Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.p.i.RecordAccumulator [Producer clientId=producer-1] Assigned producerId 1 and producerEpoch 0 to batch with base sequence 1120 being sent to partition zishinew.instaclustr.debezium_sla_latency-2 Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.apache.kafka.clients.NetworkClient [Producer clientId=producer-1] Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=1786) and timeout 30000 to node 2: {acks=-1,timeout=30000,partitionSizes=[zishinew.instaclustr.debezi um_sla_latency-0=306]} Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.apache.kafka.clients.NetworkClient [Producer clientId=producer-1] Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=1787) and timeout 30000 to node 3: {acks=-1,timeout=30000,partitionSizes=[zishinew.instaclustr.debezi um_sla_latency-2=306]} Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.apache.kafka.clients.NetworkClient [Producer clientId=producer-1] Initialize connection to node 10.0.172.98:9093 (id: 1 rack: us-east-1c) for sending metadata request Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG org.apache.kafka.clients.ClientUtils Resolved host 10.0.172.98 as 10.0.172.98 Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.apache.kafka.clients.NetworkClient [Producer clientId=producer-1] Initiating connection to node 10.0.172.98:9093 (id: 1 rack: us-east-1c) using address /10.0.172.98 Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.s.a.SaslClientAuthenticator [Producer clientId=producer-1] Set SASL client state to SEND_APIVERSIONS_REQUEST Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.s.a.SaslClientAuthenticator [Producer clientId=producer-1] Creating SaslClient: client=null;service=kafka;serviceHostname=ip-10-0-172-98.ec2.internal;mechs=[SCRAM-SHA-256] Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.s.s.i.ScramSaslClient Setting SASL/SCRAM_SHA_256 client state to SEND_CLIENT_FIRST_MESSAGE Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.kafka.common.network.Selector [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 1 Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.apache.kafka.clients.NetworkClient [Producer clientId=producer-1] Completed connection to node 1. Fetching API versions. Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.apache.kafka.clients.NetworkClient [Producer clientId=producer-1] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(topicId=AAAAAAAAAAAAAAAAAAAAAA, name='zishinew.weather.test')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node 10.0.0.143:9093 (id: 3 rack: us-east-1a) Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.apache.kafka.clients.NetworkClient [Producer clientId=producer-1] Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=12, clientId=producer-1, correlationId=1788) and timeout 30000 to node 3: MetadataRequestData(topics=[MetadataRequestTopic(topicId=AAAAAAAAAAAAAAAAAAAAAA, name='zishinew.weather.test')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.apache.kafka.clients.NetworkClient [Producer clientId=producer-1] Received PRODUCE response from node 2 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=1786): ProduceResponseData(responses=[TopicProduceResponse(name='zishinew.instaclustr.debezium_sla_latency', partitionResponses=[PartitionProduceResponse(index=0, errorCode=0, baseOffset=2196, logAppendTimeMs=-1, logStartOffset=0, recordErrors=[], errorMessage=null)])], throttleTimeMs=0) Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.p.i.TransactionManager [Producer clientId=producer-1] ProducerId: 1; Set last ack'd sequence number for topic-partition zishinew.instaclustr.debezium_sla_latency-0 to 652 Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.apache.kafka.clients.NetworkClient [Producer clientId=producer-1] Received PRODUCE response from node 3 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=1787): ProduceResponseData(responses=[TopicProduceResponse(name='zishinew.instaclustr.debezium_sla_latency', partitionResponses=[PartitionProduceResponse(index=2, errorCode=0, baseOffset=3841, logAppendTimeMs=-1, logStartOffset=0, recordErrors=[], errorMessage=null)])], throttleTimeMs=0) Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.p.i.TransactionManager [Producer clientId=producer-1] ProducerId: 1; Set last ack'd sequence number for topic-partition zishinew.instaclustr.debezium_sla_latency-2 to 1120 Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.apache.kafka.clients.NetworkClient [Producer clientId=producer-1] Received METADATA response from node 3 for request with header RequestHeader(apiKey=METADATA, apiVersion=12, clientId=producer-1, correlationId=1788): MetadataResponseData(throttleTimeMs=0, brokers=[MetadataResponseBroker(nodeId=1, host='10.0.172.98', port=9093, rack='us-east-1c'), MetadataResponseBroker(nodeId=2, host='10.0.97.79', port=9093, rack='us-east-1b'), MetadataResponseBroker(nodeId=3, host='10.0.0.143', port=9093, rack='us-east-1a')], clusterId='i5Jk0PhOTEKLn2WKfBQRaw', controllerId=2, topics=[MetadataResponseTopic(errorCode=0, name='zishinew.weather.test', topicId=4eucYkUXRcWy_0dJU8y80A, isInternal=false, partitions=[MetadataResponsePartition(errorCode=0, partitionIndex=1, leaderId=1, leaderEpoch=0, replicaNodes=[1, 3, 2], isrNodes=[1, 2, 3], offlineReplicas=[]), MetadataResponsePartition(errorCode=0, partitionIndex=2, leaderId=3, leaderEpoch=2, replicaNodes=[3, 2, 1], isrNodes=[2, 1, 3], offlineReplicas=[]), MetadataResponsePartition(errorCode=0, partitionIndex=0, leaderId=2, leaderEpoch=0, replicaNodes=[2, 1, 3], isrNodes=[2, 1, 3], offlineReplicas=[])], topicAuthorizedOperations=-2147483648)], clusterAuthorizedOperations=-2147483648) Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: INFO org.apache.kafka.clients.Metadata [Producer clientId=producer-1] Resetting the last seen epoch of partition zishinew.weather.test-1 to 0 since the associated topicId changed from null to 4eucYkUXRcWy_0dJU8y80A Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: INFO org.apache.kafka.clients.Metadata [Producer clientId=producer-1] Resetting the last seen epoch of partition zishinew.weather.test-2 to 2 since the associated topicId changed from null to 4eucYkUXRcWy_0dJU8y80A Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: INFO org.apache.kafka.clients.Metadata [Producer clientId=producer-1] Resetting the last seen epoch of partition zishinew.weather.test-0 to 0 since the associated topicId changed from null to 4eucYkUXRcWy_0dJU8y80A Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG org.apache.kafka.clients.Metadata [Producer clientId=producer-1] Updated cluster metadata updateVersion 36 to MetadataCache{clusterId='i5Jk0PhOTEKLn2WKfBQRaw', nodes={1=10.0.172.98:9093 (id: 1 rack: us-east-1c), 2=10.0.97.79:9093 (id: 2 rack: us-east-1b), 3=10.0.0.143:9093 (id: 3 rack: us-east-1a)}, partitions=[PartitionMetadata(error=NONE, partition=zishinew.instaclustr.debezium_sla_latency-2, leader=Optional[3], leaderEpoch=Optional[2], replicas=3,2,1, isr=2,1,3, offlineReplicas=), PartitionMetadata(error=NONE, partition=zishinew.weather.test-1, leader=Optional[1], leaderEpoch=Optional[0], replicas=1,3,2, isr=1,2,3, offlineReplicas=), PartitionMetadata(error=NONE, partition=zishinew.weather.test-2, leader=Optional[3], leaderEpoch=Optional[2], replicas=3,2,1, isr=2,1,3, offlineReplicas=), PartitionMetadata(error=NONE, partition=zishinew.weather.test-0, leader=Optional[2], leaderEpoch=Optional[0], replicas=2,1,3, isr=2,1,3, offlineReplicas=), PartitionMetadata(error=NONE, partition=zishinew.instaclustr.debezium_sla_latency-1, leader=Optional[1], leaderEpoch=Optional[0], replicas=1,3,2, isr=1,2,3, offlineReplicas=), PartitionMetadata(error=NONE, partition=zishinew.instaclustr.debezium_sla_latency-0, leader=Optional[2], leaderEpoch=Optional[0], replicas=2,1,3, isr=2,1,3, offlineReplicas=)], controller=10.0.97.79:9093 (id: 2 rack: us-east-1b)} Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.p.i.RecordAccumulator [Producer clientId=producer-1] Assigned producerId 1 and producerEpoch 0 to batch with base sequence 11 being sent to partition zishinew.weather.test-0 Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.apache.kafka.clients.NetworkClient [Producer clientId=producer-1] Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=1789) and timeout 30000 to node 2: {acks=-1,timeout=30000,partitionSizes=[zishinew.weather.test-0=211]} Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.apache.kafka.clients.NetworkClient [Producer clientId=producer-1] Received PRODUCE response from node 2 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=1789): ProduceResponseData(responses=[TopicProduceResponse(name='zishinew.weather.test', partitionResponses=[PartitionProduceResponse(index=0, errorCode=0, baseOffset=36, logAppendTimeMs=-1, logStartOffset=0, recordErrors=[], errorMessage=null)])], throttleTimeMs=0) Nov 28 04:56:35 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.p.i.TransactionManager [Producer clientId=producer-1] ProducerId: 1; Set last ack'd sequence number for topic-partition zishinew.weather.test-0 to 11 Nov 28 04:56:36 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.network.SslTransportLayer [SslTransportLayer channelId=1 key=channel=java.nio.channels.SocketChannel[connection-pending remote=/10.0.172.98:9093], selector=sun.nio.ch.EPollSelectorImpl@683c8a57, interestOps=8, readyOps=0] SSL handshake completed successfully with peerHost '10.0.172.98' peerPort 9093 peerPrincipal 'CN=Node 941ceece-b430-44fd-b5e8-bb83f1c1069c, OU=Cluster 72b2d997-781b-449f-9cdc-9dd0f03bcb79, O=Instaclustr Pty. Ltd., C=AU' cipherSuite 'TLS_AES_256_GCM_SHA384' Nov 28 04:56:36 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.s.a.SaslClientAuthenticator [Producer clientId=producer-1] Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE Nov 28 04:56:36 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.s.a.SaslClientAuthenticator [Producer clientId=producer-1] Set SASL client state to SEND_HANDSHAKE_REQUEST Nov 28 04:56:36 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.s.a.SaslClientAuthenticator [Producer clientId=producer-1] Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE Nov 28 04:56:36 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.s.a.SaslClientAuthenticator [Producer clientId=producer-1] Set SASL client state to INITIAL Nov 28 04:56:36 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.s.s.i.ScramSaslClient Setting SASL/SCRAM_SHA_256 client state to RECEIVE_SERVER_FIRST_MESSAGE Nov 28 04:56:36 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.s.a.SaslClientAuthenticator [Producer clientId=producer-1] Set SASL client state to INTERMEDIATE Nov 28 04:56:36 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.s.s.i.ScramSaslClient Setting SASL/SCRAM_SHA_256 client state to RECEIVE_SERVER_FINAL_MESSAGE Nov 28 04:56:36 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.s.s.i.ScramSaslClient Setting SASL/SCRAM_SHA_256 client state to COMPLETE Nov 28 04:56:36 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.s.a.SaslClientAuthenticator [Producer clientId=producer-1] Set SASL client state to COMPLETE Nov 28 04:56:36 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.s.a.SaslClientAuthenticator [Producer clientId=producer-1] Finished authentication with no session expiration and no session re-authentication Nov 28 04:56:36 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.kafka.common.network.Selector [Producer clientId=producer-1] Successfully authenticated with ip-10-0-172-98.ec2.internal/10.0.172.98 Nov 28 04:56:36 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.apache.kafka.clients.NetworkClient [Producer clientId=producer-1] Initiating API versions fetch from node 1. Nov 28 04:56:36 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.apache.kafka.clients.NetworkClient [Producer clientId=producer-1] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=producer-1, correlationId=1790) and timeout 30000 to node 1: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='3.3.1') Nov 28 04:56:36 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.apache.kafka.clients.NetworkClient [Producer clientId=producer-1] Received API_VERSIONS response from node 1 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=producer-1, correlationId=1790): ApiVersionsResponseData(errorCode=0, apiKeys=[ApiVersion(apiKey=0, minVersion=0, maxVersion=10), ApiVersion(apiKey=1, minVersion=0, maxVersion=16), ApiVersion(apiKey=2, minVersion=0, maxVersion=8), ApiVersion(apiKey=3, minVersion=0, maxVersion=12), ApiVersion(apiKey=8, minVersion=0, maxVersion=9), ApiVersion(apiKey=9, minVersion=0, maxVersion=9), ApiVersion(apiKey=10, minVersion=0, maxVersion=4), ApiVersion(apiKey=11, minVersion=0, maxVersion=9), ApiVersion(apiKey=12, minVersion=0, maxVersion=4), ApiVersion(apiKey=13, minVersion=0, maxVersion=5), ApiVersion(apiKey=14, minVersion=0, maxVersion=5), ApiVersion(apiKey=15, minVersion=0, maxVersion=5), ApiVersion(apiKey=16, minVersion=0, maxVersion=4), ApiVersion(apiKey=17, minVersion=0, maxVersion=1), ApiVersion(apiKey=18, minVersion=0, maxVersion=3), ApiVersion(apiKey=19, minVersion=0, maxVersion=7), ApiVersion(apiKey=20, minVersion=0, maxVersion=6), ApiVersion(apiKey=21, minVersion=0, maxVersion=2), ApiVersion(apiKey=22, minVersion=0, maxVersion=4), ApiVersion(apiKey=23, minVersion=0, maxVersion=4), ApiVersion(apiKey=24, minVersion=0, maxVersion=4), ApiVersion(apiKey=25, minVersion=0, maxVersion=3), ApiVersion(apiKey=26, minVersion=0, maxVersion=3), ApiVersion(apiKey=27, minVersion=0, maxVersion=1), ApiVersion(apiKey=28, minVersion=0, maxVersion=3), ApiVersion(apiKey=29, minVersion=0, maxVersion=3), ApiVersion(apiKey=30, minVersion=0, maxVersion=3), ApiVersion(apiKey=31, minVersion=0, maxVersion=3), ApiVersion(apiKey=32, minVersion=0, maxVersion=4), ApiVersion(apiKey=33, minVersion=0, maxVersion=2), ApiVersion(apiKey=34, minVersion=0, maxVersion=2), ApiVersion(apiKey=35, minVersion=0, maxVersion=4), ApiVersion(apiKey=36, minVersion=0, maxVersion=2), ApiVersion(apiKey=37, minVersion=0, maxVersion=3), ApiVersion(apiKey=38, minVersion=0, maxVersion=3), ApiVersion(apiKey=39, minVersion=0, maxVersion=2), ApiVersion(apiKey=40, minVersion=0, maxVersion=2), ApiVersion(apiKey=41, minVersion=0, maxVersion=3), ApiVersion(apiKey=42, minVersion=0, maxVersion=2), ApiVersion(apiKey=43, minVersion=0, maxVersion=2), ApiVersion(apiKey=44, minVersion=0, maxVersion=1), ApiVersion(apiKey=45, minVersion=0, maxVersion=0), ApiVersion(apiKey=46, minVersion=0, maxVersion=0), ApiVersion(apiKey=47, minVersion=0, maxVersion=0), ApiVersion(apiKey=48, minVersion=0, maxVersion=1), ApiVersion(apiKey=49, minVersion=0, maxVersion=1), ApiVersion(apiKey=50, minVersion=0, maxVersion=0), ApiVersion(apiKey=51, minVersion=0, maxVersion=0), ApiVersion(apiKey=55, minVersion=0, maxVersion=1), ApiVersion(apiKey=57, minVersion=0, maxVersion=1), ApiVersion(apiKey=60, minVersion=0, maxVersion=1), ApiVersion(apiKey=61, minVersion=0, maxVersion=0), ApiVersion(apiKey=64, minVersion=0, maxVersion=0), ApiVersion(apiKey=65, minVersion=0, maxVersion=0), ApiVersion(apiKey=66, minVersion=0, maxVersion=0), ApiVersion(apiKey=68, minVersion=0, maxVersion=0), ApiVersion(apiKey=74, minVersion=0, maxVersion=0)], throttleTimeMs=0, supportedFeatures=[SupportedFeatureKey(name='metadata.version', minVersion=1, maxVersion=19)], finalizedFeaturesEpoch=28463, finalizedFeatures=[FinalizedFeatureKey(name='metadata.version', maxVersionLevel=19, minVersionLevel=19)]) Nov 28 04:56:36 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.apache.kafka.clients.NetworkClient [Producer clientId=producer-1] Node 1 has finalized features epoch: 28463, finalized features: [FinalizedFeatureKey(name='metadata.version', maxVersionLevel=19, minVersionLevel=19)], supported features: [SupportedFeatureKey(name='metadata.version', minVersion=1, maxVersion=19)], API versions: (Produce(0): 0 to 10 [usable: 9], Fetch(1): 0 to 16 [usable: 13], ListOffsets(2): 0 to 8 [usable: 7], Metadata(3): 0 to 12 [usable: 12], LeaderAndIsr(4): UNSUPPORTED, StopReplica(5): UNSUPPORTED, UpdateMetadata(6): UNSUPPORTED, ControlledShutdown(7): UNSUPPORTED, OffsetCommit(8): 0 to 9 [usable: 8], OffsetFetch(9): 0 to 9 [usable: 8], FindCoordinator(10): 0 to 4 [usable: 4], JoinGroup(11): 0 to 9 [usable: 9], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 5 [usable: 5], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 4 [usable: 4], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 7 [usable: 7], DeleteTopics(20): 0 to 6 [usable: 6], DeleteRecords(21): 0 to 2 [usable: 2], InitProducerId(22): 0 to 4 [usable: 4], OffsetForLeaderEpoch(23): 0 to 4 [usable: 4], AddPartitionsToTxn(24): 0 to 4 [usable: 3], AddOffsetsToTxn(25): 0 to 3 [usable: 3], EndTxn(26): 0 to 3 [usable: 3], WriteTxnMarkers(27): 0 to 1 [usable: 1], TxnOffsetCommit(28): 0 to 3 [usable: 3], DescribeAcls(29): 0 to 3 [usable: 3], CreateAcls(30): 0 to 3 [usable: 3], DeleteAcls(31): 0 to 3 [usable: 3], DescribeConfigs(32): 0 to 4 [usable: 4], AlterConfigs(33): 0 to 2 [usable: 2], AlterReplicaLogDirs(34): 0 to 2 [usable: 2], DescribeLogDirs(35): 0 to 4 [usable: 4], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 3 [usable: 3], CreateDelegationToken(38): 0 to 3 [usable: 3], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 3 [usable: 3], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0], DescribeClientQuotas(48): 0 to 1 [usable: 1], AlterClientQuotas(49): 0 to 1 [usable: 1], DescribeUserScramCredentials(50): 0 [usable: 0], AlterUserScramCredentials(51): 0 [usable: 0], DescribeQuorum(55): 0 to 1 [usable: 1], AlterPartition(56): UNSUPPORTED, UpdateFeatures(57): 0 to 1 [usable: 1], DescribeCluster(60): 0 to 1 [usable: 0], DescribeProducers(61): 0 [usable: 0], UnregisterBroker(64): 0 [usable: 0], DescribeTransactions(65): 0 [usable: 0], ListTransactions(66): 0 [usable: 0], AllocateProducerIds(67): UNSUPPORTED, UNKNOWN(68): 0, UNKNOWN(74): 0). Nov 28 04:56:42 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG i.d.c.cassandra.SnapshotProcessor Skipping snapshot [mode: NEVER] Nov 28 04:56:43 ip-10-1-107-210 debezium-connector-cassandra[8088]: INFO i.d.c.c.AbstractDirectoryWatcher No commitLogFile is detected in /var/lib/cassandra/data/cdc_raw. Nov 28 04:56:43 ip-10-1-107-210 debezium-connector-cassandra[8088]: INFO i.d.c.c.AbstractDirectoryWatcher Polling commitLog files from /var/lib/cassandra/data/cdc_raw ... Nov 28 04:56:44 ip-10-1-107-210 debezium-connector-cassandra[8088]: INFO i.d.c.c.Cassandra4CommitLogRealTimeParser Polling for completeness of idx file for: LogicalCommitLog{commitLogPosition=CommitLogPosition(segmentId=1732759045759, position=0), synced=4926029, completed=false, log=/var/lib/cassandra/data/cdc_raw/CommitLog-7-1732759045759.log, index=/var/lib/cassandra/data/cdc_raw/CommitLog-7-1732759045759_cdc.idx, commitLogSegmentId=1732759045759} Nov 28 04:56:44 ip-10-1-107-210 debezium-connector-cassandra[8088]: INFO i.d.c.c.Cassandra4CommitLogRealTimeParser Resume to read the partial file: LogicalCommitLog{commitLogPosition=CommitLogPosition(segmentId=1732759045759, position=0), synced=4926029, completed=false, log=/var/lib/cassandra/data/cdc_raw/CommitLog-7-1732759045759.log, index=/var/lib/cassandra/data/cdc_raw/CommitLog-7-1732759045759_cdc.idx, commitLogSegmentId=1732759045759} Nov 28 04:56:44 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG i.d.c.c.AbstractCassandra4CommitLogParser Starting to read commit log segments LogicalCommitLog{commitLogPosition=CommitLogPosition(segmentId=1732759045759, position=0), synced=4926029, completed=false, log=/var/lib/cassandra/data/cdc_raw/CommitLog-7-1732759045759.log, index=/var/lib/cassandra/data/cdc_raw/CommitLog-7-1732759045759_cdc.idx, commitLogSegmentId=1732759045759} on position CommitLogPosition(segmentId=1732759045759, position=4925963) Nov 28 04:56:44 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.c.db.commitlog.CommitLogReader Reading /var/lib/cassandra/data/cdc_raw/CommitLog-7-1732759045759.log (CL version 7, messaging version 12, compression null) Nov 28 04:56:44 ip-10-1-107-210 debezium-connector-cassandra[8088]: INFO o.a.c.db.commitlog.CommitLogReader Finished reading /var/lib/cassandra/data/cdc_raw/CommitLog-7-1732759045759.log Nov 28 04:56:44 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG i.d.c.c.AbstractCassandra4CommitLogParser Finished reading commit log segments LogicalCommitLog{commitLogPosition=CommitLogPosition(segmentId=1732759045759, position=0), synced=4926029, completed=false, log=/var/lib/cassandra/data/cdc_raw/CommitLog-7-1732759045759.log, index=/var/lib/cassandra/data/cdc_raw/CommitLog-7-1732759045759_cdc.idx, commitLogSegmentId=1732759045759} on position CommitLogPosition(segmentId=1732759045759, position=4925963) Nov 28 04:56:44 ip-10-1-107-210 debezium-connector-cassandra[8088]: INFO i.d.c.c.Cassandra4CommitLogRealTimeParser Sleep for idx file to be complete Nov 28 04:56:44 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.p.i.RecordAccumulator [Producer clientId=producer-1] Assigned producerId 1 and producerEpoch 0 to batch with base sequence 12 being sent to partition zishinew.weather.test-0 Nov 28 04:56:44 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.apache.kafka.clients.NetworkClient [Producer clientId=producer-1] Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=1791) and timeout 30000 to node 2: {acks=-1,timeout=30000,partitionSizes=[zishinew.weather.test-0=211]} Nov 28 04:56:44 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.apache.kafka.clients.NetworkClient [Producer clientId=producer-1] Received PRODUCE response from node 2 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=1791): ProduceResponseData(responses=[TopicProduceResponse(name='zishinew.weather.test', partitionResponses=[PartitionProduceResponse(index=0, errorCode=0, baseOffset=38, logAppendTimeMs=-1, logStartOffset=0, recordErrors=[], errorMessage=null)])], throttleTimeMs=0) Nov 28 04:56:44 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.p.i.TransactionManager [Producer clientId=producer-1] ProducerId: 1; Set last ack'd sequence number for topic-partition zishinew.weather.test-0 to 12 Nov 28 04:56:44 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.p.i.RecordAccumulator [Producer clientId=producer-1] Assigned producerId 1 and producerEpoch 0 to batch with base sequence 1121 being sent to partition zishinew.instaclustr.debezium_sla_latency-2 Nov 28 04:56:44 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.apache.kafka.clients.NetworkClient [Producer clientId=producer-1] Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=1792) and timeout 30000 to node 3: {acks=-1,timeout=30000,partitionSizes=[zishinew.instaclustr.debezium_sla_latency-2=306]} Nov 28 04:56:44 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.p.i.RecordAccumulator [Producer clientId=producer-1] Assigned producerId 1 and producerEpoch 0 to batch with base sequence 653 being sent to partition zishinew.instaclustr.debezium_sla_latency-0 Nov 28 04:56:44 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.p.i.RecordAccumulator [Producer clientId=producer-1] Assigned producerId 1 and producerEpoch 0 to batch with base sequence 1122 being sent to partition zishinew.instaclustr.debezium_sla_latency-2 Nov 28 04:56:44 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.apache.kafka.clients.NetworkClient [Producer clientId=producer-1] Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=1793) and timeout 30000 to node 2: {acks=-1,timeout=30000,partitionSizes=[zishinew.instaclustr.debezium_sla_latency-0=306]} Nov 28 04:56:44 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.apache.kafka.clients.NetworkClient [Producer clientId=producer-1] Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=1794) and timeout 30000 to node 3: {acks=-1,timeout=30000,partitionSizes=[zishinew.instaclustr.debezium_sla_latency-2=306]} Nov 28 04:56:44 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.apache.kafka.clients.NetworkClient [Producer clientId=producer-1] Received PRODUCE response from node 3 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=1792): ProduceResponseData(responses=[TopicProduceResponse(name='zishinew.instaclustr.debezium_sla_latency', partitionResponses=[PartitionProduceResponse(index=2, errorCode=0, baseOffset=3843, logAppendTimeMs=-1, logStartOffset=0, recordErrors=[], errorMessage=null)])], throttleTimeMs=0) Nov 28 04:56:44 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.p.i.TransactionManager [Producer clientId=producer-1] ProducerId: 1; Set last ack'd sequence number for topic-partition zishinew.instaclustr.debezium_sla_latency-2 to 1121 Nov 28 04:56:44 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.apache.kafka.clients.NetworkClient [Producer clientId=producer-1] Received PRODUCE response from node 2 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=1793): ProduceResponseData(responses=[TopicProduceResponse(name='zishinew.instaclustr.debezium_sla_latency', partitionResponses=[PartitionProduceResponse(index=0, errorCode=0, baseOffset=2198, logAppendTimeMs=-1, logStartOffset=0, recordErrors=[], errorMessage=null)])], throttleTimeMs=0) Nov 28 04:56:44 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.p.i.TransactionManager [Producer clientId=producer-1] ProducerId: 1; Set last ack'd sequence number for topic-partition zishinew.instaclustr.debezium_sla_latency-0 to 653 Nov 28 04:56:44 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.apache.kafka.clients.NetworkClient [Producer clientId=producer-1] Received PRODUCE response from node 3 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=1794): ProduceResponseData(responses=[TopicProduceResponse(name='zishinew.instaclustr.debezium_sla_latency', partitionResponses=[PartitionProduceResponse(index=2, errorCode=0, baseOffset=3844, logAppendTimeMs=-1, logStartOffset=0, recordErrors=[], errorMessage=null)])], throttleTimeMs=0) Nov 28 04:56:44 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG o.a.k.c.p.i.TransactionManager [Producer clientId=producer-1] ProducerId: 1; Set last ack'd sequence number for topic-partition zishinew.instaclustr.debezium_sla_latency-2 to 1122 Nov 28 04:56:45 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG c.d.o.d.i.c.channel.HeartbeatHandler Connection was inactive for 30 seconds, sending heartbeat Nov 28 04:56:45 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG c.d.o.d.i.c.channel.InFlightHandler [s0|id: 0x6a5108cb, L:/10.1.107.210:60032 - R:/44.218.9.111:9042] Got last response on in-flight stream id 0, completing and releasing Nov 28 04:56:45 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG c.d.o.d.i.c.channel.HeartbeatHandler [id: 0x6a5108cb, L:/10.1.107.210:60032 - R:/44.218.9.111:9042] Heartbeat query succeeded Nov 28 04:56:45 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG c.d.o.d.i.c.channel.HeartbeatHandler Connection was inactive for 30 seconds, sending heartbeat Nov 28 04:56:45 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG c.d.o.d.i.c.channel.InFlightHandler [s0|id: 0xe7739e9b, L:/10.1.107.210:45778 - R:/98.85.163.132:9042] Got last response on in-flight stream id 0, completing and releasing Nov 28 04:56:45 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG c.d.o.d.i.c.channel.HeartbeatHandler [id: 0xe7739e9b, L:/10.1.107.210:45778 - R:/98.85.163.132:9042] Heartbeat query succeeded Nov 28 04:56:45 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG c.d.o.d.i.c.channel.HeartbeatHandler Connection was inactive for 30 seconds, sending heartbeat Nov 28 04:56:45 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG c.d.o.d.i.c.channel.InFlightHandler [s0|control|id: 0xa8432be0, L:/10.1.107.210:49990 - R:/54.159.58.141:9042] Got last response on in-flight stream id 0, completing and releasing Nov 28 04:56:45 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG c.d.o.d.i.c.channel.HeartbeatHandler [id: 0xa8432be0, L:/10.1.107.210:49990 - R:/54.159.58.141:9042] Heartbeat query succeeded Nov 28 04:56:45 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG c.d.o.d.i.c.channel.HeartbeatHandler Connection was inactive for 30 seconds, sending heartbeat Nov 28 04:56:45 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG c.d.o.d.i.c.channel.InFlightHandler [s0|id: 0xe99c9227, L:/10.1.107.210:50002 - R:/54.159.58.141:9042] Got last response on in-flight stream id 0, completing and releasing Nov 28 04:56:45 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG c.d.o.d.i.c.channel.HeartbeatHandler [id: 0xe99c9227, L:/10.1.107.210:50002 - R:/54.159.58.141:9042] Heartbeat query succeeded Nov 28 04:56:52 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG i.d.c.cassandra.SnapshotProcessor Skipping snapshot [mode: NEVER] Nov 28 04:56:53 ip-10-1-107-210 debezium-connector-cassandra[8088]: INFO i.d.c.c.AbstractDirectoryWatcher No commitLogFile is detected in /var/lib/cassandra/data/cdc_raw. Nov 28 04:56:53 ip-10-1-107-210 debezium-connector-cassandra[8088]: INFO i.d.c.c.AbstractDirectoryWatcher Polling commitLog files from /var/lib/cassandra/data/cdc_raw ... Nov 28 04:56:54 ip-10-1-107-210 debezium-connector-cassandra[8088]: INFO i.d.c.c.Cassandra4CommitLogRealTimeParser Polling for completeness of idx file for: LogicalCommitLog{commitLogPosition=CommitLogPosition(segmentId=1732759045759, position=0), synced=4932693, completed=false, log=/var/lib/cassandra/data/cdc_raw/CommitLog-7-1732759045759.log, index=/var/lib/cassandra/data/cdc_raw/CommitLog-7-1732759045759_cdc.idx, commitLogSegmentId=1732759045759} Nov 28 04:56:54 ip-10-1-107-210 debezium-connector-cassandra[8088]: INFO i.d.c.c.Cassandra4CommitLogRealTimeParser Resume to read the partial file: LogicalCommitLog{commitLogPosition=CommitLogPosition(segmentId=1732759045759, position=0), synced=4932693, completed=false, log=/var/lib/cassandra/data/cdc_raw/CommitLog-7-1732759045759.log, index=/var/lib/cassandra/data/cdc_raw/CommitLog-7-1732759045759_cdc.idx, commitLogSegmentId=1732759045759} Nov 28 04:56:54 ip-10-1-107-210 debezium-connector-cassandra[8088]: DEBUG i.d.c.c.AbstractCassandra4CommitLogParser Starting to read commit log segments LogicalCommitLog{commitLogPosition=CommitLogPosition(segmentId=1732759045759, position=0), synced=4932693, completed=false, log=/var/lib/cassandra/data/cdc_raw/CommitLog-7-1732759045759.log, index=/var/lib/cassandra/data/cdc_raw/CommitLog-7-1732759045759_cdc.idx, commitLogSegmentId=1732759045759} on position CommitLogPosition(segmentId=1732759045759, position=4926029)
How to reproduce the issue using our tutorial deployment?
Deploy Cassandra + Debezium-connector-cassandra with 3 nodes, RF=3, create keyspace, table in Cassandra and insert a record into that table.
Feature request or enhancement
For feature requests or enhancements, provide this information, please:
For on record in commitlog on one node, there just one change event sent to Kafka.
Which use case/requirement will be addressed by the proposed feature?
Not sure about the root cause, since there's no log indicates for issues of sending record to Kafka, I assume there's some bug when process commitlog in Debezium, I'll read code too.
Implementation ideas (optional)
<Your answer>