Uploaded image for project: 'JBoss Enterprise Application Platform 4 and 5'
  1. JBoss Enterprise Application Platform 4 and 5
  2. JBPAPP-5707

Cope with moving messages that have clashing duplicateIDs

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Major Major
    • EAP 5.1.0 Post Release
    • EAP 5.1.0 Post Release
    • HornetQ
    • None
    • Hide

      Define these in hornetq-jms.xml:

      <queue name="A">
      <entry name="A" />
      </queue>

      <queue name="B">
      <entry name="B" />
      </queue>

      Execute this code:

      public static void main(String[] args) throws Exception {
      Properties props = new Properties();
      props.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
      props.put("java.naming.provider.url", "127.0.0.1:1099");
      props.put("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces");

      InitialContext initialcontext = new InitialContext(props);
      QueueConnectionFactory jbosscf = (QueueConnectionFactory) initialcontext.lookup("ConnectionFactory");
      Queue a = (Queue) initialcontext.lookup("A");
      Queue b = (Queue) initialcontext.lookup("B");

      QueueConnection connection = jbosscf.createQueueConnection();
      connection.start();
      QueueSession session;

      // Send the same message to both "A" and "B"
      session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
      String text = "coucou at " + new Date();
      TextMessage message = session.createTextMessage(text);
      message.setStringProperty("_HQ_DUPL_ID", text);
      MessageProducer producer;
      producer = session.createProducer(a);
      producer.send(message);
      producer.close();
      producer = session.createProducer(b);
      producer.send(message);
      producer.close();
      QueueReceiver receiver;
      receiver = session.createReceiver(a);
      message = (TextMessage) receiver.receive(1000);
      // Assert.assertEquals(text, message.getText());
      receiver.close();
      session.close();
      connection.close();

      String moveMode = "JMS";
      // String moveMode = "MBEAN";

      if (moveMode.equals("JMS"))

      { connection = jbosscf.createQueueConnection(); connection.start(); session = connection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(a); receiver = session.createReceiver(b); message = (TextMessage) receiver.receive(1000); // Assert.assertEquals(text, message.getText()); producer.send(message); session.commit(); session.close(); connection.close(); }

      else if (moveMode.equals("MBEAN")) {
      JMXServiceURL jmxURL = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:5004/jmxrmi");
      Hashtable<String, Object> h = new Hashtable<String, Object>();
      String[] creds = new String[]

      { "admin", "admin" }

      ;
      h.put(JMXConnector.CREDENTIALS, creds);
      JMXConnector connector = JMXConnectorFactory.connect(jmxURL, h);
      MBeanServerConnection serverConnection = connector.getMBeanServerConnection();
      ObjectName on = new ObjectName("org.hornetq:module=JMS,name=\"B\",type=Queue");
      Object[] myargs = new Object[]

      { "", "A" }

      ;
      String[] sig = new String[]

      { "java.lang.String", "java.lang.String" }

      ;
      serverConnection.invoke(on, "moveMessages", myargs, sig);
      }

      // log.info("done");
      }

      Show
      Define these in hornetq-jms.xml: <queue name="A"> <entry name="A" /> </queue> <queue name="B"> <entry name="B" /> </queue> Execute this code: public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory"); props.put("java.naming.provider.url", "127.0.0.1:1099"); props.put("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces"); InitialContext initialcontext = new InitialContext(props); QueueConnectionFactory jbosscf = (QueueConnectionFactory) initialcontext.lookup("ConnectionFactory"); Queue a = (Queue) initialcontext.lookup("A"); Queue b = (Queue) initialcontext.lookup("B"); QueueConnection connection = jbosscf.createQueueConnection(); connection.start(); QueueSession session; // Send the same message to both "A" and "B" session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); String text = "coucou at " + new Date(); TextMessage message = session.createTextMessage(text); message.setStringProperty("_HQ_DUPL_ID", text); MessageProducer producer; producer = session.createProducer(a); producer.send(message); producer.close(); producer = session.createProducer(b); producer.send(message); producer.close(); QueueReceiver receiver; receiver = session.createReceiver(a); message = (TextMessage) receiver.receive(1000); // Assert.assertEquals(text, message.getText()); receiver.close(); session.close(); connection.close(); String moveMode = "JMS"; // String moveMode = "MBEAN"; if (moveMode.equals("JMS")) { connection = jbosscf.createQueueConnection(); connection.start(); session = connection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(a); receiver = session.createReceiver(b); message = (TextMessage) receiver.receive(1000); // Assert.assertEquals(text, message.getText()); producer.send(message); session.commit(); session.close(); connection.close(); } else if (moveMode.equals("MBEAN")) { JMXServiceURL jmxURL = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:5004/jmxrmi"); Hashtable<String, Object> h = new Hashtable<String, Object>(); String[] creds = new String[] { "admin", "admin" } ; h.put(JMXConnector.CREDENTIALS, creds); JMXConnector connector = JMXConnectorFactory.connect(jmxURL, h); MBeanServerConnection serverConnection = connector.getMBeanServerConnection(); ObjectName on = new ObjectName("org.hornetq:module=JMS,name=\"B\",type=Queue"); Object[] myargs = new Object[] { "", "A" } ; String[] sig = new String[] { "java.lang.String", "java.lang.String" } ; serverConnection.invoke(on, "moveMessages", myargs, sig); } // log.info("done"); }

      If you perform the steps to reproduce you will see that the DeliveringCount on queue "B" stays at "1" until the server is restarted.

            csuconic@redhat.com Clebert Suconic
            rhn-support-jbertram Justin Bertram
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

              Created:
              Updated:
              Resolved: