-
Bug
-
Resolution: Unresolved
-
Major
-
None
-
None
I identified a bug in Vitess connector that causes the connector to crash with "Invalid value: null used for required field: "id", schema type: STRING" error message.
[2022-02-03 03:03:56,883] INFO Stopping down connector (io.debezium.connector.common.BaseSourceTask) ... 5 more at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:700) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInternal(ClientCallImpl.java:715) at io.grpc.internal.DelayedClientCall$DelayedListener.onMessage(DelayedClientCall.java:447) at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:465) at io.debezium.connector.vitess.connection.VitessReplicationConnection$1.onNext(VitessReplicationConnection.java:77) at io.debezium.connector.vitess.connection.VitessReplicationConnection$1.onNext(VitessReplicationConnection.java:99) at io.debezium.connector.vitess.connection.VStreamOutputMessageDecoder.processMessage(VStreamOutputMessageDecoder.java:58) at io.debezium.connector.vitess.connection.VStreamOutputMessageDecoder.handleBeginMessage(VStreamOutputMessageDecoder.java:114) at io.debezium.connector.vitess.VitessStreamingChangeEventSource.lambda$newReplicationMessageProcessor$0(VitessStreamingChangeEventSource.java:99) at io.debezium.pipeline.EventDispatcher.dispatchTransactionStartedEvent(EventDispatcher.java:282) at io.debezium.pipeline.txmetadata.TransactionMonitor.transactionStartedEvent(TransactionMonitor.java:148) at io.debezium.pipeline.txmetadata.TransactionMonitor.beginTransaction(TransactionMonitor.java:166) at org.apache.kafka.connect.data.Struct.put(Struct.java:203) at org.apache.kafka.connect.data.Struct.put(Struct.java:216) at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220) Caused by: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "id", schema type: STRING ... 3 more at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:781) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:802) at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:69) at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:616) at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:460) at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:427) at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:463) at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:478) at io.grpc.Status.asRuntimeException(Status.java:533) Caused by: io.grpc.StatusRuntimeException: CANCELLED: Failed to read message. at java.base/java.lang.Thread.run(Thread.java:829) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:108) at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:139) at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:172) at io.debezium.connector.vitess.VitessStreamingChangeEventSource.execute(VitessStreamingChangeEventSource.java:29) at io.debezium.connector.vitess.VitessStreamingChangeEventSource.execute(VitessStreamingChangeEventSource.java:78)
The cause is that the VEvents batch does not contain VGTID events, but the connector still dispatches transaction events for BEGIN and COMMIT events
$ grpcurl -d '{ "vgtid": { "shard_gtids": [ { "keyspace": "test_keyspace", "shard": "80-", "gtid": "MySQL56/f84895a8-4eed-42ba-b500-469fd71cc08b:1-2499683:3415911-3415942:4415911-4415931" } ] } }' -plaintext xxxxxx:1234 vtgateservice.Vitess.VStream { "events": [ { "type": "BEGIN", "timestamp": "1637704151", "currentTime": "1643857685432455570" }, { "type": "COMMIT", "timestamp": "1637704151", "currentTime": "1643857685432479353" } ] } { "events": [ { "type": "BEGIN", "timestamp": "1643838860", "currentTime": "1643857685432505735" }, { "type": "COMMIT", "timestamp": "1643838860", "currentTime": "1643857685432535479" } ] }