NeoGraph 0.10.0
A C++17 Graph Agent Engine Library — LangGraph for C++
Loading...
Searching...
No Matches
neograph::graph::GraphEngine Class Reference

Super-step loop execution engine for graph-based agent workflows. More...

#include <engine.h>

Public Member Functions

void clear_node_cache ()
 Drop all cached entries (per-node enable state preserved).
 
std::string fork (const std::string &source_thread_id, const std::string &new_thread_id, const std::string &checkpoint_id="")
 Fork a thread, creating a new thread from an existing checkpoint.
 
const std::string & get_graph_name () const
 Get the graph name (from the JSON definition).
 
std::optional< json > get_state (const std::string &thread_id) const
 Get the current state for a thread.
 
std::vector< Checkpointget_state_history (const std::string &thread_id, int limit=100) const
 Get the checkpoint history for a thread (time-travel).
 
std::shared_ptr< Storeget_store () const
 Get the cross-thread shared memory store.
 
const NodeCachenode_cache () const
 Borrow the engine's NodeCache for stats inspection.
 
void own_tools (std::vector< std::unique_ptr< Tool > > tools)
 Transfer tool ownership to the engine.
 
RunResult resume (const std::string &thread_id, const json &resume_value=json(), const GraphStreamCallback &cb=nullptr)
 Resume execution from a HITL interrupt.
 
asio::awaitable< RunResultresume_async (const std::string &thread_id, const json &resume_value=json(), const GraphStreamCallback &cb=nullptr)
 Async peer of resume — non-blocking coroutine surface.
 
RunResult run (const RunConfig &config)
 Execute the graph synchronously (blocking).
 
asio::awaitable< RunResultrun_async (RunConfig config)
 Async peer of run() — returns an awaitable yielding the result.
 
RunResult run_stream (const RunConfig &config, const GraphStreamCallback &cb)
 Execute the graph with streaming event callbacks.
 
asio::awaitable< RunResultrun_stream_async (RunConfig config, GraphStreamCallback cb)
 Async peer of run_stream — non-blocking coroutine surface.
 
void set_checkpoint_store (std::shared_ptr< CheckpointStore > store)
 Set the checkpoint persistence store.
 
void set_node_cache_enabled (const std::string &node_name, bool enabled)
 Enable or disable per-node result caching.
 
void set_node_retry_policy (const std::string &node_name, const RetryPolicy &policy)
 Set a retry policy for a specific node (overrides default).
 
void set_retry_policy (const RetryPolicy &policy)
 Set the default retry policy for all nodes.
 
void set_store (std::shared_ptr< Store > store)
 Set the cross-thread shared memory store.
 
void set_worker_count (std::size_t n)
 Resize (or install) the engine-owned worker pool for parallel fan-out.
 
void set_worker_count_auto ()
 Opt into a hardware_concurrency()-sized worker pool.
 
void update_state (const std::string &thread_id, const json &channel_writes, const std::string &as_node="")
 Update the state for a thread by applying channel writes.
 

Static Public Member Functions

static std::unique_ptr< GraphEnginecompile (const json &definition, const NodeContext &default_context, std::shared_ptr< CheckpointStore > store=nullptr)
 Compile a graph from a JSON definition.
 

Detailed Description

Super-step loop execution engine for graph-based agent workflows.

GraphEngine compiles a JSON graph definition into an executable workflow, then runs it using the Pregel BSP model. Key capabilities:

  • Parallel execution: Multiple independent nodes run concurrently on an engine-owned asio::thread_pool; sync and async entry points share it.
  • Checkpointing: Full state snapshots at every super-step for time-travel.
  • HITL: interrupt_before/after + resume() for human-in-the-loop workflows.
  • Send/Command: Dynamic fan-out and routing overrides from nodes.
  • Retry policies: Per-node exponential backoff on failure.

Thread safety

After compile() returns, the graph definition (nodes, edges, channels) is treated as immutable. A single GraphEngine instance is therefore safe to share across user threads that invoke run() / run_stream() / resume() concurrently with distinct thread_ids — each call constructs its own GraphState and the bundled InMemoryCheckpointStore is mutex-guarded. This lets you host multi-tenant agent workloads on a shared engine without an external async runtime; just dispatch onto std::async, a thread pool, or your existing event loop's worker.

Caveats:

  • Mutator APIs (set_retry_policy, set_node_retry_policy, set_checkpoint_store, set_store, own_tools) must be called before any concurrent run() — they are configuration, not runtime.
  • Concurrent run() calls sharing the same thread_id do not crash but produce unspecified checkpoint interleaving (last- writer-wins on save_checkpoint, last-saver visible to subsequent load_latest); serialize per-thread access yourself if you need deterministic semantics. Same caveat applies across multiple engines sharing one CheckpointStore for the same thread_id — e.g. evolving-agent patterns that tear an engine down and recompile while a straggler run_async is still in flight: the store guarantees each individual checkpoint op is atomic, not that they sequence in any particular order. Likewise, update_state / get_state / fork overlapping a live run_async on the same thread_id race against the engine's own writes; if you need "no straggler may overwrite my admin op", cancel the straggler via RunConfig::cancel_token first and co_await its completion before issuing the admin call.
  • Custom GraphNode subclasses must be stateless or self-synchronized. Node instances are owned by the engine and shared across all runs.
  • User-provided CheckpointStore / Store / Provider / Tool implementations must be thread-safe.
auto engine = GraphEngine::compile(graph_json, context, checkpoint_store);
RunConfig config;
config.thread_id = "session-1";
config.input = {{"messages", json::array({{{"role","user"},{"content","Hello"}}})}};
auto result = engine->run(config);
static std::unique_ptr< GraphEngine > compile(const json &definition, const NodeContext &default_context, std::shared_ptr< CheckpointStore > store=nullptr)
Compile a graph from a JSON definition.
Configuration for a graph execution run.
Definition engine.h:40
std::string thread_id
Thread ID for checkpoint association.
Definition engine.h:41
json input
Initial channel writes (e.g., {"messages": [...]}).
Definition engine.h:42
See also
RunConfig, RunResult, GraphNode
Note
Public surface size — this class exposes ~23 public methods spanning three concerns: graph execution (run/run_async/run_stream/ resume), state administration (get_state/update_state/fork), and runtime configuration (set_retry_policy/set_worker_count/ set_node_cache_enabled/set_checkpoint_store/own_tools). A future major version (v1.0) is expected to split into GraphEngine (run
  • resume only), GraphAdmin (state inspection/update/fork), and a GraphConfigBuilder consumed at compile time. The current shape is kept so existing examples and downstream consumers don't break; the class-level docs above flag mutator setters as "configuration, not runtime" already.

Definition at line 343 of file engine.h.

Member Function Documentation

◆ compile()

static std::unique_ptr< GraphEngine > neograph::graph::GraphEngine::compile ( const json &  definition,
const NodeContext default_context,
std::shared_ptr< CheckpointStore store = nullptr 
)
static

Compile a graph from a JSON definition.

Parses the JSON graph definition, creates nodes via NodeFactory, resolves edges and conditions, and returns a ready-to-run engine.

Parameters
definitionJSON graph definition (nodes, edges, channels, etc.).
default_contextDefault NodeContext providing provider, tools, model.
storeOptional checkpoint store for persistence (nullptr = no checkpointing).
Returns
A compiled GraphEngine ready for execution.
Exceptions
std::runtime_errorIf the graph definition is invalid.

◆ fork()

std::string neograph::graph::GraphEngine::fork ( const std::string &  source_thread_id,
const std::string &  new_thread_id,
const std::string &  checkpoint_id = "" 
)

Fork a thread, creating a new thread from an existing checkpoint.

Copies the specified checkpoint (or the latest) to a new thread ID, enabling branching execution paths.

Parameters
source_thread_idThread to fork from.
new_thread_idThread ID for the new fork.
checkpoint_idSpecific checkpoint to fork from (empty = latest).
Returns
The checkpoint ID of the forked state.

◆ get_graph_name()

const std::string & neograph::graph::GraphEngine::get_graph_name ( ) const
inline

Get the graph name (from the JSON definition).

Returns
Graph name string.

Definition at line 592 of file engine.h.

◆ get_state()

std::optional< json > neograph::graph::GraphEngine::get_state ( const std::string &  thread_id) const

Get the current state for a thread.

Parameters
thread_idThread ID to look up.
Returns
Serialized state JSON, or std::nullopt if no checkpoint exists.

◆ get_state_history()

std::vector< Checkpoint > neograph::graph::GraphEngine::get_state_history ( const std::string &  thread_id,
int  limit = 100 
) const

Get the checkpoint history for a thread (time-travel).

Parameters
thread_idThread ID to look up.
limitMaximum number of checkpoints to return (default: 100).
Returns
Vector of Checkpoint objects, newest first.

◆ get_store()

std::shared_ptr< Store > neograph::graph::GraphEngine::get_store ( ) const
inline

Get the cross-thread shared memory store.

Returns
Shared pointer to the Store, or nullptr if not set.

Definition at line 506 of file engine.h.

◆ own_tools()

void neograph::graph::GraphEngine::own_tools ( std::vector< std::unique_ptr< Tool > >  tools)

Transfer tool ownership to the engine.

The engine takes ownership of the tools and keeps them alive for the duration of the engine's lifetime.

Parameters
toolsVector of tool unique_ptrs to transfer.

◆ resume()

RunResult neograph::graph::GraphEngine::resume ( const std::string &  thread_id,
const json &  resume_value = json(),
const GraphStreamCallback cb = nullptr 
)

Resume execution from a HITL interrupt.

Loads the last checkpoint for the given thread, applies the resume value, and continues execution from the interrupted node.

Parameters
thread_idThread ID of the interrupted session.
resume_valueOptional value to inject before resuming (e.g., human input).
cbOptional streaming callback.
Returns
Execution result after resumption.

◆ run()

RunResult neograph::graph::GraphEngine::run ( const RunConfig config)

Execute the graph synchronously (blocking).

Thread-safe across distinct thread_ids on a shared engine instance: dispatch via std::async, a thread pool, or any executor. See the class-level "Thread safety" notes for caveats.

Parameters
configRun configuration with thread ID, input, and limits.
Returns
Execution result with final state and metadata.

◆ run_async()

asio::awaitable< RunResult > neograph::graph::GraphEngine::run_async ( RunConfig  config)

Async peer of run() — returns an awaitable yielding the result.

Callers driving an io_context can co_await engine->run_async(cfg) alongside other coroutines (typically multiple concurrent agents). The super-step loop, node dispatch, checkpoint I/O, parallel fan-out, and retry backoff are all coroutine-native (3.0) — the caller's executor is never blocked by engine work.

The config is taken by value so the awaitable owns its own copy in the coroutine frame. This makes the common asio::co_spawn(io.get_executor(), engine->run_async(stack_cfg), use_future) shape safe — stack_cfg may go out of scope before the awaitable resolves without dangling-referencing the config.

Parameters
configRun configuration (moved into the coroutine frame).
Returns
Awaitable yielding the execution result.

◆ run_stream()

RunResult neograph::graph::GraphEngine::run_stream ( const RunConfig config,
const GraphStreamCallback cb 
)

Execute the graph with streaming event callbacks.

Parameters
configRun configuration.
cbCallback invoked for each graph event (filtered by config.stream_mode).
Returns
Execution result.

◆ run_stream_async()

asio::awaitable< RunResult > neograph::graph::GraphEngine::run_stream_async ( RunConfig  config,
GraphStreamCallback  cb 
)

Async peer of run_stream — non-blocking coroutine surface.

config and cb are taken by value for the same reason as run_async() — see that overload's docstring.

◆ set_checkpoint_store()

void neograph::graph::GraphEngine::set_checkpoint_store ( std::shared_ptr< CheckpointStore store)

Set the checkpoint persistence store.

Parameters
storeCheckpoint store implementation.

◆ set_node_cache_enabled()

void neograph::graph::GraphEngine::set_node_cache_enabled ( const std::string &  node_name,
bool  enabled 
)

Enable or disable per-node result caching.

When enabled for a node, the executor hashes the input state and looks up (node_name, hash) in the engine's NodeCache. On hit, the cached NodeResult is replayed without invoking the node — no LLM call, no tool execution. On miss, the node runs and the result is stored.

Cache is OFF by default. Only opt in for nodes that are pure (deterministic, no external side effects, no time dependence). Streaming runs (run_stream) bypass the cache for the affected nodes because cached hits cannot replay LLM_TOKEN events.

Parameters
node_nameName of the node to enable / disable.
enabledTrue to enable caching; false to disable.

◆ set_node_retry_policy()

void neograph::graph::GraphEngine::set_node_retry_policy ( const std::string &  node_name,
const RetryPolicy policy 
)

Set a retry policy for a specific node (overrides default).

Parameters
node_nameName of the node.
policyRetry policy for this specific node.

◆ set_retry_policy()

void neograph::graph::GraphEngine::set_retry_policy ( const RetryPolicy policy)

Set the default retry policy for all nodes.

Parameters
policyRetry policy with backoff configuration.

◆ set_store()

void neograph::graph::GraphEngine::set_store ( std::shared_ptr< Store store)

Set the cross-thread shared memory store.

Parameters
storeStore implementation for cross-thread data sharing.
See also
Store, InMemoryStore

◆ set_worker_count()

void neograph::graph::GraphEngine::set_worker_count ( std::size_t  n)

Resize (or install) the engine-owned worker pool for parallel fan-out.

**compile() default is set_worker_count(1)** — no engine-owned thread pool, fan-out branches dispatch inline on the coroutine's own executor. That keeps sequential and single-Send workloads off the ~6-7 µs cross-thread submit path, and avoids silently exposing nodes that hold non-thread-safe state to concurrent execution. See docs/migration-v0.4-to-v1.0.md (Migration 3) for the full rationale.

Call this with n >= 2 to opt into a real engine-owned asio::thread_pool for multi-Send fan-out or multi-outgoing edges (e.g. set_worker_count(4) for a 4-way Send). For hardware_concurrency() sizing, see set_worker_count_auto().

Must be called before any concurrent run(); resizing rebuilds both the pool and the internal executor and is a hard error against in-flight runs. Values < 1 are clamped to 1.

Parameters
nNumber of worker threads in the fan-out pool.
See also
set_worker_count_auto()

◆ set_worker_count_auto()

void neograph::graph::GraphEngine::set_worker_count_auto ( )

Opt into a hardware_concurrency()-sized worker pool.

Equivalent to set_worker_count(std::thread::hardware_concurrency()), with a fallback of 4 if the runtime cannot detect. **compile() does NOT call this — its default is set_worker_count(1).** Use this once after compile() (and before any run()) to enable real parallel fan-out for multi-Send / multi-outgoing-edge workloads.

◆ update_state()

void neograph::graph::GraphEngine::update_state ( const std::string &  thread_id,
const json &  channel_writes,
const std::string &  as_node = "" 
)

Update the state for a thread by applying channel writes.

Loads the latest checkpoint, applies the writes, and saves a new checkpoint. Useful for injecting state externally (e.g., from a UI).

Parameters
thread_idThread ID to update.
channel_writesJSON object of channel_name -> value pairs to write.
as_nodeOptional node name to attribute the update to (for tracing).

The documentation for this class was generated from the following file: