52 RequestQueue(
size_t num_workers = 128,
size_t max_queue_size = 10000)
53 : max_queue_size_(max_queue_size)
55 for (
size_t i = 0; i < num_workers; ++i) {
56 workers_.emplace_back([
this] { worker_loop(); });
68 for (
auto& w : workers_) {
69 if (w.joinable()) w.join();
89 std::pair<bool, std::future<void>>
submit(F&& task) {
90 if (pending_.load(std::memory_order_relaxed) >= max_queue_size_) {
91 rejected_.fetch_add(1, std::memory_order_relaxed);
92 return {
false, std::future<void>()};
95 auto promise = std::make_shared<std::promise<void>>();
96 auto future = promise->get_future();
98 pending_.fetch_add(1, std::memory_order_relaxed);
100 [t = std::forward<F>(task), p = std::move(promise)]()
mutable {
105 p->set_exception(std::current_exception());
110 return {
true, std::move(future)};
119 pending_.load(std::memory_order_relaxed),
120 active_.load(std::memory_order_relaxed),
121 completed_.load(std::memory_order_relaxed),
122 rejected_.load(std::memory_order_relaxed),
130 std::function<void()> task;
131 while (running_.load(std::memory_order_relaxed)) {
132 if (queue_.try_dequeue(task)) {
133 pending_.fetch_sub(1, std::memory_order_relaxed);
134 active_.fetch_add(1, std::memory_order_relaxed);
136 active_.fetch_sub(1, std::memory_order_relaxed);
137 completed_.fetch_add(1, std::memory_order_relaxed);
140 std::unique_lock<std::mutex> lock(cv_mutex_);
141 cv_.wait_for(lock, std::chrono::milliseconds(50), [
this] {
142 return !running_.load(std::memory_order_relaxed)
143 || queue_.size_approx() > 0;
149 moodycamel::ConcurrentQueue<std::function<void()>> queue_;
150 std::vector<std::thread> workers_;
151 std::mutex cv_mutex_;
152 std::condition_variable cv_;
153 std::atomic<bool> running_{
true};
154 std::atomic<size_t> pending_{0};
155 std::atomic<size_t> active_{0};
156 std::atomic<size_t> completed_{0};
157 std::atomic<size_t> rejected_{0};
158 size_t max_queue_size_;