Back to TILs

C++ zeromq_push_pull_multithread_inproc

Date: 2020-10-26Last modified: 2023-12-22

Table of contents

#include <spdlog/spdlog.h>

#include <future>
#include <iostream>
#include <string>

#include <cppzmq/zmq.hpp>
#include <cppzmq/zmq_addon.hpp>

void PusherThread( zmq::context_t *ctx )
{
  //  Prepare pusher
  zmq::socket_t pusher( *ctx, zmq::socket_type::push );
  pusher.connect( "inproc://#1" );

  // Give the pullers a chance to connect, so they don't lose any messages
  std::this_thread::sleep_for( std::chrono::milliseconds( 20 ) );

  while( true ) {
    //  Write three messages, each with an envelope and content
    pusher.send( zmq::str_buffer( "A" ), zmq::send_flags::sndmore );
    pusher.send( zmq::str_buffer( "Message in A envelope" ) );

    pusher.send( zmq::str_buffer( "B" ), zmq::send_flags::sndmore );
    pusher.send( zmq::str_buffer( "Message in B envelope" ) );

    pusher.send( zmq::str_buffer( "C" ), zmq::send_flags::sndmore );
    pusher.send( zmq::str_buffer( "Message in C envelope" ) );
    std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
  }
}

void PullerThread( zmq::context_t *ctx )
{
  //  Prepare puller
  zmq::socket_t puller( *ctx, zmq::socket_type::pull );
  // puller.bind( "inproc://#1" );
  puller.bind( "tcp://127.0.0.1:15555" );

  while( true ) {
    // Receive all parts of the message
    std::vector<zmq::message_t> recv_msgs;
    zmq::recv_result_t          result = zmq::recv_multipart( puller, std::back_inserter( recv_msgs ) );
    assert( result && "recv failed" );

    // filtra somente o A
    if( recv_msgs[0].to_string_view() == "A" ) {
      std::cout << "Puller: [" << recv_msgs[0].to_string_view() << "] " << recv_msgs[1].to_string_view() << std::endl;
    }
  }
}

int main()
{
  zmq::context_t ctx( 0 );

  auto thread2 = std::async( std::launch::async, PullerThread, &ctx );

  std::this_thread::sleep_for( std::chrono::milliseconds( 500 ) );
  auto thread1 = std::async( std::launch::async, PusherThread, &ctx );

  thread1.wait();
  thread2.wait();
}

Possible output

loop infinito

References