Uploaded image for project: 'Red Hat Fuse'
  1. Red Hat Fuse
  2. ENTESB-15817

CKC File connector: unable to append new-line

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Won't Do
    • Icon: Major Major
    • 2021-M2
    • Camel-K-M4
    • Camel Kafka Connector
    • None
    • False
    • False
    • % %
    • Undefined

      I want all messages from a kafka topic into a single file, so I specify according to the docs :

      connector.class=org.apache.camel.kafkaconnector.file.CamelFileSinkConnector
      camel.sink.endpoint.fileExist=append
      tasks.max=1
      topics=mytopic
      camel.sink.endpoint.appendChars=\\n (double backslash here as adviced in the docs)
      name=CamelFileSinkConnector
      value.converter=org.apache.kafka.connect.storage.StringConverter
      camel.sink.endpoint.fileName=output.txt
      key.converter=org.apache.kafka.connect.storage.StringConverter
      camel.sink.path.directoryName=/tmp/sink
      
      [2021-02-18 14:55:18,876] ERROR WorkerSinkTask{id=CamelFileSinkConnector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Exchange delivery has failed! (org.apache.kafka.connect.runtime.WorkerSinkTask:586)
      org.apache.kafka.connect.errors.ConnectException: Exchange delivery has failed!
      	at org.apache.camel.kafkaconnector.CamelSinkTask.put(CamelSinkTask.java:186)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
      	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
      	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
      	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      	at java.base/java.lang.Thread.run(Thread.java:834)
      Caused by: org.apache.camel.ResolveEndpointFailedException: Failed to resolve endpoint: file:///tmp/sink?appendChars=
      &fileExist=append&fileName=output.txt due to: Illegal character in query at index 29: file:///tmp/sink?appendChars=
      &fileExist=append&fileName=output.txt
      	at org.apache.camel.impl.engine.AbstractCamelContext.doGetEndpoint(AbstractCamelContext.java:912)
      	at org.apache.camel.impl.engine.AbstractCamelContext.getEndpoint(AbstractCamelContext.java:798)
      	at org.apache.camel.support.CamelContextHelper.getMandatoryEndpoint(CamelContextHelper.java:73)
      	at org.apache.camel.support.ExchangeHelper.resolveEndpoint(ExchangeHelper.java:112)
      	at org.apache.camel.support.ExchangeHelper.resolveEndpoint(ExchangeHelper.java:91)
      	at org.apache.camel.processor.SendDynamicProcessor.resolveEndpoint(SendDynamicProcessor.java:294)
      	at org.apache.camel.processor.SendDynamicProcessor.process(SendDynamicProcessor.java:155)
      	at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:395)
      	at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
      	at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:60)
      	at org.apache.camel.processor.Pipeline.process(Pipeline.java:147)
      	at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:312)
      	at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:84)
      	at org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:218)
      	at org.apache.camel.impl.engine.SharedCamelInternalProcessor$1.process(SharedCamelInternalProcessor.java:112)
      	at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
      	at org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:109)
      	at org.apache.camel.support.cache.DefaultProducerCache.send(DefaultProducerCache.java:189)
      	at org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:176)
      	at org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:148)
      	at org.apache.camel.kafkaconnector.CamelSinkTask.put(CamelSinkTask.java:181)
      	... 11 more
      Caused by: java.net.URISyntaxException: Illegal character in query at index 29: file:///tmp/sink?appendChars=
      &fileExist=append&fileName=output.txt
      	at java.base/java.net.URI$Parser.fail(URI.java:2915)
      	at java.base/java.net.URI$Parser.checkChars(URI.java:3086)
      	at java.base/java.net.URI$Parser.parseHierarchical(URI.java:3174)
      	at java.base/java.net.URI$Parser.parse(URI.java:3116)
      	at java.base/java.net.URI.<init>(URI.java:600)
      	at org.apache.camel.support.DefaultComponent.createEndpoint(DefaultComponent.java:96)
      	at org.apache.camel.impl.engine.AbstractCamelContext.doGetEndpoint(AbstractCamelContext.java:878)
      	... 31 more
      [2021-02-18 14:55:18,877] ERROR WorkerSinkTask{id=CamelFileSinkConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187)
      org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:588)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
      	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
      	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
      	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      	at java.base/java.lang.Thread.run(Thread.java:834)
      Caused by: org.apache.kafka.connect.errors.ConnectException: Exchange delivery has failed!
      	at org.apache.camel.kafkaconnector.CamelSinkTask.put(CamelSinkTask.java:186)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
      	... 10 more
      Caused by: org.apache.camel.ResolveEndpointFailedException: Failed to resolve endpoint: file:///tmp/sink?appendChars=
      &fileExist=append&fileName=output.txt due to: Illegal character in query at index 29: file:///tmp/sink?appendChars=
      &fileExist=append&fileName=output.txt
      	at org.apache.camel.impl.engine.AbstractCamelContext.doGetEndpoint(AbstractCamelContext.java:912)
      	at org.apache.camel.impl.engine.AbstractCamelContext.getEndpoint(AbstractCamelContext.java:798)
      	at org.apache.camel.support.CamelContextHelper.getMandatoryEndpoint(CamelContextHelper.java:73)
      	at org.apache.camel.support.ExchangeHelper.resolveEndpoint(ExchangeHelper.java:112)
      	at org.apache.camel.support.ExchangeHelper.resolveEndpoint(ExchangeHelper.java:91)
      	at org.apache.camel.processor.SendDynamicProcessor.resolveEndpoint(SendDynamicProcessor.java:294)
      	at org.apache.camel.processor.SendDynamicProcessor.process(SendDynamicProcessor.java:155)
      	at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:395)
      	at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
      	at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:60)
      	at org.apache.camel.processor.Pipeline.process(Pipeline.java:147)
      	at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:312)
      	at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:84)
      	at org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:218)
      	at org.apache.camel.impl.engine.SharedCamelInternalProcessor$1.process(SharedCamelInternalProcessor.java:112)
      	at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
      	at org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:109)
      	at org.apache.camel.support.cache.DefaultProducerCache.send(DefaultProducerCache.java:189)
      	at org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:176)
      	at org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:148)
      	at org.apache.camel.kafkaconnector.CamelSinkTask.put(CamelSinkTask.java:181)
      	... 11 more
      Caused by: java.net.URISyntaxException: Illegal character in query at index 29: file:///tmp/sink?appendChars=
      &fileExist=append&fileName=output.txt
      	at java.base/java.net.URI$Parser.fail(URI.java:2915)
      	at java.base/java.net.URI$Parser.checkChars(URI.java:3086)
      	at java.base/java.net.URI$Parser.parseHierarchical(URI.java:3174)
      	at java.base/java.net.URI$Parser.parse(URI.java:3116)
      	at java.base/java.net.URI.<init>(URI.java:600)
      	at org.apache.camel.support.DefaultComponent.createEndpoint(DefaultComponent.java:96)
      	at org.apache.camel.impl.engine.AbstractCamelContext.doGetEndpoint(AbstractCamelContext.java:878)
      	... 31 more
      

      From the error message you can see that the "\n" is creating a new line there

              acosenti Andrea Cosentino
              avano@redhat.com Andrej Vano
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

                Created:
                Updated:
                Resolved: