Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-1373

How to have field type support for date in kafka

    XMLWordPrintable

Details

    • Enhancement
    • Status: Closed (View Workflow)
    • Major
    • Resolution: Done
    • None
    • None
    • mongodb-connector
    • None

    Description

      using debezium-mongodb-connector i managed to push my collections to kafka, the only problem i'm facing is that the field `date` in one of my collections with this format 2019-05-14T23:25:34.703+00:00, is not being pushed to the topic with the same format but rather i get something like this 1560708085175.

      this is my debezium connector command `connect-standalone /etc/kafka/connect-standalone.properties /etc/kafka/connect-mongodb-source.properties
      `
      this is my mongodb collection example.

          {"_id":"5cdb4e6ed767ba70593e2aa8","sender":"5cdb43db4505956efc70ba03","receiver":"5cdb43db4505956efc70ba03","receiverWalletId":"5cdb43db4505956efc70ba04","status":"succes","type":"topup","amount":200000,"totalFee":0,"createdAt":"2019-05-14T23:25:34.703Z","updatedAt":"2019-05-14T23:25:35.132Z","__v":0,"details":"none."}
      

      and this is my kafka topic example.

       
      {"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"sender"},{"type":"string","optional":true,"field":"receiver"},{"type":"string","optional":true,"field":"receiverWalletId"},{"type":"string","optional":true,"field":"status"},{"type":"string","optional":true,"field":"type"},{"type":"int32","optional":true,"field":"amount"},{"type":"int32","optional":true,"field":"totalFee"},{"type":"int64","optional":true,"field":"createdAt"},{"type":"int64","optional":true,"field":"updatedAt"},{"type":"int32","optional":true,"field":"__v"},{"type":"string","optional":true,"field":"from"},{"type":"string","optional":true,"field":"orderId"},{"type":"string","optional":true,"field":"id"}],"optional":false,"name":"mongo_conn.digi.transactions"},"payload":{"sender":"5cef970ca2e9c273c655483","receiver":"5cef970ca2e9c27355c483","receiverWalletId":"5cef970ca2e9c27556c484","status":"pending","type":"topup","amount":6000,"totalFee":0,"createdAt":1560708024322,"updatedAt":1560708024753,"__v":0,"from":"smt","orderId":"d7a97581-9d18-79cd-8b09-16e400a43714","id":"5d0683b8be4af834abe3cf58"}}
      

      and this is my connect-mongodb-source.properties

          name=mongodb-source-connector
          connector.class=io.debezium.connector.mongodb.MongoDbConnector
          mongodb.hosts=repracli/**.**.**.***27017
          mongodb.name=mongo_conn
          initial.sync.max.threads=1
          tasks.max=1
          transforms=unwrap
          transforms.unwrap.type=io.debezium.connector.mongodb.transforms.UnwrapFromMongo$
          transforms.unwrap.operation.header=true
      

      now my connect-mongodb-source.properties contains the following, but still nothing happend

      name=mongodb-source-connector
      connector.class=io.debezium.connector.mongodb.MongoDbConnector
      mongodb.hosts=repracli/52.47.51.218:27017
      mongodb.name=mongo_conn
      initial.sync.max.threads=1
      tasks.max=1
      transforms=unwrap
      transforms.unwrap.type=io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope
      transforms.unwrap.operation.header=true
      "transforms": "TimestampConverter",
      "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
      "transforms.TimestampConverter.field":"createdAt",
      "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss ZZZ",
      "transforms.TimestampConverter.target.type": "long"
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              benmoussa Ben Moussa (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: