C++ zeromq_push_pull_multithread_inproc
Date: 2020-10-26Last modified: 2024-11-03
Table of contents
#include <spdlog/spdlog.h>
#include <future>
#include <iostream>
#include <string>
#include <zmq.hpp>
#include <zmq_addon.hpp>
void PusherThread( zmq::context_t *ctx )
{
// Prepare pusher
zmq::socket_t pusher( *ctx, zmq::socket_type::push );
pusher.connect( "inproc://#1" );
// Give the pullers a chance to connect, so they don't lose any messages
std::this_thread::sleep_for( std::chrono::milliseconds( 20 ) );
while( true ) {
// Write three messages, each with an envelope and content
pusher.send( zmq::str_buffer( "A" ), zmq::send_flags::sndmore );
pusher.send( zmq::str_buffer( "Message in A envelope" ) );
pusher.send( zmq::str_buffer( "B" ), zmq::send_flags::sndmore );
pusher.send( zmq::str_buffer( "Message in B envelope" ) );
pusher.send( zmq::str_buffer( "C" ), zmq::send_flags::sndmore );
pusher.send( zmq::str_buffer( "Message in C envelope" ) );
std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
}
}
void PullerThread( zmq::context_t *ctx )
{
// Prepare puller
zmq::socket_t puller( *ctx, zmq::socket_type::pull );
// puller.bind( "inproc://#1" );
puller.bind( "tcp://127.0.0.1:15555" );
while( true ) {
// Receive all parts of the message
std::vector<zmq::message_t> recv_msgs;
zmq::recv_result_t result = zmq::recv_multipart( puller, std::back_inserter( recv_msgs ) );
assert( result && "recv failed" );
// filtra somente o A
if( recv_msgs[0].to_string_view() == "A" ) {
std::cout << "Puller: [" << recv_msgs[0].to_string_view() << "] " << recv_msgs[1].to_string_view() << std::endl;
}
}
}
int main()
{
zmq::context_t ctx( 0 );
auto thread2 = std::async( std::launch::async, PullerThread, &ctx );
std::this_thread::sleep_for( std::chrono::milliseconds( 500 ) );
auto thread1 = std::async( std::launch::async, PusherThread, &ctx );
thread1.wait();
thread2.wait();
}
Possible output
loop infinito