Uploaded image for project: 'AMQ Broker'
  1. AMQ Broker
  2. ENTMQBR-6596

Not getting messages on MQTT subscriptions with $

XMLWordPrintable

      Switching from Artemis broker 2.20 to 2.21 we experienced an issue about message delivering.
      It looks like MQTT devices don’t receive the messages matching their subscriptions.

      The test code (***) run with an Artemis broker 2.19 or 2.20 as target (wildcard addresses modified as (**)) produces the correct output:

      waiting for messages
      Client: test-client-1 - Delivery completed: $EDC/kapua-sys/test-client-1/MQTT/BIRTH
      Client: test-client-admin - Message arrived on topic: $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
      Client: test-client-1 - Message arrived on topic: $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
      ===
      Client: test-client-2 - Delivery completed: $EDC/kapua-sys/test-client-2/MQTT/BIRTH
      Client: test-client-2 - Message arrived on topic: $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
      Client: test-client-admin - Message arrived on topic: $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
      ===
      Client: test-client-admin - Delivery completed: $EDC/kapua-sys/test-client-1/MQTT/APPS
      Client: test-client-admin - Message arrived on topic: $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
      Client: test-client-1 - Message arrived on topic: $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
      …

      With broker 2.21 or 2.22 (configuration changes described in (*) ) as target the output is:

      waiting for messages
      ===
      Client: test-client-1 - Delivery completed: $EDC/kapua-sys/test-client-1/MQTT/BIRTH
      ===
      Client: test-client-2 - Delivery completed: $EDC/kapua-sys/test-client-2/MQTT/BIRTH
      ===
      Client: test-client-admin - Delivery completed: $EDC/kapua-sys/test-client-1/MQTT/APPS
      …

      So the broker doesn’t send any message to the clients. May be we missed to configure something needed by 2.21 versions onward?

      *:
      The 2.21 and 2.22-SNAPSHOT default broker.xml configuration file has changed in this way:

      • set the broker name (message-broker)
      • removed double connector bound to 1883 (the broker with the default configuration crashed)
      • allow only MQTT protocol for connector bound to 1883 port
      • removed broadcast connector and configuration
      • added custom wildcard configuration (**)

      **:

       <wildcard-addresses>
          <routing-enabled>true</routing-enabled>
          <delimiter>/</delimiter>
          <any-words>#</any-words>
          <single-word>+</single-word>
      </wildcard-addresses>

      ***:

      /*******************************************************************************
      * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others
      *
      * This program and the accompanying materials are made
      * available under the terms of the Eclipse Public License 2.0
      * which is available at https://www.eclipse.org/legal/epl-2.0/
      *
      * SPDX-License-Identifier: EPL-2.0
      *
      * Contributors:
      *     Eurotech - initial API and implementation
      *******************************************************************************/
      package org.eclipse.kapua.qa.common;
      
      import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
      import org.eclipse.paho.client.mqttv3.MqttCallback;
      import org.eclipse.paho.client.mqttv3.MqttClient;
      import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
      import org.eclipse.paho.client.mqttv3.MqttException;
      import org.eclipse.paho.client.mqttv3.MqttMessage;
      import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      public class TestMqttClient {
      
          protected static Logger logger = LoggerFactory.getLogger(TestMqttClient.class);
      
          private static final String SERVER_URI = "tcp://localhost:1883";
          private static final String CLIENT_ID_ADMIN = "test-client-admin";
          private static final String CLIENT_ID_1 = "test-client-1";
          private static final String CLIENT_ID_2 = "test-client-2";
          private static final String USERNAME = "kapua-broker";
          private static final String PASSWORD = "kapua-password";
          private static final String USERNAME_ADMIN = "kapua-sys";
          private static final String PASSWORD_ADMIN = "kapua-password";
      
          private TestMqttClient() {
          }
      
          public static void main(String argv[]) throws MqttException {
              MqttClient clientAdmin = new MqttClient(SERVER_URI, CLIENT_ID_ADMIN, new MemoryPersistence());
              MqttClient client1 = new MqttClient(SERVER_URI, CLIENT_ID_1, new MemoryPersistence());
              MqttClient client2 = new MqttClient(SERVER_URI, CLIENT_ID_2, new MemoryPersistence());
              clientAdmin.setCallback(new TestMqttClientCallback(CLIENT_ID_ADMIN));
              client1.setCallback(new TestMqttClientCallback(CLIENT_ID_1));
              client2.setCallback(new TestMqttClientCallback(CLIENT_ID_2));
      
              clientAdmin.connect(getMqttConnectOptions(USERNAME_ADMIN, PASSWORD_ADMIN));
              client1.connect(getMqttConnectOptions(USERNAME, PASSWORD));
              client2.connect(getMqttConnectOptions(USERNAME, PASSWORD));
              System.out.println("waiting for messages");
              client1.subscribe("$EDC/kapua-sys/" + CLIENT_ID_1 + "/#");
              client2.subscribe("$EDC/kapua-sys/" + CLIENT_ID_2 + "/#");
              clientAdmin.subscribe("#");
      
              client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/BIRTH", new MqttMessage("test".getBytes()));
              System.out.println("===");
              client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/BIRTH", new MqttMessage("test".getBytes()));
              System.out.println("===");
              clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/APPS", new MqttMessage("test".getBytes()));
              System.out.println("===");
              clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/APPS", new MqttMessage("test".getBytes()));
              System.out.println("===");
      
              client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC", new MqttMessage("test".getBytes()));
              System.out.println("===");
              client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC", new MqttMessage("test".getBytes()));
              System.out.println("===");
              clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC", new MqttMessage("test".getBytes()));
              System.out.println("===");
              clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC", new MqttMessage("test".getBytes()));
              System.out.println("===");
      
              clientAdmin.disconnect();
              client1.disconnect();
              client2.disconnect();
          }
      
          private static MqttConnectOptions getMqttConnectOptions(String username, String password) {
              MqttConnectOptions options = new MqttConnectOptions();
              options.setCleanSession(true);
              options.setUserName(username);
              options.setPassword(password.toCharArray());
              return options;
          }
      }
      
      class TestMqttClientCallback implements MqttCallback {
      
          private String clientId;
      
          TestMqttClientCallback(String clientId) {
              this.clientId = clientId;
          }
      
          @Override
          public void messageArrived(String topic, MqttMessage message) throws Exception {
              System.out.println("Client: " + clientId + " - Message arrived on topic: " + topic + " - message: " + new String(message.getPayload()));
          }
      
          @Override
          public void deliveryComplete(IMqttDeliveryToken token) {
              System.out.println("Client: " + clientId + " - Delivery completed: " + token.getTopics()[0]);
          }
      
          @Override
          public void connectionLost(Throwable cause) {
              System.out.println("Client: " + clientId + " - Connection lost: " + cause.getMessage());
              cause.printStackTrace();
          }
      }

              rhn-support-jbertram Justin Bertram
              rh-messaging-ci Messaging CI
              Oleg Sushchenko Oleg Sushchenko
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

                Created:
                Updated:
                Resolved: