24#include <asio/awaitable.hpp>
25#include <asio/thread_pool.hpp>
35namespace neograph::graph {
126 std::optional<std::chrono::steady_clock::time_point>
deadline;
233 template <
typename T>
248 if (
output.contains(
"channels") &&
output[
"channels"].contains(name)) {
249 const auto wrapped =
output[
"channels"][name];
250 if (wrapped.contains(
"value"))
return wrapped[
"value"];
253 throw json::out_of_range(
254 "RunResult::channel: no such channel '" + name +
"' in output "
255 "(checked output[\"channels\"][\"" + name +
"\"][\"value\"] and output[\"" + name +
"\"])");
265 if (
output.contains(
"channels") &&
output[
"channels"].contains(name)
266 &&
output[
"channels"][name].contains(
"value")) {
269 return output.contains(name);
358 const json& definition,
360 std::shared_ptr<CheckpointStore> store =
nullptr);
421 const json& resume_value = json(),
426 const std::string& thread_id,
427 const json& resume_value = json(),
437 std::optional<json>
get_state(
const std::string& thread_id)
const;
446 int limit = 100)
const;
459 const json& channel_writes,
460 const std::string& as_node =
"");
473 std::string
fork(
const std::string& source_thread_id,
474 const std::string& new_thread_id,
475 const std::string& checkpoint_id =
"");
487 void own_tools(std::vector<std::unique_ptr<Tool>> tools);
506 std::shared_ptr<Store>
get_store()
const {
return store_; }
598 void apply_input(
GraphState& state,
const json& input)
const;
605 asio::awaitable<RunResult> execute_graph_async(
608 const std::vector<std::string>& resume_from = {},
609 const json& resume_value = json());
611 RetryPolicy get_retry_policy(
const std::string& node_name)
const;
618 std::vector<ChannelDef> channel_defs_;
620 std::map<std::string, std::unique_ptr<GraphNode>> nodes_;
621 std::vector<Edge> edges_;
622 std::vector<ConditionalEdge> conditional_edges_;
628 std::unique_ptr<Scheduler> scheduler_;
634 std::unique_ptr<NodeExecutor> executor_;
636 std::set<std::string> interrupt_before_;
637 std::set<std::string> interrupt_after_;
639 std::shared_ptr<CheckpointStore> checkpoint_store_;
640 std::shared_ptr<Store> store_;
641 std::vector<std::unique_ptr<Tool>> owned_tools_;
644 RetryPolicy default_retry_policy_;
645 std::map<std::string, RetryPolicy> node_retry_policies_;
654 std::unique_ptr<asio::thread_pool> pool_;
659 NodeCache node_cache_;
666 std::atomic<int> active_runs_{0};
NEOGRAPH_API export/import macro for shared-library builds.
Cooperative cancellation primitive for graph runs.
Checkpoint system for graph execution state persistence and time-travel.
Super-step loop execution engine for graph-based agent workflows.
void set_node_retry_policy(const std::string &node_name, const RetryPolicy &policy)
Set a retry policy for a specific node (overrides default).
std::string fork(const std::string &source_thread_id, const std::string &new_thread_id, const std::string &checkpoint_id="")
Fork a thread, creating a new thread from an existing checkpoint.
RunResult run_stream(const RunConfig &config, const GraphStreamCallback &cb)
Execute the graph with streaming event callbacks.
const std::string & get_graph_name() const
Get the graph name (from the JSON definition).
const NodeCache & node_cache() const
Borrow the engine's NodeCache for stats inspection.
std::vector< Checkpoint > get_state_history(const std::string &thread_id, int limit=100) const
Get the checkpoint history for a thread (time-travel).
void clear_node_cache()
Drop all cached entries (per-node enable state preserved).
void set_store(std::shared_ptr< Store > store)
Set the cross-thread shared memory store.
void set_checkpoint_store(std::shared_ptr< CheckpointStore > store)
Set the checkpoint persistence store.
void set_worker_count(std::size_t n)
Resize (or install) the engine-owned worker pool for parallel fan-out.
RunResult resume(const std::string &thread_id, const json &resume_value=json(), const GraphStreamCallback &cb=nullptr)
Resume execution from a HITL interrupt.
RunResult run(const RunConfig &config)
Execute the graph synchronously (blocking).
void update_state(const std::string &thread_id, const json &channel_writes, const std::string &as_node="")
Update the state for a thread by applying channel writes.
asio::awaitable< RunResult > run_async(RunConfig config)
Async peer of run() — returns an awaitable yielding the result.
void set_retry_policy(const RetryPolicy &policy)
Set the default retry policy for all nodes.
static std::unique_ptr< GraphEngine > compile(const json &definition, const NodeContext &default_context, std::shared_ptr< CheckpointStore > store=nullptr)
Compile a graph from a JSON definition.
asio::awaitable< RunResult > run_stream_async(RunConfig config, GraphStreamCallback cb)
Async peer of run_stream — non-blocking coroutine surface.
asio::awaitable< RunResult > resume_async(const std::string &thread_id, const json &resume_value=json(), const GraphStreamCallback &cb=nullptr)
Async peer of resume — non-blocking coroutine surface.
void set_worker_count_auto()
Opt into a hardware_concurrency()-sized worker pool.
std::optional< json > get_state(const std::string &thread_id) const
Get the current state for a thread.
void set_node_cache_enabled(const std::string &node_name, bool enabled)
Enable or disable per-node result caching.
void own_tools(std::vector< std::unique_ptr< Tool > > tools)
Transfer tool ownership to the engine.
std::shared_ptr< Store > get_store() const
Get the cross-thread shared memory store.
Thread-safe container for all graph state channels.
Pure JSON-definition → CompiledGraph parser, extracted from GraphEngine.
CheckpointCoordinator — owns the super-step checkpoint lifecycle.
NodeExecutor — owns per-super-step node invocation.
Graph engine type definitions: channels, edges, nodes, events, and control flow.
std::function< void(const GraphEvent &)> GraphStreamCallback
Callback for receiving graph execution events.
StreamMode
Bitfield flags for selecting which events to stream during execution.
Graph node implementations: base class and built-in node types.
Per-node result cache (opt-in).
Pure routing / next-step-planning logic extracted from GraphEngine.
Thread-safe mutable graph state management.
Cross-thread shared memory store (LangGraph Store equivalent).
Dependency injection context passed to nodes during construction.
Retry policy for node execution with exponential backoff.
Configuration for a graph execution run.
int max_steps
Safety limit for maximum super-steps per run.
StreamMode stream_mode
Which event types to emit during streaming.
std::string thread_id
Thread ID for checkpoint association.
bool resume_if_exists
Auto-resume from latest checkpoint for thread_id (v0.3.1+).
json input
Initial channel writes (e.g., {"messages": [...]}).
std::shared_ptr< CancelToken > cancel_token
Optional cooperative cancel handle (v0.3+).
Per-run dispatch metadata threaded through the engine and executor.
std::shared_ptr< CancelToken > cancel_token
Cooperative cancel handle.
int step
Current super-step index.
std::string thread_id
Mirrors RunConfig::thread_id so executor-side logic (e.g.
std::optional< std::chrono::steady_clock::time_point > deadline
Optional absolute wall-clock deadline.
std::shared_ptr< Store > store
Cross-thread shared memory (issue #27).
StreamMode stream_mode
Mirrors RunConfig::stream_mode.
std::string trace_id
Per-run trace correlator.
Result of a graph execution run.
std::string interrupt_node
Name of the node that triggered the interrupt.
json channel_raw(const std::string &name) const
Read a channel value as a raw json node (issue #25).
bool interrupted
True if execution was interrupted (HITL).
bool has_channel(const std::string &name) const noexcept
Test whether a channel value exists in either shape.
std::string checkpoint_id
ID of the last checkpoint saved.
json interrupt_value
Value associated with the interrupt.
json output
Final serialized graph state. See struct docstring for shape details.
std::vector< std::string > execution_trace
Ordered list of executed node names.
T channel(const std::string &name) const
Read a channel value as type T (issue #25).