Uploaded image for project: 'Red Hat Fuse'
  1. Red Hat Fuse
  2. ENTESB-15895

camel-hdfs FileNotFoundException when reading files

XMLWordPrintable

    • False
    • False
    • % %
    • Undefined

      I have an HDFS system running inside docker containers and I have multiple files produced in one folder:

      [root@6e13d8ac88bb hadoop]# ./bin/hadoop fs -fs hdfs://namenode:8020 -ls /ckc-test-1614847916206
      Found 10 items
      -rw-r--r--   3 avano supergroup         18 2021-03-04 08:52 /ckc-test-1614847916206/file0
      -rw-r--r--   3 avano supergroup         18 2021-03-04 08:52 /ckc-test-1614847916206/file1
      -rw-r--r--   3 avano supergroup         18 2021-03-04 08:52 /ckc-test-1614847916206/file2
      -rw-r--r--   3 avano supergroup         18 2021-03-04 08:52 /ckc-test-1614847916206/file3
      -rw-r--r--   3 avano supergroup         18 2021-03-04 08:52 /ckc-test-1614847916206/file4
      -rw-r--r--   3 avano supergroup         18 2021-03-04 08:52 /ckc-test-1614847916206/file5
      -rw-r--r--   3 avano supergroup         18 2021-03-04 08:52 /ckc-test-1614847916206/file6
      -rw-r--r--   3 avano supergroup         18 2021-03-04 08:52 /ckc-test-1614847916206/file7
      -rw-r--r--   3 avano supergroup         18 2021-03-04 08:52 /ckc-test-1614847916206/file8
      -rw-r--r--   3 avano supergroup         18 2021-03-04 08:52 /ckc-test-1614847916206/file9
      

      When I use following hdfs source connector:

      connector.class=org.apache.camel.kafkaconnector.hdfs.CamelHdfsSourceConnector                                                                                                                                       
      camel.source.path.path=/ckc-test-1614847916206        
      tasks.max=1         
      topics=mytopic        
      camel.source.path.port=8020         
      name=CamelHDFSSourceConnector         
      camel.source.path.hostName=localhost         
      value.converter=org.apache.kafka.connect.storage.StringConverter         
      key.converter=org.apache.kafka.connect.storage.StringConverter
      

      The files are being processed, but they fail with:

      2021-03-04 10:20:37,314 [t-1614849621904] WARN org.apache.camel.component.hdfs.HdfsConsumer - Consumer Consumer[hdfs://localhost:8020//ckc-test-1614849621904] failed polling endpoint: hdfs://localhost:8020//ckc-test-1614849621904. Will try again at next poll. Caused by: [org.apache.camel.RuntimeCamelException - java.io.FileNotFoundException: /tmp/file0.opened2942709342032564877.hdfs/file0.opened (Not a directory)]
      org.apache.camel.RuntimeCamelException: java.io.FileNotFoundException: /tmp/file0.opened2942709342032564877.hdfs/file0.opened (Not a directory)
          at org.apache.camel.component.hdfs.HdfsNormalFileHandler.createInputStream(HdfsNormalFileHandler.java:95) ~[camel-hdfs-3.7.0.fuse-800004-redhat-00001.jar:3.7.0.fuse-800004-redhat-00001]
          at org.apache.camel.component.hdfs.HdfsNormalFileHandler.createInputStream(HdfsNormalFileHandler.java:36) ~[camel-hdfs-3.7.0.fuse-800004-redhat-00001.jar:3.7.0.fuse-800004-redhat-00001]                       
          at org.apache.camel.component.hdfs.HdfsFileType.createInputStream(HdfsFileType.java:46) ~[camel-hdfs-3.7.0.fuse-800004-redhat-00001.jar:3.7.0.fuse-800004-redhat-00001]
          at org.apache.camel.component.hdfs.HdfsInputStream.createInputStream(HdfsInputStream.java:71) ~[camel-hdfs-3.7.0.fuse-800004-redhat-00001.jar:3.7.0.fuse-800004-redhat-00001]
          at org.apache.camel.component.hdfs.HdfsConsumer.asHdfsFile(HdfsConsumer.java:241) ~[camel-hdfs-3.7.0.fuse-800004-redhat-00001.jar:3.7.0.fuse-800004-redhat-00001]
          at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) ~[?:?]
          at java.util.stream.SliceOps$1$1.accept(SliceOps.java:199) ~[?:?]
          at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) ~[?:?]
          at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) ~[?:?]
          at java.util.Spliterators$ArraySpliterator.tryAdvance(Spliterators.java:958) ~[?:?]
          at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:127) ~[?:?]
          at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:502) ~[?:?]
          at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:488) ~[?:?]
          at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) ~[?:?]
          at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) ~[?:?]
          at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
          at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) ~[?:?]
          at org.apache.camel.component.hdfs.HdfsConsumer.processFileStatuses(HdfsConsumer.java:148) ~[camel-hdfs-3.7.0.fuse-800004-redhat-00001.jar:3.7.0.fuse-800004-redhat-00001]
          at org.apache.camel.component.hdfs.HdfsConsumer.doPoll(HdfsConsumer.java:136) ~[camel-hdfs-3.7.0.fuse-800004-redhat-00001.jar:3.7.0.fuse-800004-redhat-00001]
          at org.apache.camel.component.hdfs.HdfsConsumer.poll(HdfsConsumer.java:110) ~[camel-hdfs-3.7.0.fuse-800004-redhat-00001.jar:3.7.0.fuse-800004-redhat-00001]
          at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:190) [camel-support-3.7.0.fuse-800004-redhat-00001.jar:3.7.0.fuse-800004-redhat-00001]
          at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:107) [camel-support-3.7.0.fuse-800004-redhat-00001.jar:3.7.0.fuse-800004-redhat-00001]
          at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
          at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
          at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
          at java.lang.Thread.run(Thread.java:834) [?:?]
      Caused by: java.io.FileNotFoundException: /tmp/file0.opened2942709342032564877.hdfs/file0.opened (Not a directory)
          at java.io.FileInputStream.open0(Native Method) ~[?:?]
          at java.io.FileInputStream.open(FileInputStream.java:219) ~[?:?]
          at java.io.FileInputStream.<init>(FileInputStream.java:157) ~[?:?]
          at org.apache.camel.component.hdfs.HdfsNormalFileHandler.createInputStream(HdfsNormalFileHandler.java:91) ~[camel-hdfs-3.7.0.fuse-800004-redhat-00001.jar:3.7.0.fuse-800004-redhat-00001]
          ... 27 more
      

      /tmp/file0.opened2942709342032564877.hdfs/file0.opened doesn't exist, but /tmp/file0.opened2942709342032564877.hdfs exists and it's a file that contains the content from the hdfs file.

      I tried in standalone camel as well:
      3.5.0 - same exception
      3.0.0 - works ok
      2.25.3 hdfs2 component - works ok

      from("hdfs:localhost:8020/ckc-test-1614847916206").log("${body}");
      

              Unassigned Unassigned
              avano@redhat.com Andrej Vano
              Votes:
              0 Vote for this issue
              Watchers:
              0 Start watching this issue

                Created:
                Updated:
                Resolved: