|
NeoGraph 0.10.0
A C++17 Graph Agent Engine Library — LangGraph for C++
|
Super-step loop execution engine for graph-based agent workflows. More...
#include <engine.h>
Public Member Functions | |
| void | clear_node_cache () |
| Drop all cached entries (per-node enable state preserved). | |
| std::string | fork (const std::string &source_thread_id, const std::string &new_thread_id, const std::string &checkpoint_id="") |
| Fork a thread, creating a new thread from an existing checkpoint. | |
| const std::string & | get_graph_name () const |
| Get the graph name (from the JSON definition). | |
| std::optional< json > | get_state (const std::string &thread_id) const |
| Get the current state for a thread. | |
| std::vector< Checkpoint > | get_state_history (const std::string &thread_id, int limit=100) const |
| Get the checkpoint history for a thread (time-travel). | |
| std::shared_ptr< Store > | get_store () const |
| Get the cross-thread shared memory store. | |
| const NodeCache & | node_cache () const |
| Borrow the engine's NodeCache for stats inspection. | |
| void | own_tools (std::vector< std::unique_ptr< Tool > > tools) |
| Transfer tool ownership to the engine. | |
| RunResult | resume (const std::string &thread_id, const json &resume_value=json(), const GraphStreamCallback &cb=nullptr) |
| Resume execution from a HITL interrupt. | |
| asio::awaitable< RunResult > | resume_async (const std::string &thread_id, const json &resume_value=json(), const GraphStreamCallback &cb=nullptr) |
| Async peer of resume — non-blocking coroutine surface. | |
| RunResult | run (const RunConfig &config) |
| Execute the graph synchronously (blocking). | |
| asio::awaitable< RunResult > | run_async (RunConfig config) |
| Async peer of run() — returns an awaitable yielding the result. | |
| RunResult | run_stream (const RunConfig &config, const GraphStreamCallback &cb) |
| Execute the graph with streaming event callbacks. | |
| asio::awaitable< RunResult > | run_stream_async (RunConfig config, GraphStreamCallback cb) |
| Async peer of run_stream — non-blocking coroutine surface. | |
| void | set_checkpoint_store (std::shared_ptr< CheckpointStore > store) |
| Set the checkpoint persistence store. | |
| void | set_node_cache_enabled (const std::string &node_name, bool enabled) |
| Enable or disable per-node result caching. | |
| void | set_node_retry_policy (const std::string &node_name, const RetryPolicy &policy) |
| Set a retry policy for a specific node (overrides default). | |
| void | set_retry_policy (const RetryPolicy &policy) |
| Set the default retry policy for all nodes. | |
| void | set_store (std::shared_ptr< Store > store) |
| Set the cross-thread shared memory store. | |
| void | set_worker_count (std::size_t n) |
| Resize (or install) the engine-owned worker pool for parallel fan-out. | |
| void | set_worker_count_auto () |
Opt into a hardware_concurrency()-sized worker pool. | |
| void | update_state (const std::string &thread_id, const json &channel_writes, const std::string &as_node="") |
| Update the state for a thread by applying channel writes. | |
Static Public Member Functions | |
| static std::unique_ptr< GraphEngine > | compile (const json &definition, const NodeContext &default_context, std::shared_ptr< CheckpointStore > store=nullptr) |
| Compile a graph from a JSON definition. | |
Super-step loop execution engine for graph-based agent workflows.
GraphEngine compiles a JSON graph definition into an executable workflow, then runs it using the Pregel BSP model. Key capabilities:
asio::thread_pool; sync and async entry points share it.After compile() returns, the graph definition (nodes, edges, channels) is treated as immutable. A single GraphEngine instance is therefore safe to share across user threads that invoke run() / run_stream() / resume() concurrently with distinct thread_ids — each call constructs its own GraphState and the bundled InMemoryCheckpointStore is mutex-guarded. This lets you host multi-tenant agent workloads on a shared engine without an external async runtime; just dispatch onto std::async, a thread pool, or your existing event loop's worker.
Caveats:
set_retry_policy, set_node_retry_policy, set_checkpoint_store, set_store, own_tools) must be called before any concurrent run() — they are configuration, not runtime.run() calls sharing the same thread_id do not crash but produce unspecified checkpoint interleaving (last- writer-wins on save_checkpoint, last-saver visible to subsequent load_latest); serialize per-thread access yourself if you need deterministic semantics. Same caveat applies across multiple engines sharing one CheckpointStore for the same thread_id — e.g. evolving-agent patterns that tear an engine down and recompile while a straggler run_async is still in flight: the store guarantees each individual checkpoint op is atomic, not that they sequence in any particular order. Likewise, update_state / get_state / fork overlapping a live run_async on the same thread_id race against the engine's own writes; if you need "no straggler may overwrite my admin op", cancel the straggler via RunConfig::cancel_token first and co_await its completion before issuing the admin call.GraphNode subclasses must be stateless or self-synchronized. Node instances are owned by the engine and shared across all runs.CheckpointStore / Store / Provider / Tool implementations must be thread-safe.GraphEngine (runGraphAdmin (state inspection/update/fork), and a GraphConfigBuilder consumed at compile time. The current shape is kept so existing examples and downstream consumers don't break; the class-level docs above flag mutator setters as "configuration,
not runtime" already.
|
static |
Compile a graph from a JSON definition.
Parses the JSON graph definition, creates nodes via NodeFactory, resolves edges and conditions, and returns a ready-to-run engine.
| definition | JSON graph definition (nodes, edges, channels, etc.). |
| default_context | Default NodeContext providing provider, tools, model. |
| store | Optional checkpoint store for persistence (nullptr = no checkpointing). |
| std::runtime_error | If the graph definition is invalid. |
| std::string neograph::graph::GraphEngine::fork | ( | const std::string & | source_thread_id, |
| const std::string & | new_thread_id, | ||
| const std::string & | checkpoint_id = "" |
||
| ) |
Fork a thread, creating a new thread from an existing checkpoint.
Copies the specified checkpoint (or the latest) to a new thread ID, enabling branching execution paths.
| source_thread_id | Thread to fork from. |
| new_thread_id | Thread ID for the new fork. |
| checkpoint_id | Specific checkpoint to fork from (empty = latest). |
|
inline |
| std::optional< json > neograph::graph::GraphEngine::get_state | ( | const std::string & | thread_id | ) | const |
Get the current state for a thread.
| thread_id | Thread ID to look up. |
| std::vector< Checkpoint > neograph::graph::GraphEngine::get_state_history | ( | const std::string & | thread_id, |
| int | limit = 100 |
||
| ) | const |
Get the checkpoint history for a thread (time-travel).
| thread_id | Thread ID to look up. |
| limit | Maximum number of checkpoints to return (default: 100). |
|
inline |
| void neograph::graph::GraphEngine::own_tools | ( | std::vector< std::unique_ptr< Tool > > | tools | ) |
Transfer tool ownership to the engine.
The engine takes ownership of the tools and keeps them alive for the duration of the engine's lifetime.
| tools | Vector of tool unique_ptrs to transfer. |
| RunResult neograph::graph::GraphEngine::resume | ( | const std::string & | thread_id, |
| const json & | resume_value = json(), |
||
| const GraphStreamCallback & | cb = nullptr |
||
| ) |
Resume execution from a HITL interrupt.
Loads the last checkpoint for the given thread, applies the resume value, and continues execution from the interrupted node.
| thread_id | Thread ID of the interrupted session. |
| resume_value | Optional value to inject before resuming (e.g., human input). |
| cb | Optional streaming callback. |
Execute the graph synchronously (blocking).
Thread-safe across distinct thread_ids on a shared engine instance: dispatch via std::async, a thread pool, or any executor. See the class-level "Thread safety" notes for caveats.
| config | Run configuration with thread ID, input, and limits. |
Async peer of run() — returns an awaitable yielding the result.
Callers driving an io_context can co_await engine->run_async(cfg) alongside other coroutines (typically multiple concurrent agents). The super-step loop, node dispatch, checkpoint I/O, parallel fan-out, and retry backoff are all coroutine-native (3.0) — the caller's executor is never blocked by engine work.
The config is taken by value so the awaitable owns its own copy in the coroutine frame. This makes the common asio::co_spawn(io.get_executor(), engine->run_async(stack_cfg), use_future) shape safe — stack_cfg may go out of scope before the awaitable resolves without dangling-referencing the config.
| config | Run configuration (moved into the coroutine frame). |
| RunResult neograph::graph::GraphEngine::run_stream | ( | const RunConfig & | config, |
| const GraphStreamCallback & | cb | ||
| ) |
Execute the graph with streaming event callbacks.
| config | Run configuration. |
| cb | Callback invoked for each graph event (filtered by config.stream_mode). |
| asio::awaitable< RunResult > neograph::graph::GraphEngine::run_stream_async | ( | RunConfig | config, |
| GraphStreamCallback | cb | ||
| ) |
Async peer of run_stream — non-blocking coroutine surface.
config and cb are taken by value for the same reason as run_async() — see that overload's docstring.
| void neograph::graph::GraphEngine::set_checkpoint_store | ( | std::shared_ptr< CheckpointStore > | store | ) |
Set the checkpoint persistence store.
| store | Checkpoint store implementation. |
| void neograph::graph::GraphEngine::set_node_cache_enabled | ( | const std::string & | node_name, |
| bool | enabled | ||
| ) |
Enable or disable per-node result caching.
When enabled for a node, the executor hashes the input state and looks up (node_name, hash) in the engine's NodeCache. On hit, the cached NodeResult is replayed without invoking the node — no LLM call, no tool execution. On miss, the node runs and the result is stored.
Cache is OFF by default. Only opt in for nodes that are pure (deterministic, no external side effects, no time dependence). Streaming runs (run_stream) bypass the cache for the affected nodes because cached hits cannot replay LLM_TOKEN events.
| node_name | Name of the node to enable / disable. |
| enabled | True to enable caching; false to disable. |
| void neograph::graph::GraphEngine::set_node_retry_policy | ( | const std::string & | node_name, |
| const RetryPolicy & | policy | ||
| ) |
Set a retry policy for a specific node (overrides default).
| node_name | Name of the node. |
| policy | Retry policy for this specific node. |
| void neograph::graph::GraphEngine::set_retry_policy | ( | const RetryPolicy & | policy | ) |
Set the default retry policy for all nodes.
| policy | Retry policy with backoff configuration. |
| void neograph::graph::GraphEngine::set_store | ( | std::shared_ptr< Store > | store | ) |
Set the cross-thread shared memory store.
| store | Store implementation for cross-thread data sharing. |
| void neograph::graph::GraphEngine::set_worker_count | ( | std::size_t | n | ) |
Resize (or install) the engine-owned worker pool for parallel fan-out.
**compile() default is set_worker_count(1)** — no engine-owned thread pool, fan-out branches dispatch inline on the coroutine's own executor. That keeps sequential and single-Send workloads off the ~6-7 µs cross-thread submit path, and avoids silently exposing nodes that hold non-thread-safe state to concurrent execution. See docs/migration-v0.4-to-v1.0.md (Migration 3) for the full rationale.
Call this with n >= 2 to opt into a real engine-owned asio::thread_pool for multi-Send fan-out or multi-outgoing edges (e.g. set_worker_count(4) for a 4-way Send). For hardware_concurrency() sizing, see set_worker_count_auto().
Must be called before any concurrent run(); resizing rebuilds both the pool and the internal executor and is a hard error against in-flight runs. Values < 1 are clamped to 1.
| n | Number of worker threads in the fan-out pool. |
| void neograph::graph::GraphEngine::set_worker_count_auto | ( | ) |
Opt into a hardware_concurrency()-sized worker pool.
Equivalent to set_worker_count(std::thread::hardware_concurrency()), with a fallback of 4 if the runtime cannot detect. **compile() does NOT call this — its default is set_worker_count(1).** Use this once after compile() (and before any run()) to enable real parallel fan-out for multi-Send / multi-outgoing-edge workloads.
| void neograph::graph::GraphEngine::update_state | ( | const std::string & | thread_id, |
| const json & | channel_writes, | ||
| const std::string & | as_node = "" |
||
| ) |
Update the state for a thread by applying channel writes.
Loads the latest checkpoint, applies the writes, and saves a new checkpoint. Useful for injecting state externally (e.g., from a UI).
| thread_id | Thread ID to update. |
| channel_writes | JSON object of channel_name -> value pairs to write. |
| as_node | Optional node name to attribute the update to (for tracing). |