NeoGraph 0.10.0
A C++17 Graph Agent Engine Library — LangGraph for C++
Loading...
Searching...
No Matches
request_queue.h
Go to the documentation of this file.
1
9#pragma once
10
11#include <concurrentqueue.h>
12#include <thread>
13#include <vector>
14#include <atomic>
15#include <functional>
16#include <future>
17#include <iostream>
18#include <mutex>
19#include <condition_variable>
20
21namespace neograph::util {
22
36public:
38 struct Stats {
39 size_t pending;
40 size_t active;
41 size_t completed;
42 size_t rejected;
43 size_t num_workers;
45 };
46
52 RequestQueue(size_t num_workers = 128, size_t max_queue_size = 10000)
53 : max_queue_size_(max_queue_size)
54 {
55 for (size_t i = 0; i < num_workers; ++i) {
56 workers_.emplace_back([this] { worker_loop(); });
57 }
58 // No banner on stdout — library code shouldn't pollute the
59 // host process's stdout (breaks scripts that parse it). If
60 // observability is desired, route through an injected logger
61 // instead of std::cout.
62 }
63
66 running_ = false;
67 cv_.notify_all();
68 for (auto& w : workers_) {
69 if (w.joinable()) w.join();
70 }
71 }
72
73 RequestQueue(const RequestQueue&) = delete;
74 RequestQueue& operator=(const RequestQueue&) = delete;
75
88 template<typename F>
89 std::pair<bool, std::future<void>> submit(F&& task) {
90 if (pending_.load(std::memory_order_relaxed) >= max_queue_size_) {
91 rejected_.fetch_add(1, std::memory_order_relaxed);
92 return {false, std::future<void>()};
93 }
94
95 auto promise = std::make_shared<std::promise<void>>();
96 auto future = promise->get_future();
97
98 pending_.fetch_add(1, std::memory_order_relaxed);
99 queue_.enqueue(
100 [t = std::forward<F>(task), p = std::move(promise)]() mutable {
101 try {
102 t();
103 p->set_value();
104 } catch (...) {
105 p->set_exception(std::current_exception());
106 }
107 });
108 cv_.notify_one();
109
110 return {true, std::move(future)};
111 }
112
117 Stats stats() const {
118 return {
119 pending_.load(std::memory_order_relaxed),
120 active_.load(std::memory_order_relaxed),
121 completed_.load(std::memory_order_relaxed),
122 rejected_.load(std::memory_order_relaxed),
123 workers_.size(),
124 max_queue_size_
125 };
126 }
127
128private:
129 void worker_loop() {
130 std::function<void()> task;
131 while (running_.load(std::memory_order_relaxed)) {
132 if (queue_.try_dequeue(task)) {
133 pending_.fetch_sub(1, std::memory_order_relaxed);
134 active_.fetch_add(1, std::memory_order_relaxed);
135 task();
136 active_.fetch_sub(1, std::memory_order_relaxed);
137 completed_.fetch_add(1, std::memory_order_relaxed);
138 } else {
139 // No busy-spin — sleep until notified by submit()
140 std::unique_lock<std::mutex> lock(cv_mutex_);
141 cv_.wait_for(lock, std::chrono::milliseconds(50), [this] {
142 return !running_.load(std::memory_order_relaxed)
143 || queue_.size_approx() > 0;
144 });
145 }
146 }
147 }
148
149 moodycamel::ConcurrentQueue<std::function<void()>> queue_;
150 std::vector<std::thread> workers_;
151 std::mutex cv_mutex_;
152 std::condition_variable cv_;
153 std::atomic<bool> running_{true};
154 std::atomic<size_t> pending_{0};
155 std::atomic<size_t> active_{0};
156 std::atomic<size_t> completed_{0};
157 std::atomic<size_t> rejected_{0};
158 size_t max_queue_size_;
159};
160
161} // namespace neograph::util
Lock-free task queue with a fixed worker thread pool and backpressure.
RequestQueue(size_t num_workers=128, size_t max_queue_size=10000)
Construct a request queue with a worker thread pool.
Stats stats() const
Get current queue statistics.
std::pair< bool, std::future< void > > submit(F &&task)
Submit a task to the queue.
~RequestQueue()
Destructor: stops all workers and waits for them to finish.
Runtime statistics for monitoring queue health.
size_t rejected
Tasks rejected due to backpressure (queue full).
size_t completed
Total tasks finished.
size_t pending
Tasks waiting in the queue.
size_t active
Tasks currently being executed.
size_t max_queue_size
Maximum queue capacity.
size_t num_workers
Number of worker threads.