|
NeoGraph 0.10.0
A C++17 Graph Agent Engine Library — LangGraph for C++
|
Stateless-per-call node invocation dispatcher. More...
#include <executor.h>
Public Member Functions | |
| asio::awaitable< NodeResult > | execute_node_with_retry_async (const std::string &node_name, GraphState &state, const GraphStreamCallback &cb, StreamMode stream_mode, const RunContext &ctx) |
| Inner retry loop with exponential backoff + NodeInterrupt short-circuit. | |
| NodeExecutor (const std::map< std::string, std::unique_ptr< GraphNode > > &nodes, const std::vector< ChannelDef > &channel_defs, RetryPolicyLookup retry_policy_for, asio::thread_pool *fan_out_pool=nullptr, NodeCache *node_cache=nullptr) | |
| asio::awaitable< NodeResult > | run_one_async (const std::string &node_name, int step, GraphState &state, const std::unordered_map< std::string, NodeResult > &replay, CheckpointCoordinator &coord, const std::string &parent_cp_id, const BarrierState &barrier_state, std::vector< std::string > &trace, const GraphStreamCallback &cb, StreamMode stream_mode, const RunContext &ctx) |
| Execute a single node in the current super-step. | |
| asio::awaitable< std::vector< NodeResult > > | run_parallel_async (const std::vector< std::string > &ready, int step, GraphState &state, const std::unordered_map< std::string, NodeResult > &replay, CheckpointCoordinator &coord, const std::string &parent_cp_id, const BarrierState &barrier_state, std::vector< std::string > &trace, const GraphStreamCallback &cb, StreamMode stream_mode, const RunContext &ctx) |
Execute all ready nodes concurrently via asio::experimental::make_parallel_group + wait_for_all. | |
| asio::awaitable< std::vector< StepRouting > > | run_sends_async (const std::vector< Send > &sends, int step, GraphState &state, const std::unordered_map< std::string, NodeResult > &replay, CheckpointCoordinator &coord, const std::string &parent_cp_id, std::vector< std::string > &trace, const GraphStreamCallback &cb, StreamMode stream_mode, const RunContext &ctx) |
| Execute a list of Send requests accumulated this step. | |
Stateless-per-call node invocation dispatcher.
Holds only static configuration: the node map, the channel-def layout (needed to re-init isolated Send states), and a retry-policy lookup. Every dynamic concern — GraphState, CheckpointCoordinator, replay map, step index, streaming callback — flows in through method parameters so tests can drive individual invocations without reconstructing the surrounding super-step.
Definition at line 81 of file executor.h.
| neograph::graph::NodeExecutor::NodeExecutor | ( | const std::map< std::string, std::unique_ptr< GraphNode > > & | nodes, |
| const std::vector< ChannelDef > & | channel_defs, | ||
| RetryPolicyLookup | retry_policy_for, | ||
| asio::thread_pool * | fan_out_pool = nullptr, |
||
| NodeCache * | node_cache = nullptr |
||
| ) |
| nodes | Map of node_name → GraphNode owned by the engine. Must outlive this executor (engine owns both). |
| channel_defs | Channel layout used to initialize isolated states for multi-Send execution. |
| retry_policy_for | Lookup called for each retry run; the executor has no opinion about fallback behavior — the callback is free to return a default policy. |
| fan_out_pool | Optional non-owning pool used for parallel fan-out (run_parallel_async and multi-Send run_sends_async). If nullptr, branches run on the current coroutine's executor — fine for single-thread async callers, but serializes CPU-bound fan-out. |
| asio::awaitable< NodeResult > neograph::graph::NodeExecutor::execute_node_with_retry_async | ( | const std::string & | node_name, |
| GraphState & | state, | ||
| const GraphStreamCallback & | cb, | ||
| StreamMode | stream_mode, | ||
| const RunContext & | ctx | ||
| ) |
Inner retry loop with exponential backoff + NodeInterrupt short-circuit.
Drives node->execute_full_(stream_)async via co_await and uses asio::steady_timer.async_wait for backoff so the executor is not frozen during retry waits. NodeInterrupt + exception semantics preserved bit-for-bit; GCC-13-safe (catch block captures the exception via std::optional, co_await happens outside).
Public so regression tests can drive it directly without reconstructing a full super-step.
| asio::awaitable< NodeResult > neograph::graph::NodeExecutor::run_one_async | ( | const std::string & | node_name, |
| int | step, | ||
| GraphState & | state, | ||
| const std::unordered_map< std::string, NodeResult > & | replay, | ||
| CheckpointCoordinator & | coord, | ||
| const std::string & | parent_cp_id, | ||
| const BarrierState & | barrier_state, | ||
| std::vector< std::string > & | trace, | ||
| const GraphStreamCallback & | cb, | ||
| StreamMode | stream_mode, | ||
| const RunContext & | ctx | ||
| ) |
Execute a single node in the current super-step.
On success: applies writes + command.updates to state, appends node_name to trace, returns the NodeResult. Records a pending write via coord BEFORE state.apply_writes, so a crash between record and apply still leaves a replayable log.
On NodeInterrupt: saves a phase=NodeInterrupt checkpoint with barrier_state via coord, then rethrows. On any other exception: propagates after the retry policy is exhausted (execute_node_with_retry_async handles the retry loop internally). Node dispatch and checkpoint I/O flow through co_await so other coroutines on the same executor keep moving.
| asio::awaitable< std::vector< NodeResult > > neograph::graph::NodeExecutor::run_parallel_async | ( | const std::vector< std::string > & | ready, |
| int | step, | ||
| GraphState & | state, | ||
| const std::unordered_map< std::string, NodeResult > & | replay, | ||
| CheckpointCoordinator & | coord, | ||
| const std::string & | parent_cp_id, | ||
| const BarrierState & | barrier_state, | ||
| std::vector< std::string > & | trace, | ||
| const GraphStreamCallback & | cb, | ||
| StreamMode | stream_mode, | ||
| const RunContext & | ctx | ||
| ) |
Execute all ready nodes concurrently via asio::experimental::make_parallel_group + wait_for_all.
After every branch has finished: if any threw, the first exception is rethrown; otherwise writes + command.updates from each result are applied to the shared state in ready order, trace is appended in ready order, and results are returned in ready order so the caller can pair them with scheduler routing decisions.
If the first thrown exception is a NodeInterrupt, the offending node's name is captured and a phase=NodeInterrupt checkpoint is saved with next_nodes={interrupted_node} before rethrow — matching run_one_async's behavior so resume re-enters on just the interrupting node (replay skips the siblings that already completed via pending_writes).
| asio::awaitable< std::vector< StepRouting > > neograph::graph::NodeExecutor::run_sends_async | ( | const std::vector< Send > & | sends, |
| int | step, | ||
| GraphState & | state, | ||
| const std::unordered_map< std::string, NodeResult > & | replay, | ||
| CheckpointCoordinator & | coord, | ||
| const std::string & | parent_cp_id, | ||
| std::vector< std::string > & | trace, | ||
| const GraphStreamCallback & | cb, | ||
| StreamMode | stream_mode, | ||
| const RunContext & | ctx | ||
| ) |
Execute a list of Send requests accumulated this step.
Single-send path: the target runs on the shared state with retry and the Send.input is applied to that state. Multi-send path: each target runs on an isolated state copy (fresh init + restore + apply_input) via asio::experimental::make_parallel_group on deferred workers, without retry, and the writes are fanned back into the shared state after wait_for_all.
Returns one StepRouting per Send-spawned task, in send order. The caller merges these with the original ready-set's routings before invoking the Scheduler so that per-task Command.goto and the Send target's default outgoing edges flow into the next super-step's routing decision (LangGraph parity).