-
Enhancement
-
Resolution: Done
-
Major
-
3.1.3.Final
In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.
Bug report
For bug reports, provide this information, please:
What Debezium connector do you use and what version?
Debezium Server 3.1Final
What is the connector configuration?
apiVersion: debezium.io/v1alpha1
kind: DebeziumServer
metadata:
 name: debezium
spec:
 image: quay.io/debezium/server:3.1
 runtime:
  api:
   enabled: true
 sink:
  type: eventhubs
  source:
  class: io.debezium.connector.postgresql.PostgresConnector
  offset:
   file:
    fileName: offset.dat
  schemaHistory:
   file:
    fileName: schema-history.dat
  config:
   database.port: 5432
   database.dbname: yourdbname
   plugin.name: pgoutput
   transforms: SingleTopicReroute, PartitionRouting
   transforms.SingleTopicReroute.type: io.debezium.transforms.ByLogicalTableRouter
   transforms.SingleTopicReroute.topic.regex: .*
   transforms.SingleTopicReroute.topic.replacement: debezium_postgres
   transforms.PartitionRouting.type: io.debezium.transforms.partitions.PartitionRouting
   transforms.PartitionRouting.partition.payload.fields: change.partition_id
   transforms.PartitionRouting.partition.topic.num: 6
   transforms.PartitionRouting.partition.hash.function: murmur2
What is the captured database version and mode of deployment?
Azure Postgres Flex Server 17, deployed with Azure ARM Templates through Terraform
What behavior do you expect?
When Debezium sends a message to Azure Event Hub, it should include the message key
What behavior do you see?
Debezium errors out and crashes
Â
If I configure `debezium.sink.eventhubs.partitionkey=id` then all of my messages end up in a single partition, with empty message IDs for all messages
Do you see the same behaviour using the latest released Debezium version?
Yes
Do you have the connector logs, ideally from start till finish?
Yes
{"timestamp":"2025-06-30T10:42:25.043631116Z","sequence":962,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.postgresql.connection.WalPositionLocator","level":"INFO","message":"Looking for WAL restart position for last commit LSN 'null' and last change LSN 'LSN{D/290034B8}'","threadName":"debezium-postgresconnector-authz-change-event-source-coordinator","threadId":55,"mdc":{"dbz.taskId":"0","dbz.databaseName":"gum","dbz.connectorName":"authz","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:25.043760424Z","sequence":963,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.postgresql.connection.PostgresReplicationConnection","level":"INFO","message":"Initializing PgOutput logical decoder publication","threadName":"debezium-postgresconnector-authz-change-event-source-coordinator","threadId":55,"mdc":{"dbz.taskId":"0","dbz.databaseName":"gum","dbz.connectorName":"authz","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:25.071626635Z","sequence":968,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.postgresql.connection.PostgresConnection","level":"INFO","message":"Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{D/290034B8}, catalogXmin=244384]","threadName":"debezium-postgresconnector-authz-change-event-source-coordinator","threadId":55,"mdc":{"dbz.taskId":"0","dbz.databaseName":"gum","dbz.connectorName":"authz","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:25.073006425Z","sequence":969,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.jdbc.JdbcConnection","level":"INFO","message":"Connection gracefully closed","threadName":"pool-18-thread-1","threadId":59,"mdc":{"dbz.taskId":"0","dbz.databaseName":"gum","dbz.connectorName":"authz","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:25.089655507Z","sequence":970,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.util.Threads","level":"INFO","message":"Requested thread factory for component PostgresConnector, id = authz named = keep-alive","threadName":"debezium-postgresconnector-authz-change-event-source-coordinator","threadId":55,"mdc":{"dbz.taskId":"0","dbz.databaseName":"gum","dbz.connectorName":"authz","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:25.09016174Z","sequence":971,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.util.Threads","level":"INFO","message":"Creating thread debezium-postgresconnector-authz-keep-alive","threadName":"debezium-postgresconnector-authz-change-event-source-coordinator","threadId":55,"mdc":{"dbz.taskId":"0","dbz.databaseName":"gum","dbz.connectorName":"authz","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:25.090990694Z","sequence":972,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.postgresql.PostgresStreamingChangeEventSource","level":"INFO","message":"Processing messages","threadName":"debezium-postgresconnector-authz-change-event-source-coordinator","threadId":55,"mdc":{"dbz.taskId":"0","dbz.databaseName":"gum","dbz.connectorName":"authz","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:25.686066179Z","sequence":973,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"com.azure.core.amqp.implementation.ReactorSender","level":"WARN","message":"{\"az.sdk.message\":\"Delivery rejected.\",\"connectionId\":\"MF_68f90a_1751280142770\",\"entityPath\":\"um-postgres-cdc-events\",\"linkName\":\"um-postgres-cdc-events\",\"deliveryTag\":\"63de0182dace4315af465de2f53adc9f\",\"rejected\":\"Rejected{error=Error{condition=amqp:not-allowed, description='Compacted event hub does not allow null message key. TrackingId:3b906d850000100e0000002c68626a11_G0_B2, SystemTracker:st-gum-stage-westus2-01:eventhub:um-postgres-cdc-events~21844, Timestamp:2025-06-30T10:42:25', info=null}}\"}","threadName":"reactor-executor-1","threadId":36,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:25.688446834Z","sequence":974,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"com.azure.core.amqp.implementation.RetryUtil","level":"ERROR","message":"partitionId[null]: Sending messages timed out.\nCompacted event hub does not allow null message key. TrackingId:3b906d850000100e0000002c68626a11_G0_B2, SystemTracker:st-gum-stage-westus2-01:eventhub:um-postgres-cdc-events~21844, Timestamp:2025-06-30T10:42:25","threadName":"reactor-executor-1","threadId":36,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:25.688902863Z","sequence":975,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.embedded.async.AsyncEmbeddedEngine","level":"ERROR","message":"Engine has failed with ","threadName":"pool-7-thread-1","threadId":42,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1,"exception":{"refId":1,"exceptionType":"java.util.concurrent.ExecutionException","message":"io.debezium.DebeziumException: java.lang.UnsupportedOperationException: Compacted event hub does not allow null message key. TrackingId:3b906d850000100e0000002c68626a11_G0_B2, SystemTracker:st-gum-stage-westus2-01:eventhub:um-postgres-cdc-events~21844, Timestamp:2025-06-30T10:42:25","frames":[{"class<span class="code-quote">":"java.util.concurrent.FutureTask","method":"report","line":122},{"class<span class="code-quote">":"java.util.concurrent.FutureTask","method":"get","line":191},{"class<span class="code-quote">":"io.debezium.embedded.async.AsyncEmbeddedEngine","method":"runTasksPolling","line":496},{"class<span class="code-quote">":"io.debezium.embedded.async.AsyncEmbeddedEngine","method":"run","line":220},{"class<span class="code-quote">":"io.debezium.server.DebeziumServer","method":"lambda$start$1","line":182},{"class<span class="code-quote">":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1144},{"class<span class="code-quote">":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":642},{"class<span class="code-quote">":"java.lang.Thread","method":"run","line":1583}],"causedBy":{"exception":{"refId":2,"exceptionType":"io.debezium.DebeziumException","message":"java.lang.UnsupportedOperationException: Compacted event hub does not allow null message key. TrackingId:3b906d850000100e0000002c68626a11_G0_B2, SystemTracker:st-gum-stage-westus2-01:eventhub:um-postgres-cdc-events~21844, Timestamp:2025-06-30T10:42:25","frames":[{"class<span class="code-quote">":"io.debezium.server.eventhubs.BatchManager","method":"emitBatchToEventHub","line":133},{"class<span class="code-quote">":"io.debezium.server.eventhubs.BatchManager","method":"lambda$closeAndEmitBatches$2","line":93},{"class<span class="code-quote">":"java.util.HashMap","method":"forEach","line":1429},{"class<span class="code-quote">":"io.debezium.server.eventhubs.BatchManager","method":"closeAndEmitBatches","line":90},{"class<span class="code-quote">":"io.debezium.server.eventhubs.EventHubsChangeConsumer","method":"handleBatch","line":198},{"class<span class="code-quote">":"io.debezium.embedded.async.ParallelSmtAndConvertBatchProcessor","method":"processRecords","line":56},{"class<span class="code-quote">":"io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords","method":"doCall","line":1203},{"class<span class="code-quote">":"io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords","method":"doCall","line":1183},{"class<span class="code-quote">":"io.debezium.embedded.async.RetryingCallable","method":"call","line":47},{"class<span class="code-quote">":"java.util.concurrent.FutureTask","method":"run","line":317},{"class<span class="code-quote">":"java.util.concurrent.Executors$RunnableAdapter","method":"call","line":572},{"class<span class="code-quote">":"java.util.concurrent.FutureTask","method":"run","line":317},{"class<span class="code-quote">":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1144},{"class<span class="code-quote">":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":642},{"class<span class="code-quote">":"java.lang.Thread","method":"run","line":1583}],"causedBy":{"exception":{"refId":3,"exceptionType":"java.lang.UnsupportedOperationException","message":"Compacted event hub does not allow null message key. TrackingId:3b906d850000100e0000002c68626a11_G0_B2, SystemTracker:st-gum-stage-westus2-01:eventhub:um-postgres-cdc-events~21844, Timestamp:2025-06-30T10:42:25","frames":[{"class<span class="code-quote">":"com.azure.core.amqp.implementation.ExceptionUtil","method":"toException","line":78},{"class<span class="code-quote">":"com.azure.core.amqp.implementation.ReactorSender","method":"processDeliveredMessage","line":650},{"class<span class="code-quote">":"reactor.core.publisher.LambdaSubscriber","method":"onNext","line":160},{"class<span class="code-quote">":"reactor.core.publisher.EmitterProcessor","method":"drain","line":537},{"class<span class="code-quote">":"reactor.core.publisher.EmitterProcessor","method":"tryEmitNext","line":343},{"class<span class="code-quote">":"reactor.core.publisher.SinkManySerialized","method":"tryEmitNext","line":100},{"class<span class="code-quote">":"reactor.core.publisher.InternalManySink","method":"emitNext","line":27},{"class<span class="code-quote">":"com.azure.core.amqp.implementation.handler.SendLinkHandler","method":"onDelivery","line":205},{"class<span class="code-quote">":"org.apache.qpid.proton.engine.BaseHandler","method":"handle","line":185},{"class<span class="code-quote">":"org.apache.qpid.proton.engine.impl.EventImpl","method":"dispatch","line":108},{"class<span class="code-quote">":"org.apache.qpid.proton.reactor.impl.ReactorImpl","method":"dispatch","line":324},{"class<span class="code-quote">":"org.apache.qpid.proton.reactor.impl.ReactorImpl","method":"process","line":291},{"class<span class="code-quote">":"com.azure.core.amqp.implementation.ReactorExecutor","method":"run","line":91},{"class<span class="code-quote">":"reactor.core.scheduler.SchedulerTask","method":"call","line":68},{"class<span class="code-quote">":"reactor.core.scheduler.SchedulerTask","method":"call","line":28},{"class<span class="code-quote">":"java.util.concurrent.FutureTask","method":"run","line":317},{"class<span class="code-quote">":"java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask","method":"run","line":304},{"class<span class="code-quote">":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1144},{"class<span class="code-quote">":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":642},{"class<span class="code-quote">":"java.lang.Thread","method":"run","line":1583}],"suppressed":[{"refId":4,"exceptionType":"java.lang.Exception","message":"#block terminated with an error","frames":[{"class<span class="code-quote">":"reactor.core.publisher.BlockingSingleSubscriber","method":"blockingGet","line":100},{"class<span class="code-quote">":"reactor.core.publisher.Mono","method":"block","line":1742},{"class<span class="code-quote">":"com.azure.messaging.eventhubs.EventHubProducerClient","method":"send","line":385},{"class<span class="code-quote">":"io.debezium.server.eventhubs.EventDataBatchProxy","method":"emit","line":48},{"class<span class="code-quote">":"io.debezium.server.eventhubs.BatchManager","method":"emitBatchToEventHub","line":129},{"class<span class="code-quote">":"io.debezium.server.eventhubs.BatchManager","method":"lambda$closeAndEmitBatches$2","line":93},{"class<span class="code-quote">":"java.util.HashMap","method":"forEach","line":1429},{"class<span class="code-quote">":"io.debezium.server.eventhubs.BatchManager","method":"closeAndEmitBatches","line":90},{"class<span class="code-quote">":"io.debezium.server.eventhubs.EventHubsChangeConsumer","method":"handleBatch","line":198},{"class<span class="code-quote">":"io.debezium.embedded.async.ParallelSmtAndConvertBatchProcessor","method":"processRecords","line":56},{"class<span class="code-quote">":"io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords","method":"doCall","line":1203},{"class<span class="code-quote">":"io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords","method":"doCall","line":1183},{"class<span class="code-quote">":"io.debezium.embedded.async.RetryingCallable","method":"call","line":47},{"class<span class="code-quote">":"java.util.concurrent.FutureTask","method":"run","line":317},{"class<span class="code-quote">":"java.util.concurrent.Executors$RunnableAdapter","method":"call","line":572},{"class<span class="code-quote">":"java.util.concurrent.FutureTask","method":"run","line":317},{"class<span class="code-quote">":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1144},{"class<span class="code-quote">":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":642},{"class<span class="code-quote">":"java.lang.Thread","method":"run","line":1583}]}]}}}}}} {"timestamp":"2025-06-30T10:42:25.689979133Z","sequence":976,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.embedded.async.AsyncEmbeddedEngine","level":"INFO","message":"Engine state has changed from 'POLLING_TASKS' to 'STOPPING'","threadName":"pool-7-thread-1","threadId":42,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:25.690632476Z","sequence":977,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.common.BaseSourceTask","level":"INFO","message":"Stopping down connector","threadName":"pool-13-thread-1","threadId":51,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:26.104892406Z","sequence":978,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.jdbc.JdbcConnection","level":"INFO","message":"Connection gracefully closed","threadName":"pool-19-thread-1","threadId":61,"mdc":{"dbz.taskId":"0","dbz.databaseName":"gum","dbz.connectorName":"authz","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:26.106112985Z","sequence":979,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.jdbc.JdbcConnection","level":"INFO","message":"Connection gracefully closed","threadName":"pool-20-thread-1","threadId":62,"mdc":{"dbz.taskId":"0","dbz.databaseName":"gum","dbz.connectorName":"authz","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:26.106258395Z","sequence":980,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.ChangeEventSourceCoordinator","level":"INFO","message":"Finished streaming","threadName":"debezium-postgresconnector-authz-change-event-source-coordinator","threadId":55,"mdc":{"dbz.taskId":"0","dbz.databaseName":"gum","dbz.connectorName":"authz","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:26.106447607Z","sequence":981,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.ChangeEventSourceCoordinator","level":"INFO","message":"Connected metrics set to 'false'","threadName":"debezium-postgresconnector-authz-change-event-source-coordinator","threadId":55,"mdc":{"dbz.taskId":"0","dbz.databaseName":"gum","dbz.connectorName":"authz","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:26.107179354Z","sequence":982,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.signal.SignalProcessor","level":"INFO","message":"SignalProcessor stopped","threadName":"pool-13-thread-1","threadId":51,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:26.107487874Z","sequence":983,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.service.DefaultServiceRegistry","level":"INFO","message":"Debezium ServiceRegistry stopped.","threadName":"pool-13-thread-1","threadId":51,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:26.108512541Z","sequence":984,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.jdbc.JdbcConnection","level":"INFO","message":"Connection gracefully closed","threadName":"pool-21-thread-1","threadId":63,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:26.108775058Z","sequence":985,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.embedded.async.AsyncEmbeddedEngine","level":"INFO","message":"Stopped task #1 out of 1 tasks (it took 418 ms to stop the task).","threadName":"pool-7-thread-1","threadId":42,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:26.109501105Z","sequence":986,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"org.apache.kafka.connect.storage.FileOffsetBackingStore","level":"INFO","message":"Stopped FileOffsetBackingStore","threadName":"pool-7-thread-1","threadId":42,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:26.110102644Z","sequence":987,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.embedded.async.AsyncEmbeddedEngine","level":"INFO","message":"Engine is stopped.","threadName":"pool-7-thread-1","threadId":42,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:26.110241953Z","sequence":988,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.embedded.async.AsyncEmbeddedEngine","level":"INFO","message":"Engine state has changed from 'STOPPING' to 'STOPPED'","threadName":"pool-7-thread-1","threadId":42,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:26.110487769Z","sequence":989,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.ConnectorLifecycle","level":"ERROR","message":"Connector completed: success = 'false', message = 'io.debezium.DebeziumException: java.lang.UnsupportedOperationException: Compacted event hub does not allow null message key. TrackingId:3b906d850000100e0000002c68626a11_G0_B2, SystemTracker:st-gum-stage-westus2-01:eventhub:um-postgres-cdc-events~21844, Timestamp:2025-06-30T10:42:25', error = 'io.debezium.DebeziumException: java.lang.UnsupportedOperationException: Compacted event hub does not allow null message key. TrackingId:3b906d850000100e0000002c68626a11_G0_B2, SystemTracker:st-gum-stage-westus2-01:eventhub:um-postgres-cdc-events~21844, Timestamp:2025-06-30T10:42:25'","threadName":"pool-7-thread-1","threadId":42,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1,"exception":{"refId":1,"exceptionType":"io.debezium.DebeziumException","message":"java.lang.UnsupportedOperationException: Compacted event hub does not allow null message key. TrackingId:3b906d850000100e0000002c68626a11_G0_B2, SystemTracker:st-gum-stage-westus2-01:eventhub:um-postgres-cdc-events~21844, Timestamp:2025-06-30T10:42:25","frames":[{"class<span class="code-quote">":"io.debezium.server.eventhubs.BatchManager","method":"emitBatchToEventHub","line":133},{"class<span class="code-quote">":"io.debezium.server.eventhubs.BatchManager","method":"lambda$closeAndEmitBatches$2","line":93},{"class<span class="code-quote">":"java.util.HashMap","method":"forEach","line":1429},{"class<span class="code-quote">":"io.debezium.server.eventhubs.BatchManager","method":"closeAndEmitBatches","line":90},{"class<span class="code-quote">":"io.debezium.server.eventhubs.EventHubsChangeConsumer","method":"handleBatch","line":198},{"class<span class="code-quote">":"io.debezium.embedded.async.ParallelSmtAndConvertBatchProcessor","method":"processRecords","line":56},{"class<span class="code-quote">":"io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords","method":"doCall","line":1203},{"class<span class="code-quote">":"io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords","method":"doCall","line":1183},{"class<span class="code-quote">":"io.debezium.embedded.async.RetryingCallable","method":"call","line":47},{"class<span class="code-quote">":"java.util.concurrent.FutureTask","method":"run","line":317},{"class<span class="code-quote">":"java.util.concurrent.Executors$RunnableAdapter","method":"call","line":572},{"class<span class="code-quote">":"java.util.concurrent.FutureTask","method":"run","line":317},{"class<span class="code-quote">":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1144},{"class<span class="code-quote">":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":642},{"class<span class="code-quote">":"java.lang.Thread","method":"run","line":1583}],"causedBy":{"exception":{"refId":2,"exceptionType":"java.lang.UnsupportedOperationException","message":"Compacted event hub does not allow null message key. TrackingId:3b906d850000100e0000002c68626a11_G0_B2, SystemTracker:st-gum-stage-westus2-01:eventhub:um-postgres-cdc-events~21844, Timestamp:2025-06-30T10:42:25","frames":[{"class<span class="code-quote">":"com.azure.core.amqp.implementation.ExceptionUtil","method":"toException","line":78},{"class<span class="code-quote">":"com.azure.core.amqp.implementation.ReactorSender","method":"processDeliveredMessage","line":650},{"class<span class="code-quote">":"reactor.core.publisher.LambdaSubscriber","method":"onNext","line":160},{"class<span class="code-quote">":"reactor.core.publisher.EmitterProcessor","method":"drain","line":537},{"class<span class="code-quote">":"reactor.core.publisher.EmitterProcessor","method":"tryEmitNext","line":343},{"class<span class="code-quote">":"reactor.core.publisher.SinkManySerialized","method":"tryEmitNext","line":100},{"class<span class="code-quote">":"reactor.core.publisher.InternalManySink","method":"emitNext","line":27},{"class<span class="code-quote">":"com.azure.core.amqp.implementation.handler.SendLinkHandler","method":"onDelivery","line":205},{"class<span class="code-quote">":"org.apache.qpid.proton.engine.BaseHandler","method":"handle","line":185},{"class<span class="code-quote">":"org.apache.qpid.proton.engine.impl.EventImpl","method":"dispatch","line":108},{"class<span class="code-quote">":"org.apache.qpid.proton.reactor.impl.ReactorImpl","method":"dispatch","line":324},{"class<span class="code-quote">":"org.apache.qpid.proton.reactor.impl.ReactorImpl","method":"process","line":291},{"class<span class="code-quote">":"com.azure.core.amqp.implementation.ReactorExecutor","method":"run","line":91},{"class<span class="code-quote">":"reactor.core.scheduler.SchedulerTask","method":"call","line":68},{"class<span class="code-quote">":"reactor.core.scheduler.SchedulerTask","method":"call","line":28},{"class<span class="code-quote">":"java.util.concurrent.FutureTask","method":"run","line":317},{"class<span class="code-quote">":"java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask","method":"run","line":304},{"class<span class="code-quote">":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1144},{"class<span class="code-quote">":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":642},{"class<span class="code-quote">":"java.lang.Thread","method":"run","line":1583}],"suppressed":[{"refId":3,"exceptionType":"java.lang.Exception","message":"#block terminated with an error","frames":[{"class<span class="code-quote">":"reactor.core.publisher.BlockingSingleSubscriber","method":"blockingGet","line":100},{"class<span class="code-quote">":"reactor.core.publisher.Mono","method":"block","line":1742},{"class<span class="code-quote">":"com.azure.messaging.eventhubs.EventHubProducerClient","method":"send","line":385},{"class<span class="code-quote">":"io.debezium.server.eventhubs.EventDataBatchProxy","method":"emit","line":48},{"class<span class="code-quote">":"io.debezium.server.eventhubs.BatchManager","method":"emitBatchToEventHub","line":129},{"class<span class="code-quote">":"io.debezium.server.eventhubs.BatchManager","method":"lambda$closeAndEmitBatches$2","line":93},{"class<span class="code-quote">":"java.util.HashMap","method":"forEach","line":1429},{"class<span class="code-quote">":"io.debezium.server.eventhubs.BatchManager","method":"closeAndEmitBatches","line":90},{"class<span class="code-quote">":"io.debezium.server.eventhubs.EventHubsChangeConsumer","method":"handleBatch","line":198},{"class<span class="code-quote">":"io.debezium.embedded.async.ParallelSmtAndConvertBatchProcessor","method":"processRecords","line":56},{"class<span class="code-quote">":"io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords","method":"doCall","line":1203},{"class<span class="code-quote">":"io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords","method":"doCall","line":1183},{"class<span class="code-quote">":"io.debezium.embedded.async.RetryingCallable","method":"call","line":47},{"class<span class="code-quote">":"java.util.concurrent.FutureTask","method":"run","line":317},{"class<span class="code-quote">":"java.util.concurrent.Executors$RunnableAdapter","method":"call","line":572},{"class<span class="code-quote">":"java.util.concurrent.FutureTask","method":"run","line":317},{"class<span class="code-quote">":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1144},{"class<span class="code-quote">":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":642},{"class<span class="code-quote">":"java.lang.Thread","method":"run","line":1583}]}]}}}} {"timestamp":"2025-06-30T10:42:26.116400254Z","sequence":990,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.DebeziumServer","level":"INFO","message":"Received request to stop the engine","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:26.116664371Z","sequence":991,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.DebeziumServer","level":"INFO","message":"Cannot shut down engine now: ","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:26.117079098Z","sequence":992,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor","level":"INFO","message":"{\"az.sdk.message\":\"Upstream connection publisher was completed. Terminating processor.\",\"entityPath\":\"um-postgres-cdc-events\"}","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:26.11742092Z","sequence":993,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"com.azure.core.amqp.implementation.ReactorConnection","level":"INFO","message":"{\"az.sdk.message\":\"Disposing of ReactorConnection.\",\"connectionId\":\"MF_68f90a_1751280142770\",\"isTransient\":false,\"isInitiatedByClient\":true,\"shutdownMessage\":\"Disposed by client.\"}","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:26.117694938Z","sequence":994,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor","level":"INFO","message":"{\"az.sdk.message\":\"Channel is disposed.\",\"entityPath\":\"um-postgres-cdc-events\"}","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:26.119557059Z","sequence":995,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"com.azure.core.amqp.implementation.ReactorSession","level":"INFO","message":"{\"az.sdk.message\":\"session disposal is completed\",\"connectionId\":\"MF_68f90a_1751280142770\",\"sessionName\":\"mgmt-session\",\"sessionId\":\"H_2a02fb_1751280143399\"}","threadName":"reactor-executor-1","threadId":36,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:26.120741436Z","sequence":996,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"com.azure.core.amqp.implementation.handler.SessionHandler","level":"INFO","message":"{\"az.sdk.message\":\"onSessionRemoteClose\",\"connectionId\":\"MF_68f90a_1751280142770\",\"sessionName\":\"mgmt-session\",\"sessionId\":\"H_2a02fb_1751280143399\"}","threadName":"reactor-executor-1","threadId":36,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:26.121176764Z","sequence":997,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"com.azure.core.amqp.implementation.ReactorSession","level":"INFO","message":"{\"az.sdk.message\":\"session disposal is completed\",\"connectionId\":\"MF_68f90a_1751280142770\",\"sessionName\":\"um-postgres-cdc-events\",\"sessionId\":\"H_cacf01_1751280144889\"}","threadName":"reactor-executor-1","threadId":36,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:26.121512086Z","sequence":998,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"com.azure.core.amqp.implementation.ReactorSession","level":"INFO","message":"{\"az.sdk.message\":\"session disposal is completed\",\"connectionId\":\"MF_68f90a_1751280142770\",\"sessionName\":\"cbs-session\",\"sessionId\":\"H_a30051_1751280144908\"}","threadName":"reactor-executor-1","threadId":36,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:26.123295602Z","sequence":999,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"com.azure.core.amqp.implementation.handler.SendLinkHandler","level":"INFO","message":"{\"az.sdk.message\":\"onLinkRemoteClose\",\"connectionId\":\"MF_68f90a_1751280142770\",\"linkName\":\"um-postgres-cdc-events\",\"entityPath\":\"um-postgres-cdc-events\"}","threadName":"reactor-executor-1","threadId":36,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:26.123590221Z","sequence":1000,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.eventhubs.EventHubsChangeConsumer","level":"INFO","message":"Closed Event Hubs producer client","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:26.124599787Z","sequence":1001,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"com.azure.core.amqp.implementation.handler.SessionHandler","level":"INFO","message":"{\"az.sdk.message\":\"onSessionRemoteClose\",\"connectionId\":\"MF_68f90a_1751280142770\",\"sessionName\":\"um-postgres-cdc-events\",\"sessionId\":\"H_cacf01_1751280144889\"}","threadName":"reactor-executor-1","threadId":36,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:26.124719695Z","sequence":1002,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"com.azure.core.amqp.implementation.handler.SessionHandler","level":"INFO","message":"{\"az.sdk.message\":\"onSessionRemoteClose\",\"connectionId\":\"MF_68f90a_1751280142770\",\"sessionName\":\"cbs-session\",\"sessionId\":\"H_a30051_1751280144908\"}","threadName":"reactor-executor-1","threadId":36,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} {"timestamp":"2025-06-30T10:42:26.135152273Z","sequence":1003,"loggerClassName":"org.jboss.logging.Logger","loggerName":"io.quarkus","level":"INFO","message":"debezium-server-dist stopped in 0.024s","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
How to reproduce the issue using our tutorial deployment?
Use an Azure Event Hub sink with Debezium Server 3.1 Final (followed docs at https://debezium.io/documentation/reference/stable/operations/debezium-server.html#_azure_event_hubs)
Make sure the Event Hub topic's cleanup policy is set to "Compaction"
Feature request or enhancement
For feature requests or enhancements, provide this information, please:
Which use case/requirement will be addressed by the proposed feature?
Allows the use of compacted topics when using Azure Event Hub sinks.
This is vital to keep topic size manageable and allow easier replay of topic messages.
Implementation ideas (optional)
<Your answer>
in the particular case of table truncation messages, the default behavior of not having a message key needs to be addressed as well (https://debezium.io/documentation/reference/3.1/connectors/postgresql.html#postgresql-truncate-events)