NeoGraph 0.10.0
A C++17 Graph Agent Engine Library — LangGraph for C++
Loading...
Searching...
No Matches
executor.h
Go to the documentation of this file.
1
40#pragma once
41
42#include <neograph/api.h>
44#include <neograph/graph/node.h>
49
50#include <asio/thread_pool.hpp>
51
52#include <atomic>
53#include <functional>
54#include <map>
55#include <memory>
56#include <string>
57#include <unordered_map>
58#include <vector>
59
60namespace neograph::graph {
61
62class GraphState;
63
69struct RunContext;
70
81class NEOGRAPH_API NodeExecutor {
82public:
83 using RetryPolicyLookup = std::function<RetryPolicy(const std::string&)>;
84
100 const std::map<std::string, std::unique_ptr<GraphNode>>& nodes,
101 const std::vector<ChannelDef>& channel_defs,
102 RetryPolicyLookup retry_policy_for,
103 asio::thread_pool* fan_out_pool = nullptr,
104 NodeCache* node_cache = nullptr);
105
121 asio::awaitable<NodeResult> run_one_async(
122 const std::string& node_name,
123 int step,
124 GraphState& state,
125 const std::unordered_map<std::string, NodeResult>& replay,
127 const std::string& parent_cp_id,
128 const BarrierState& barrier_state,
129 std::vector<std::string>& trace,
130 const GraphStreamCallback& cb,
131 StreamMode stream_mode,
132 const RunContext& ctx);
133
144 asio::awaitable<NodeResult> execute_node_with_retry_async(
145 const std::string& node_name,
146 GraphState& state,
147 const GraphStreamCallback& cb,
148 StreamMode stream_mode,
149 const RunContext& ctx);
150
169 asio::awaitable<std::vector<NodeResult>> run_parallel_async(
170 const std::vector<std::string>& ready,
171 int step,
172 GraphState& state,
173 const std::unordered_map<std::string, NodeResult>& replay,
175 const std::string& parent_cp_id,
176 const BarrierState& barrier_state,
177 std::vector<std::string>& trace,
178 const GraphStreamCallback& cb,
179 StreamMode stream_mode,
180 const RunContext& ctx);
181
199 asio::awaitable<std::vector<StepRouting>> run_sends_async(
200 const std::vector<Send>& sends,
201 int step,
202 GraphState& state,
203 const std::unordered_map<std::string, NodeResult>& replay,
205 const std::string& parent_cp_id,
206 std::vector<std::string>& trace,
207 const GraphStreamCallback& cb,
208 StreamMode stream_mode,
209 const RunContext& ctx);
210
211private:
214 void init_state(GraphState& state) const;
215
218 void apply_input(GraphState& state, const json& input) const;
219
229 void maybe_warn_serial_fanout(std::size_t width) const;
230
231 const std::map<std::string, std::unique_ptr<GraphNode>>& nodes_;
232 const std::vector<ChannelDef>& channel_defs_;
233 RetryPolicyLookup retry_policy_for_;
234 asio::thread_pool* fan_out_pool_ = nullptr;
235 NodeCache* node_cache_ = nullptr;
236 mutable std::atomic<bool> warned_serial_fanout_{false};
237};
238
239} // namespace neograph::graph
NEOGRAPH_API export/import macro for shared-library builds.
Per-run coordinator for checkpoint lifecycle operations.
Definition coordinator.h:75
Thread-safe container for all graph state channels.
Definition state.h:26
Per-node result cache.
Definition node_cache.h:38
Stateless-per-call node invocation dispatcher.
Definition executor.h:81
NodeExecutor(const std::map< std::string, std::unique_ptr< GraphNode > > &nodes, const std::vector< ChannelDef > &channel_defs, RetryPolicyLookup retry_policy_for, asio::thread_pool *fan_out_pool=nullptr, NodeCache *node_cache=nullptr)
asio::awaitable< NodeResult > run_one_async(const std::string &node_name, int step, GraphState &state, const std::unordered_map< std::string, NodeResult > &replay, CheckpointCoordinator &coord, const std::string &parent_cp_id, const BarrierState &barrier_state, std::vector< std::string > &trace, const GraphStreamCallback &cb, StreamMode stream_mode, const RunContext &ctx)
Execute a single node in the current super-step.
asio::awaitable< std::vector< NodeResult > > run_parallel_async(const std::vector< std::string > &ready, int step, GraphState &state, const std::unordered_map< std::string, NodeResult > &replay, CheckpointCoordinator &coord, const std::string &parent_cp_id, const BarrierState &barrier_state, std::vector< std::string > &trace, const GraphStreamCallback &cb, StreamMode stream_mode, const RunContext &ctx)
Execute all ready nodes concurrently via asio::experimental::make_parallel_group + wait_for_all.
asio::awaitable< std::vector< StepRouting > > run_sends_async(const std::vector< Send > &sends, int step, GraphState &state, const std::unordered_map< std::string, NodeResult > &replay, CheckpointCoordinator &coord, const std::string &parent_cp_id, std::vector< std::string > &trace, const GraphStreamCallback &cb, StreamMode stream_mode, const RunContext &ctx)
Execute a list of Send requests accumulated this step.
asio::awaitable< NodeResult > execute_node_with_retry_async(const std::string &node_name, GraphState &state, const GraphStreamCallback &cb, StreamMode stream_mode, const RunContext &ctx)
Inner retry loop with exponential backoff + NodeInterrupt short-circuit.
Pure JSON-definition → CompiledGraph parser, extracted from GraphEngine.
CheckpointCoordinator — owns the super-step checkpoint lifecycle.
Graph engine type definitions: channels, edges, nodes, events, and control flow.
std::function< void(const GraphEvent &)> GraphStreamCallback
Callback for receiving graph execution events.
Definition types.h:287
StreamMode
Bitfield flags for selecting which events to stream during execution.
Definition types.h:194
Graph node implementations: base class and built-in node types.
Per-node result cache (opt-in).
Pure routing / next-step-planning logic extracted from GraphEngine.
std::map< std::string, std::set< std::string > > BarrierState
Per-barrier signal bookkeeping.
Definition scheduler.h:70
Retry policy for node execution with exponential backoff.
Definition types.h:172
Per-run dispatch metadata threaded through the engine and executor.
Definition engine.h:118