-
Bug
-
Resolution: Done
-
Major
-
None
-
None
-
False
-
None
-
False
Switching from Artemis broker 2.20 to 2.21 we experienced an issue about message delivering.
It looks like MQTT devices don’t receive the messages matching their subscriptions.
The test code (***) run with an Artemis broker 2.19 or 2.20 as target (wildcard addresses modified as (**)) produces the correct output:
waiting for messages Client: test-client-1 - Delivery completed: $EDC/kapua-sys/test-client-1/MQTT/BIRTH Client: test-client-admin - Message arrived on topic: $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test Client: test-client-1 - Message arrived on topic: $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test === Client: test-client-2 - Delivery completed: $EDC/kapua-sys/test-client-2/MQTT/BIRTH Client: test-client-2 - Message arrived on topic: $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test Client: test-client-admin - Message arrived on topic: $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test === Client: test-client-admin - Delivery completed: $EDC/kapua-sys/test-client-1/MQTT/APPS Client: test-client-admin - Message arrived on topic: $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test Client: test-client-1 - Message arrived on topic: $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test …
With broker 2.21 or 2.22 (configuration changes described in (*) ) as target the output is:
waiting for messages === Client: test-client-1 - Delivery completed: $EDC/kapua-sys/test-client-1/MQTT/BIRTH === Client: test-client-2 - Delivery completed: $EDC/kapua-sys/test-client-2/MQTT/BIRTH === Client: test-client-admin - Delivery completed: $EDC/kapua-sys/test-client-1/MQTT/APPS …
So the broker doesn’t send any message to the clients. May be we missed to configure something needed by 2.21 versions onward?
*:
The 2.21 and 2.22-SNAPSHOT default broker.xml configuration file has changed in this way:
- set the broker name (message-broker)
- removed double connector bound to 1883 (the broker with the default configuration crashed)
- allow only MQTT protocol for connector bound to 1883 port
- removed broadcast connector and configuration
- added custom wildcard configuration (**)
**:
<wildcard-addresses> <routing-enabled>true</routing-enabled> <delimiter>/</delimiter> <any-words>#</any-words> <single-word>+</single-word> </wildcard-addresses>
***:
/******************************************************************************* * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others * * This program and the accompanying materials are made * available under the terms of the Eclipse Public License 2.0 * which is available at https://www.eclipse.org/legal/epl-2.0/ * * SPDX-License-Identifier: EPL-2.0 * * Contributors: * Eurotech - initial API and implementation *******************************************************************************/ package org.eclipse.kapua.qa.common; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TestMqttClient { protected static Logger logger = LoggerFactory.getLogger(TestMqttClient.class); private static final String SERVER_URI = "tcp://localhost:1883"; private static final String CLIENT_ID_ADMIN = "test-client-admin"; private static final String CLIENT_ID_1 = "test-client-1"; private static final String CLIENT_ID_2 = "test-client-2"; private static final String USERNAME = "kapua-broker"; private static final String PASSWORD = "kapua-password"; private static final String USERNAME_ADMIN = "kapua-sys"; private static final String PASSWORD_ADMIN = "kapua-password"; private TestMqttClient() { } public static void main(String argv[]) throws MqttException { MqttClient clientAdmin = new MqttClient(SERVER_URI, CLIENT_ID_ADMIN, new MemoryPersistence()); MqttClient client1 = new MqttClient(SERVER_URI, CLIENT_ID_1, new MemoryPersistence()); MqttClient client2 = new MqttClient(SERVER_URI, CLIENT_ID_2, new MemoryPersistence()); clientAdmin.setCallback(new TestMqttClientCallback(CLIENT_ID_ADMIN)); client1.setCallback(new TestMqttClientCallback(CLIENT_ID_1)); client2.setCallback(new TestMqttClientCallback(CLIENT_ID_2)); clientAdmin.connect(getMqttConnectOptions(USERNAME_ADMIN, PASSWORD_ADMIN)); client1.connect(getMqttConnectOptions(USERNAME, PASSWORD)); client2.connect(getMqttConnectOptions(USERNAME, PASSWORD)); System.out.println("waiting for messages"); client1.subscribe("$EDC/kapua-sys/" + CLIENT_ID_1 + "/#"); client2.subscribe("$EDC/kapua-sys/" + CLIENT_ID_2 + "/#"); clientAdmin.subscribe("#"); client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/BIRTH", new MqttMessage("test".getBytes())); System.out.println("==="); client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/BIRTH", new MqttMessage("test".getBytes())); System.out.println("==="); clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/APPS", new MqttMessage("test".getBytes())); System.out.println("==="); clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/APPS", new MqttMessage("test".getBytes())); System.out.println("==="); client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC", new MqttMessage("test".getBytes())); System.out.println("==="); client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC", new MqttMessage("test".getBytes())); System.out.println("==="); clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC", new MqttMessage("test".getBytes())); System.out.println("==="); clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC", new MqttMessage("test".getBytes())); System.out.println("==="); clientAdmin.disconnect(); client1.disconnect(); client2.disconnect(); } private static MqttConnectOptions getMqttConnectOptions(String username, String password) { MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); options.setUserName(username); options.setPassword(password.toCharArray()); return options; } } class TestMqttClientCallback implements MqttCallback { private String clientId; TestMqttClientCallback(String clientId) { this.clientId = clientId; } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("Client: " + clientId + " - Message arrived on topic: " + topic + " - message: " + new String(message.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("Client: " + clientId + " - Delivery completed: " + token.getTopics()[0]); } @Override public void connectionLost(Throwable cause) { System.out.println("Client: " + clientId + " - Connection lost: " + cause.getMessage()); cause.printStackTrace(); } }