NeoGraph 0.10.0
A C++17 Graph Agent Engine Library — LangGraph for C++
Loading...
Searching...
No Matches
run_sync.h
Go to the documentation of this file.
1
25#pragma once
26
28
29#include <asio/awaitable.hpp>
30#include <asio/bind_cancellation_slot.hpp>
31#include <asio/co_spawn.hpp>
32#include <asio/detached.hpp>
33#include <asio/io_context.hpp>
34#include <asio/thread_pool.hpp>
35
36#include <cstddef>
37#include <exception>
38#include <optional>
39#include <utility>
40
41namespace neograph::async {
42
58template <typename T>
59T run_sync(asio::awaitable<T> aw,
60 neograph::graph::CancelToken* cancel = nullptr) {
61 // v0.3.2: short-circuit if the parent token is already cancelled.
62 // Without this, the retry loop in NodeExecutor would re-call
63 // Provider::complete after a first cancel, fresh run_sync would
64 // bind its slot AFTER add_cancel_hook fired its post-emit (the
65 // emit-before-bind race), the cancel signal would be lost, and
66 // the new HTTP request would run to completion — burning billable
67 // tokens on every retry attempt. Throwing CancelledException
68 // eagerly closes that loop: the executor's retry loop catches it
69 // (alongside NodeInterrupt) and skips retries, so a second
70 // cancelled call never hits the wire.
71 if (cancel && cancel->is_cancelled()) {
72 throw neograph::graph::CancelledException("run_sync entry");
73 }
74
75 asio::io_context io;
76 std::optional<T> result;
77 std::exception_ptr err;
78
79 auto body = [&]() -> asio::awaitable<void> {
80 try {
81 result.emplace(co_await std::move(aw));
82 } catch (...) {
83 err = std::current_exception();
84 }
85 co_return;
86 };
87
88 if (cancel) {
89 // v0.4 PR 3: fork a child token for this nested run_sync.
90 // The child has its own ``cancellation_signal``, bound to
91 // this io_context's executor; ``parent.cancel()`` cascades
92 // to every live child, so concurrent nested run_syncs
93 // (multi-Send fan-out workers each calling
94 // ``provider.complete()``) all get their HTTP sockets torn
95 // down. The pre-v0.3.1 design bound the parent's single
96 // signal to io.get_executor() — last writer won, so only
97 // one of N concurrent workers received cancel and the rest
98 // streamed to completion. v0.3.1's ``add_cancel_hook`` list
99 // was a workaround on top of that single-signal model;
100 // ``fork()`` replaces it with a structural primitive.
101 //
102 // Lifetime: ``child`` is a ``shared_ptr`` so a late-firing
103 // cascade (parent racing with body completion) doesn't UAF.
104 // The parent stores a ``weak_ptr`` to the child, so once
105 // ``child`` goes out of scope at the end of this function
106 // the parent's children_ list naturally drops it.
107 //
108 // Eager-cancel race: if the parent was already cancelled
109 // before ``fork()``, the child's polling flag is pre-set;
110 // ``bind_executor`` then fires the child's signal at the
111 // first co_await checkpoint of body(). No "emit-vs-bind"
112 // window like the pre-v0.3.2 hook design, so the eager
113 // ``is_cancelled()`` short-circuit at function entry stays
114 // strictly as a tiny optimization (skip io_context
115 // construction altogether).
116 auto child = cancel->fork();
117 child->bind_executor(io.get_executor());
118 asio::co_spawn(io, body(),
119 asio::bind_cancellation_slot(child->slot(),
120 asio::detached));
121 io.run();
122 // ``child`` goes out of scope at end of block → parent's
123 // weak_ptr expires → next parent.cancel()/fork() prunes it.
124
125 // v0.3.2: if the inner co_spawn completed because of a cancel
126 // (asio::system_error operation_aborted from a torn-down HTTP
127 // socket), surface it as the typed CancelledException so the
128 // executor's retry loop can short-circuit instead of treating
129 // it as a transient runtime_error.
130 if (err && cancel->is_cancelled()) {
131 throw neograph::graph::CancelledException("run_sync inner abort");
132 }
133 } else {
134 asio::co_spawn(io, body(), asio::detached);
135 io.run();
136 }
137
138 if (err) std::rethrow_exception(err);
139 return std::move(*result);
140}
141
143inline void run_sync(asio::awaitable<void> aw,
144 neograph::graph::CancelToken* cancel = nullptr) {
145 // v0.3.2: same eager short-circuit as the templated peer above.
146 // See that overload's comment for the retry/cost-leak rationale.
147 if (cancel && cancel->is_cancelled()) {
148 throw neograph::graph::CancelledException("run_sync entry");
149 }
150
151 asio::io_context io;
152 std::exception_ptr err;
153
154 auto body = [&]() -> asio::awaitable<void> {
155 try {
156 co_await std::move(aw);
157 } catch (...) {
158 err = std::current_exception();
159 }
160 };
161
162 if (cancel) {
163 // v0.4 PR 3: fork a child token. See the templated peer
164 // above for the full rationale; this is the bit-for-bit
165 // void specialization.
166 auto child = cancel->fork();
167 child->bind_executor(io.get_executor());
168 asio::co_spawn(io, body(),
169 asio::bind_cancellation_slot(child->slot(),
170 asio::detached));
171 io.run();
172 if (err && cancel->is_cancelled()) {
173 throw neograph::graph::CancelledException("run_sync inner abort");
174 }
175 if (err) std::rethrow_exception(err);
176 return;
177 } else {
178 asio::co_spawn(io, body(), asio::detached);
179 }
180
181 io.run();
182
183 if (err) std::rethrow_exception(err);
184}
185
193template <typename T>
194T run_sync_pool(asio::awaitable<T> aw, std::size_t n_threads) {
195 asio::thread_pool pool(n_threads > 0 ? n_threads : 1);
196 std::optional<T> result;
197 std::exception_ptr err;
198
199 asio::co_spawn(
200 pool.get_executor(),
201 [&]() -> asio::awaitable<void> {
202 try {
203 result.emplace(co_await std::move(aw));
204 } catch (...) {
205 err = std::current_exception();
206 }
207 co_return;
208 },
209 asio::detached);
210
211 pool.join();
212
213 if (err) std::rethrow_exception(err);
214 return std::move(*result);
215}
216
218inline void run_sync_pool(asio::awaitable<void> aw, std::size_t n_threads) {
219 asio::thread_pool pool(n_threads > 0 ? n_threads : 1);
220 std::exception_ptr err;
221
222 asio::co_spawn(
223 pool.get_executor(),
224 [&]() -> asio::awaitable<void> {
225 try {
226 co_await std::move(aw);
227 } catch (...) {
228 err = std::current_exception();
229 }
230 },
231 asio::detached);
232
233 pool.join();
234
235 if (err) std::rethrow_exception(err);
236}
237
238} // namespace neograph::async
Cooperative cancellation primitive for graph runs.
Cooperative cancel primitive shared between caller and engine.
Definition cancel.h:77
Thrown by the engine when a run is cancelled mid-flight via CancelToken::cancel().
Definition cancel.h:61
T run_sync(asio::awaitable< T > aw, neograph::graph::CancelToken *cancel=nullptr)
Run aw to completion on a fresh single-threaded io_context and return its result.
Definition run_sync.h:59
T run_sync_pool(asio::awaitable< T > aw, std::size_t n_threads)
Run aw to completion on a fresh N-worker asio::thread_pool and return its result.
Definition run_sync.h:194