Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-6491

Allow schema to be specified in the Debezium Sink Connector configuration

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Major Major
    • 2.3.0.Final
    • 2.2.0.Final, 2.2.1.Final, 2.3.0.Alpha1, 2.3.0.Beta1, 2.3.0.CR1
    • jdbc-connector
    • None

      In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      debezium-connector-jdbc/2.3.0.Alpha1

      What is the connector configuration?

      {
          "name": "oracle-jdbc-sink-connector",
          "config": {
              "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
              "tasks.max": "1",   
              "topics.regex": "dbserver1.public.(.*)",
              "connection.url": "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCPS)(PORT=1543)(HOST=masked))(CONNECT_DATA=(SERVICE_NAME=masked))(SECURITY=(ssl_server_cert_dn=\"CN=masked\")))",
              "connection.username": "my_user",
              "connection.password": "password",
              "security.protocol":"SSL",
              "ssl.enabled.protocols": "TLSv1.2,TLSv1.1",
              "ssl.truststore.type": "JKS",
              "auto.create": "false",
              "auto.evolve":false,        
              "table.name.format" : "SIS.${topic}", 
              "delete.enabled": "true",
              "primary.key.mode": "record_key",
              "insert.mode": "upsert", 
              "database.time_zone": "UTC",
              "quote.sql.identifiers": "never",
              "transforms":"route,topicCase",
              "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
              "transforms.route.regex": "([^.])\\.([^.])
      .([^.]+)",
              "transforms.route.replacement": "$3",
              "transforms.topicCase.type": "com.myorg.kafka.connect.transform.ToUpperCase"
          }
      }

      What is the captured database version and mode of depoyment?

      I am capturing changes from PostgreSQL  (
      postgis/postgis:15-3.3) database running on docker using debezium-postgres-connector. I am trying to sink this data with an on premise Oracle 12c database using debezium-jdbc-sink connector. 

      What behaviour do you expect?

      I am trying to connect to Oracle database with  schema specified as SIS. I am expecting that debezium will sink my records to tables located in this schema.

      What behaviour do you see?

      Debezium sink connector is trying to create table with the schema prefix included. Its unable to do so as I have disabled auto creation. I get error message as given in the logs.

      Do you see the same behaviour using the latest relesead Debezium version?

      I am already using the latest debezium sink connect in Alpha. 
      debezium-connector-jdbc/2.3.0.Alpha1. I am advised by Chris Cranford in the community forum to create  an issue for this.

      Do you have the connector logs, ideally from start till finish?

      time.distributed.DistributedHerder]
      2023-05-19 21:31:34,099 INFO   ||  Creating task oracle-jdbc-sink-connector-0   [org.apache.kafka.connect.runtime.Worker]
      2023-05-19 21:31:34,100 INFO   ||  ConnectorConfig values:
              config.action.reload = restart
              connector.class = io.debezium.connector.jdbc.JdbcSinkConnector
              errors.log.enable = false
              errors.log.include.messages = false
              errors.retry.delay.max.ms = 60000
              errors.retry.timeout = 0
              errors.tolerance = none
              header.converter = null
              key.converter = null
              name = oracle-jdbc-sink-connector
              predicates = []
              tasks.max = 1
              transforms = [route, ConvertCreatedTimestamp, ConvertUpdatedTimestamp, topicCase]
              value.converter = null
         [org.apache.kafka.connect.runtime.ConnectorConfig]
      2023-05-19 21:31:34,101 INFO   ||  EnrichedConnectorConfig values:
              config.action.reload = restart
              connector.class = io.debezium.connector.jdbc.JdbcSinkConnector
              errors.log.enable = false
              errors.log.include.messages = false
              errors.retry.delay.max.ms = 60000
              errors.retry.timeout = 0
              errors.tolerance = none
              header.converter = null
              key.converter = null
              name = oracle-jdbc-sink-connector
              predicates = []
              tasks.max = 1
              transforms = [route, ConvertCreatedTimestamp, ConvertUpdatedTimestamp, topicCase]
              transforms.ConvertCreatedTimestamp.field = datetime_created
              transforms.ConvertCreatedTimestamp.format =
              transforms.ConvertCreatedTimestamp.negate = false
              transforms.ConvertCreatedTimestamp.predicate =
              transforms.ConvertCreatedTimestamp.target.type = Timestamp
              transforms.ConvertCreatedTimestamp.type = class org.apache.kafka.connect.transforms.TimestampConverter$Value
              transforms.ConvertCreatedTimestamp.unix.precision = milliseconds
              transforms.ConvertUpdatedTimestamp.field = datetime_updated
              transforms.ConvertUpdatedTimestamp.format =
              transforms.ConvertUpdatedTimestamp.negate = false
              transforms.ConvertUpdatedTimestamp.predicate =
              transforms.ConvertUpdatedTimestamp.target.type = Timestamp
              transforms.ConvertUpdatedTimestamp.type = class org.apache.kafka.connect.transforms.TimestampConverter$Value
              transforms.ConvertUpdatedTimestamp.unix.precision = milliseconds
              transforms.route.negate = false
              transforms.route.predicate =
              transforms.route.regex = ([^.])\.([^.])\.([^.]+)
              transforms.route.replacement = $3
              transforms.route.type = class org.apache.kafka.connect.transforms.RegexRouter
              transforms.topicCase.negate = false
              transforms.topicCase.predicate =
              transforms.topicCase.type = class ca.bc.gov.epd.kafka.connect.transform.ToUpperCase
              value.converter = null
         [org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig]
      2023-05-19 21:31:34,104 INFO   ||  TaskConfig values:
              task.class = class io.debezium.connector.jdbc.JdbcSinkConnectorTask
         [org.apache.kafka.connect.runtime.TaskConfig]
      2023-05-19 21:31:34,106 INFO   ||  Instantiated task oracle-jdbc-sink-connector-0 with version 2.3.0.Alpha1 of type io.debezium.connector.jdbc.JdbcSinkConnectorTask   [org.apache.kafka.connect.runtime.Worker]
      2023-05-19 21:31:34,106 INFO   ||  JsonConverterConfig values:
              converter.type = key
              decimal.format = BASE64
              schemas.cache.size = 1000
              schemas.enable = true
         [org.apache.kafka.connect.json.JsonConverterConfig]
      2023-05-19 21:31:34,106 INFO   ||  Set up the key converter class org.apache.kafka.connect.json.JsonConverter for task oracle-jdbc-sink-connector-0 using the worker config   [org.apache.kafka.connect.runtime.Worker]
      2023-05-19 21:31:34,106 INFO   ||  JsonConverterConfig values:
              converter.type = value
              decimal.format = BASE64
              schemas.cache.size = 1000
              schemas.enable = true
         [org.apache.kafka.connect.json.JsonConverterConfig]
      2023-05-19 21:31:34,106 INFO   ||  Set up the value converter class org.apache.kafka.connect.json.JsonConverter for task oracle-jdbc-sink-connector-0 using the worker config   [org.apache.kafka.connect.runtime.Worker]
      2023-05-19 21:31:34,106 INFO   ||  Set up the header converter class org.apache.kafka.connect.storage.SimpleHeaderConverter for task oracle-jdbc-sink-connector-0 using the worker config   [org.apache.kafka.connect.runtime.Worker]
      2023-05-19 21:31:34,108 INFO   ||  Initializing: org.apache.kafka.connect.runtime.TransformationChain{org.apache.kafka.connect.transforms.RegexRouter, org.apache.kafka.connect.transforms.TimestampConverter$Value, org.apache.kafka.connect.transforms.TimestampConverter$Value, ca.bc.gov.epd.kafka.connect.transform.ToUpperCase}   [org.apache.kafka.connect.runtime.Worker]
      2023-05-19 21:31:34,108 INFO   ||  SinkConnectorConfig values:
              config.action.reload = restart
              connector.class = io.debezium.connector.jdbc.JdbcSinkConnector
              errors.deadletterqueue.context.headers.enable = false
              errors.deadletterqueue.topic.name =
              errors.deadletterqueue.topic.replication.factor = 3
              errors.log.enable = false
              errors.log.include.messages = false
              errors.retry.delay.max.ms = 60000
              errors.retry.timeout = 0
              errors.tolerance = none
              header.converter = null
              key.converter = null
              name = oracle-jdbc-sink-connector
              predicates = []
              tasks.max = 1
              topics = []
              topics.regex = dbserver1.public.(.*)
              transforms = [route, ConvertCreatedTimestamp, ConvertUpdatedTimestamp, topicCase]
              value.converter = null
         [org.apache.kafka.connect.runtime.SinkConnectorConfig]
      2023-05-19 21:31:34,109 INFO   ||  EnrichedConnectorConfig values:
              config.action.reload = restart
              connector.class = io.debezium.connector.jdbc.JdbcSinkConnector
              errors.deadletterqueue.context.headers.enable = false
              errors.deadletterqueue.topic.name =
              errors.deadletterqueue.topic.replication.factor = 3
              errors.log.enable = false
              errors.log.include.messages = false
              errors.retry.delay.max.ms = 60000
              errors.retry.timeout = 0
              errors.tolerance = none
              header.converter = null
              key.converter = null
              name = oracle-jdbc-sink-connector
              predicates = []
              tasks.max = 1
              topics = []
              topics.regex = dbserver1.public.(.*)
              transforms = [route, ConvertCreatedTimestamp, ConvertUpdatedTimestamp, topicCase]
              transforms.ConvertCreatedTimestamp.field = datetime_created
              transforms.ConvertCreatedTimestamp.format =
              transforms.ConvertCreatedTimestamp.negate = false
              transforms.ConvertCreatedTimestamp.predicate =
              transforms.ConvertCreatedTimestamp.target.type = Timestamp
              transforms.ConvertCreatedTimestamp.type = class org.apache.kafka.connect.transforms.TimestampConverter$Value
              transforms.ConvertCreatedTimestamp.unix.precision = milliseconds
              transforms.ConvertUpdatedTimestamp.field = datetime_updated
              transforms.ConvertUpdatedTimestamp.format =
              transforms.ConvertUpdatedTimestamp.negate = false
              transforms.ConvertUpdatedTimestamp.predicate =
              transforms.ConvertUpdatedTimestamp.target.type = Timestamp
              transforms.ConvertUpdatedTimestamp.type = class org.apache.kafka.connect.transforms.TimestampConverter$Value
              transforms.ConvertUpdatedTimestamp.unix.precision = milliseconds
              transforms.route.negate = false
              transforms.route.predicate =
              transforms.route.regex = ([^.])\.([^.])\.([^.]+)
              transforms.route.replacement = $3
              transforms.route.type = class org.apache.kafka.connect.transforms.RegexRouter
              transforms.topicCase.negate = false
              transforms.topicCase.predicate =
              transforms.topicCase.type = class ca.bc.gov.epd.kafka.connect.transform.ToUpperCase
              value.converter = null
         [org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig]
      2023-05-19 21:31:34,109 INFO   ||  ConsumerConfig values:
              allow.auto.create.topics = true
              auto.commit.interval.ms = 5000
              auto.offset.reset = earliest
              bootstrap.servers = [kafka:9092]
              check.crcs = true
              client.dns.lookup = use_all_dns_ips
              client.id = connector-consumer-oracle-jdbc-sink-connector-0
              client.rack =
              connections.max.idle.ms = 540000
              default.api.timeout.ms = 60000
              enable.auto.commit = false
              exclude.internal.topics = true
              fetch.max.bytes = 52428800
              fetch.max.wait.ms = 500
              fetch.min.bytes = 1
              group.id = connect-oracle-jdbc-sink-connector
              group.instance.id = null
              heartbeat.interval.ms = 3000
              interceptor.classes = []
              internal.leave.group.on.close = true
              internal.throw.on.fetch.stable.offset.unsupported = false
              isolation.level = read_uncommitted
              key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
              max.partition.fetch.bytes = 1048576
              max.poll.interval.ms = 300000
              max.poll.records = 500
              metadata.max.age.ms = 300000
              metric.reporters = []
              metrics.num.samples = 2
              metrics.recording.level = INFO
              metrics.sample.window.ms = 30000
              partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
              receive.buffer.bytes = 65536
              reconnect.backoff.max.ms = 1000
              reconnect.backoff.ms = 50
              request.timeout.ms = 30000
              retry.backoff.ms = 100
              sasl.client.callback.handler.class = null
              sasl.jaas.config = null
              sasl.kerberos.kinit.cmd = /usr/bin/kinit
              sasl.kerberos.min.time.before.relogin = 60000
              sasl.kerberos.service.name = null
              sasl.kerberos.ticket.renew.jitter = 0.05
              sasl.kerberos.ticket.renew.window.factor = 0.8
              sasl.login.callback.handler.class = null
              sasl.login.class = null
              sasl.login.connect.timeout.ms = null
              sasl.login.read.timeout.ms = null
              sasl.login.refresh.buffer.seconds = 300
              sasl.login.refresh.min.period.seconds = 60
              sasl.login.refresh.window.factor = 0.8
              sasl.login.refresh.window.jitter = 0.05
              sasl.login.retry.backoff.max.ms = 10000
              sasl.login.retry.backoff.ms = 100
              sasl.mechanism = GSSAPI
              sasl.oauthbearer.clock.skew.seconds = 30
              sasl.oauthbearer.expected.audience = null
              sasl.oauthbearer.expected.issuer = null
              sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
              sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
              sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
              sasl.oauthbearer.jwks.endpoint.url = null
              sasl.oauthbearer.scope.claim.name = scope
              sasl.oauthbearer.sub.claim.name = sub
              sasl.oauthbearer.token.endpoint.url = null
              security.protocol = PLAINTEXT
              security.providers = null
              send.buffer.bytes = 131072
              session.timeout.ms = 45000
              socket.connection.setup.timeout.max.ms = 30000
              socket.connection.setup.timeout.ms = 10000
              ssl.cipher.suites = null
              ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
              ssl.endpoint.identification.algorithm = https
              ssl.engine.factory.class = null
              ssl.key.password = null
              ssl.keymanager.algorithm = SunX509
              ssl.keystore.certificate.chain = null
              ssl.keystore.key = null
              ssl.keystore.location = null
              ssl.keystore.password = null
              ssl.keystore.type = JKS
              ssl.protocol = TLSv1.3
              ssl.provider = null
              ssl.secure.random.implementation = null
              ssl.trustmanager.algorithm = PKIX
              ssl.truststore.certificates = null
              ssl.truststore.location = null
              ssl.truststore.password = null
              ssl.truststore.type = JKS
              value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
         [org.apache.kafka.clients.consumer.ConsumerConfig]
      2023-05-19 21:31:34,113 WARN   ||  The configuration 'metrics.context.connect.kafka.cluster.id' was supplied but isn't a known config.   [org.apache.kafka.clients.consumer.ConsumerConfig]
      2023-05-19 21:31:34,113 WARN   ||  The configuration 'metrics.context.connect.group.id' was supplied but isn't a known config.   [org.apache.kafka.clients.consumer.ConsumerConfig]
      2023-05-19 21:31:34,113 INFO   ||  Kafka version: 3.2.0   [org.apache.kafka.common.utils.AppInfoParser]
      2023-05-19 21:31:34,113 INFO   ||  Kafka commitId: 38103ffaa962ef50   [org.apache.kafka.common.utils.AppInfoParser]
      2023-05-19 21:31:34,113 INFO   ||  Kafka startTimeMs: 1684531894113   [org.apache.kafka.common.utils.AppInfoParser]
      2023-05-19 21:31:34,118 INFO   ||  [Worker clientId=connect-1, groupId=1] Finished starting connectors and tasks   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
      2023-05-19 21:31:34,119 INFO   ||  [Consumer clientId=connector-consumer-oracle-jdbc-sink-connector-0, groupId=connect-oracle-jdbc-sink-connector] Subscribed to pattern: 'dbserver1.public.(.*)'   [org.apache.kafka.clients.consumer.KafkaConsumer]
      2023-05-19 21:31:34,125 INFO   ||  Starting JdbcSinkConnectorConfig with configuration:   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,126 INFO   ||     connector.class = io.debezium.connector.jdbc.JdbcSinkConnector   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,126 INFO   ||     connection.password = ********   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,126 INFO   ||     transforms.ConvertCreatedTimestamp.type = org.apache.kafka.connect.transforms.TimestampConverter$Value   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,126 INFO   ||     tasks.max = 1   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,126 INFO   ||     transforms.ConvertUpdatedTimestamp.target.type = Timestamp   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,126 INFO   ||     transforms = route,ConvertCreatedTimestamp,ConvertUpdatedTimestamp,topicCase   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,126 INFO   ||     transforms.route.type = org.apache.kafka.connect.transforms.RegexRouter   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,126 INFO   ||     transforms.route.regex = ([^.])\.([^.])\.([^.]+)   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,126 INFO   ||     auto.evolve = false   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,126 INFO   ||     transforms.ConvertCreatedTimestamp.field = datetime_created   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,126 INFO   ||     transforms.route.replacement = $3   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,126 INFO   ||     transforms.ConvertCreatedTimestamp.target.type = Timestamp   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,126 INFO   ||     insert.mode = upsert   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,126 INFO   ||     table.name.format = SIS.${topic}   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,126 INFO   ||     transforms.ConvertUpdatedTimestamp.field = datetime_updated   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,126 INFO   ||     primary.key.mode = record_key   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,126 INFO   ||     database.time_zone = UTC   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,126 INFO   ||     ssl.enabled.protocols = TLSv1.2,TLSv1.1   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,126 INFO   ||     connection.username = JAITHOMA   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,126 INFO   ||     topics.regex = dbserver1.public.(.*)   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,126 INFO   ||     ssl.truststore.type = JKS   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,126 INFO   ||     security.protocol = SSL   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,126 INFO   ||     transforms.topicCase.type = ca.bc.gov.epd.kafka.connect.transform.ToUpperCase   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,126 INFO   ||     delete.enabled = true   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,126 INFO   ||     task.class = io.debezium.connector.jdbc.JdbcSinkConnectorTask   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,126 INFO   ||     name = oracle-jdbc-sink-connector   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,126 INFO   ||     auto.create = false   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,126 INFO   ||     connection.url = jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCPS)(PORT=1543)(HOST=nrcdb01.bcgov))(CONNECT_DATA=(SERVICE_NAME=SD57387.NRS.BCGOV))(SECURITY=(ssl_server_cert_dn="CN=nrcdb01.bcgov")))   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,127 INFO   ||     transforms.ConvertUpdatedTimestamp.type = org.apache.kafka.connect.transforms.TimestampConverter$Value   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,127 INFO   ||     quote.sql.identifiers = never   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:34,215 INFO   ||  HHH000412: Hibernate ORM core version 6.1.7.Final   [org.hibernate.Version]
      2023-05-19 21:31:34,574 INFO   ||  HHH000130: Instantiating explicit connection provider: org.hibernate.c3p0.internal.C3P0ConnectionProvider   [org.hibernate.engine.jdbc.connections.internal.ConnectionProviderInitiator]
      2023-05-19 21:31:34,577 INFO   ||  HHH010002: C3P0 using driver: null at URL: jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCPS)(PORT=1543)(HOST=nrcdb01.bcgov))(CONNECT_DATA=(SERVICE_NAME=SD57387.NRS.BCGOV))(SECURITY=(ssl_server_cert_dn="CN=nrcdb01.bcgov")))   [org.hibernate.orm.connections.pooling.c3p0]
      2023-05-19 21:31:34,578 INFO   ||  HHH10001001: Connection properties: {password=****, user=JAITHOMA}   [org.hibernate.orm.connections.pooling.c3p0]
      2023-05-19 21:31:34,578 INFO   ||  HHH10001003: Autocommit mode: false   [org.hibernate.orm.connections.pooling.c3p0]
      2023-05-19 21:31:34,578 WARN   ||  HHH10001006: No JDBC Driver class was specified by property hibernate.connection.driver_class   [org.hibernate.orm.connections.pooling.c3p0]
      2023-05-19 21:31:34,599 INFO   ||  MLog clients using slf4j logging.   [com.mchange.v2.log.MLog]
      2023-05-19 21:31:34,670 INFO   ||  Initializing c3p0-0.9.5.5 [built 11-December-2019 22:18:33 -0800; debug? true; trace: 10]   [com.mchange.v2.c3p0.C3P0Registry]
      2023-05-19 21:31:34,748 INFO   ||  HHH10001007: JDBC isolation level: <unknown>   [org.hibernate.orm.connections.pooling.c3p0]
      2023-05-19 21:31:34,777 INFO   ||  Initializing c3p0 pool... com.mchange.v2.c3p0.PoolBackedDataSource@7640205 [ connectionPoolDataSource -> com.mchange.v2.c3p0.WrapperConnectionPoolDataSource@51ec2abd [ acquireIncrement -> 32, acquireRetryAttempts -> 30, acquireRetryDelay -> 1000, autoCommitOnClose -> false, automaticTestTable -> null, breakAfterAcquireFailure -> false, checkoutTimeout -> 0, connectionCustomizerClassName -> null, connectionTesterClassName -> com.mchange.v2.c3p0.impl.DefaultConnectionTester, contextClassLoaderSource -> caller, debugUnreturnedConnectionStackTraces -> false, factoryClassLocation -> null, forceIgnoreUnresolvedTransactions -> false, forceSynchronousCheckins -> false, identityToken -> 1hge77paweyn6968qae2j|2b7c2b70, idleConnectionTestPeriod -> 0, initialPoolSize -> 5, maxAdministrativeTaskTime -> 0, maxConnectionAge -> 0, maxIdleTime -> 0, maxIdleTimeExcessConnections -> 0, maxPoolSize -> 32, maxStatements -> 0, maxStatementsPerConnection -> 0, minPoolSize -> 5, nestedDataSource -> com.mchange.v2.c3p0.DriverManagerDataSource@cde3288e [ description -> null, driverClass -> null, factoryClassLocation -> null, forceUseNamedDriverClass -> false, identityToken -> 1hge77paweyn6968qae2j|58574d21, jdbcUrl -> jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCPS)(PORT=1543)(HOST=nrcdb01.bcgov))(CONNECT_DATA=(SERVICE_NAME=SD57387.NRS.BCGOV))(SECURITY=(ssl_server_cert_dn="CN=nrcdb01.bcgov"))), properties -> {password=*****, user=*****} ], preferredTestQuery -> null, privilegeSpawnedThreads -> false, propertyCycle -> 0, statementCacheNumDeferredCloseThreads -> 0, testConnectionOnCheckin -> false, testConnectionOnCheckout -> false, unreturnedConnectionTimeout -> 0, usesTraditionalReflectiveProxies -> false; userOverrides: {} ], dataSourceName -> null, extensions -> {}, factoryClassLocation -> null, identityToken -> 1hge77paweyn6968qae2j|28448ad2, numHelperThreads -> 3 ]   [com.mchange.v2.c3p0.impl.AbstractPoolBackedDataSource]
      2023-05-19 21:31:36,762 INFO   ||  HHH000400: Using dialect: org.hibernate.dialect.OracleDialect   [SQL dialect]
      2023-05-19 21:31:37,784 INFO   ||  HHH000490: Using JtaPlatform implementation: [org.hibernate.engine.transaction.jta.platform.internal.NoJtaPlatform]   [org.hibernate.engine.transaction.jta.platform.internal.JtaPlatformInitiator]
      2023-05-19 21:31:37,814 INFO   ||  Using dialect io.debezium.connector.jdbc.dialect.oracle.OracleDatabaseDialect   [io.debezium.connector.jdbc.dialect.DatabaseDialectResolver]
      2023-05-19 21:31:37,858 INFO   ||  Database version 12.2.0   [io.debezium.connector.jdbc.JdbcChangeEventSink]
      2023-05-19 21:31:37,858 INFO   ||  WorkerSinkTask{id=oracle-jdbc-sink-connector-0} Sink task finished initialization and start   [org.apache.kafka.connect.runtime.WorkerSinkTask]
      2023-05-19 21:31:37,860 INFO   ||  WorkerSinkTask{id=oracle-jdbc-sink-connector-0} Executing sink task   [org.apache.kafka.connect.runtime.WorkerSinkTask]
      2023-05-19 21:31:37,871 INFO   ||  [Consumer clientId=connector-consumer-oracle-jdbc-sink-connector-0, groupId=connect-oracle-jdbc-sink-connector] Resetting the last seen epoch of partition dbserver1.public.protection_category_cd-0 to 0 since the associated topicId changed from null to b_6VIkdmSnaNWIvk-fRW8Q   [org.apache.kafka.clients.Metadata]
      2023-05-19 21:31:37,872 INFO   ||  [Consumer clientId=connector-consumer-oracle-jdbc-sink-connector-0, groupId=connect-oracle-jdbc-sink-connector] Resetting the last seen epoch of partition dbserver1.public.activity_cd-0 to 0 since the associated topicId changed from null to hwR6GwWgQKG_M76y-39q5Q   [org.apache.kafka.clients.Metadata]
      2023-05-19 21:31:37,872 INFO   ||  [Consumer clientId=connector-consumer-oracle-jdbc-sink-connector-0, groupId=connect-oracle-jdbc-sink-connector] Cluster ID: lsrt-P0GQi2iQO02mwQIew   [org.apache.kafka.clients.Metadata]
      2023-05-19 21:31:37,872 INFO   ||  [Consumer clientId=connector-consumer-oracle-jdbc-sink-connector-0, groupId=connect-oracle-jdbc-sink-connector] Discovered group coordinator 192.168.32.4:9092 (id: 2147483646 rack: null)   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
      2023-05-19 21:31:37,873 INFO   ||  [Consumer clientId=connector-consumer-oracle-jdbc-sink-connector-0, groupId=connect-oracle-jdbc-sink-connector] (Re-)joining group   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
      2023-05-19 21:31:37,885 INFO   ||  [Consumer clientId=connector-consumer-oracle-jdbc-sink-connector-0, groupId=connect-oracle-jdbc-sink-connector] Request joining group due to: need to re-join with the given member-id: connector-consumer-oracle-jdbc-sink-connector-0-584cda44-36cf-4bb3-8365-33b6634d2f6e   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
      2023-05-19 21:31:37,886 INFO   ||  [Consumer clientId=connector-consumer-oracle-jdbc-sink-connector-0, groupId=connect-oracle-jdbc-sink-connector] (Re-)joining group   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
      2023-05-19 21:31:37,901 INFO   ||  [Consumer clientId=connector-consumer-oracle-jdbc-sink-connector-0, groupId=connect-oracle-jdbc-sink-connector] Successfully joined group with generation Generation{generationId=1, memberId='connector-consumer-oracle-jdbc-sink-connector-0-584cda44-36cf-4bb3-8365-33b6634d2f6e', protocol='range'}   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
      2023-05-19 21:31:37,903 INFO   ||  [Consumer clientId=connector-consumer-oracle-jdbc-sink-connector-0, groupId=connect-oracle-jdbc-sink-connector] Finished assignment for group at generation 1: {connector-consumer-oracle-jdbc-sink-connector-0-584cda44-36cf-4bb3-8365-33b6634d2f6e=Assignment(partitions=[dbserver1.public.protection_category_cd-0, dbserver1.public.activity_cd-0])}   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
      2023-05-19 21:31:37,913 INFO   ||  [Consumer clientId=connector-consumer-oracle-jdbc-sink-connector-0, groupId=connect-oracle-jdbc-sink-connector] Successfully synced group in generation Generation{generationId=1, memberId='connector-consumer-oracle-jdbc-sink-connector-0-584cda44-36cf-4bb3-8365-33b6634d2f6e', protocol='range'}   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
      2023-05-19 21:31:37,914 INFO   ||  [Consumer clientId=connector-consumer-oracle-jdbc-sink-connector-0, groupId=connect-oracle-jdbc-sink-connector] Notifying assignor about the new Assignment(partitions=[dbserver1.public.protection_category_cd-0, dbserver1.public.activity_cd-0])   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
      2023-05-19 21:31:37,914 INFO   ||  [Consumer clientId=connector-consumer-oracle-jdbc-sink-connector-0, groupId=connect-oracle-jdbc-sink-connector] Adding newly assigned partitions: dbserver1.public.activity_cd-0, dbserver1.public.protection_category_cd-0   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
      2023-05-19 21:31:37,929 INFO   ||  [Consumer clientId=connector-consumer-oracle-jdbc-sink-connector-0, groupId=connect-oracle-jdbc-sink-connector] Found no committed offset for partition dbserver1.public.protection_category_cd-0   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
      2023-05-19 21:31:37,929 INFO   ||  [Consumer clientId=connector-consumer-oracle-jdbc-sink-connector-0, groupId=connect-oracle-jdbc-sink-connector] Found no committed offset for partition dbserver1.public.activity_cd-0   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
      2023-05-19 21:31:37,932 INFO   ||  [Consumer clientId=connector-consumer-oracle-jdbc-sink-connector-0, groupId=connect-oracle-jdbc-sink-connector] Resetting offset for partition dbserver1.public.protection_category_cd-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[192.168.32.4:9092 (id: 1 rack: null)], epoch=0}}.   [org.apache.kafka.clients.consumer.internals.SubscriptionState]
      2023-05-19 21:31:37,933 INFO   ||  [Consumer clientId=connector-consumer-oracle-jdbc-sink-connector-0, groupId=connect-oracle-jdbc-sink-connector] Resetting offset for partition dbserver1.public.activity_cd-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[192.168.32.4:9092 (id: 1 rack: null)], epoch=0}}.   [org.apache.kafka.clients.consumer.internals.SubscriptionState]
      2023-05-19 21:31:38,071 WARN   ||  Table 'SIS.PROTECTION_CATEGORY_CD' cannot be created because schema evolution is disabled.   [io.debezium.connector.jdbc.JdbcChangeEventSink]
      2023-05-19 21:31:38,072 WARN   ||  Table creation failed for 'SIS.PROTECTION_CATEGORY_CD', attempting to alter the table   [io.debezium.connector.jdbc.JdbcChangeEventSink]
      java.sql.SQLException: Cannot create table SIS.PROTECTION_CATEGORY_CD because schema evolution is disabled
              at io.debezium.connector.jdbc.JdbcChangeEventSink.createTable(JdbcChangeEventSink.java:139)
              at io.debezium.connector.jdbc.JdbcChangeEventSink.checkAndApplyTableChangesIfNeeded(JdbcChangeEventSink.java:99)
              at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:67)
              at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:87)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
              at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
              at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
              at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
              at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
              at java.base/java.lang.Thread.run(Thread.java:829)
      2023-05-19 21:31:38,117 ERROR  ||  Table 'SIS.PROTECTION_CATEGORY_CD' does not exist and cannot be altered.   [io.debezium.connector.jdbc.JdbcChangeEventSink]
      2023-05-19 21:31:38,118 ERROR  ||  Failed to alter the table 'SIS.PROTECTION_CATEGORY_CD'.   [io.debezium.connector.jdbc.JdbcChangeEventSink]
      java.sql.SQLException: Could not find table: SIS.PROTECTION_CATEGORY_CD
              at io.debezium.connector.jdbc.JdbcChangeEventSink.alterTableIfNeeded(JdbcChangeEventSink.java:162)
              at io.debezium.connector.jdbc.JdbcChangeEventSink.checkAndApplyTableChangesIfNeeded(JdbcChangeEventSink.java:105)
              at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:67)
              at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:87)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
              at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
              at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
              at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
              at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
              at java.base/java.lang.Thread.run(Thread.java:829)
      2023-05-19 21:31:38,119 ERROR  ||  Failed to process record: Failed to process a sink record   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-05-19 21:31:38,122 WARN   ||  WorkerSinkTask{id=oracle-jdbc-sink-connector-0} Ignoring invalid task provided offset PROTECTION_CATEGORY_CD-0/OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''} – partition not assigned, assignment=[dbserver1.public.protection_category_cd-0, dbserver1.public.activity_cd-0]   [org.apache.kafka.connect.runtime.WorkerSinkTask]
      2023-05-19 21:31:38,122 WARN   ||  WorkerSinkTask{id=oracle-jdbc-sink-connector-0} Ignoring invalid task provided offset ACTIVITY_CD-0/OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''} – partition not assigned, assignment=[dbserver1.public.protection_category_cd-0, dbserver1.public.activity_cd-0]   [org.apache.kafka.connect.runtime.WorkerSinkTask]

      How to reproduce the issue using our tutorial deployment?

      <Your answer>

      Feature request or enhancement

      For feature requests or enhancements, provide this information, please:

      Which use case/requirement will be addressed by the proposed feature?

      1. Capture changes from Postgres Database using debezium postgres connector 
      io.debezium.connector.postgresql.PostgresConnector
      2. Register oracle-sink connector using the configuration uploaded above. 
      3. Insert a record to Postgres table.
      4. Observer connector log.
      I can upload the docker files if needed.

      Implementation ideas (optional)

      None.

            ccranfor@redhat.com Chris Cranford
            jaise.thomas@aot-technologies.com Jaise Thomas (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

              Created:
              Updated:
              Resolved: