NeoGraph 0.10.0
A C++17 Graph Agent Engine Library — LangGraph for C++
Loading...
Searching...
No Matches
cancel.h
Go to the documentation of this file.
1
31#pragma once
32
33#include <neograph/api.h>
34
35#include <asio/any_io_executor.hpp>
36#include <asio/cancellation_signal.hpp>
37#include <asio/post.hpp>
38
39#include <algorithm>
40#include <atomic>
41#include <cstdint>
42#include <functional>
43#include <memory>
44#include <mutex>
45#include <stdexcept>
46#include <utility>
47#include <vector>
48
49namespace neograph::graph {
50
61class NEOGRAPH_API CancelledException : public std::runtime_error {
62public:
64 : std::runtime_error("neograph: run cancelled") {}
65 explicit CancelledException(const std::string& detail)
66 : std::runtime_error("neograph: run cancelled — " + detail) {}
67};
68
77class NEOGRAPH_API CancelToken {
78public:
79 CancelToken() = default;
80
81 CancelToken(const CancelToken&) = delete;
82 CancelToken& operator=(const CancelToken&) = delete;
83
97 void cancel() noexcept {
98 if (cancelled_.exchange(true, std::memory_order_acq_rel)) {
99 return; // already cancelled
100 }
101
102 // Snapshot the executor under the mutex; bind_executor and
103 // cancel may race across threads.
104 asio::any_io_executor ex_snapshot;
105 {
106 std::lock_guard<std::mutex> lk(mu_);
107 ex_snapshot = ex_;
108 }
109 if (ex_snapshot) {
110 // Post the emit onto the strand that owns the signal —
111 // emit() is documented as not thread-safe across executors.
112 // The engine binds the same executor it co_spawns on.
113 asio::post(ex_snapshot, [this]() {
114 sig_.emit(asio::cancellation_type::all);
115 });
116 }
117
118 // v1.0 (9c): the legacy ``add_cancel_hook`` / hooks_ iteration
119 // is gone — ``fork()`` is the canonical primitive for nested
120 // cancel scopes, and every internal ``run_sync`` cascades via
121 // a forked child.
122
123 // Cascade to live children produced by ``fork()``.
124 // Snapshot live shared_ptrs under the lock, then call
125 // ``cancel()`` on each outside the lock — the recursive call
126 // would otherwise re-enter ``children_mu_`` if a grandchild
127 // exists, and lock_guard isn't reentrant.
128 std::vector<std::shared_ptr<CancelToken>> live_children;
129 {
130 std::lock_guard<std::mutex> lk(children_mu_);
131 live_children.reserve(children_.size());
132 for (auto& w : children_) {
133 if (auto sp = w.lock()) {
134 live_children.push_back(std::move(sp));
135 }
136 }
137 // Don't bother pruning expired entries here — fork() does
138 // it on the next call, and after this cancel() the parent
139 // is "done" anyway.
140 }
141 for (auto& child : live_children) {
142 child->cancel();
143 }
144 }
145
147 [[nodiscard]] bool is_cancelled() const noexcept {
148 return cancelled_.load(std::memory_order_acquire);
149 }
150
160 void bind_executor(asio::any_io_executor ex) {
161 bool fire_immediately = false;
162 {
163 std::lock_guard<std::mutex> lk(mu_);
164 ex_ = std::move(ex);
165 fire_immediately = cancelled_.load(std::memory_order_acquire);
166 }
167 if (fire_immediately) {
168 std::lock_guard<std::mutex> lk(mu_);
169 if (ex_) {
170 asio::post(ex_, [this]() {
171 sig_.emit(asio::cancellation_type::all);
172 });
173 }
174 }
175 }
176
186 asio::cancellation_slot slot() noexcept {
187 return sig_.slot();
188 }
189
194 void throw_if_cancelled(const std::string& detail = {}) const {
195 if (is_cancelled()) {
196 throw CancelledException(detail);
197 }
198 }
199
233 [[nodiscard]] std::shared_ptr<CancelToken> fork() {
234 // Construct child outside any lock — its ctor takes no lock.
235 // shared_ptr ctor allocates the control block; cheap relative
236 // to a real cancel scope (one io_context spin-up downstream).
237 auto child = std::shared_ptr<CancelToken>(new CancelToken());
238
239 {
240 std::lock_guard<std::mutex> lk(children_mu_);
241 // Opportunistic prune: every fork() is a natural place to
242 // drop expired weak_ptrs from previous children that have
243 // already been released by their owners. Bounds the
244 // children_ vector at the live-fan-out width even on
245 // long-lived parents (e.g. an engine with 1000 LLM calls
246 // per run, each forking once).
247 children_.erase(
248 std::remove_if(
249 children_.begin(), children_.end(),
250 [](const std::weak_ptr<CancelToken>& w) {
251 return w.expired();
252 }),
253 children_.end());
254 children_.push_back(child);
255 }
256
257 // Eager propagation: parent already cancelled → child sees
258 // the polling flag immediately. ``bind_executor`` on the
259 // child will then fire its signal at the next co_await.
260 if (cancelled_.load(std::memory_order_acquire)) {
261 child->cancel();
262 }
263 return child;
264 }
265
266private:
267 std::atomic<bool> cancelled_{false};
268 mutable std::mutex mu_; // guards ex_ vs cancel() race
269 asio::any_io_executor ex_; // bound by engine before HTTP I/O
270 asio::cancellation_signal sig_; // for asio operation cancel
271
272 // Hierarchical cascade list. Each entry is a child token
273 // produced by ``fork()``. ``cancel()`` walks live children and
274 // cascades. Stored as ``weak_ptr`` so a child that goes out of
275 // scope (its run_sync returned, run completed) is automatically
276 // pruned on the next ``cancel()`` / ``fork()`` traversal.
277 mutable std::mutex children_mu_;
278 std::vector<std::weak_ptr<CancelToken>> children_;
279};
280
281// v1.0 (9d): the `current_cancel_token()` thread_local +
282// `CurrentCancelTokenScope` RAII smuggling channel is gone.
283// `RunContext::cancel_token` (on `NodeInput::ctx`) is the only cancel
284// channel. Engine threads it through every dispatch.
285
286} // namespace neograph::graph
NEOGRAPH_API export/import macro for shared-library builds.
Cooperative cancel primitive shared between caller and engine.
Definition cancel.h:77
void throw_if_cancelled(const std::string &detail={}) const
Throws CancelledException if cancelled.
Definition cancel.h:194
void bind_executor(asio::any_io_executor ex)
Bind the executor that will handle cancellation_signal emits.
Definition cancel.h:160
asio::cancellation_slot slot() noexcept
Cancellation slot for binding to a coroutine via asio::bind_cancellation_slot at co_spawn.
Definition cancel.h:186
void cancel() noexcept
Request cancellation.
Definition cancel.h:97
bool is_cancelled() const noexcept
Polling read of the cancel flag. Cheap, lock-free.
Definition cancel.h:147
std::shared_ptr< CancelToken > fork()
Create a child token that cascades from this one.
Definition cancel.h:233
Thrown by the engine when a run is cancelled mid-flight via CancelToken::cancel().
Definition cancel.h:61