C++ mosquitto_01
Date: 2023-02-04Last modified: 2024-11-02
Target | Description |
---|---|
paho-mqtt3a | asynchronous, no encryption |
paho-mqtt3as | asynchronous with SSL/TLS support |
paho-mqtt3c | synchronous, no encryption |
paho-mqtt3cs | synchronous with SSL/TLS support |
paho-mqtt3a-static | asynchronous, no encryption, static linkage |
paho-mqtt3as-static | asynchronous with SSL/TLS support, static linkage |
paho-mqtt3c-static | synchronous, no encryption, static linkage |
paho-mqtt3cs-static | synchronous with SSL/TLS support, static linkage |
void publishMessages() {
int rc;
struct mosquitto *mosq;
this_thread::sleep_for(chrono::seconds(1));
// called on subscribeTopic function
// mosquitto_lib_init();
mosq = mosquitto_new("publisher-test", true, NULL);
rc = mosquitto_connect(mosq, "localhost", 1883, 60);
if (rc != 0) {
fmt::print("PUB: Client could not connect to broker! Error Code: {}\n", rc);
mosquitto_destroy(mosq);
return;
}
fmt::print("PUB: We are now connected to the broker!\n");
for (int i = 0; i < 5; ++i) {
auto msg = fmt::format("Hello from C++ #{}", i);
fmt::print("PUB: mosquitto_publish #{}\n", i);
mosquitto_publish(mosq, NULL, test_topic, msg.size(), msg.data(), 0, false);
this_thread::sleep_for(chrono::seconds(1));
}
this_thread::sleep_for(chrono::seconds(2));
fmt::print("PUB: mosquitto_disconnect\n");
mosquitto_disconnect(mosq);
fmt::print("PUB: mosquitto_destroy\n");
mosquitto_destroy(mosq);
// mosquitto_lib_cleanup();
}
void SubOnConnect(struct mosquitto *mosq, void *obj, int rc) {
fmt::print("SUB: OnConnect: ID: {} RC: {}\n", *(int *)obj, rc);
if (rc) {
fmt::print("SUB: OnConnect: Error with result code:", rc);
exit(-1);
}
mosquitto_subscribe(mosq, NULL, test_topic, 0);
}
void SubOnMessage(struct mosquitto *mosq, void *obj,
const struct mosquitto_message *msg) {
fmt::print("SUB: OnMessage: New message with topic {}: {}\n", msg->topic,
(char *)msg->payload);
}
void subscribeTopic() {
int rc, id = 12;
mosquitto_lib_init();
struct mosquitto *mosq;
mosq = mosquitto_new("subscribe-test", true, &id);
mosquitto_connect_callback_set(mosq, SubOnConnect);
mosquitto_message_callback_set(mosq, SubOnMessage);
rc = mosquitto_connect(mosq, "localhost", 1883, 10);
if (rc) {
fmt::print("Could not connect to Broker with return code {}\n", rc);
return;
}
fmt::print("SUB: mosquitto_loop_start\n");
mosquitto_loop_start(mosq);
fmt::print("SUB: sleep for 6s\n");
this_thread::sleep_for(chrono::seconds(6));
fmt::print("SUB: mosquitto_loop_stop\n");
mosquitto_loop_stop(mosq, true);
fmt::print("SUB: mosquitto_disconnect\n");
mosquitto_disconnect(mosq);
fmt::print("SUB: mosquitto_destroy\n");
mosquitto_destroy(mosq);
fmt::print("SUB: mosquitto_lib_cleanup\n");
mosquitto_lib_cleanup();
}
thread tSub(subscribeTopic);
thread tPub(publishMessages);
tSub.join();
tPub.join();
Table of contents
Possible output
Could not connect to Broker with return code 14
PUB: Client could not connect to broker! Error Code: 14