Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-2311

MongoDB New Record State Extraction SMT fails on certain records

    Details

    • Steps to Reproduce:
      Hide
      1. Create a collection on MongoDB
      2. Configure a debezium connector to read that collection
      3. Configure the New Record State Extraction SMT as a transform
      4. Insert the following "good" and "bad" documents

       

      // Good Record{	"_id": {		"$oid": "5ef49c01f986bb0001ff4e9d"	},	"internalId": "593421143319531520",	"externalId": "EI8E2",	"restaurantId": "5e7f20186ebdf6000104efcb",	"partnerId": "5e7dd8d31c9d440000fbb0c0",	"menuCardId": "5e7f540b6ebdf6000104efd9",	"cityId": "5e5df8137d5a5a00019d05f6",	"foodAggragetorId": "5e80732e61de3d16f2257522",	"price": 65,	"comment": "",	"customerDetails": {		"customerName": "John Doe",		"mobileNumber": "4151111111",		"emailAddress": "jd@example.com"	},	"deliveryDetails": {		"deliveryAddress": "Sea side, The Gardens",		"latitude": "25.152209",		"longitude": "55.253903",		"deliveryNote": "",		"deliveryOrder": true,		"price": {			"unitPrice": 6,			"discountAmount": 0,			"taxAmount": 0,			"totalPrice": 6,			"currencyCode": "USD"		}	},	"menuDetails": [{		"internalId": "5e80a1096ebdf6000104efe9",		"qty": 1,		"price": {			"unitPrice": 59,			"discountAmount": 0,			"taxAmount": 0,			"totalPrice": 59,			"currencyCode": "USD"		},		"extraDetails": [{				"recipeId": "5e80a0c56ebdf6000104efe8",				"qty": 1,				"modifierId": "d7110e51-dae0-49ff-b066-b2c85854401a",				"price": {					"unitPrice": 0,					"discountAmount": 0,					"taxAmount": 0,					"totalPrice": 0,					"currencyCode": "USD"				}			},			{				"recipeId": "5e80a0c56ebdf6000104efe8",				"qty": 1,				"modifierId": "391b71a3-ed47-4fd6-adc1-b6a9d45d7c2a",				"price": {					"unitPrice": 0,					"discountAmount": 0,					"taxAmount": 0,					"totalPrice": 0,					"currencyCode": "USD"				}			},			{				"recipeId": "5e80a0c56ebdf6000104efe8",				"qty": 1,				"modifierId": "c2bb2049-fe19-4a2f-8f99-06b4fdb1debf",				"price": {					"unitPrice": 0,					"discountAmount": 0,					"taxAmount": 0,					"totalPrice": 0,					"currencyCode": "USD"				}			},			{				"recipeId": "5e80a0c56ebdf6000104efe8",				"qty": 1,				"modifierId": "c0902fdd-d9cb-47b2-acf7-68dc59f51c8c",				"price": {					"unitPrice": 0,					"discountAmount": 0,					"taxAmount": 0,					"totalPrice": 0,					"currencyCode": "USD"				}			}		]	}],	"paymentDetails": {		"paymentCurrency": "USD",		"paymentMerchantRef": "",		"paymentStatus": "UNPAID",		"paymentMethod": "CASH",		"customerIP": "192.168.0.0",		"priceDetails": {			"unitPrice": 59,			"discountAmount": 0,			"taxAmount": 2.81,			"totalPrice": 65,			"currencyCode": "USD"		}	},	"source": {		"sourceId": "5e80732e61de3d16f2257522",		"uniqueOrderId": "028a094a-9bf5-45a7-9f04-b7870ebe559e",		"integration": "ONLINE_ORDER"	},	"status": "OrderCreated",	"createdAt": {		"$date": 1593089025807	},	"modifiedAt": {		"$date": 1593089025807	},	"delete": false,	"active": true,	"_class": "com.example.models.OrderDetails"}
      // Bad Record{	"_id": {		"$oid": "5ef270faf986bb0001ff4e63"	},	"internalId": "592825186773127168",	"externalId": "B1SS8",	"restaurantId": "5e537245fb96c700018c4615",	"partnerId": "5e536a7f218e20267f0ead29",	"menuCardId": "5e6760cf6ebdf6000104eef9",	"kitchenId": "5e537ad0fb96c700018c461a",	"foodAggragetorId": "5ed0b3fde3b8f0fb36ff7c80",	"price": 10,	"comment": "",	"customerDetails": {		"customerName": " ",		"mobileNumber": "415"	},	"deliveryDetails": {		"deliveryAddress": ", ",		"latitude": "0.0",		"longitude": "0.0",		"deliveryNote": "",		"deliveryOrder": false,		"price": {			"unitPrice": 0,			"discountAmount": 0,			"taxAmount": 0,			"totalPrice": 0,			"currencyCode": "USD"		}	},	"menuDetails": [{			"internalId": "5e54e914062bb000018cbc95",			"qty": 1,			"price": {				"unitPrice": 12,				"discountAmount": 0,				"taxAmount": 0,				"totalPrice": 12,				"currencyCode": "USD"			},			"extraDetails": []		},		{			"internalId": "5e54e914062bb000018cbc95",			"qty": 1,			"price": {				"unitPrice": 12,				"discountAmount": 0,				"taxAmount": 0,				"totalPrice": 12,				"currencyCode": "USD"			},			"extraDetails": [{					"recipeId": "5e54bc4dced25f0001a10905",					"qty": 1,					"modifierId": "371c7c8a-ee59-4c37-b18b-638e6a0aa734",					"price": {						"unitPrice": 6,						"discountAmount": 0,						"taxAmount": 0,						"totalPrice": 6,						"currencyCode": "USD"					}				},				{					"recipeId": "5e54bc4dced25f0001a10905",					"qty": 1,					"modifierId": "0a71489f-ae4c-46fe-adca-fd5cd8caa67e",					"price": {						"unitPrice": 5,						"discountAmount": 0,						"taxAmount": 0,						"totalPrice": 5,						"currencyCode": "USD"					}				}			]		}	],	"paymentDetails": {		"paymentCurrency": "USD",		"paymentStatus": "PREPAID",		"paymentMethod": "CARD",		"customerIP": "192.168.0.0",		"priceDetails": {			"unitPrice": 10,			"discountAmount": 0,			"taxAmount": 0.0,			"totalPrice": 10,			"currencyCode": "USD"		}	},	"source": {		"sourceId": "5ed0b3fde3b8f0fb36ff7c80",		"uniqueOrderId": "3b662889-8ce6-451a-a7c6-cf4bcb254a62",		"integration": "SystemA"	},	"status": "OrderCreated",	"createdAt": {		"$date": 1592946938694	},	"modifiedAt": {		"$date": 1592946938694	},	"delete": false,	"active": true,	"_class": "com.example.models.OrderDetails"}
      
      Show
      Create a collection on MongoDB Configure a debezium connector to read that collection Configure the New Record State Extraction SMT as a transform Insert the following "good" and "bad" documents   // Good Record{ "_id" : { "$oid" : "5ef49c01f986bb0001ff4e9d" }, "internalId" : "593421143319531520" , "externalId" : "EI8E2" , "restaurantId" : "5e7f20186ebdf6000104efcb" , "partnerId" : "5e7dd8d31c9d440000fbb0c0" , "menuCardId" : "5e7f540b6ebdf6000104efd9" , "cityId" : "5e5df8137d5a5a00019d05f6" , "foodAggragetorId" : "5e80732e61de3d16f2257522" , "price" : 65, "comment" : "", " customerDetails ": { " customerName ": " John Doe ", " mobileNumber ": " 4151111111 ", " emailAddress ": " jd@example.com " }, " deliveryDetails ": { " deliveryAddress ": " Sea side, The Gardens ", " latitude ": " 25.152209 ", " longitude ": " 55.253903 ", " deliveryNote ": " ", " deliveryOrder ": true , " price ": { " unitPrice ": 6, " discountAmount ": 0, " taxAmount ": 0, " totalPrice ": 6, " currencyCode ": " USD " } }, " menuDetails ": [{ " internalId ": " 5e80a1096ebdf6000104efe9 ", " qty ": 1, " price ": { " unitPrice ": 59, " discountAmount ": 0, " taxAmount ": 0, " totalPrice ": 59, " currencyCode ": " USD " }, " extraDetails ": [{ " recipeId ": " 5e80a0c56ebdf6000104efe8 ", " qty ": 1, " modifierId ": " d7110e51-dae0-49ff-b066-b2c85854401a ", " price ": { " unitPrice ": 0, " discountAmount ": 0, " taxAmount ": 0, " totalPrice ": 0, " currencyCode ": " USD " } }, { " recipeId ": " 5e80a0c56ebdf6000104efe8 ", " qty ": 1, " modifierId ": " 391b71a3-ed47-4fd6-adc1-b6a9d45d7c2a ", " price ": { " unitPrice ": 0, " discountAmount ": 0, " taxAmount ": 0, " totalPrice ": 0, " currencyCode ": " USD " } }, { " recipeId ": " 5e80a0c56ebdf6000104efe8 ", " qty ": 1, " modifierId ": " c2bb2049-fe19-4a2f-8f99-06b4fdb1debf ", " price ": { " unitPrice ": 0, " discountAmount ": 0, " taxAmount ": 0, " totalPrice ": 0, " currencyCode ": " USD " } }, { " recipeId ": " 5e80a0c56ebdf6000104efe8 ", " qty ": 1, " modifierId ": " c0902fdd-d9cb-47b2-acf7-68dc59f51c8c ", " price ": { " unitPrice ": 0, " discountAmount ": 0, " taxAmount ": 0, " totalPrice ": 0, " currencyCode ": " USD " } } ] }], " paymentDetails ": { " paymentCurrency ": " USD ", " paymentMerchantRef ": " ", " paymentStatus ": " UNPAID ", " paymentMethod ": " CASH ", " customerIP ": " 192.168.0.0 ", " priceDetails ": { " unitPrice ": 59, " discountAmount ": 0, " taxAmount ": 2.81, " totalPrice ": 65, " currencyCode ": " USD " } }, " source ": { " sourceId ": " 5e80732e61de3d16f2257522 ", " uniqueOrderId ": " 028a094a-9bf5-45a7-9f04-b7870ebe559e ", " integration ": " ONLINE_ORDER " }, " status ": " OrderCreated ", " createdAt ": { " $date ": 1593089025807 }, " modifiedAt ": { " $date ": 1593089025807 }, " delete ": false , " active ": true , " _class ": " com.example.models.OrderDetails"} // Bad Record{ "_id" : { "$oid" : "5ef270faf986bb0001ff4e63" }, "internalId" : "592825186773127168" , "externalId" : "B1SS8" , "restaurantId" : "5e537245fb96c700018c4615" , "partnerId" : "5e536a7f218e20267f0ead29" , "menuCardId" : "5e6760cf6ebdf6000104eef9" , "kitchenId" : "5e537ad0fb96c700018c461a" , "foodAggragetorId" : "5ed0b3fde3b8f0fb36ff7c80" , "price" : 10, "comment" : "", " customerDetails ": { " customerName ": " ", " mobileNumber ": " 415 " }, " deliveryDetails ": { " deliveryAddress ": " , ", " latitude ": " 0.0 ", " longitude ": " 0.0 ", " deliveryNote ": " ", " deliveryOrder ": false , " price ": { " unitPrice ": 0, " discountAmount ": 0, " taxAmount ": 0, " totalPrice ": 0, " currencyCode ": " USD " } }, " menuDetails ": [{ " internalId ": " 5e54e914062bb000018cbc95 ", " qty ": 1, " price ": { " unitPrice ": 12, " discountAmount ": 0, " taxAmount ": 0, " totalPrice ": 12, " currencyCode ": " USD " }, " extraDetails ": [] }, { " internalId ": " 5e54e914062bb000018cbc95 ", " qty ": 1, " price ": { " unitPrice ": 12, " discountAmount ": 0, " taxAmount ": 0, " totalPrice ": 12, " currencyCode ": " USD " }, " extraDetails ": [{ " recipeId ": " 5e54bc4dced25f0001a10905 ", " qty ": 1, " modifierId ": " 371c7c8a-ee59-4c37-b18b-638e6a0aa734 ", " price ": { " unitPrice ": 6, " discountAmount ": 0, " taxAmount ": 0, " totalPrice ": 6, " currencyCode ": " USD " } }, { " recipeId ": " 5e54bc4dced25f0001a10905 ", " qty ": 1, " modifierId ": " 0a71489f-ae4c-46fe-adca-fd5cd8caa67e ", " price ": { " unitPrice ": 5, " discountAmount ": 0, " taxAmount ": 0, " totalPrice ": 5, " currencyCode ": " USD " } } ] } ], " paymentDetails ": { " paymentCurrency ": " USD ", " paymentStatus ": " PREPAID ", " paymentMethod ": " CARD ", " customerIP ": " 192.168.0.0 ", " priceDetails ": { " unitPrice ": 10, " discountAmount ": 0, " taxAmount ": 0.0, " totalPrice ": 10, " currencyCode ": " USD " } }, " source ": { " sourceId ": " 5ed0b3fde3b8f0fb36ff7c80 ", " uniqueOrderId ": " 3b662889-8ce6-451a-a7c6-cf4bcb254a62 ", " integration ": " SystemA " }, " status ": " OrderCreated ", " createdAt ": { " $date ": 1592946938694 }, " modifiedAt ": { " $date ": 1592946938694 }, " delete ": false , " active ": true , " _class ": " com.example.models.OrderDetails"}

      Description

      While trying to "unwrap" records generated by the Debezium MongoDB connector using the New Record State Extraction SMT, we are receiving the following exception on a subset of the records:

       

      Executing stage 'TRANSFORMATION' with class 'io.debezium.connector.mongodb.transforms.ExtractNewDocumentState'. (org.apache.kafka.connect.runtime.errors.LogReporter:62)
      org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{STRING}
          at org.apache.kafka.connect.data.Struct.<init>(Struct.java:53)
          at io.debezium.connector.mongodb.transforms.MongoDataConverter.lambda$convertFieldValue$0(MongoDataConverter.java:210)
          at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1654)
          at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)
          at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:163)
          at io.debezium.connector.mongodb.transforms.MongoDataConverter.lambda$convertFieldValue$0(MongoDataConverter.java:212)
          at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1654)
          at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)
          at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:163)
          at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertRecord(MongoDataConverter.java:51)
          at io.debezium.connector.mongodb.transforms.ExtractNewDocumentState.newRecord(ExtractNewDocumentState.java:320)
          at io.debezium.connector.mongodb.transforms.ExtractNewDocumentState.apply(ExtractNewDocumentState.java:260)
          at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
          at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
          at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
          at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)
          at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
          at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
          at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
          at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
          at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
          at java.base/java.lang.Thread.run(Thread.java:834)
      

      I've inspected a number of "good" and "bad" records and the only difference that I can find is that all of the "good" records have the field `extraDetails` (which is an array of nested documents) populated and all of the "bad" records have an empty array instead.

       

        Gliffy Diagrams

          Attachments

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                ahamidi83 Ali Hamidi
              • Votes:
                0 Vote for this issue
                Watchers:
                1 Start watching this issue

                Dates

                • Created:
                  Updated: