Back to TILs

C++ zeromq pubsub multithread inproc

Date: 2020-10-14Last modified: 2023-12-22

Table of contents

#include <future>
#include <iostream>
#include <string>

#include <cppzmq/zmq.hpp>
#include <cppzmq/zmq_addon.hpp>

void PublisherThread( zmq::context_t *ctx )
{
  //  Prepare publisher
  zmq::socket_t publisher( *ctx, zmq::socket_type::pub );
  publisher.bind( "inproc://#1" );

  // Give the subscribers a chance to connect, so they don't lose any messages
  std::this_thread::sleep_for( std::chrono::milliseconds( 20 ) );

  while( true ) {
    //  Write three messages, each with an envelope and content
    publisher.send( zmq::str_buffer( "A" ), zmq::send_flags::sndmore );
    publisher.send( zmq::str_buffer( "Message in A envelope" ) );
    publisher.send( zmq::str_buffer( "B" ), zmq::send_flags::sndmore );
    publisher.send( zmq::str_buffer( "Message in B envelope" ) );
    publisher.send( zmq::str_buffer( "C" ), zmq::send_flags::sndmore );
    publisher.send( zmq::str_buffer( "Message in C envelope" ) );
    std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
  }
}

void SubscriberThread1( zmq::context_t *ctx )
{
  //  Prepare subscriber
  zmq::socket_t subscriber( *ctx, zmq::socket_type::sub );
  subscriber.connect( "inproc://#1" );

  //  Thread2 opens "A" and "B" envelopes
  subscriber.set( zmq::sockopt::subscribe, "A" );
  subscriber.set( zmq::sockopt::subscribe, "B" );

  while( true ) {
    // Receive all parts of the message
    std::vector<zmq::message_t> recv_msgs;
    zmq::recv_result_t          result = zmq::recv_multipart( subscriber, std::back_inserter( recv_msgs ) );
    assert( result && "recv failed" );

    std::cout << "Thread2: [" << recv_msgs[0].to_string_view() << "] " << recv_msgs[1].to_string_view() << std::endl;
  }
}

void SubscriberThread2( zmq::context_t *ctx )
{
  //  Prepare our context and subscriber
  zmq::socket_t subscriber( *ctx, zmq::socket_type::sub );
  subscriber.connect( "inproc://#1" );

  //  Thread3 opens ALL envelopes
  subscriber.set( zmq::sockopt::subscribe, "" );

  while( true ) {
    // Receive all parts of the message
    std::vector<zmq::message_t> recv_msgs;
    zmq::recv_result_t          result = zmq::recv_multipart( subscriber, std::back_inserter( recv_msgs ) );
    assert( result && "recv failed" );

    std::cout << "Thread3: [" << recv_msgs[0].to_string_view() << "] " << recv_msgs[1].to_string_view() << std::endl;
  }
}

int main()
{
  /*
   * No I/O threads are involved in passing messages using the inproc transport.
   * Therefore, if you are using a ØMQ context for in-process messaging only you
   * can initialise the context with zero I/O threads.
   *
   * Source: http://api.zeromq.org/4-3:zmq-inproc
   */
  zmq::context_t ctx( 0 );

  auto thread1 = std::async( std::launch::async, PublisherThread, &ctx );

  // Give the publisher a chance to bind, since inproc requires it
  std::this_thread::sleep_for( std::chrono::milliseconds( 10 ) );

  auto thread2 = std::async( std::launch::async, SubscriberThread1, &ctx );
  auto thread3 = std::async( std::launch::async, SubscriberThread2, &ctx );
  thread1.wait();
  thread2.wait();
  thread3.wait();

  /*
   * Output:
   *   An infinite loop of a mix of:
   *     Thread2: [A] Message in A envelope
   *     Thread2: [B] Message in B envelope
   *     Thread3: [A] Message in A envelope
   *     Thread3: [B] Message in B envelope
   *     Thread3: [C] Message in C envelope
   */
}

Possible output

loop infinito

References