Back to TILs

C++ mosquitto_01

Date: 2023-02-04Last modified: 2024-11-02
Target Description
paho-mqtt3a asynchronous, no encryption
paho-mqtt3as asynchronous with SSL/TLS support
paho-mqtt3c synchronous, no encryption
paho-mqtt3cs synchronous with SSL/TLS support
paho-mqtt3a-static asynchronous, no encryption, static linkage
paho-mqtt3as-static asynchronous with SSL/TLS support, static linkage
paho-mqtt3c-static synchronous, no encryption, static linkage
paho-mqtt3cs-static synchronous with SSL/TLS support, static linkage
void publishMessages() {
  int rc;
  struct mosquitto *mosq;

  this_thread::sleep_for(chrono::seconds(1));

  // called on subscribeTopic function
  // mosquitto_lib_init();

  mosq = mosquitto_new("publisher-test", true, NULL);

  rc = mosquitto_connect(mosq, "localhost", 1883, 60);
  if (rc != 0) {
    fmt::print("PUB: Client could not connect to broker! Error Code: {}\n", rc);
    mosquitto_destroy(mosq);
    return;
  }
  fmt::print("PUB: We are now connected to the broker!\n");

  for (int i = 0; i < 5; ++i) {
    auto msg = fmt::format("Hello from C++ #{}", i);
    fmt::print("PUB: mosquitto_publish #{}\n", i);
    mosquitto_publish(mosq, NULL, test_topic, msg.size(), msg.data(), 0, false);
    this_thread::sleep_for(chrono::seconds(1));
  }

  this_thread::sleep_for(chrono::seconds(2));
  fmt::print("PUB: mosquitto_disconnect\n");
  mosquitto_disconnect(mosq);
  fmt::print("PUB: mosquitto_destroy\n");
  mosquitto_destroy(mosq);
  // mosquitto_lib_cleanup();
}
void SubOnConnect(struct mosquitto *mosq, void *obj, int rc) {
  fmt::print("SUB: OnConnect: ID: {} RC: {}\n", *(int *)obj, rc);
  if (rc) {
    fmt::print("SUB: OnConnect: Error with result code:", rc);
    exit(-1);
  }
  mosquitto_subscribe(mosq, NULL, test_topic, 0);
}

void SubOnMessage(struct mosquitto *mosq, void *obj,
               const struct mosquitto_message *msg) {
  fmt::print("SUB: OnMessage: New message with topic {}: {}\n", msg->topic,
             (char *)msg->payload);
}
void subscribeTopic() {
  int rc, id = 12;

  mosquitto_lib_init();

  struct mosquitto *mosq;

  mosq = mosquitto_new("subscribe-test", true, &id);
  mosquitto_connect_callback_set(mosq, SubOnConnect);
  mosquitto_message_callback_set(mosq, SubOnMessage);

  rc = mosquitto_connect(mosq, "localhost", 1883, 10);
  if (rc) {
    fmt::print("Could not connect to Broker with return code {}\n", rc);
    return;
  }

  fmt::print("SUB: mosquitto_loop_start\n");
  mosquitto_loop_start(mosq);

  fmt::print("SUB: sleep for 6s\n");
  this_thread::sleep_for(chrono::seconds(6));

  fmt::print("SUB: mosquitto_loop_stop\n");
  mosquitto_loop_stop(mosq, true);

  fmt::print("SUB: mosquitto_disconnect\n");
  mosquitto_disconnect(mosq);

  fmt::print("SUB: mosquitto_destroy\n");
  mosquitto_destroy(mosq);

  fmt::print("SUB: mosquitto_lib_cleanup\n");
  mosquitto_lib_cleanup();
}

  thread tSub(subscribeTopic);
  thread tPub(publishMessages);

  tSub.join();
  tPub.join();

Table of contents

Possible output

Could not connect to Broker with return code 14
PUB: Client could not connect to broker! Error Code: 14

References