Uploaded image for project: 'AMQ Broker'
  1. AMQ Broker
  2. ENTMQBR-4813

AsynchronousCloseException with large messages and multiple C++ subscribers.

    XMLWordPrintable

Details

    • False
    • False
    • Undefined
    • Hide
      • Download AMQ-standalone_test_program.zip.
      • topic-receive.cpp is used as receiver to the topic.
      • Run AMQ 7.8 instance. Publisher and Subscribers are also running in same host as broker.
      • Publisher is using core protocol while C++ subscribers are amqp.
      • Compile topic-receive.cpp
      $ g++ topic-receive.cpp -lqpid-proton-cpp -o topic-receive.out 
      • From two different terminal run two subscribers
      $ ./topic-receive.out 
      RECEIVE: Opened receiver for source address 'exampleTopic' 
      • We should get  two subscribers to address exampleTopic.
      $ ./artemis queue stat|grep exampleTopic
      |0e92a161-75a1-4056-82ea-1fd36d13c31c|exampleTopic             |1              |0             |0              |0                |0              |0               |MULTICAST    |
      |d204890a-ef13-4acc-8544-102d94c04911|exampleTopic             |1              |0             |0              |0                |0              |0               |MULTICAST    |
      
      •  Run Publisher.java from AMQ-standalone_test_program with arguments "admin localhost". I used eclipse IDE to run this Java class.
      • Once Publisher Class is running, from Eclipse Console, excute
      send L 100 200000 
      • Check AMQ logs
      2021-03-22 21:33:38,012 WARN  [org.apache.activemq.artemis.core.server] AMQ222151: removing consumer which did not handle a message, consumer=ServerConsumerImpl [id=0, filter=null, binding=LocalQueueBinding [address=exampleTopic, queue=QueueImpl[name=d204890a-ef13-4acc-8544-102d94c04911, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=ca8ca5d3-88c2-11eb-84f3-b0fc366fae1f], temp=true]@10b84dad, filter=null, name=d204890a-ef13-4acc-8544-102d94c04911, clusterName=d204890a-ef13-4acc-8544-102d94c04911ca8ca5d3-88c2-11eb-84f3-b0fc366fae1f]], message=Reference[2147485570]:NON-RELIABLE:LargeServerMessage[messageID=2147485570,durable=false,userID=29b4ce8f-8b28-11eb-9946-b0fc366fae1f,priority=4, timestamp=Mon Mar 22 21:33:37 IST 2021,expiration=Mon Mar 22 22:33:37 IST 2021, durable=false, address=exampleTopic, properties=TypedProperties[__AMQ_CID=admin,_AMQ_ROUTING_TYPE=0,MessageLoc=mid,_AMQ_LARGE_SIZE=200038]]@1790183926: java.lang.RuntimeException: java.nio.channels.AsynchronousCloseException2021-03-22 21:33:38,012 WARN  [org.apache.activemq.artemis.core.server] AMQ222151: removing consumer which did not handle a message, consumer=ServerConsumerImpl [id=0, filter=null, binding=LocalQueueBinding [address=exampleTopic, queue=QueueImpl[name=d204890a-ef13-4acc-8544-102d94c04911, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=ca8ca5d3-88c2-11eb-84f3-b0fc366fae1f], temp=true]@10b84dad, filter=null, name=d204890a-ef13-4acc-8544-102d94c04911, clusterName=d204890a-ef13-4acc-8544-102d94c04911ca8ca5d3-88c2-11eb-84f3-b0fc366fae1f]], message=Reference[2147485570]:NON-RELIABLE:LargeServerMessage[messageID=2147485570,durable=false,userID=29b4ce8f-8b28-11eb-9946-b0fc366fae1f,priority=4, timestamp=Mon Mar 22 21:33:37 IST 2021,expiration=Mon Mar 22 22:33:37 IST 2021, durable=false, address=exampleTopic, properties=TypedProperties[__AMQ_CID=admin,_AMQ_ROUTING_TYPE=0,MessageLoc=mid,_AMQ_LARGE_SIZE=200038]]@1790183926: java.lang.RuntimeException: java.nio.channels.AsynchronousCloseException at org.apache.activemq.artemis.core.persistence.impl.journal.LargeBody.getBodyBufferSize(LargeBody.java:294) [artemis-server-2.13.0.redhat-00006.jar:2.13.0.redhat-00006] at org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl.getBodyBufferSize(LargeServerMessageImpl.java:223) [artemis-server-2.13.0.redhat-00006.jar:2.13.0.redhat-00006] at org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage.getBodyLength(ServerJMSBytesMessage.java:57) [artemis-amqp-protocol-2.13.0.redhat-00006.jar:2.13.0.redhat-00006] at org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter.getBinaryFromMessageBody(CoreAmqpConverter.java:525) [artemis-amqp-protocol-2.13.0.redhat-00006.jar:2.13.0.redhat-00006] at org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter.convertBody(CoreAmqpConverter.java:408) [artemis-amqp-protocol-2.13.0.redhat-00006.jar:2.13.0.redhat-00006] at org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter.fromCore(CoreAmqpConverter.java:143) [artemis-amqp-protocol-2.13.0.redhat-00006.jar:2.13.0.redhat-00006] at org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter.checkAMQP(CoreAmqpConverter.java:117) [artemis-amqp-protocol-2.13.0.redhat-00006.jar:2.13.0.redhat-00006] at org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext.executeDelivery(ProtonServerSenderContext.java:820) [artemis-amqp-protocol-2.13.0.redhat-00006.jar:2.13.0.redhat-00006] at org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl.run(MessageReferenceImpl.java:132) [artemis-server-2.13.0.redhat-00006.jar:2.13.0.redhat-00006] at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) [netty-all-4.1.48.Final-redhat-00001.jar:4.1.48.Final-redhat-00001] at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) [netty-all-4.1.48.Final-redhat-00001.jar:4.1.48.Final-redhat-00001] at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) [netty-all-4.1.48.Final-redhat-00001.jar:4.1.48.Final-redhat-00001] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [netty-all-4.1.48.Final-redhat-00001.jar:4.1.48.Final-redhat-00001] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-all-4.1.48.Final-redhat-00001.jar:4.1.48.Final-redhat-00001] at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118) [artemis-commons-2.13.0.redhat-00006.jar:2.13.0.redhat-00006]Caused by: java.nio.channels.AsynchronousCloseException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205) [rt.jar:1.8.0_272] at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:315) [rt.jar:1.8.0_272] at org.apache.activemq.artemis.core.io.nio.NIOSequentialFile.size(NIOSequentialFile.java:335) [artemis-journal-2.13.0.redhat-00006.jar:2.13.0.redhat-00006] at org.apache.activemq.artemis.core.persistence.impl.journal.LargeBody.getBodyBufferSize(LargeBody.java:286) [artemis-server-2.13.0.redhat-00006.jar:2.13.0.redhat-00006] ... 14 more 

       

      • In queue stat output we find that one subscriber is closed abruptly.
      [chandrashekhar@localhost bin]$ ./artemis queue stat|grep exampleTopic
      |0e92a161-75a1-4056-82ea-1fd36d13c31c|exampleTopic             |1              |0             |100            |0                |100            |0               |MULTICAST    |
      |d204890a-ef13-4acc-8544-102d94c04911|exampleTopic             |0              |96            |100            |1                |5              |0               |MULTICAST    |
      [chandrashekhar@localhost bin]$ 
       

       

      Show
      Download AMQ-standalone_test_program.zip. topic-receive.cpp is used as receiver to the topic. Run AMQ 7.8 instance. Publisher and Subscribers are also running in same host as broker. Publisher is using core protocol while C++ subscribers are amqp. Compile topic-receive.cpp $ g++ topic-receive.cpp -lqpid-proton-cpp -o topic-receive.out From two different terminal run two subscribers $ ./topic-receive.out RECEIVE: Opened receiver for source address 'exampleTopic' We should get  two subscribers to address exampleTopic. $ ./artemis queue stat|grep exampleTopic |0e92a161-75a1-4056-82ea-1fd36d13c31c|exampleTopic |1 |0 |0 |0 |0 |0 |MULTICAST | |d204890a-ef13-4acc-8544-102d94c04911|exampleTopic |1 |0 |0 |0 |0 |0 |MULTICAST |  Run Publisher.java from AMQ-standalone_test_program with arguments "admin localhost". I used eclipse IDE to run this Java class. Once Publisher Class is running, from Eclipse Console, excute send L 100 200000 Check AMQ logs 2021-03-22 21:33:38,012 WARN  [org.apache.activemq.artemis.core.server] AMQ222151: removing consumer which did not handle a message, consumer=ServerConsumerImpl [id=0, filter= null , binding=LocalQueueBinding [address=exampleTopic, queue=QueueImpl[name=d204890a-ef13-4acc-8544-102d94c04911, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=ca8ca5d3-88c2-11eb-84f3-b0fc366fae1f], temp= true ]@10b84dad, filter= null , name=d204890a-ef13-4acc-8544-102d94c04911, clusterName=d204890a-ef13-4acc-8544-102d94c04911ca8ca5d3-88c2-11eb-84f3-b0fc366fae1f]], message=Reference[2147485570]:NON-RELIABLE:LargeServerMessage[messageID=2147485570,durable= false ,userID=29b4ce8f-8b28-11eb-9946-b0fc366fae1f,priority=4, timestamp=Mon Mar 22 21:33:37 IST 2021,expiration=Mon Mar 22 22:33:37 IST 2021, durable= false , address=exampleTopic, properties=TypedProperties[__AMQ_CID=admin,_AMQ_ROUTING_TYPE=0,MessageLoc=mid,_AMQ_LARGE_SIZE=200038]]@1790183926: java.lang.RuntimeException: java.nio.channels.AsynchronousCloseException2021-03-22 21:33:38,012 WARN  [org.apache.activemq.artemis.core.server] AMQ222151: removing consumer which did not handle a message, consumer=ServerConsumerImpl [id=0, filter= null , binding=LocalQueueBinding [address=exampleTopic, queue=QueueImpl[name=d204890a-ef13-4acc-8544-102d94c04911, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=ca8ca5d3-88c2-11eb-84f3-b0fc366fae1f], temp= true ]@10b84dad, filter= null , name=d204890a-ef13-4acc-8544-102d94c04911, clusterName=d204890a-ef13-4acc-8544-102d94c04911ca8ca5d3-88c2-11eb-84f3-b0fc366fae1f]], message=Reference[2147485570]:NON-RELIABLE:LargeServerMessage[messageID=2147485570,durable= false ,userID=29b4ce8f-8b28-11eb-9946-b0fc366fae1f,priority=4, timestamp=Mon Mar 22 21:33:37 IST 2021,expiration=Mon Mar 22 22:33:37 IST 2021, durable= false , address=exampleTopic, properties=TypedProperties[__AMQ_CID=admin,_AMQ_ROUTING_TYPE=0,MessageLoc=mid,_AMQ_LARGE_SIZE=200038]]@1790183926: java.lang.RuntimeException: java.nio.channels.AsynchronousCloseException at org.apache.activemq.artemis.core.persistence.impl.journal.LargeBody.getBodyBufferSize(LargeBody.java:294) [artemis-server-2.13.0.redhat-00006.jar:2.13.0.redhat-00006] at org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl.getBodyBufferSize(LargeServerMessageImpl.java:223) [artemis-server-2.13.0.redhat-00006.jar:2.13.0.redhat-00006] at org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage.getBodyLength(ServerJMSBytesMessage.java:57) [artemis-amqp-protocol-2.13.0.redhat-00006.jar:2.13.0.redhat-00006] at org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter.getBinaryFromMessageBody(CoreAmqpConverter.java:525) [artemis-amqp-protocol-2.13.0.redhat-00006.jar:2.13.0.redhat-00006] at org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter.convertBody(CoreAmqpConverter.java:408) [artemis-amqp-protocol-2.13.0.redhat-00006.jar:2.13.0.redhat-00006] at org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter.fromCore(CoreAmqpConverter.java:143) [artemis-amqp-protocol-2.13.0.redhat-00006.jar:2.13.0.redhat-00006] at org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter.checkAMQP(CoreAmqpConverter.java:117) [artemis-amqp-protocol-2.13.0.redhat-00006.jar:2.13.0.redhat-00006] at org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext.executeDelivery(ProtonServerSenderContext.java:820) [artemis-amqp-protocol-2.13.0.redhat-00006.jar:2.13.0.redhat-00006] at org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl.run(MessageReferenceImpl.java:132) [artemis-server-2.13.0.redhat-00006.jar:2.13.0.redhat-00006] at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) [netty-all-4.1.48.Final-redhat-00001.jar:4.1.48.Final-redhat-00001] at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) [netty-all-4.1.48.Final-redhat-00001.jar:4.1.48.Final-redhat-00001] at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) [netty-all-4.1.48.Final-redhat-00001.jar:4.1.48.Final-redhat-00001] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [netty-all-4.1.48.Final-redhat-00001.jar:4.1.48.Final-redhat-00001] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-all-4.1.48.Final-redhat-00001.jar:4.1.48.Final-redhat-00001] at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118) [artemis-commons-2.13.0.redhat-00006.jar:2.13.0.redhat-00006]Caused by: java.nio.channels.AsynchronousCloseException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205) [rt.jar:1.8.0_272] at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:315) [rt.jar:1.8.0_272] at org.apache.activemq.artemis.core.io.nio.NIOSequentialFile.size(NIOSequentialFile.java:335) [artemis-journal-2.13.0.redhat-00006.jar:2.13.0.redhat-00006] at org.apache.activemq.artemis.core.persistence.impl.journal.LargeBody.getBodyBufferSize(LargeBody.java:286) [artemis-server-2.13.0.redhat-00006.jar:2.13.0.redhat-00006] ... 14 more   In queue stat output we find that one subscriber is closed abruptly. [chandrashekhar@localhost bin]$ ./artemis queue stat|grep exampleTopic |0e92a161-75a1-4056-82ea-1fd36d13c31c|exampleTopic |1 |0 |100 |0 |100 |0 |MULTICAST | |d204890a-ef13-4acc-8544-102d94c04911|exampleTopic |0 |96 |100 |1 |5 |0 |MULTICAST | [chandrashekhar@localhost bin]$  

    Attachments

      Issue Links

        Activity

          People

            csuconic@redhat.com Clebert Suconic
            rhn-support-cpandey Chandra Shekhar Pandey (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: