Exploring Queues in C++
A beginner book report on learning SPSC, Lock-Free and Mutex Queues with TCP/IP
This document is a public write‑up of my hands‑on journey to brush up on core C++ fundamentals, design patterns, and modern project workflows. My motivation is twofold:
Deepen my C++ chops by re‑visiting language essentials: memory management, object lifetimes, and the STL.
Apply cloud‑inspired asynchronous patterns—especially queues and event buses—in a more constrained, embedded‑style context.
To that end, I’ve built a small framework around three queue implementations (mutex‑based, single‑producer/single‑consumer, and lock‑free) with TCP/IP client demos. For each queue type, you’ll find:
Implementation overview & design rationale
Micro‑benchmark: processing 100 000 messages
Template for project structure: headers, sources, cmake layout
Finally, I wrap up with a CI/CD primer using GitHub Actions, gcov/lcov coverage reporting, and a glance at static checks.
Project Structure
The project structure is the following:
pub_sub/
├── CMakeLists.txt
├── .gitignore
├── README.md
├── .github
│ └── workflows
│ └── ci.yml
├── core/
│ ├── spsc_queue_adapter.hpp
│ ├── spsc_ring_buffer.hpp
│ ├── broker.hpp
│ ├── lf_queue.hpp
│ ├── message_queue.hpp
│ ├── metrics.hpp
│ ├── mutex_queue.hpp
│ ├── queue_factory.hpp
│ └── thread_pool.hpp
├── src/
│ ├── bench_pub.cpp
│ ├── bench_sub.cpp
│ ├── broker.cpp
│ ├── pub_client.cpp
│ ├── queue_factory.cpp
│ └── sub_client.cpp
├── tests/
│ ├── CMakeLists.txt
│ ├── test_lf_queue.cpp
│ ├── test_message_queue.cpp
│ └── test_spsc.cppDependency Management using CMake
We’ll use CMake for managing dependencies in the project and define parameters of the project. Some of the compile options that needed to be defined came when CI/CD was being contemplated because the github workflows utilize ubuntu-latest whereas the local environment is using Windows. I think it is unlikely most companies will be using Windows for embedded development, but it is what my machine is set up with currently.
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)
if (CMAKE_CXX_COMPILER_ID MATCHES "GNU|Clang")
add_compile_options(-Wall -Wextra -Wpedantic -Werror)
elseif (MSVC)
add_compile_options(/W4 /WX)
endif()
if (CMAKE_CXX_COMPILER_ID MATCHES "Clang" OR CMAKE_CXX_COMPILER_ID MATCHES "AppleClang")
add_compile_options(-Wno-unused-private-field)
endif()These settings is basically to silence some errors being raised which have to do with differences between Ubuntu and Windows. Asio is being utilized to develop with the queues. Therefore, we’ll set up the CMakeLists.txt file to bring in these dependencies to the project:
include(FetchContent)
FetchContent_Declare(
asio_src
GIT_REPOSITORY https://github.com/chriskohlhoff/asio.git
GIT_TAG asio-1-34-2
)
FetchContent_MakeAvailable(asio_src)
# Threads support (for Asio)
find_package(Threads REQUIRED)
# Define Asio interface target
add_library(asio INTERFACE)
target_include_directories(asio INTERFACE
${asio_src_SOURCE_DIR}/asio/include
)
target_compile_definitions(asio INTERFACE ASIO_STANDALONE)
if (WIN32)
target_link_libraries(asio INTERFACE Threads::Threads Ws2_32 Mswsock)
else()
target_link_libraries(asio INTERFACE Threads::Threads)
endif()include(FetchContent)
basically, here we load the FetchContent module so we can utilize it for retrieving dependencies.
FetchContent_Declare(...)
we declare the external source that we need (in this case asio)
there are key arguments we pass which enable FetchContent to identify the dependency
FetchContent_MakeAvailable(asio_src)
this call actually clones the asio repository, configures as a sub project, and makes it available in the build
it downloads and clones into
${CMAKE_BINARY_DIR}/_deps/asio_src-src/it is added as a sub-directory so it can be linked against targets
find_package(Threads REQUIRED)
adds the threading library to the project and makes it available to the components
Asio uses threading primitives
add_library(asio INTERFACE)
this declares a new interface target called asio
the interface library has no compiled sources of its’ own. It only propagates usage requirements to anything that links against it.
CMake will not compile any .cpp file and won’t produce a file on disk
It does not exist as a build artifact in the make files
You use commands like
target_include_directories(<iface> INTERFACE …),target_compile_definitions(<iface> INTERFACE …), andtarget_link_libraries(<iface> INTERFACE …)to tell CMake “if you link this<iface>into your executable or library, apply these include paths, compiler flags, macros, and link‑libs to that consumer.”Those settings live in the target’s
INTERFACE_*properties and are automatically propagated (even transitively) to anything thattarget_link_libraries(... PRIVATE|PUBLIC|INTERFACE <iface>).
target_include_directories(asio INTERFACE …)
tells CMAKE to add
${…}/asio/includeto their compiler’s include path
target_compile_definitions(asio INTERFACE ASIO_STANDALONE)
Instructs Asio to build without boost dependencies (standalone mode)
Only uses the C++ Standard Library
makes the dependency footprint smaller
because there are no Boost libraries to compile then there are smaller compile times
however, if you want to do thinks like stackful coroutines and non C++11 compilers we might not define this
If you want more extensive errors implemented then you need to use more of Boost
target_link_libraries(asio INTERFACE …)
CMake will generate the appropriate
-lWs2_32,-lMswsock,-pthread, etc., in the linker command line of any target that doestarget_link_libraries(my_app PRIVATE asio)This was done because the Github actions workflow needs to be done in Linux environment and not in a Windows environment
A Nod to Git
adding a .gitignore file ensures that the repo is not polluted with .vscode and build files.
.Github / Workflows
The backbone of the CI/CD Flows in github actions is the ci.yml. For this small project, the steps will be quite simple—but they can be quite complex for large projects (potentially with a few different services/modules inside).
First, we define boilerplate stuff like the name, on:, and permissions:
name: C/C++ CI
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main, develop ]
permissions:
contents: read
security-events: writethe on: operator is quite important because it defines for the project under which conditions the workflow runs. Each team is going to have their own norms for their quality gates. Some projects require fairly complicated branching strategies and others do not. This project does not. So, we just say that when there are pushes and pulls to main and develop then the workflow runs. There are some nifty things you can do with scheduled workflow runs so you can consistently run the test suite automatically and catch low-hanging fruit in nightly, hourly, daily builds.
Setting permissions just enables for the script to perform edits to files.
Typically, I like to have a formatting step first to get basic style formalized and meeting some kind of basic standard. There are books written on style and I am not an expert on it, but each project or team will have their own norms. They should spend some time deciding what styles they want to hold themselves to and be accountable to the styling standards. Enforce it in CI/CD and there can be no ambiguity.
jobs:
format:
name: "❯ Format"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Install clang-format
run: |
sudo apt-get update
sudo apt-get install -y build-essential cmake libboost-all-dev lcov
- name: Check formatting
run: |
git diff --exit-code -- $(git ls-files '*.cpp' '*.hpp') \
|| (echo "Code is not clang‑formatted. Run `clang-format -i`." && exit 1)Top‑level:
jobs:jobs is the mapping key
the indentation is important and needs to be consistent through the entire document
jobs are groups of independent units of work
each job runs in its’s own VM (unless a dependency is set)
each top-level job in the workflow is handed off to a fresh GitHub Actions runner—by default a brand new VM with Linux, Windows, or macOS, or a container if
jobs.<job_id>.containeris seteach runner is ephemeral and is a clean slate for each job
by default, the jobs run in parallel
if you want to make Job B dependent on Job A, then you use the needs operator
there is still a separate runner for test but only after a successful build
isolation is preferred because there are no leftover artifacts after each run
CI/CD will run much faster when parallelism is leveraged in Github
failures in one flow will not contaminate the other runs
sometimes you might want different environments to be running for similar scripts
jobs:
build:
runs-on: ubuntu-latest
test:
runs-on: ubuntu-latest
needs: buildJob identifier:
format:name is the key and can be named anything but must be unique under the jobs key
Display name:
name: "❯ Format"just the string shown in the UI
it can be omitted and the default is the job id assigned by github
Runner spec:
runs-on: ubuntu-latestthis picks a Microsoft hosted Ubuntu VM
could also choose
windows-latest,macos-latest, or a self‑hosted label if there were some security concerns or env that is highly specific to the runtimeSteps sequence:
steps:steps introduces a sequence of items prefixed with -
under each - there is a mapping of step properties
steps run in order unless they get further split into other jobs
by default each run uses a fresh shell but files and checkouts persist across steps
uses: actions/checkout@v4
with: fetch-depth: 0pulls the checkout action version 4 so the later steps can see repo files
the with operator has a parameter called
fetch-depth:which indicates that we want the full history
run:blocks- name: Install clang-format run: | sudo apt-get update sudo apt-get install -y build-essential cmake libboost-all-dev lcov|in YAML preserves the line breakseverything indented beneath the
|is fed as a single shell script tobash -esudo apt-get updaterefreshes package lists in the VMsudo apt-get install -y …gets the formatting tools and dependencies
Conditional exit in shell
- name: Check formatting run: | git diff --exit-code -- $(git ls-files '*.cpp' '*.hpp') \ || (echo "🙅♂️ Code is not clang‑formatted. Run `clang-format -i`." && exit 1)|| (…) && exit 1: ifgit difffinds changes, theechoruns and then we explicitly fail the step (and thus the job).This job ensures every
.cpp/.hppfile is clang‑formatted before any build or test runs.By modularizing it into its own job, you can fail fast on style without touching your heavy build steps.
Job definition
build_windows: name: "❯ Build & Test (Windows)" runs-on: windows-latest needs: format env: BOOST_ROOT: /mingw64Job ID vs name:
the key build_windows is how other jobs refer to it
name is just the human readable name
runs on: windows-latestspins up a fresh VMthis job is set up to run the tests on windows VM where the code was developed and to be deployed
needs: formatmakes this job wait for the formatting step to be done which we defined earlierenv: BOOST_ROOT=/mingw64sets an env variable for all steps and points CMake at MSYS2 installed Boost
Checkout Code
- uses: actions/checkout@v4 with: fetch-depth: 0grabs the code from the repo
Install MSYS2 + toolchain
- name: Setup MSYS2 toolchain uses: msys2/setup-msys2@v2 with: update: true install: > base-devel mingw-w64-x86_64-toolchain mingw-w64-x86_64-boost mingw-w64-x86_64-cmake mingw-w64-x86_64-makewe use MSYS2 so we can have a POSIX-like shell and MinGW-w64 compilers on Windows
update: truerunspacman - Syuto get the latest packagesinstall: pulls in:
base-develtoolchainboost libraries
CMake and Make
this gives us all the tools we need to run the tests in VM
Configure Build
- name: Configure (MSYS2 Bash) shell: msys2 {0} run: | mkdir -p build && cd build cmake .. -G "Unix Makefiles" -DCMAKE_BUILD_TYPE=Debug \ -DBOOST_ROOT=$BOOST_ROOTshell: msys2 {0} tells Actions to run the command in MSYS2’s bash and not the PowerShell or Command Prompt
a
build/directory is madeCMake is invoked with the Unix Makefiles generator
we pass in debug mode and where to find Boost via
-DBOOST_ROOT
Build
- name: Build (MSYS2 Bash) shell: msys2 {0} run: cmake --build build --parallelcmake —build builddrives make and is a kind of cross platform approach—parallelattempts to make things a bit more speedy
Run Tests
- name: Run tests shell: msys2 {0} run: cd build && ctest --output-on-failurectest —output-on-failureruns any tests declared in CMake (via add_test) and prints logs for failureswe build and test in Windows but will look at coverage in Linux because the tools under lcov and gcov do not work in non-POSIX environments like Windows. It’s convoluted, and there are probably ways around the limitation, but this works fine for a small project
Test Coverage Calculation in Linux
basically, this step is the same process as we defined with MSYS2 Windows env
there are a few syntax changes with regards to installing dependencies, configuring for coverage flags, and capturing coverage, so this jumps a few steps to the more important pieces
Configure with coverage flags
- name: Configure with coverage flags run: | mkdir build && cd build cmake .. \ -DCMAKE_BUILD_TYPE=Debug \ -DCMAKE_C_FLAGS="--coverage" \ -DCMAKE_CXX_FLAGS="--coverage"configure debug to be on soo code is instrumented with symbols and no optimizations so coverage tooling can map back to lines
—coverageflags:tells GCC/Clang to compile and link with
-fprofile-arcs -ftest-coveragegenerates
.gcnofiles at compile time and.gcdaat runtime, whichlcovandgcovconsume
Capture coverage
- name: Capture coverage run: | cd build /usr/bin/geninfo . \ --rc geninfo_unexecuted_blocks=1 \ --ignore-errors mismatch \ . \ --output-filename coverage.info lcov --remove coverage.info '/usr/*' --output-file coverage.info genhtml coverage.info --output-directory coverage-htmlgeninfo .scans the entire build for
.gcda/.gcnofiles and produces acoverage.infofile—ignore errors mismatchskips entries where source and coverage data disagreethere are some downsides when version or format does not match between gcov and the compiler
If you compile with GCC 10+ (producing
.gcno/.gcda) but invoke an oldergcov(which expects.bbfiles), you’ll get “mismatched end line” or “no .bb files” errors. Ignoring these silences the fact that you’re using the wrong tool, so no coverage data will actually be collectedEnabling branch coverage (
--coverageimplies-fprofile-arcs -ftest-coverage) can expose bugs deep in howgcovmaps branch IDs. Loop constructs or unreachable branches sometimes generate inconsistent metadata, and lcov will abort with “mismatch” errors. Hiding them means you’ll get incomplete or misleading branch‑coverage numbersIf a test crashes or you forget to compile every translation unit with coverage flags, some
.gcnofiles have no.gcdacounterpart. geninfo sees data for lines that were never executed, flags a mismatch, and stops. Ignoring it just discards those files—and you won’t notice that entire modules went untested. This is likely a serious drawback for some projectsgeninfo/lcov record source paths at compile time. If you move, rename, or build from a different working directory (e.g. CMake out‑of‑source vs in‑source), the paths won’t line up and you’ll get mismatches. Suppressing the error hides the fact that your coverage tool isn’t even seeing your real source tree.
With
--parallel, geninfo spins up child processes. If a child dies (out‑of‑memory, segmentation fault, etc.), you get a “child process returned non-zero” error. Ignoring it may let you finish the run, but you’ll miss coverage for the parts that child was responsible for.Once you blanket‑ignore “mismatch” errors, new, legitimate mismatches (e.g. from a refactor that broke your tests) will also be hidden. Over time, your coverage reports drift further from reality—and you only discover the problem when someone manually inspects the HTML output.
lcov —removestrips out the system headers (e.g
/usr/*) so the report only covers the projects code
genhtmlconverts the
coverage.infofile into a browsable HTML report incoverage-html/
Upload Coverage Artifact
- name: Upload coverage artifact uses: actions/upload-artifact@v4 with: name: coverage-report path: build/coverage-htmlbundles the html report so it can be seen in the GitHub UI under ‘Artifacts’
the yml can be updated to fail the workflow based on a coverage threshold as well as uploaded to Codecov or Coveralls (both end up costing something, however).
Core/broker.hpp
This header file defines the broker.
Header Guards & Includes
#pragma once
#include <boost/asio.hpp>
#include <message_queue.hpp>
#include <unordered_map>
#include <string>
#include <memory>
#include <vector>
#include <cstdint>
#include "core/mutex_queue.hpp"
#include "core/lf_queue.hpp"
#include "core/spsc_queue_adapter.hpp"
#include "core/queue_factory.hpp"#pragma oncenot standard but a supported header
ensures that this file’s contents are only included one time per translation to avoid duplicate definitions
this is simpler, but effectively has the same functionality as
#ifndef…#define…#endif
class Broker
class Broker {
public:
Broker(boost::asio::io_context& io_context,
unsigned short pub_port,
unsigned short sub_port,
QueueKind kind);
void start();
private:
void do_accept_publisher();
void do_accept_subscriber();
bool use_lf_;
boost::asio::io_context& io_context_;
boost::asio::ip::tcp::acceptor pub_acceptor_;
boost::asio::ip::tcp::acceptor sub_acceptor_;
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
QueueKind queue_kind_;
std::unordered_map<std::string,
std::shared_ptr<MessageQueue<std::string>>> channels_;
…
};class Broker { … };Defines a user-defined type with members, public/private sections
Encapsulates everything needed to accept publishers and subscribers and route messages
Keeps the broker logic isolated
Constructor signature
boost::asio::io_context& io_contextuses a reference to the I/O event loop so that multiple components share the same reactor.unsigned short pub_portandsub_portare the TCP port numbers for publishers and subscribersQueueKind kindis an enum selecting which concrete queue implementation to use (mutex, lock-free, etc)
void start();member function declaration
kicks off the two accept loops (publishers + subscribers)
Private helpers:
do_accept_*()free to call recursively to continue async accepts
each calls
accepter_.async_accept(…)so the broker is always ready for the next connectionkeeps the acceptance logic separate and reusable
bool use_lf_;flag to pick lock free versus another queue type
caches the enum check in a faster boolean for per-session use
boost::asio::io_context& io_context_;all async I/O in this broker happens in this context
ensures all brokers use the same event loop
boost::asio::ip::tcp::acceptor pub_acceptor_;concrete acceptor bound to the pub_port
listens for publisher connections
separate acceptors isolate publisher traffic from subscriber traffic
boost::asio::strand<…> strand_;provides thread safety without explicitly locking around the channels_ map. So, even if multiple threads call io_context.run() that use this strand will not interleave and corrupt the shared state
std::unordered_map<std::string, std::shared_ptr<MessageQueue<std::string>>> channels_;hash map that stores channel name to queue pointer
dynamically holds each named message queue for publishers and subscribers
shared_ptr lets sessions share ownership safely
Nested
PublisherSessionclass PublisherSession : public std::enable_shared_from_this<PublisherSession> { public: PublisherSession(boost::asio::ip::tcp::socket socket, boost::asio::strand<…> strand, std::unordered_map<…>& channels, QueueKind kind) : socket_(std::move(socket)), strand_(strand), channels_(channels), kind_(kind) {} void start() { read_header(); } private: void read_header(); void read_body(uint16_t channel_len, uint32_t payload_len); boost::asio::ip::tcp::socket socket_; boost::asio::strand<…> strand_; std::unordered_map<…>& channels_; std::vector<char> buffer_; static constexpr size_t header_size = sizeof(uint16_t) + sizeof(uint32_t); QueueKind kind_; };std::enable_shared_from_this<…>prevents dangling references if the socket closes mid-callback
in the async callbacks we can safely capture
shared_from_this()to keep the session alive until all handlers finish
socket_(std::move(socket))moves the constructor for
tcp::socketby transferring ownership of the socket into this session object without copying ithands the connected socket from the acceptor to the session
static constexpr size_t header_size - …;compile time constant
used to allocate exactly 6 bytes on the wire
Nested
SubscriberSessionclass SubscriberSession : public std::enable_shared_from_this<SubscriberSession> { public: SubscriberSession(boost::asio::ip::tcp::socket socket, boost::asio::strand<…> strand, std::unordered_map<…>& channels, QueueKind kind) : socket_(std::move(socket)), strand_(strand), channels_(channels), kind_(kind) {} ~SubscriberSession(); void start() { read_subscription(); } private: void read_subscription(); void deliver_next(); void launch_worker(); void async_send(std::string msg); boost::asio::ip::tcp::socket socket_; boost::asio::strand<…> strand_; std::unordered_map<…>& channels_; QueueKind kind_; std::shared_ptr<MessageQueue<std::string>> queue_; std::vector<char> buffer_; std::atomic<bool> stopped_{false}; };Destructor
~SubscriberSession()can add custom clean up logic if needed
cancels background workers and cancels timers
ensures there are no dangling threads reading from the queue after the socket closes
std::atomic<bool>
lock-free boolean flag
signals to the worker loop to stop safely from another thread
async_send(std:: string msg)takes ownership of a copy of the message buffer
queues data on the strand and does
async_writedecouples the publishing events from I/O scheduling
Overall Design Rationale
a single
io_context+ multiple threads: in theory, we can scale our to N threads by callingio_context.run()and the strands prevent race conditions on the shared datatwo ports / two acceptors keeps publisher logic separate from subscriber logic and keeps the protocol framing simple on both sides
QueueKind+queue_factoryallows swapping mutex‑based vs. lock‑free vs. SPSC queue implementations without changing broker codenested sessions encapsulate per-connection states in self-contained objects that clean up automatically when no handlers remain
core/lf_queue.hpp
Includes & Header Guard
basically the same syntax as the previous file. key difference is we are importing <boost/lockfree/queue.hpp> and <message_queue.hpp>.
Template & Class Declaration
template<typename T, std::size_t CapacityPow2 = 1024>
class LfQueue : public MessageQueue<T> { … };template<typename T, std::size_t CapacityPow2 = 1024>a class template with a parameter of T and a non-type parameter of
CapacityPow2that defaults to1024allows any message type T, with a queue capacity that must be a power of two (required for BoostLockFree)
useful for compile time capacity tuning for performance and avoids need for dynamic resizing
class LfQueue : public MessageQueue<T>inheritance from an abstract
MessageQueue<T>interface, with virtual methods to overrideallows for polymorphic swapping via the
QueueKindin the broker
Constructor
LfQueue()
: q_(CapacityPow2)
, open_(true)
{}Member initializer list
initializes
q_andopen_before the constructor bodyq_(CapacityPow2)constructs the lock-free queue with the given fixed capacityopen_(true) marks the queue as accepting pushes from the publisher
ensures both publishers and subscribers are ready to be used
void push(const T& item) override
void push(const T& item) override {
T* p = new T(item);
while (!q_.push(p)) {
std::this_thread::yield();
}
}overrideensures this method matches a virtual declaration in the base class
we will get a compiler error if the signature doesn’t match
MessageQueue<T>::push
Heap allocation
new T(item)dynamically copy-constructs a T on the heap
the lock-free queue holds only pointers, so each message must live beyond the scope of the call
avoids object slicing or ephemeral stack storage; ownership is cleaned up on pop
Spin-until-success loop
while (!q_.push(p)):busy-wait until
pushreturnstrueif the queue is momentarily full, this thread yields to let other threads make progress
std::this_thread::yield()we basically want to have a simple backoff strategy for lock-free enqueue without sleeping too long
std::optional<T> wait_and_pop() override
std::optional<T> wait_and_pop() override {
T* p = nullptr;
while (true) {
if (q_.pop(p)) {
T value = std::move(*p);
delete p;
return value;
}
if (!open_) {
return std::nullopt;
}
std::this_thread::sleep_for(std::chrono::microseconds(50));
}
}std::optional<T>return a type that may or may not return a
Tsignals the end of the stream when the queue is closed
q_.pop(p)
attempts to dequeue a pointer to
Treturns
trueif an element is available
move and cleanup
T value = std::move(*p)transfers the payload out of the heap objectdelete pfrees up the memory in the heap
close check & sleep backoff
if (!open_ return std::nullopt;exit when there are no more pushessleep_for(50µs) reduces CPU spin while the queue is empty—there is probably a better way to do this. Look into it for round two
void close() override
void close() override {
open_.store(false, std::memory_order_relaxed);
}std::atomic<bool>::store(…)sets the atomic flag without synchronization ordering (
relaxed)tells all waiting consumers to break out and
return std::nullopt
Private Members
private:
boost::lockfree::queue<T*> q_;
std::atomic<bool> open_;
};boost::locklfree::queue<T*> q_;offers lock-free push/pop for any thread count
std::atomic<bool> open_;atomic flag for shutdown
visible to all threads without locks
since Boost.Lockfree only handles pointers, we allocate per-item storage and clean up on pop
this is suitable for multiple producers and consumers without mutex contention
fulfills a common
MessageQueue<T>interface so the broker can swap implementations at runtimebackoff strategies
yield()on enqueue failures to avoid busy waiting too aggressivelysleep_for(50 µs)on empty pops to reduce CPU usage but still poll frequentlyprobably could do this a bit more intelligently with some future improvements
graceful shutdowns
close()flipsopen_so consumers returnstd::nullopt;and can teardown cleanly
core/message_queue.hpp
Header Guard & Includes
#pragma once
#include <optional>same story as the other files
#include <optional>
allows wait_and_pop to return ‘nothing’ state when the queue is closed
clearly signals end of stream without resorting to exceptions or sentinel values
Template & Class Declaration
template<typename T>
class MessageQueue {
// ...
};template<typename T>declares the class template parameterization on type
Tlets us create MessageQueue<int> and MessageQueue<std::string> or any other payload type
compile time type safety
class MessageQueueserves as an abstract instance for all queue implementations
decouples broker logic from concrete queue details via polymorphism
Pure-virtual Interface Methods
virtual void push(const T& item) = 0;
virtual std::optional<T> wait_and_pop() = 0;
virtual void close() = 0;virtualcalls to these methods on a base pointer/reference invoke the derived implementation at runtime
enables the broker to hold pointers to
MessageQueue<T>without knowing which subclass it’s using
= 0(pure specifier)indicates a pure-virtual function making the class abstract
we have to subclass and implement these methods
forces each adapter to provide its own
push,wait_and_pop, andclose
std::optional<T> wait_and_pop()return type that may contain a
Tor be emptyallows consumers to check that there are no more items in the queue when
close()is signaleda clean way to signal the end of the stream without more flags or exceptions
Virtual Destructor
virtual ~MessageQueue() = default;virtual ~MessageQueue() = default;safely destroys any resources held by the concrete queue implementation
used to prevent resource leaks when a
std::shared_ptr<MessageQueue>>goes out of scope
= defaultinstructs the compiler to generate the default destructor implementation
we abstract the
MessageQueue<T>so the broker uses the methods only so we can introduce new queue implementations down the road for testing without having to mess with the broker codea little bit of polymorphism allows us to choose the concurrency strategy at startup with the
QueueKindstd::optional<T>attempts to clearly separate no data yet from never again
core/metrics.hpp
struct Metrics
struct Metrics {
std::atomic<uint16_t> published{0};
std::atomic<uint16_t> delivered{0};
};struct Metricsjust hold simple metrics of published and delivered
plain data aggregate makes it easy to add and remove performance metrics
std::atomic<uint16_t> published{0};unsigned integer initialized to 0
multiple publishers should be able to safely increment without locks
std::atomic<uint16_t> delivered{0};unsigned integer that is initialized to zero but measures the number of delivered messages
inline uint64_t now_ns()
inline uint64_t now_ns() {
return std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
}inlinefor a small, frequently-called utility like
now_ns(), inlining can eliminate the function-call cost and make timestamping as cheap as possible.
std::chrono::steady_clockjust measure the elapsed time between events
time_since_epoch()+duration_cast<nanoseconds>()returns a 64 bit integer count of nanoseconds since thee clocks epoch, suitable for timestamps and latency calculations
core/mutex_queue.hpp
Header Guard & Includes
#pragma once
#include <message_queue.hpp>
#include <queue>
#include <mutex>
#include <condition_variable>#include <message_queue.hpp>brings in the abstract message_queue.hpp so we can implement and override
push,wait_and_pop, andclose
#include <queue>provides the standard queue adapter
utilizes FIFO methods of moving out stored items
simple sequential container
#include <mutex>synchronizes access to
queue_andopen_ensures thread safety without bespoke implementations
#include <condition_variable>lets consumer threads sleep until new data arrives or the queue is closed
avoids busy waiting, reducing CPU usage
Template & Class Declaration
template<typename T>
class MutexQueue : public MessageQueue<T> {
// …
};template<typename T>class template parameterized on payload type
Tcan instantiate
MutexQueue<int>, MutextQueue<std:: string>
class MutexQueue : public MessageQueue<T>enables runtime polymorphism—broker code stays agnostic to the queue type
implements
push,wait_and_pop, andclose
Private Members
private:
std::queue<T> queue_;
std::mutex mutex_;
std::condition_variable cond_;
bool open_ = true;std::queue<T> queue_;FIFO container for
Tstores items in the insertion order
std::mutex mutex_;
mutual exclusion lock
only one thread may modify
queue_oropen_at a timeprevents race conditions
std::condition_variable cond_;thread coordination primitive
wakes waiting consumers when
queue_changes oropen_flips to falseeliminates CPU spin-loops
bool open_ = true;simple default flag
tracks whether producers may push
signals end of stream to consumers
void push(const T& item) override
void push(const T& item) override {
{
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(item);
}
cond_.notify_one();
}overrideverifies this matches a virtual in the base class
compiler error if signature drifts
std::lock_guard<std::mutex>RAII wrapper that locks in ctor, unlocks in dtor
safely scopes the lock around the push
minimize lock lifetime and prevents deadlocks
cond_.notify_one()wakes a single thread waiting on cond_
lets exactly one consumer re-check the queue
efficient wake-up strategy for one-item arrival
std::optional<T> wait_and_pop() override
std::optional<T> wait_and_pop() override {
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [this]{ return !open_ || !queue_.empty(); });
if (!queue_.empty()) {
T value = queue_.front();
queue_.pop();
return value;
}
return std::nullopt;
}std::unique_lock<std::mutex>lock that can be unlocked and re-locked by hand
required by
condition_variable::waitmore flexible than
lock_guard
cond_.wait(lock, predicate)blocks until the predicate is true, releasing and re-acquiring the lock
sleeps consumer threads until there’s data or the queue is closed
avoids spurious wakeups and busy-waiting
return logic
if queue_ non-empty then dequeue front and return it
else (queue empty and
open_ ==false): returnstd::nulloptclean end of stream signaling without exceptions—in the real world this likely will not fly and need to add for client error handling
std::optional<T> try_pop()
std::optional<T> try_pop() {
std::lock_guard<std::mutex> lock(mutex_);
if (queue_.empty())
return std::nullopt;
T value = queue_.front();
queue_.pop();
return value;
}attempts to get an item without waiting
returns immediately with
std::nulloptif emptyuseful for polling-style consumers
void close() override
void close() override {
{
std::lock_guard<std::mutex> lock(mutex_);
open_ = false;
}
cond_.notify_all();
}flip
open_ = falseunder lockprevents race conditions with concurrent pushes/pops
ensures all waiting threads see the update
cond_.notify_all()wakes every thread waiting on
cond_lets all blocked consumers exit
wait_and_pop()withstd::nullopt
uses
condition_variableto avoid CPU spin loops—ideal for moderate throughputimplements the same
MessageQueue<T>interface so the broker can switch to this when lock-free semantics are not neededwe get clean teardown with
close()
core/queue_factory.hpp
Header Guard & Includes
#pragma once
#include <memory>
#include <string>
#include <message_queue.hpp>
#include <mutex_queue.hpp>
#include <lf_queue.hpp>
#include <spsc_queue_adapter.hpp>queue implementation headers
<mutex_queue.hpp>,<lf_queue.hpp>,<spsc_queue_adapter.hpp>
brings in the concrete classes
MutexQueue,LfQueue, andSpscQueueAdapterthe factory function can construct any of these at runtime based on the
QueueKinddecouples the broker from queue-type details—only the factory knows all the implementations
enum class QueueKind
enum class QueueKind { Mutex, LockFree, Spsc };enum classvsenumimproves type safety and name collisions down the road
constexpr std::size_t DEFAULT_Q_CAP = 1 « 12;
constexpr std::size_t DEFAULT_Q_CAP = 1 << 12;constexprcentralizes the default queue capacity so all implementations share the same sizing unless overridden
Factory Function Declaration
std::shared_ptr<MessageQueue<std::string>> make_queue(QueueKind kind);return type
std::shared_ptr<MessageQueue<std::string» make_queue(QueueKind kind);caller does not need to know which concrete class was created
enables polymorphic behavior and shared ownership between the broker and its sessions
parameter
QueueKind kindfactory inspects the
kindand constructs the corresponding queue implementation
utilize the factory pattern for choosing queue types
keeps the broker code simple and not so many switches
single points of configuration
Decouples interface (
MessageQueue<T>) from implementations, allowing seamless swapping betweenMutexQueue,LfQueue, andSpscQueueAdaptershared_ptrensures that as long as any session holds the queue, it stays aliveenum class QueueKindprovides clarity and safety when selecting queue behavior.DEFAULT_Q_CAPstandardizes sizing across queue implementations.
core/spsc_queue_adapter.hpp
Header Guard & Includes
#pragma once
#include <message_queue.hpp>
#include <optional>
#include <thread>
#include <chrono>
#include "spsc_ring_buffer.hpp"#include “spsc_ring_buffer.hpp”local header defining a single-producer/consumer ring buffer template
efficient MPMC-free queue for one producer and one consumer.
Template & Class Declaration
template<typename T, std::size_t CapacityPow2>
class SpscAdapter : public MessageQueue<T> { … };template<typename T, std::size_t CapacityPow2>the ring buffer has a size that is fixed at compile time and must be a power of 2 for efficient indexing
zero overhead abstraction; capacity tuning using the template parameter
class SpscAdapter : public MessageQueue<T>inherits the MessageQueue<T> interface requiring the override of its virtual methods
allows polymorphic use alongside other queue types in the broker
consistent API across the queue implementations
void push(const T& item) override
void push(const T& item) override {
while (!rb_.push(item)) {
std::this_thread::yield();
}
}spin loop with
yieldwhile (!rb_.push((item))attempts to enqueue until success with no blockingstd::this_thread::yield()yields execution to avoid starving the other threadsbackoff strategy while the buffer is full
ok for low latency SPSC scenarios where blocking is undesirable
std::optional<T> wait_and_pop() override
std::optional<T> wait_and_pop() override {
for (;;) {
auto v = rb_.pop();
if (v) return v;
if (!open_) return std::nullopt;
std::this_thread::sleep_for(std::chrono::microseconds(50));
}
}return type
std::optional<T>carries
Twhen available or empty to signal closureprobably should make this more explicit
rb_.pop()attempts to dequeue from the ring buffer, returns
std::optional<T>returns immediately if no data is present
Loop or sleep backoff
check
open_if closed then returnnulloptto indicate end of streamsleep_for(50µs)prevents tight busy-wait, trading slight latency for reduced CPU use.tries to (awkwardly) balance responsiveness with efficiency
can make this much better
void close() override
void close() override {
open_ = false;
}open_ = falsesignals to the consumer loop in the wait_and_pop() the no more items will arrive
we wanted something simple—assuming SPSC context, no data race on this flag is acceptable (producer and consumer never write concurrently)
Private Members
private:
SpscRingBuffer<T, CapacityPow2> rb_;
std::atomic<bool> open_{true};SpscRingBuffer<T, CapacityPow2> rb_;high performance queue for single-rpoducer/consumer patterns
std::atomic<bool> open_{true}ensures thread-safe visibility of closure to the consumer
guards shutdown logic in
wait_and_pop()
this eliminates need for locks or heavy atomic operations beyond a simple flag
perfect for simple one-to-one publisher and subscribers that interact via queue
using a power-of-two buffer size lets the ring buffer index wrap via bitmasking
yield()inpushavoids hogging the cpu when the buffer is temporarily fullsleep_for(50µs)inwait_and_pop()reduces CPU usage on empty buffer, while still polling frequentlynot a particularly desirable method. Some work to do here with regards to eliminating need for poller—stylistically it is not my favorite
Conforms to
MessageQueue<T>, so the broker can switch between mutex-based, MPMC lock-free, or SPSC at runtime via the factory
core/spsc_ring_buffer.hpp
Header Guard & Includes
#pragma once
#include <atomic>
#include <cstddef>
#include <type_traits>
#include <optional>
#include <thread>
#include <chrono>same stuff we have seen, nothing special here
Template Declaration & Compile-Time Checks
template<typename T, std::size_t Capacity>
class SpscRingBuffer {
static_assert((Capacity & (Capacity - 1)) == 0);
static constexpr std::size_t MASK = Capacity - 1;
…
};template<typename T, std::size_t Capacity>choose
SpscRingBuffer<int, 1024>; the compiler generates code specialized to that capacityzero-overhead, compile-time sizing; no dynamic allocations or resizes
static_assert((Capacity & (Capacity - 1)) == 0);ensures at compile time that Capacity is a power of two
fails to compile if you pass a non-power of two, preventing runtime errors in the bitmasking logic
static constexpr std::size_t MASK = Capacity - 1;requiring
Capacityto be a power of two, you turn what would be a costly modulo operation into a single-cycle bitwise AND, ensuring both correctness (wrap-around behavior) and maximal performance in the SPSC ring buffer
Constructor
SpscRingBuffer(): head_(0), tail_(0) {}initializes atomics head_ and tail_ to zero before the body runs
both indices start at slot 0 representing an empty buffer
ensures a well defined initial state without additional code in the constructor body
push overloads
bool push(const T& v) { return emplace(v); }
bool push(T&& v) { return emplace(std::move(v)); }lvalue vs rvalue overloads
one takes const
T&the otherT&&efficiently handles both copyable and movable types
forward value category to the common
emplaceimplementation
pop
std::optional<T> pop() {
const auto head = head_.load(std::memory_order_acquire);
if (head == tail_.load(std::memory_order_relaxed)) return std::nullopt;
T value = std::move(buffer_[head & MASK]);
head_.store(head + 1, std::memory_order_release);
return value;
}atomic loads/stores
load(std::memory_order_acquire) to read head_,store(…,std::memory_order_release)to update itensures proper visibility of writes to buffer_ and synchronization with the producer
ensures lock-free thread safety with minimal fencing
check empty
if (head == tail_)detects empty buffer by comparing indices
bitmask indexing
buffer_[head & MASK]uses bitwise AND
Move & Advance
std::movetransfers ownership out of the slothead_.store(head + 1)advances the consumer index
Status queries: empty, full, size
bool empty() const {
return head_.load(std::memory_order_acquire)
== tail_.load(std::memory_order_acquire);
}
bool full() const {
return (tail_.load(std::memory_order_acquire)
- head_.load(std::memory_order_acquire)) == Capacity;
}
std::size_t size() const {
return tail_.load(std::memory_order_acquire)
- head_.load(std::memory_order_acquire);
}empty check
compares the indices under the same memory order to see if no items exist
full check
difference equals
Capacitymeans no free slots remain
size
simple subtraction gives current occupancy
rationale
all operations are lock-free and O(1), hopefully
Private emplace Implementation
template<typename U>
bool emplace(U&& v) {
const auto tail = tail_.load(std::memory_order_relaxed);
const auto next = tail + 1;
if ((next - head_.load(std::memory_order_acquire)) > Capacity)
return false;
buffer_[tail & MASK] = std::forward<U>(v);
tail_.store(next, std::memory_order_release);
return true;
}template forwarding
U&&is a forwarding reference it can accept either an lvalue (something you already have, like a named variable)or an r value (a temporary value, like
MyType()orstd::move(x))std::forward<U>(v)says to take whatever you were given—copy it if it’s a normal variable, move it if it’s a temporary—so we don’t do extra copies.
tailandheadare just two countersheadis where we read from nexttailis where we write to next
next - headmeasures how much is left in the bufferif that number exceeds
Capacitythen it would overflow the bufferwrap:
tail & MASKturns the ever increasingtailcounter into an array index between0andCapacity - 1once it has placed the item in that slot, it updates
tailcounter so the consumer knows there’s now one more thing in the bufferultimately when the counter gets to the end of the array then it jumps back to the front
Aligned Members
alignas(64) std::atomic<std::size_t> head_;
alignas(64) std::atomic<std::size_t> tail_;
alignas(64) T buffer_[Capacity];alignas(64)aligns each member on a 64 bit boundary
prevents false sharing between head_ and tail_ on adjacent cache lines
maximizes throughput by avoiding cache-coherency ping-pong in concurrent access
lock-free SPSC is tailored for one producer and one consumer, minimizing overhead with atomic indices and no locks
power of two buffer enables bitmask wrapping instead of modulo—which is faster
acquire and release semantics guarantee safe handoff of data between threads
cache alignment reduces false-sharing, critical for high-frequency enqueue/dequeue operations
all logic lives in the template, no separate .cpp making it easy to include and inline
core/thread_pool.hpp
Header Guard & Includes
#pragma once
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <vector>
#include <functional>
#include <atomic>These are all pretty standard headers so we will move onto how we use them
class ThreadPool
class ThreadPool {
public:
explicit ThreadPool(size_t n = std::thread::hardware_concurrency())
: stop_(false) { … }
~ThreadPool() { … }
void post(std::function<void()> fn) { … }
private:
void worker() { … }
// data members…
};
explicitprevents unintended implicit conversions to
ThreadPoolby default, a single-argument constructor lets the compiler do hidden conversions. With
explicit, we can turn that offwithout
explicitstops the compiler from turning a lonesize_tinto aThreadPoolbehind sneakily. We have to write the constructor call ourselves
Constructor
ThreadPool(size_t n = std::thread::hardware_concurrency())
: stop_(false) {
workers_.reserve(n);
for (size_t i = 0; i < n; ++i) {
workers_.emplace_back([this] { worker(); });
}
}size_t n = std::thread::hardware_concurrency()default parameter set to the number of CPU cores
if we do not pass
n, the pool size matches the machines core count
member initializer list
: stop(false)initializes
stop_before the constructor body runsensures that the shutdown flag starts as
false
workers_.reserve(n)preallocates space in the
std::vectorto avoid reallocations
workers_.emplace_back([this] {wroker(); });emplace_backconstructs a std::thread in-place at the end of the vector[this'] {worker(); }is a lambda that captures[this]pointer and callsworker()launches
nthread, each running theworker()looppractically, this likely would need to be optimized per machine and requirements
Destructor
~ThreadPool() {
{
std::lock_guard<std::mutex> lk(m_);
stop_ = true;
}
cv_.notify_all();
for (auto &t : workers_) t.join();
}RAII for shutdown
destructor (
~ThreadPool) runs automatically when the object goes out of scopeensures the clean shutdown of all threads
std::lock_guard<std::mutex> lk(m_);locks
m_for the scope and unlocks at the end automatically (RAII)safely sets
stop_ = trueunder lock so no thread sees a torn value
cv_.notify_all()wakes all the threads waiting on the condition variable
lets each
worker()thread noticestop_and exit
t.join()waits for thread
tto finish executionblocks until each worker thread has terminated, preventing detached threads
void post(std::function<void()> fn)
void post(std::function<void()> fn) {
{
std::lock_guard<std::mutex> lk(m_);
tasks_.push(std::move(fn));
}
cv_.notify_one();
}std::function<void()>a type-erased callable that takes no arguments and returns nothing
we can pass lambdas, function pointers, or
std::bindresults
tasks_.push(std::move(fn))moves
fninto the queue to avoid copyingstores the work item for later execution by a worker thread
cv_.notify_one()wakes one waiting thread
lets one worker pick up the new task immediately
void worker()
void worker() {
for (;;) {
std::function<void()> fn;
{
std::unique_lock<std::mutex> lk(m_);
cv_.wait(lk, [this]{ return stop_ || !tasks_.empty(); });
if (stop_ && tasks_.empty()) return;
fn = std::move(tasks_.front());
tasks_.pop();
}
fn();
}
}infinite loop
(for ;; )runs forever until return is called inside
keeps each thread alive to process tasks as they arrive
std::unique_lock<std::mutex>lock type that works with
condition_variable::waitmanages the lock around waiting and waking
cv_.wait(lock, predicate)blocks until
predicateustrue,atomically releasing the lock while waitingsleeps the thread when no tasks and stop_ is false, wakes when a task arrives or shutdown is signaled
shutdown check
if (stop_ && tasks_.empty()) return;exits the function (and thread) if stopping and no tasks left
cleanly ends each worker when the pool is being destroyed
fn = std::move(tasks_.front()); tasks_.pop(); fn();take the next task out of the queue
remove it from the queue
call the function
executes the user-posted work in a thread-safe manner
Private Data Members
std::queue<std::function<void()>> tasks_;
std::mutex m_;
std::condition_variable cv_;
std::vector<std::thread> workers_;
bool stop_;tasks_holds pending work itemsm_protectstasks_andstop_from data racescv_coordinates threads sleeping/waking for new tasks or shutdownworkers_stores the actualstd::threadobjectsstop_simple flag telling threads to exitusing a thread pool pattern reuses a fixed set of threads to run many tasks avoiding the costs of creating/destroying threads per task
condition variable has workers sleep when no tasks are available, waking only on new work or shutdown
RAII shutdown has destructor orchestrate clean stop and
join(), preventing dangling threadsusing
std::function<void()>lets callers post any callable with no parameters
src/broker.cpp
Includes & Global Thread Pool
#include "core/broker.hpp"
#include "core/mutex_queue.hpp"
#include <iostream>
#include <cstdlib>
#if defined(_WIN32)
#include <winsock2.h>
#else
#include <arpa/inet.h>
#endif
#include <boost/asio/bind_executor.hpp>
#include <spsc_queue_adapter.hpp>
#include "core/thread_pool.hpp"
#include <lf_queue.hpp>
#include <core/queue_factory.hpp>
ThreadPool g_pool(std::thread::hardware_concurrency());preprocessor
#includebrings in headersthe
#if defined (_WIN32)block picks the right socket conversions for functionsThreadPool g_pool(...)is a global variable initialized beforemain()probably would have been better to inject a
ThreadPool&intoBrokerorSubscriberSessionso globals are avoidedmakes
g_poolaccessible inlaunch_worker()without threading it through every constructor
all the types become available
g_pool is a pool sized to the cpu count
Broker Constructor & start()
Broker::Broker(io_context& io, ushort pub, ushort sub, QueueKind kind)
: io_context_(io),
pub_acceptor_(io, {ip::tcp::v4(), pub}),
sub_acceptor_(io, {ip::tcp::v4(), sub}),
strand_(io.get_executor()),
queue_kind_(kind)
{
std::cout << "[broker] ctor: pub_port=" << pub
<< " sub_port=" << sub << "\n";
start();
}
void Broker::start() {
do_accept_publisher();
do_accept_subscriber();
}there is a member initializer list to construct the references and acceptors before the body
strand_(io.get_executor))creates a strand—a serial execution context in Asiobinds two tcp acceptors to the given ports on ipv4
immediately kicks off two async loops: one waiting for publishers and one for subscribers
ensures all callbacks that use
strand_run one at a time, protecting channels_ without explicit lockswe could also use a separate std::mutex around channels_, or run other message-passing actor per channel. strands are lighter than mutexes in Asio
Accepting Publishers
void Broker::do_accept_publisher() {
std::cout << "[broker] waiting for publisher...\n";
pub_acceptor_.async_accept(
strand_,
[this](error_code ec, ip::tcp::socket sock) {
if (!ec) {
std::cout << "[broker] publisher connected\n";
auto s = std::make_shared<PublisherSession>(
std::move(sock), strand_, channels_, queue_kind_);
s->start();
} else {
std::cout << "[broker] accept error: " << ec.message() << "\n";
}
do_accept_publisher();
});
}async_accept(executor, handler)uses a lambda as the callbackstd::move(socket)moves the connected socket into the session objectwaits for one new TCP connection on the publisher port
on success, creates a
PublisherSessionto handle framing, then immediately re-arms itself to accept the next publisherthis implements a common Asio pattern instead of using a
whileloopthe code could accept into a fixed-size pool of sockets or use synchronous
accept()in a dedicated thread—simpler but blocks threads and that’s probably not sustainable in a large system
PublisherSession: Reading Header & Body
void PublisherSession::read_header() {
buffer_.resize(header_size);
auto self = shared_from_this();
async_read(socket_,
buffer(buffer_),
transfer_exactly(buffer_.size()),
bind_executor(strand_, [this,self](ec,_) {
if (!ec) {
uint16_t ch_len = ntohs(*reinterpret_cast<uint16_t*>(buffer_.data()));
uint32_t pl_len = ntohl(*reinterpret_cast<uint32_t*>(buffer_.data()+2));
read_body(ch_len, pl_len);
}
}));
}
void PublisherSession::read_body(uint16_t ch_len, uint32_t pl_len) {
buffer_.resize(ch_len + pl_len);
auto self = shared_from_this();
async_read(socket_,
buffer(buffer_),
transfer_exactly(buffer_.size()),
bind_executor(strand_, [this,self,ch_len](ec,_) {
if (!ec) {
std::string channel(buffer_.data(), ch_len);
std::string payload(buffer_.data()+ch_len, buffer_.size()-ch_len);
if (!channels_[channel])
channels_[channel] = make_queue(kind_);
channels_[channel]->push(payload);
read_header();
}
}));
}async_read+transfer_exactlydoes a framed read of N bytesshared_from_this()keeps the session alive until all callbacks finishfirst reads exactly 6 bytes (channel length + payload length)
parses those network order integers (
ntohs,ntohl) to host orderthen reads the rest (channel name + payload) pushes into the right queue, and loops
we do a two stage read because its the simplest framed protocol: first header, then body
some alternatives might be to use a fixed-header struct and
async_read_some,or a delimiter-based protocol (e.g newline terminated) withasync_read_until
Accepting Subscribers & Cleanup
void Broker::do_accept_subscriber() {
std::cout << "[broker] waiting for subscriber...\n";
sub_acceptor_.async_accept(strand_, [this](ec, sock) {
if (!ec) {
auto s = std::make_shared<SubscriberSession>(
std::move(sock), strand_, channels_, queue_kind_);
s->start();
}
do_accept_subscriber();
});
}
Broker::SubscriberSession::~SubscriberSession() {
stopped_ = true;
if (queue_) queue_->close();
}we use the same
async_acceptpatterndestructor uses the rule of zero/RAII cleaning up by simply closing the queue
new subscriber connections spin off their own
SubscriberSessionwhen a
SubscriberSessionfinally goes out of scope, it signals shutdown to its worker by causing its queue and settingstopped_destructor cleanup ensures background workers stop
the code could use
std::shared_ptrcustom deleters or explicitstop()methods, but RAII is simpler and exception safe
Reading Subscription & Launching the Worker
void SubscriberSession::read_subscription() {
buffer_.resize(2);
auto self = shared_from_this();
async_read(socket_, buffer(buffer_), [this,self](ec,_) {
if (ec) return;
uint16_t name_len = ntohs(*reinterpret_cast<uint16_t*>(buffer_.data()));
buffer_.resize(name_len);
async_read(socket_, buffer(buffer_),
bind_executor(strand_, [this,self,name_len](ec2,_) {
if (ec2) return;
std::string channel(buffer_.data(), name_len);
queue_ = channels_[channel];
if (!queue_) {
queue_ = std::make_shared<MutexQueue<std::string>>();
channels_[channel] = queue_;
}
launch_worker();
}));
});
}
void SubscriberSession::launch_worker() {
extern ThreadPool g_pool;
auto self = shared_from_this();
g_pool.post([self]{
while (!self->stopped_) {
auto opt = self->queue_->wait_and_pop();
if (!opt) break;
auto msg = std::move(*opt);
boost::asio::post(self->strand_, [self,msg=std::move(msg)](){
self->async_send(msg);
});
}
});
}framed read of a 2-byte length + name, then picks/creates the right queue
g_pool.post(…)submits a lambda to the global thread pool for blocking work (thewait_and_pop()call)subscribers send a channel name; we look up (or make) the channel’s queue
instead of blocking the Asio thread, a thread-pool is spun up that loops on
wait_and_pop()and then re-post back into the Asio strand to actually write to the socketwe use a thread pool here because
wait_and_pop()has the propensity to block the Asio reactor threadthe best implementation of this would be to use Asio’s coroutines (stackful coros with
co_wait_queue→async_pop)) or anasync_queueabstraction—definitely more complex but fully non-blocking
Writing Back to Subscribers
void SubscriberSession::async_send(std::string msg) {
if (stopped_) return;
struct WriteState { array<char,4> header; string payload; };
auto state = make_shared<WriteState>();
uint32_t netlen = htonl(static_cast<uint32_t>(msg.size()));
memcpy(state->header.data(), &netlen,4);
state->payload = std::move(msg);
array<const_buffer,2> bufs{
buffer(state->header),
buffer(state->payload)
};
auto self = shared_from_this();
async_write(socket_, bufs,
bind_executor(strand_, [this,self,state](ec,_) {
if (ec) {
stopped_ = true;
if (queue_) queue_->close();
} else {
cout << "[broker] sent to subscriber\n";
}
}));
}a two-part write: first a 4-byte length header, then the payload
shared_ptr<WriteState>ensures both buffers outlive the asynchronous operationpackages the length+payload into Asio buffers and does an
async_writeon error, tears down the subscription
we heap-allocate header+payload state because Asio needs the buffers to stay valid until the write completes
an alternative might be to use stack buffers with a custom arena or
asio::const_bufferpointing into a ring buffer, but that risks lifetime issues unless extremely careful
main()
int main(int argc, char* argv[]) {
if (argc!=3) { cerr<<"Usage: broker <pub> <sub>\n"; return 1; }
ushort pub = atoi(argv[1]), sub = atoi(argv[2]);
auto pick_kind = [](const char* v){ /*…*/ };
QueueKind kind = pick_kind(getenv("BROKER_QUEUE"));
io_context io;
Broker broker(io,pub,sub,kind);
io.run();
return 0;
}parses
argc/argv[], uses a small lambda to pick theQueueKindio_context.run()starts the Asio event loop (pumping all async handlers)validates input, picks the queue implementation via
BROKER_QUEUEenvironment variable, constructs theBroker, and then blocks in the reactor until the process exitswe keep io_context.run() in main because it keeps all async IO on the main thread or pool of threads
an alternative might be to have
boost::asio::thread_poolor multipleio_context.run()threads for higher throughput, or integrate with another application main loopthe good:
clear separation of the publisher vs. subscriber logic
uses strands to serialize access to shared data
offloading the blocking queue waits to a thread pool
possible enhancements:
replace manual thread pool + blocking waits with async queue and coroutines (
co_wait) for fully non-blocking flowinject the thread pool (or an Executor interface) instead of using a global
add back-pressure: reject publisher when subscriber queue lag too far
src/queue_factory.cpp
#include <queue_factory.hpp>brings in the declaration of
make_queue, theQueueKindenum,DEFAULT_Q_CAP, and all the queue classes (MutexQueue,LfQueue,SpscAdapter)Ensures this .cpp knows about every concrete queue type and the interface it returns (
MessageQueue<std::string>)
Function Signature
std::shared_ptr<MessageQueue<std::string>> make_queue(QueueKind kind)std::shared_ptr<MessageQueue<std::string»is a smart pointer to the abstract interfacewe use a
shared_ptrbecause multiple session (publishers, subscribers) can hold that same pointer safelyQueueKind kindis scoped enum (enum class) that selects at runtime which queue to create
switch on kind
switch (kind) {
case QueueKind::LockFree:
return std::make_shared<LfQueue<std::string, DEFAULT_Q_CAP>>();
case QueueKind::Spsc:
return std::make_shared<SpscAdapter<std::string, DEFAULT_Q_CAP>>();
default:
return std::make_shared<MutexQueue<std::string>>();
}switchstandard C++ control flow for enums; the default handles Mutex (and any future values not explicitly listed)
std::make_shared<…>()single allocation for both control block and object, more efficient than separate
shared_ptr(new…)constructs the chosen queue with the default capacity
(1<12)
template parameters
LfQueue<std::string, DEFAULT_Q_CAP>andSpscAdapter<std::string, DEFAULT_Q_CAP>fix the element type and capacity at compile time
if you set the
BROKER_QUEUE=lfyou get the MPMC lock-free queueif
spsc, you get the single-producer/single consumer adapterotherwise, we fall back to the mutex-based queue
factory pattern
centralizes the instantiation logic; broker code just says
make_queue(kind)and never worries about the concrete class
runtime flexibility
we can switch the queue implementation variables or command line switch, without recompiling
smart pointers
shared_ptrautomates lifetime; when the last session referencing a channel’s queue goes away, the queue is destroyed
potential alternatives
std::unique_ptrif ownership were truly exclusive (only the broker held the queue and shards handed out raw references), you could return
unique_ptrand document the lifetimeharder to share ownership between sessions safely
template based factory
we could make the broker itself a template on the queue type
would lose runtime selection though
registration map
use a
std::map<QueueKind, std::function<Ptr()»that is populated at startupadding a new queue type would then not require editing a
switch—just register a lambda
policy injection
pass a function or policy object into the broker at construction time that knows how to create queues
it would be nice because of further decoupling and easier testing
avoiding heap allocations
for ultra low latency, store one instance of each queue type in the broker and hand out references or use a
std::variantstd::visit or manual dispatch would need to be done instead of virtual calls, which can be faster but its more code
src/pub_client.cpp
Includes
#include <boost/asio.hpp>
#include <iostream>
#include <vector>
#include <cstdlib>
#include <cstdint>
#if defined(_WIN32)
#include <winsock2.h>
#else
#include <arpa/inet.h>
#endif<boost/asio.hpp>brings in all of Boost.Asio’s networking I/O (docs)
get
io_context,ip::tcp::resolver,socket,connect, andwrite
platform socket headers (
winsock2.h/arpa/inet.h)for
htons/htonlandntohson Windows vs. UNIX
standard headers
<vector>for dynamic buffers<cstdlib>/<cstdint>forstd::atoi, fixed‐width ints
main() signature & argument parsing
int main(int argc, char* argv[]) {
if (argc != 5) {
std::cerr << "Usage pub_client <host> <port> <channel> <message>\n";
return 1;
}
const char* host = argv[1];
const char* port = argv[2];
std::string channel = argv[3];
std::string message = argv[4];
…
}int argc, char* argv[]standard entry point receiving command line args
return 1;signals failure to the shellan alternative might be to use a flag parsing library (e.g
boost::program_options) for more robust CLI handling
Asio Setup & Connecting
boost::asio::io_context io;
boost::asio::ip::tcp::resolver resolver(io);
auto endpoints = resolver.resolve(boost::asio::ip::tcp::v4(), host, port);
boost::asio::ip::tcp::socket socket(io);
boost::system::error_code ec;
boost::asio::connect(socket, endpoints, ec);
if (ec) { … }
std::cout << "[pub] connected\n";io_contextcore i/o event pump, even if these are synchronous calls
resolver.resolve(…)DNS lookup returning a list of endpoints
translates
host+portinto TCP endpoints
connect(socket, endpoints, ec)synchronously tries each endpoint until one succeeds or fails
blocks until connection is made
error handling via
error_codetechnically this avoids throwing exceptions on failure
would be better to handle the exceptions properly
an alternative would be to use
async_resolve+async_connectfor non blocking behavior
Building the send buffer
uint16_t ch_len = htons(static_cast<uint16_t>(channel.size()));
uint32_t msg_len = htonl(static_cast<uint32_t>(message.size()));
std::vector<char> buf;
buf.reserve(2 + 4 + channel.size() + message.size());
// Append channel length
buf.insert(buf.end(),
reinterpret_cast<char*>(&ch_len),
reinterpret_cast<char*>(&ch_len) + sizeof(ch_len));
// Append message length
buf.insert(buf.end(),
reinterpret_cast<char*>(&msg_len),
reinterpret_cast<char*>(&msg_len) + sizeof(msg_len));
// Append channel name bytes
buf.insert(buf.end(), channel.begin(), channel.end());
// Append message bytes
buf.insert(buf.end(), message.begin(), message.end());network-byte-order encoding
htons/htonlconvert host-order integers to big-endian for wire format
std::vector<char>as bufferreserveavoids reallocationsinsertcopies raw bytes and string data in sequence
an alternative might be to use
boost::asio::streambufor scatter/gather buffers (buffer_sequence) to avoid manualmemcpy
Writing to the socket
std::size_t n = boost::asio::write(socket, boost::asio::buffer(buf), ec);
if (ec) {
std::cerr << "[pub] write failed: " << ec.message() << "\n";
return 1;
}
std::cout << "[SENT] bytes=" << n
<< " channel=" << channel
<< " msg=\"" << message << "\"\n";write(socket, buffer, ec)synchronous blocking write of the entire buffer
sends all bytes or sets
econ error
return number of bytes actually transferred
an alternative is to use
async_write(…)for non-blocking i/o with a callbackhandling partial writes manually with
socket.send(…)in a loop
Exception safety
try {
// all of the above
}
catch (std::exception& e) {
std::cerr << "Error: " << e.what() << "\n";
return 1;
}try/catchcatches any exceptions (e.g from Asio operations without error-code overloads)
hopefully prevents a crash and provides an error message
this client does one connect, one right, and then exists. It’s pretty simple and easy to debug for the first attempt
lengths are included up front so the broker can parse messages in two stages (header + body)
there are clear opportunities to be fully async by switching to
async_*calls without changing the buffer format
Some Potential Alternatives
fully async client
use
async_connect+async_write+ a callback to know when send completesa key benefit being that multiple publishes in one
io_context.run()loop
reusing the connection
keep the socket open and send multiple messages in one run, rather than one publish per invocation
loop over messages or read from stdin
error retries
on write failure, attempt reconnect or back off rather than exiting immediately
structured framing helper
wrap the header + payload logic in a small helper class or function to avoid repetition in pub/sub client
src/sub_client.cpp
Includes & namespace alias
#include <boost/asio.hpp>
#include <iostream>
#include <vector>
#include <cstdlib>
#include <cstdint>
#if defined(_WIN32)
#include <winsock2.h>
#else
#include <arpa/inet.h>
#endif
using boost::asio::ip::tcp;Boost.Asio provides
io_context,tcp::resolver,tcp::socket, and sync I/O (connect,read,write)Platform headers
winsock2.h/arpa/inet.hforhtons/ntohlstd headers:
<iostream>for console I/O<vector>for dynamic buffers<cstdlib>/<cstdint>foratoi, fixed-width integers
using tcp = boost::asio::ip::tcp;shortens type names
main() signature & argument parsing
int main(int argc, char* argv[]) {
if (argc != 4){
std::cerr << "Usage: sub_client <host> <port> <channel>\n";
return 1;
}
const char* host = argv[1];
const char* port = argv[2];
std::string channel = argv[3];
…
}argc/argvare standard c++ entry point receives command-line parameterserror code 1 signals the incorrect usage in the shell
Resolve & Connect Synchronously
boost::asio::io_context io_context;
tcp::resolver resolver(io_context);
auto endpoints = resolver.resolve(host, port);
tcp::socket socket(io_context);
boost::asio::connect(socket, endpoints);io_contextis the core i/o engineresolver.resolve is the DNS lookup for
host:portreturns one or more endpointsconnecttries each endpoint in turn until success or all fail (throws an error)an alternative (as mentioned before) is to use
async_resolve+async_connectfor non-blocking client
Sending the Subscription Request
uint16_t ch_len = htons(static_cast<uint16_t>(channel.size()));
uint16_t net_len = ch_len; // already in network byte order
boost::asio::write(socket, boost::asio::buffer(&net_len, sizeof(net_len)));
boost::asio::write(socket, boost::asio::buffer(channel));
std::cout << "Subscribed to channel '" << channel << "'....\n";framing first has a 2-byte length, then the channel name
htonsconverts the host-orderuint16_tto big endian for the wiresynchronous
writeis done—blocks until exactly that many bytes are sentan alternative might be to combine into one buffer or use
async_write; wrap framing in a helper function
Read-print loop
try {
for (;;) {
uint32_t payload_len_net;
boost::asio::read(socket, boost::asio::buffer(&payload_len_net, sizeof(payload_len_net)));
uint32_t payload_len = ntohl(payload_len_net);
std::vector<char> buf(payload_len);
boost::asio::read(socket, boost::asio::buffer(buf));
std::string message(buf.begin(), buf.end());
std::cout << "[RECV] " << message << std::endl;
}
}our loop continues until the server or connection closes
4-byte big endian length header is read then converted using
ntohlread the payload (exactly
payload_len) bytes into avector<char>constructs a
std::stringand outputs it to the consolea better solution might be to check for zero-length and add a loop around partial reads; use
async_readfor non-blocking i/o
Error handling
catch (const boost::system::system_error& e) {
if (e.code() == boost::asio::error::eof) {
std::cout << "*** Server closed connection, exiting\n";
} else {
std::cerr << "*** Error: " << e.what() << "\n";
}
}boost::system::system_erroris thrown by Asio on i/o when errors whenerror_codeis not passedeof is the special case and server is closed cleanly
we print a message on exception
an alternative might be to use
error_codeoverload ofread/writeto avoid exceptions altogetherthis is a simple synchronous model that is easy to reason about—one blocking connect , then one blocking read loop
matches the broker’s protocol (length header + payload)
socketandio_contextcleaned up automatically when they go out of scopepotential enhancements & alternatives
non-blocking client:
use
async_readandasync_writeon the sameio_context, allowing simultaneous publishes and subscribers
reconnect logic
on error/eof attempt to re-resolve and reconnect rather than exit
multiple channels
allow subscribing to several channels in one process using multiple sockets or multiplexed protocol
coroutines
with C++20 coroutines and Asio’s co_wait, we can write async code that looks synchronous
framing helper
create a small
Framestruct with encode() / decode() methods to avoid repeating length-plus-payload code
src/bench_pub.cpp
Includes and setup
#include <boost/asio.hpp>
#include <iostream>
#include <vector>
#include <cstdint>
#include <chrono>
#if defined(_WIN32)
#include <winsock2.h>
#else
#include <arpa/inet.h>
#endif<boost/asio.hpp>: All of Boost.Asio’s I/O machinery (docs)<chrono>: Time utilities (steady_clock,duration_cast) (cppreference)platform headers for
htonl/htonsbrings in networking, container, timing, and byte-order conversion functions
you need
<chrono>here for measuring elapsed timefor more precise or custom timers, one might use OS-specific high-resolution APIs or
<boost/chrono>
main() signature & argument parsing
int main(int argc, char* argv[]) {
if (argc != 6) {
std::cerr << "Usage: bench_pub <host> <port> <channel> <msg_size> <count>\n";
return 1;
}
const char* host = argv[1];
const char* port = argv[2];
std::string channel = argv[3];
size_t msg_size = std::stoul(argv[4]);
size_t count = std::stoul(argv[5]);
…
}standard c++ entry point with
argc/argvan alternative might be to use a CLI library (e.g
cxxoptsorboost::program_options) for richer parsing and error messages
Payload & header construction
std::string payload(msg_size, 'X');
uint16_t ch_len = htons(static_cast<uint16_t>(channel.size()));
uint32_t msg_len_net = htonl(static_cast<uint32_t>(payload.size()));
std::vector<char> header;
header.reserve(2 + 4 + channel.size());
header.insert(header.end(), (char*)&ch_len, (char*)&ch_len + 2);
header.insert(header.end(), (char*)&msg_len_net, (char*)&msg_len_net + 4);
header.insert(header.end(), channel.begin(), channel.end());stdd:string(payload_size, ‘X’)fills the string with‘X’charactershtons/htonlhost-to-network endian conversionbuilds a reusable fixed-header buffer containing channel length, message length, and channel name
reserveavoids reallocationsavoids reconstructing the header each iteration
an alternative might be to use a single flat buffer (
std::vector<char> buf(header + payload)) or Asio’s scatter/gather (const_buffer) to eliminate copies
Connecting synchronously
boost::asio::io_context io;
auto endpoints = boost::asio::ip::tcp::resolver(io)
.resolve(boost::asio::ip::tcp::v4(), host, port);
boost::asio::ip::tcp::socket sock(io);
boost::asio::connect(sock, endpoints);the same connection scheme as the other subscriber flow
Timing the Loop
auto start = std::chrono::steady_clock::now();
for (size_t i = 0; i < count; ++i) {
boost::asio::write(sock, boost::asio::buffer(header));
boost::asio::write(sock, boost::asio::buffer(payload));
}
auto end = std::chrono::steady_clock::now();steady_clock::now()monotonic clock unaffected by system time changes
write(…)synchronous blocking write
records the start time, sends count messages back-to-back, then records the end time
ensures the timing only encompasses the network writes
an alternative would be to measure the time when using fully asynchronous operations
Calculating throughput
auto dur_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
double mps = static_cast<double>(count) / (dur_ms / 1000.0);
std::cout << "sent=" << count
<< " msgs in " << dur_ms << " ms => "
<< mps << " msg/s\n";duration_cast<milliseconds>converts asteady_clockduration into an integer millisecond countan alternative might be to use std::chrono::microseconds , or record statistics (mean, p50, p99 latencies) for more insight
potential enhancements
parallel publishing
Spawn multiple threads or coroutines, each running its own loop, to saturate the broker under multi‐threaded load
asynchronous pattern
Use
async_write+ a counter of pending writes, then wait on anio_contextrun until all completes
back‐pressure handling
Check for
writepartials or errors and implement retry/back‐off logic
detailed metrics
Measure per‐message round-trip time by having the subscriber echo back, then timing
write→read
src/bench_sub.cpp
Includes & setup
#include <boost/asio.hpp>
#include <iostream>
#include <vector>
#include <cstdint>
#include <chrono>
#if defined(_WIN32)
#include <winsock2.h>
#else
#include <arpa/inet.h>
#endif<boost/asio.hpp>: brings in synchronous I/O operations (io_context,tcp::resolver,tcp::socket,connect,read,write)<chrono>: for high-resolution timing (steady_clock,duration_cast)Platform headers (
winsock2.h/arpa/inet.h): Providehtons/ntohlbyte-order conversions
main() signature & argument parsing
largely the same pattern as the above approach to the subscriber
Resolve, connect, and subscribe
same as the publisher code mentioned above
Receive & count loop
uint64_t received = 0;
auto start = std::chrono::steady_clock::now();
while (received < target) {
uint32_t net_len;
boost::asio::read(sock, boost::asio::buffer(&net_len, sizeof(net_len)));
uint32_t len = ntohl(net_len);
std::vector<char> buf(len);
boost::asio::read(sock, boost::asio::buffer(buf));
++received;
}
auto end = std::chrono::steady_clock::now();loop continues until
received == targetreads a 4-byte network-order payload length, converts with
ntohlallocates a buffer of that length and reads it in full
an alternative might be to preallocate a single buffer large enough for the maximum message to avoid repeated allocations
Throughput allocation
largely the same calculation except for that it divides the total messages by total seconds to get messages per second
Results
--------------------spsc-------------------------------
$ ./build/bench_sub.exe 127.0.0.1 6000 mytopic 100000
recv=100000 msgs in 110240 ms => 907.112 msg/s
tyler@tyler MINGW64 /d/pub_sub
$ ./build/bench_pub.exe 127.0.0.1 5000 mytopic 128 100000
sent=100000 msgs in 40983 ms => 2440.04 msg/s
----------------------lf--------------------------------
tyler@tyler MINGW64 /d/pub_sub
$ ./build/bench_sub.exe 127.0.0.1 6000 mytopic 100000
recv=100000 msgs in 45967 ms => 2175.47 msg/s
tyler@tyler MINGW64 /d/pub_sub
$ ./build/bench_pub.exe 127.0.0.1 5000 mytopic 128 100000
sent=100000 msgs in 39750 ms => 2515.72 msg/s
-----------------------mutex----------------------------
tyler@tyler MINGW64 /d/pub_sub
$ ./build/bench_sub.exe 127.0.0.1 6000 mytopic 100000
recv=100000 msgs in 47052 ms => 2125.31 msg/s
tyler@tyler MINGW64 /d/pub_sub
$ ./build/bench_pub.exe 127.0.0.1 5000 mytopic 128 100000
sent=100000 msgs in 38919 ms => 2569.44 msg/s+-----------+-----------+-----------+
| Queue | Pub msg/s | Sub msg/s |
+-----------+-----------+-----------+
| SPSC | 2 440 | 907 |
| Lock-free | 2 515 | 2 175 |
| Mutex | 2 569 | 2 125 |
+-----------+-----------+-----------+SPSC is slow
in the
SpscAdapterhas a coarse 50 microsecond sleep in between messages—this could probably be improved significantly
Lock-free & mutex appear to be comparable
both the
LfQueueandMutexQueueavoid the sleepMutexQueueblocks oncondition_variableuntil new data arrivesLfQueuespins (withyield()) until an item is available
the delta appears to be due to overhead with moving data and context switching
Is lock-free “best”?
LfQueueedges outMutexQueueby a few percent on subscribe (~2 175 vs. 2 125 msg/s)MutexQueueis simpler and less error-prone—no memory-order subtleties or false-sharing concernsunless the project requires every last microsecond, the mutex version’s performance is “good enough” and far easier to maintain
We could make SPSC faster
two main approaches:
Tight spin(pause)+yield instead of a fixed
sleep_for(50µs)That polls briefly, then yields to let the producer run, with no long sleeps
Atomic wait/notify (C++20)
That wakes the consumer immediately when data arrives, with near-zero latency
alternatives & trade-offs
Condition variable (like
MutexQueue):CPU-efficient, but wake-ups can be slightly slower than a tight spin
Lock-free MPMC:
LfQueueuses heap allocation per item—which hurts pure latency. A pre-allocated node pool (or intrusive nodes) can eliminate that
Hybrid approach might be:
Spin‐for‐a-few-cycles then CV wait or atomic_wait—the classic “spin-then-sleep” backoff balances latency vs. CPU


