Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-9195

Support dynamic message key in Event Hub Sink

XMLWordPrintable

      In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      Debezium Server 3.1Final

      What is the connector configuration?

      apiVersion: debezium.io/v1alpha1
      kind: DebeziumServer
      metadata:
        name: debezium
      spec:
        image: quay.io/debezium/server:3.1
        runtime:
          api:
            enabled: true
        sink:
          type: eventhubs
        source:
          class: io.debezium.connector.postgresql.PostgresConnector
          offset:
            file:
              fileName: offset.dat
          schemaHistory:
            file:
              fileName: schema-history.dat
          config:
            database.port: 5432
            database.dbname: yourdbname
            plugin.name: pgoutput
            transforms: SingleTopicReroute, PartitionRouting
            transforms.SingleTopicReroute.type: io.debezium.transforms.ByLogicalTableRouter
            transforms.SingleTopicReroute.topic.regex: .*
            transforms.SingleTopicReroute.topic.replacement: debezium_postgres
            transforms.PartitionRouting.type: io.debezium.transforms.partitions.PartitionRouting
            transforms.PartitionRouting.partition.payload.fields: change.partition_id
            transforms.PartitionRouting.partition.topic.num: 6
            transforms.PartitionRouting.partition.hash.function: murmur2

      What is the captured database version and mode of deployment?

      Azure Postgres Flex Server 17, deployed with Azure ARM Templates through Terraform

      What behavior do you expect?

      When Debezium sends a message to Azure Event Hub, it should include the message key

      What behavior do you see?

      Debezium errors out and crashes

       

      If I configure `debezium.sink.eventhubs.partitionkey=id` then all of my messages end up in a single partition, with empty message IDs for all messages

      Do you see the same behaviour using the latest released Debezium version?

      Yes

      Do you have the connector logs, ideally from start till finish?

      Yes

      {"timestamp":"2025-06-30T10:42:25.043631116Z","sequence":962,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.postgresql.connection.WalPositionLocator","level":"INFO","message":"Looking for WAL restart position for last commit LSN 'null' and last change LSN 'LSN{D/290034B8}'","threadName":"debezium-postgresconnector-authz-change-event-source-coordinator","threadId":55,"mdc":{"dbz.taskId":"0","dbz.databaseName":"gum","dbz.connectorName":"authz","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:25.043760424Z","sequence":963,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.postgresql.connection.PostgresReplicationConnection","level":"INFO","message":"Initializing PgOutput logical decoder publication","threadName":"debezium-postgresconnector-authz-change-event-source-coordinator","threadId":55,"mdc":{"dbz.taskId":"0","dbz.databaseName":"gum","dbz.connectorName":"authz","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:25.071626635Z","sequence":968,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.postgresql.connection.PostgresConnection","level":"INFO","message":"Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{D/290034B8}, catalogXmin=244384]","threadName":"debezium-postgresconnector-authz-change-event-source-coordinator","threadId":55,"mdc":{"dbz.taskId":"0","dbz.databaseName":"gum","dbz.connectorName":"authz","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:25.073006425Z","sequence":969,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.jdbc.JdbcConnection","level":"INFO","message":"Connection gracefully closed","threadName":"pool-18-thread-1","threadId":59,"mdc":{"dbz.taskId":"0","dbz.databaseName":"gum","dbz.connectorName":"authz","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:25.089655507Z","sequence":970,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.util.Threads","level":"INFO","message":"Requested thread factory for component PostgresConnector, id = authz named = keep-alive","threadName":"debezium-postgresconnector-authz-change-event-source-coordinator","threadId":55,"mdc":{"dbz.taskId":"0","dbz.databaseName":"gum","dbz.connectorName":"authz","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:25.09016174Z","sequence":971,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.util.Threads","level":"INFO","message":"Creating thread debezium-postgresconnector-authz-keep-alive","threadName":"debezium-postgresconnector-authz-change-event-source-coordinator","threadId":55,"mdc":{"dbz.taskId":"0","dbz.databaseName":"gum","dbz.connectorName":"authz","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:25.090990694Z","sequence":972,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.postgresql.PostgresStreamingChangeEventSource","level":"INFO","message":"Processing messages","threadName":"debezium-postgresconnector-authz-change-event-source-coordinator","threadId":55,"mdc":{"dbz.taskId":"0","dbz.databaseName":"gum","dbz.connectorName":"authz","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:25.686066179Z","sequence":973,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"com.azure.core.amqp.implementation.ReactorSender","level":"WARN","message":"{\"az.sdk.message\":\"Delivery rejected.\",\"connectionId\":\"MF_68f90a_1751280142770\",\"entityPath\":\"um-postgres-cdc-events\",\"linkName\":\"um-postgres-cdc-events\",\"deliveryTag\":\"63de0182dace4315af465de2f53adc9f\",\"rejected\":\"Rejected{error=Error{condition=amqp:not-allowed, description='Compacted event hub does not allow null message key. TrackingId:3b906d850000100e0000002c68626a11_G0_B2, SystemTracker:st-gum-stage-westus2-01:eventhub:um-postgres-cdc-events~21844, Timestamp:2025-06-30T10:42:25', info=null}}\"}","threadName":"reactor-executor-1","threadId":36,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:25.688446834Z","sequence":974,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"com.azure.core.amqp.implementation.RetryUtil","level":"ERROR","message":"partitionId[null]: Sending messages timed out.\nCompacted event hub does not allow null message key. TrackingId:3b906d850000100e0000002c68626a11_G0_B2, SystemTracker:st-gum-stage-westus2-01:eventhub:um-postgres-cdc-events~21844, Timestamp:2025-06-30T10:42:25","threadName":"reactor-executor-1","threadId":36,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:25.688902863Z","sequence":975,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.embedded.async.AsyncEmbeddedEngine","level":"ERROR","message":"Engine has failed with ","threadName":"pool-7-thread-1","threadId":42,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1,"exception":{"refId":1,"exceptionType":"java.util.concurrent.ExecutionException","message":"io.debezium.DebeziumException: java.lang.UnsupportedOperationException: Compacted event hub does not allow null message key. TrackingId:3b906d850000100e0000002c68626a11_G0_B2, SystemTracker:st-gum-stage-westus2-01:eventhub:um-postgres-cdc-events~21844, Timestamp:2025-06-30T10:42:25","frames":[{"class<span class="code-quote">":"java.util.concurrent.FutureTask","method":"report","line":122},{"class<span class="code-quote">":"java.util.concurrent.FutureTask","method":"get","line":191},{"class<span class="code-quote">":"io.debezium.embedded.async.AsyncEmbeddedEngine","method":"runTasksPolling","line":496},{"class<span class="code-quote">":"io.debezium.embedded.async.AsyncEmbeddedEngine","method":"run","line":220},{"class<span class="code-quote">":"io.debezium.server.DebeziumServer","method":"lambda$start$1","line":182},{"class<span class="code-quote">":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1144},{"class<span class="code-quote">":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":642},{"class<span class="code-quote">":"java.lang.Thread","method":"run","line":1583}],"causedBy":{"exception":{"refId":2,"exceptionType":"io.debezium.DebeziumException","message":"java.lang.UnsupportedOperationException: Compacted event hub does not allow null message key. TrackingId:3b906d850000100e0000002c68626a11_G0_B2, SystemTracker:st-gum-stage-westus2-01:eventhub:um-postgres-cdc-events~21844, Timestamp:2025-06-30T10:42:25","frames":[{"class<span class="code-quote">":"io.debezium.server.eventhubs.BatchManager","method":"emitBatchToEventHub","line":133},{"class<span class="code-quote">":"io.debezium.server.eventhubs.BatchManager","method":"lambda$closeAndEmitBatches$2","line":93},{"class<span class="code-quote">":"java.util.HashMap","method":"forEach","line":1429},{"class<span class="code-quote">":"io.debezium.server.eventhubs.BatchManager","method":"closeAndEmitBatches","line":90},{"class<span class="code-quote">":"io.debezium.server.eventhubs.EventHubsChangeConsumer","method":"handleBatch","line":198},{"class<span class="code-quote">":"io.debezium.embedded.async.ParallelSmtAndConvertBatchProcessor","method":"processRecords","line":56},{"class<span class="code-quote">":"io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords","method":"doCall","line":1203},{"class<span class="code-quote">":"io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords","method":"doCall","line":1183},{"class<span class="code-quote">":"io.debezium.embedded.async.RetryingCallable","method":"call","line":47},{"class<span class="code-quote">":"java.util.concurrent.FutureTask","method":"run","line":317},{"class<span class="code-quote">":"java.util.concurrent.Executors$RunnableAdapter","method":"call","line":572},{"class<span class="code-quote">":"java.util.concurrent.FutureTask","method":"run","line":317},{"class<span class="code-quote">":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1144},{"class<span class="code-quote">":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":642},{"class<span class="code-quote">":"java.lang.Thread","method":"run","line":1583}],"causedBy":{"exception":{"refId":3,"exceptionType":"java.lang.UnsupportedOperationException","message":"Compacted event hub does not allow null message key. TrackingId:3b906d850000100e0000002c68626a11_G0_B2, SystemTracker:st-gum-stage-westus2-01:eventhub:um-postgres-cdc-events~21844, Timestamp:2025-06-30T10:42:25","frames":[{"class<span class="code-quote">":"com.azure.core.amqp.implementation.ExceptionUtil","method":"toException","line":78},{"class<span class="code-quote">":"com.azure.core.amqp.implementation.ReactorSender","method":"processDeliveredMessage","line":650},{"class<span class="code-quote">":"reactor.core.publisher.LambdaSubscriber","method":"onNext","line":160},{"class<span class="code-quote">":"reactor.core.publisher.EmitterProcessor","method":"drain","line":537},{"class<span class="code-quote">":"reactor.core.publisher.EmitterProcessor","method":"tryEmitNext","line":343},{"class<span class="code-quote">":"reactor.core.publisher.SinkManySerialized","method":"tryEmitNext","line":100},{"class<span class="code-quote">":"reactor.core.publisher.InternalManySink","method":"emitNext","line":27},{"class<span class="code-quote">":"com.azure.core.amqp.implementation.handler.SendLinkHandler","method":"onDelivery","line":205},{"class<span class="code-quote">":"org.apache.qpid.proton.engine.BaseHandler","method":"handle","line":185},{"class<span class="code-quote">":"org.apache.qpid.proton.engine.impl.EventImpl","method":"dispatch","line":108},{"class<span class="code-quote">":"org.apache.qpid.proton.reactor.impl.ReactorImpl","method":"dispatch","line":324},{"class<span class="code-quote">":"org.apache.qpid.proton.reactor.impl.ReactorImpl","method":"process","line":291},{"class<span class="code-quote">":"com.azure.core.amqp.implementation.ReactorExecutor","method":"run","line":91},{"class<span class="code-quote">":"reactor.core.scheduler.SchedulerTask","method":"call","line":68},{"class<span class="code-quote">":"reactor.core.scheduler.SchedulerTask","method":"call","line":28},{"class<span class="code-quote">":"java.util.concurrent.FutureTask","method":"run","line":317},{"class<span class="code-quote">":"java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask","method":"run","line":304},{"class<span class="code-quote">":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1144},{"class<span class="code-quote">":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":642},{"class<span class="code-quote">":"java.lang.Thread","method":"run","line":1583}],"suppressed":[{"refId":4,"exceptionType":"java.lang.Exception","message":"#block terminated with an error","frames":[{"class<span class="code-quote">":"reactor.core.publisher.BlockingSingleSubscriber","method":"blockingGet","line":100},{"class<span class="code-quote">":"reactor.core.publisher.Mono","method":"block","line":1742},{"class<span class="code-quote">":"com.azure.messaging.eventhubs.EventHubProducerClient","method":"send","line":385},{"class<span class="code-quote">":"io.debezium.server.eventhubs.EventDataBatchProxy","method":"emit","line":48},{"class<span class="code-quote">":"io.debezium.server.eventhubs.BatchManager","method":"emitBatchToEventHub","line":129},{"class<span class="code-quote">":"io.debezium.server.eventhubs.BatchManager","method":"lambda$closeAndEmitBatches$2","line":93},{"class<span class="code-quote">":"java.util.HashMap","method":"forEach","line":1429},{"class<span class="code-quote">":"io.debezium.server.eventhubs.BatchManager","method":"closeAndEmitBatches","line":90},{"class<span class="code-quote">":"io.debezium.server.eventhubs.EventHubsChangeConsumer","method":"handleBatch","line":198},{"class<span class="code-quote">":"io.debezium.embedded.async.ParallelSmtAndConvertBatchProcessor","method":"processRecords","line":56},{"class<span class="code-quote">":"io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords","method":"doCall","line":1203},{"class<span class="code-quote">":"io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords","method":"doCall","line":1183},{"class<span class="code-quote">":"io.debezium.embedded.async.RetryingCallable","method":"call","line":47},{"class<span class="code-quote">":"java.util.concurrent.FutureTask","method":"run","line":317},{"class<span class="code-quote">":"java.util.concurrent.Executors$RunnableAdapter","method":"call","line":572},{"class<span class="code-quote">":"java.util.concurrent.FutureTask","method":"run","line":317},{"class<span class="code-quote">":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1144},{"class<span class="code-quote">":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":642},{"class<span class="code-quote">":"java.lang.Thread","method":"run","line":1583}]}]}}}}}}
      {"timestamp":"2025-06-30T10:42:25.689979133Z","sequence":976,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.embedded.async.AsyncEmbeddedEngine","level":"INFO","message":"Engine state has changed from 'POLLING_TASKS' to 'STOPPING'","threadName":"pool-7-thread-1","threadId":42,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:25.690632476Z","sequence":977,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.common.BaseSourceTask","level":"INFO","message":"Stopping down connector","threadName":"pool-13-thread-1","threadId":51,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:26.104892406Z","sequence":978,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.jdbc.JdbcConnection","level":"INFO","message":"Connection gracefully closed","threadName":"pool-19-thread-1","threadId":61,"mdc":{"dbz.taskId":"0","dbz.databaseName":"gum","dbz.connectorName":"authz","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:26.106112985Z","sequence":979,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.jdbc.JdbcConnection","level":"INFO","message":"Connection gracefully closed","threadName":"pool-20-thread-1","threadId":62,"mdc":{"dbz.taskId":"0","dbz.databaseName":"gum","dbz.connectorName":"authz","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:26.106258395Z","sequence":980,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.ChangeEventSourceCoordinator","level":"INFO","message":"Finished streaming","threadName":"debezium-postgresconnector-authz-change-event-source-coordinator","threadId":55,"mdc":{"dbz.taskId":"0","dbz.databaseName":"gum","dbz.connectorName":"authz","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:26.106447607Z","sequence":981,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.ChangeEventSourceCoordinator","level":"INFO","message":"Connected metrics set to 'false'","threadName":"debezium-postgresconnector-authz-change-event-source-coordinator","threadId":55,"mdc":{"dbz.taskId":"0","dbz.databaseName":"gum","dbz.connectorName":"authz","dbz.connectorType":"Postgres","dbz.connectorContext":"streaming"},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:26.107179354Z","sequence":982,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.signal.SignalProcessor","level":"INFO","message":"SignalProcessor stopped","threadName":"pool-13-thread-1","threadId":51,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:26.107487874Z","sequence":983,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.service.DefaultServiceRegistry","level":"INFO","message":"Debezium ServiceRegistry stopped.","threadName":"pool-13-thread-1","threadId":51,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:26.108512541Z","sequence":984,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.jdbc.JdbcConnection","level":"INFO","message":"Connection gracefully closed","threadName":"pool-21-thread-1","threadId":63,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:26.108775058Z","sequence":985,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.embedded.async.AsyncEmbeddedEngine","level":"INFO","message":"Stopped task #1 out of 1 tasks (it took 418 ms to stop the task).","threadName":"pool-7-thread-1","threadId":42,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:26.109501105Z","sequence":986,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"org.apache.kafka.connect.storage.FileOffsetBackingStore","level":"INFO","message":"Stopped FileOffsetBackingStore","threadName":"pool-7-thread-1","threadId":42,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:26.110102644Z","sequence":987,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.embedded.async.AsyncEmbeddedEngine","level":"INFO","message":"Engine is stopped.","threadName":"pool-7-thread-1","threadId":42,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:26.110241953Z","sequence":988,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.embedded.async.AsyncEmbeddedEngine","level":"INFO","message":"Engine state has changed from 'STOPPING' to 'STOPPED'","threadName":"pool-7-thread-1","threadId":42,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:26.110487769Z","sequence":989,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.ConnectorLifecycle","level":"ERROR","message":"Connector completed: success = 'false', message = 'io.debezium.DebeziumException: java.lang.UnsupportedOperationException: Compacted event hub does not allow null message key. TrackingId:3b906d850000100e0000002c68626a11_G0_B2, SystemTracker:st-gum-stage-westus2-01:eventhub:um-postgres-cdc-events~21844, Timestamp:2025-06-30T10:42:25', error = 'io.debezium.DebeziumException: java.lang.UnsupportedOperationException: Compacted event hub does not allow null message key. TrackingId:3b906d850000100e0000002c68626a11_G0_B2, SystemTracker:st-gum-stage-westus2-01:eventhub:um-postgres-cdc-events~21844, Timestamp:2025-06-30T10:42:25'","threadName":"pool-7-thread-1","threadId":42,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1,"exception":{"refId":1,"exceptionType":"io.debezium.DebeziumException","message":"java.lang.UnsupportedOperationException: Compacted event hub does not allow null message key. TrackingId:3b906d850000100e0000002c68626a11_G0_B2, SystemTracker:st-gum-stage-westus2-01:eventhub:um-postgres-cdc-events~21844, Timestamp:2025-06-30T10:42:25","frames":[{"class<span class="code-quote">":"io.debezium.server.eventhubs.BatchManager","method":"emitBatchToEventHub","line":133},{"class<span class="code-quote">":"io.debezium.server.eventhubs.BatchManager","method":"lambda$closeAndEmitBatches$2","line":93},{"class<span class="code-quote">":"java.util.HashMap","method":"forEach","line":1429},{"class<span class="code-quote">":"io.debezium.server.eventhubs.BatchManager","method":"closeAndEmitBatches","line":90},{"class<span class="code-quote">":"io.debezium.server.eventhubs.EventHubsChangeConsumer","method":"handleBatch","line":198},{"class<span class="code-quote">":"io.debezium.embedded.async.ParallelSmtAndConvertBatchProcessor","method":"processRecords","line":56},{"class<span class="code-quote">":"io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords","method":"doCall","line":1203},{"class<span class="code-quote">":"io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords","method":"doCall","line":1183},{"class<span class="code-quote">":"io.debezium.embedded.async.RetryingCallable","method":"call","line":47},{"class<span class="code-quote">":"java.util.concurrent.FutureTask","method":"run","line":317},{"class<span class="code-quote">":"java.util.concurrent.Executors$RunnableAdapter","method":"call","line":572},{"class<span class="code-quote">":"java.util.concurrent.FutureTask","method":"run","line":317},{"class<span class="code-quote">":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1144},{"class<span class="code-quote">":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":642},{"class<span class="code-quote">":"java.lang.Thread","method":"run","line":1583}],"causedBy":{"exception":{"refId":2,"exceptionType":"java.lang.UnsupportedOperationException","message":"Compacted event hub does not allow null message key. TrackingId:3b906d850000100e0000002c68626a11_G0_B2, SystemTracker:st-gum-stage-westus2-01:eventhub:um-postgres-cdc-events~21844, Timestamp:2025-06-30T10:42:25","frames":[{"class<span class="code-quote">":"com.azure.core.amqp.implementation.ExceptionUtil","method":"toException","line":78},{"class<span class="code-quote">":"com.azure.core.amqp.implementation.ReactorSender","method":"processDeliveredMessage","line":650},{"class<span class="code-quote">":"reactor.core.publisher.LambdaSubscriber","method":"onNext","line":160},{"class<span class="code-quote">":"reactor.core.publisher.EmitterProcessor","method":"drain","line":537},{"class<span class="code-quote">":"reactor.core.publisher.EmitterProcessor","method":"tryEmitNext","line":343},{"class<span class="code-quote">":"reactor.core.publisher.SinkManySerialized","method":"tryEmitNext","line":100},{"class<span class="code-quote">":"reactor.core.publisher.InternalManySink","method":"emitNext","line":27},{"class<span class="code-quote">":"com.azure.core.amqp.implementation.handler.SendLinkHandler","method":"onDelivery","line":205},{"class<span class="code-quote">":"org.apache.qpid.proton.engine.BaseHandler","method":"handle","line":185},{"class<span class="code-quote">":"org.apache.qpid.proton.engine.impl.EventImpl","method":"dispatch","line":108},{"class<span class="code-quote">":"org.apache.qpid.proton.reactor.impl.ReactorImpl","method":"dispatch","line":324},{"class<span class="code-quote">":"org.apache.qpid.proton.reactor.impl.ReactorImpl","method":"process","line":291},{"class<span class="code-quote">":"com.azure.core.amqp.implementation.ReactorExecutor","method":"run","line":91},{"class<span class="code-quote">":"reactor.core.scheduler.SchedulerTask","method":"call","line":68},{"class<span class="code-quote">":"reactor.core.scheduler.SchedulerTask","method":"call","line":28},{"class<span class="code-quote">":"java.util.concurrent.FutureTask","method":"run","line":317},{"class<span class="code-quote">":"java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask","method":"run","line":304},{"class<span class="code-quote">":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1144},{"class<span class="code-quote">":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":642},{"class<span class="code-quote">":"java.lang.Thread","method":"run","line":1583}],"suppressed":[{"refId":3,"exceptionType":"java.lang.Exception","message":"#block terminated with an error","frames":[{"class<span class="code-quote">":"reactor.core.publisher.BlockingSingleSubscriber","method":"blockingGet","line":100},{"class<span class="code-quote">":"reactor.core.publisher.Mono","method":"block","line":1742},{"class<span class="code-quote">":"com.azure.messaging.eventhubs.EventHubProducerClient","method":"send","line":385},{"class<span class="code-quote">":"io.debezium.server.eventhubs.EventDataBatchProxy","method":"emit","line":48},{"class<span class="code-quote">":"io.debezium.server.eventhubs.BatchManager","method":"emitBatchToEventHub","line":129},{"class<span class="code-quote">":"io.debezium.server.eventhubs.BatchManager","method":"lambda$closeAndEmitBatches$2","line":93},{"class<span class="code-quote">":"java.util.HashMap","method":"forEach","line":1429},{"class<span class="code-quote">":"io.debezium.server.eventhubs.BatchManager","method":"closeAndEmitBatches","line":90},{"class<span class="code-quote">":"io.debezium.server.eventhubs.EventHubsChangeConsumer","method":"handleBatch","line":198},{"class<span class="code-quote">":"io.debezium.embedded.async.ParallelSmtAndConvertBatchProcessor","method":"processRecords","line":56},{"class<span class="code-quote">":"io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords","method":"doCall","line":1203},{"class<span class="code-quote">":"io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords","method":"doCall","line":1183},{"class<span class="code-quote">":"io.debezium.embedded.async.RetryingCallable","method":"call","line":47},{"class<span class="code-quote">":"java.util.concurrent.FutureTask","method":"run","line":317},{"class<span class="code-quote">":"java.util.concurrent.Executors$RunnableAdapter","method":"call","line":572},{"class<span class="code-quote">":"java.util.concurrent.FutureTask","method":"run","line":317},{"class<span class="code-quote">":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1144},{"class<span class="code-quote">":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":642},{"class<span class="code-quote">":"java.lang.Thread","method":"run","line":1583}]}]}}}}
      {"timestamp":"2025-06-30T10:42:26.116400254Z","sequence":990,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.DebeziumServer","level":"INFO","message":"Received request to stop the engine","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:26.116664371Z","sequence":991,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.DebeziumServer","level":"INFO","message":"Cannot shut down engine now: ","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:26.117079098Z","sequence":992,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor","level":"INFO","message":"{\"az.sdk.message\":\"Upstream connection publisher was completed. Terminating processor.\",\"entityPath\":\"um-postgres-cdc-events\"}","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:26.11742092Z","sequence":993,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"com.azure.core.amqp.implementation.ReactorConnection","level":"INFO","message":"{\"az.sdk.message\":\"Disposing of ReactorConnection.\",\"connectionId\":\"MF_68f90a_1751280142770\",\"isTransient\":false,\"isInitiatedByClient\":true,\"shutdownMessage\":\"Disposed by client.\"}","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:26.117694938Z","sequence":994,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor","level":"INFO","message":"{\"az.sdk.message\":\"Channel is disposed.\",\"entityPath\":\"um-postgres-cdc-events\"}","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:26.119557059Z","sequence":995,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"com.azure.core.amqp.implementation.ReactorSession","level":"INFO","message":"{\"az.sdk.message\":\"session disposal is completed\",\"connectionId\":\"MF_68f90a_1751280142770\",\"sessionName\":\"mgmt-session\",\"sessionId\":\"H_2a02fb_1751280143399\"}","threadName":"reactor-executor-1","threadId":36,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:26.120741436Z","sequence":996,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"com.azure.core.amqp.implementation.handler.SessionHandler","level":"INFO","message":"{\"az.sdk.message\":\"onSessionRemoteClose\",\"connectionId\":\"MF_68f90a_1751280142770\",\"sessionName\":\"mgmt-session\",\"sessionId\":\"H_2a02fb_1751280143399\"}","threadName":"reactor-executor-1","threadId":36,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:26.121176764Z","sequence":997,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"com.azure.core.amqp.implementation.ReactorSession","level":"INFO","message":"{\"az.sdk.message\":\"session disposal is completed\",\"connectionId\":\"MF_68f90a_1751280142770\",\"sessionName\":\"um-postgres-cdc-events\",\"sessionId\":\"H_cacf01_1751280144889\"}","threadName":"reactor-executor-1","threadId":36,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:26.121512086Z","sequence":998,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"com.azure.core.amqp.implementation.ReactorSession","level":"INFO","message":"{\"az.sdk.message\":\"session disposal is completed\",\"connectionId\":\"MF_68f90a_1751280142770\",\"sessionName\":\"cbs-session\",\"sessionId\":\"H_a30051_1751280144908\"}","threadName":"reactor-executor-1","threadId":36,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:26.123295602Z","sequence":999,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"com.azure.core.amqp.implementation.handler.SendLinkHandler","level":"INFO","message":"{\"az.sdk.message\":\"onLinkRemoteClose\",\"connectionId\":\"MF_68f90a_1751280142770\",\"linkName\":\"um-postgres-cdc-events\",\"entityPath\":\"um-postgres-cdc-events\"}","threadName":"reactor-executor-1","threadId":36,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:26.123590221Z","sequence":1000,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.eventhubs.EventHubsChangeConsumer","level":"INFO","message":"Closed Event Hubs producer client","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:26.124599787Z","sequence":1001,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"com.azure.core.amqp.implementation.handler.SessionHandler","level":"INFO","message":"{\"az.sdk.message\":\"onSessionRemoteClose\",\"connectionId\":\"MF_68f90a_1751280142770\",\"sessionName\":\"um-postgres-cdc-events\",\"sessionId\":\"H_cacf01_1751280144889\"}","threadName":"reactor-executor-1","threadId":36,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:26.124719695Z","sequence":1002,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"com.azure.core.amqp.implementation.handler.SessionHandler","level":"INFO","message":"{\"az.sdk.message\":\"onSessionRemoteClose\",\"connectionId\":\"MF_68f90a_1751280142770\",\"sessionName\":\"cbs-session\",\"sessionId\":\"H_a30051_1751280144908\"}","threadName":"reactor-executor-1","threadId":36,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1}
      {"timestamp":"2025-06-30T10:42:26.135152273Z","sequence":1003,"loggerClassName":"org.jboss.logging.Logger","loggerName":"io.quarkus","level":"INFO","message":"debezium-server-dist stopped in 0.024s","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"authz-debezium-6d997d4f5c-ddghj","processName":"/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-2.el8.x86_64/bin/java","processId":1} 

      How to reproduce the issue using our tutorial deployment?

      Use an Azure Event Hub sink with Debezium Server 3.1 Final (followed docs at https://debezium.io/documentation/reference/stable/operations/debezium-server.html#_azure_event_hubs)

      Make sure the Event Hub topic's cleanup policy is set to "Compaction"

      Feature request or enhancement

      For feature requests or enhancements, provide this information, please:

      Which use case/requirement will be addressed by the proposed feature?

      Allows the use of compacted topics when using Azure Event Hub sinks.

      This is vital to keep topic size manageable and allow easier replay of topic messages.

      Implementation ideas (optional)

      <Your answer>

      in the particular case of table truncation messages, the default behavior of not having a message key needs to be addressed as well (https://debezium.io/documentation/reference/3.1/connectors/postgresql.html#postgresql-truncate-events)

              Unassigned Unassigned
              liamdev1 Liam Wu (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

                Created:
                Updated:
                Resolved: