NeoGraph 0.10.0
A C++17 Graph Agent Engine Library — LangGraph for C++
Loading...
Searching...
No Matches
engine.h
Go to the documentation of this file.
1
9#pragma once
10
11#include <neograph/api.h>
17#include <neograph/graph/node.h>
23
24#include <asio/awaitable.hpp>
25#include <asio/thread_pool.hpp>
26
27#include <atomic>
28#include <chrono>
29#include <cstddef>
30#include <memory>
31#include <optional>
32#include <set>
33#include <string>
34
35namespace neograph::graph {
36
40struct RunConfig {
41 std::string thread_id;
42 json input;
43 int max_steps = 50;
44 StreamMode stream_mode = StreamMode::ALL;
45
58 std::shared_ptr<CancelToken> cancel_token;
59
88 bool resume_if_exists = false;
89};
90
121 std::shared_ptr<CancelToken> cancel_token;
122
126 std::optional<std::chrono::steady_clock::time_point> deadline;
127
131 std::string trace_id;
132
136 std::string thread_id;
137
141 int step = 0;
142
144 StreamMode stream_mode = StreamMode::ALL;
145
181 std::shared_ptr<Store> store;
182};
183
209struct RunResult {
210 json output;
211 bool interrupted = false;
212 std::string interrupt_node;
214 std::string checkpoint_id;
215 std::vector<std::string> execution_trace;
216
233 template <typename T>
234 T channel(const std::string& name) const {
235 return channel_raw(name).get<T>();
236 }
237
247 json channel_raw(const std::string& name) const {
248 if (output.contains("channels") && output["channels"].contains(name)) {
249 const auto wrapped = output["channels"][name];
250 if (wrapped.contains("value")) return wrapped["value"];
251 }
252 if (output.contains(name)) return output[name];
253 throw json::out_of_range(
254 "RunResult::channel: no such channel '" + name + "' in output "
255 "(checked output[\"channels\"][\"" + name + "\"][\"value\"] and output[\"" + name + "\"])");
256 }
257
264 bool has_channel(const std::string& name) const noexcept {
265 if (output.contains("channels") && output["channels"].contains(name)
266 && output["channels"][name].contains("value")) {
267 return true;
268 }
269 return output.contains(name);
270 }
271};
272
343class NEOGRAPH_API GraphEngine {
344public:
357 static std::unique_ptr<GraphEngine> compile(
358 const json& definition,
359 const NodeContext& default_context,
360 std::shared_ptr<CheckpointStore> store = nullptr);
361
372 RunResult run(const RunConfig& config);
373
392 asio::awaitable<RunResult> run_async(RunConfig config);
393
401 const GraphStreamCallback& cb);
402
406 asio::awaitable<RunResult> run_stream_async(
407 RunConfig config, GraphStreamCallback cb);
408
420 RunResult resume(const std::string& thread_id,
421 const json& resume_value = json(),
422 const GraphStreamCallback& cb = nullptr);
423
425 asio::awaitable<RunResult> resume_async(
426 const std::string& thread_id,
427 const json& resume_value = json(),
428 const GraphStreamCallback& cb = nullptr);
429
430 // ── State inspection & manipulation (LangGraph Checkpointer API) ──
431
437 std::optional<json> get_state(const std::string& thread_id) const;
438
445 std::vector<Checkpoint> get_state_history(const std::string& thread_id,
446 int limit = 100) const;
447
458 void update_state(const std::string& thread_id,
459 const json& channel_writes,
460 const std::string& as_node = "");
461
473 std::string fork(const std::string& source_thread_id,
474 const std::string& new_thread_id,
475 const std::string& checkpoint_id = "");
476
477 // ── Configuration ──
478
487 void own_tools(std::vector<std::unique_ptr<Tool>> tools);
488
493 void set_checkpoint_store(std::shared_ptr<CheckpointStore> store);
494
500 void set_store(std::shared_ptr<Store> store);
501
506 std::shared_ptr<Store> get_store() const { return store_; }
507
512 void set_retry_policy(const RetryPolicy& policy);
513
519 void set_node_retry_policy(const std::string& node_name, const RetryPolicy& policy);
520
546 void set_worker_count(std::size_t n);
547
560
578 void set_node_cache_enabled(const std::string& node_name, bool enabled);
579
584
586 const NodeCache& node_cache() const { return node_cache_; }
587
592 const std::string& get_graph_name() const { return name_; }
593
594private:
595 GraphEngine() = default;
596
597 void init_state(GraphState& state) const;
598 void apply_input(GraphState& state, const json& input) const;
599
605 asio::awaitable<RunResult> execute_graph_async(
606 const RunConfig& config,
607 const GraphStreamCallback& cb,
608 const std::vector<std::string>& resume_from = {},
609 const json& resume_value = json());
610
611 RetryPolicy get_retry_policy(const std::string& node_name) const;
612
613 // --- Graph definition ---
614 std::string name_;
615
618 std::vector<ChannelDef> channel_defs_;
619
620 std::map<std::string, std::unique_ptr<GraphNode>> nodes_;
621 std::vector<Edge> edges_;
622 std::vector<ConditionalEdge> conditional_edges_;
623
628 std::unique_ptr<Scheduler> scheduler_;
629
634 std::unique_ptr<NodeExecutor> executor_;
635
636 std::set<std::string> interrupt_before_;
637 std::set<std::string> interrupt_after_;
638
639 std::shared_ptr<CheckpointStore> checkpoint_store_;
640 std::shared_ptr<Store> store_;
641 std::vector<std::unique_ptr<Tool>> owned_tools_;
642
643 // Retry policies
644 RetryPolicy default_retry_policy_;
645 std::map<std::string, RetryPolicy> node_retry_policies_;
646
654 std::unique_ptr<asio::thread_pool> pool_;
655
659 NodeCache node_cache_;
660
666 std::atomic<int> active_runs_{0};
667};
668
669} // namespace neograph::graph
NEOGRAPH_API export/import macro for shared-library builds.
Cooperative cancellation primitive for graph runs.
Checkpoint system for graph execution state persistence and time-travel.
Super-step loop execution engine for graph-based agent workflows.
Definition engine.h:343
void set_node_retry_policy(const std::string &node_name, const RetryPolicy &policy)
Set a retry policy for a specific node (overrides default).
std::string fork(const std::string &source_thread_id, const std::string &new_thread_id, const std::string &checkpoint_id="")
Fork a thread, creating a new thread from an existing checkpoint.
RunResult run_stream(const RunConfig &config, const GraphStreamCallback &cb)
Execute the graph with streaming event callbacks.
const std::string & get_graph_name() const
Get the graph name (from the JSON definition).
Definition engine.h:592
const NodeCache & node_cache() const
Borrow the engine's NodeCache for stats inspection.
Definition engine.h:586
std::vector< Checkpoint > get_state_history(const std::string &thread_id, int limit=100) const
Get the checkpoint history for a thread (time-travel).
void clear_node_cache()
Drop all cached entries (per-node enable state preserved).
void set_store(std::shared_ptr< Store > store)
Set the cross-thread shared memory store.
void set_checkpoint_store(std::shared_ptr< CheckpointStore > store)
Set the checkpoint persistence store.
void set_worker_count(std::size_t n)
Resize (or install) the engine-owned worker pool for parallel fan-out.
RunResult resume(const std::string &thread_id, const json &resume_value=json(), const GraphStreamCallback &cb=nullptr)
Resume execution from a HITL interrupt.
RunResult run(const RunConfig &config)
Execute the graph synchronously (blocking).
void update_state(const std::string &thread_id, const json &channel_writes, const std::string &as_node="")
Update the state for a thread by applying channel writes.
asio::awaitable< RunResult > run_async(RunConfig config)
Async peer of run() — returns an awaitable yielding the result.
void set_retry_policy(const RetryPolicy &policy)
Set the default retry policy for all nodes.
static std::unique_ptr< GraphEngine > compile(const json &definition, const NodeContext &default_context, std::shared_ptr< CheckpointStore > store=nullptr)
Compile a graph from a JSON definition.
asio::awaitable< RunResult > run_stream_async(RunConfig config, GraphStreamCallback cb)
Async peer of run_stream — non-blocking coroutine surface.
asio::awaitable< RunResult > resume_async(const std::string &thread_id, const json &resume_value=json(), const GraphStreamCallback &cb=nullptr)
Async peer of resume — non-blocking coroutine surface.
void set_worker_count_auto()
Opt into a hardware_concurrency()-sized worker pool.
std::optional< json > get_state(const std::string &thread_id) const
Get the current state for a thread.
void set_node_cache_enabled(const std::string &node_name, bool enabled)
Enable or disable per-node result caching.
void own_tools(std::vector< std::unique_ptr< Tool > > tools)
Transfer tool ownership to the engine.
std::shared_ptr< Store > get_store() const
Get the cross-thread shared memory store.
Definition engine.h:506
Thread-safe container for all graph state channels.
Definition state.h:26
Per-node result cache.
Definition node_cache.h:38
Pure JSON-definition → CompiledGraph parser, extracted from GraphEngine.
CheckpointCoordinator — owns the super-step checkpoint lifecycle.
NodeExecutor — owns per-super-step node invocation.
Graph engine type definitions: channels, edges, nodes, events, and control flow.
std::function< void(const GraphEvent &)> GraphStreamCallback
Callback for receiving graph execution events.
Definition types.h:287
StreamMode
Bitfield flags for selecting which events to stream during execution.
Definition types.h:194
Graph node implementations: base class and built-in node types.
Per-node result cache (opt-in).
Pure routing / next-step-planning logic extracted from GraphEngine.
Thread-safe mutable graph state management.
Cross-thread shared memory store (LangGraph Store equivalent).
Dependency injection context passed to nodes during construction.
Definition types.h:251
Retry policy for node execution with exponential backoff.
Definition types.h:172
Configuration for a graph execution run.
Definition engine.h:40
int max_steps
Safety limit for maximum super-steps per run.
Definition engine.h:43
StreamMode stream_mode
Which event types to emit during streaming.
Definition engine.h:44
std::string thread_id
Thread ID for checkpoint association.
Definition engine.h:41
bool resume_if_exists
Auto-resume from latest checkpoint for thread_id (v0.3.1+).
Definition engine.h:88
json input
Initial channel writes (e.g., {"messages": [...]}).
Definition engine.h:42
std::shared_ptr< CancelToken > cancel_token
Optional cooperative cancel handle (v0.3+).
Definition engine.h:58
Per-run dispatch metadata threaded through the engine and executor.
Definition engine.h:118
std::shared_ptr< CancelToken > cancel_token
Cooperative cancel handle.
Definition engine.h:121
int step
Current super-step index.
Definition engine.h:141
std::string thread_id
Mirrors RunConfig::thread_id so executor-side logic (e.g.
Definition engine.h:136
std::optional< std::chrono::steady_clock::time_point > deadline
Optional absolute wall-clock deadline.
Definition engine.h:126
std::shared_ptr< Store > store
Cross-thread shared memory (issue #27).
Definition engine.h:181
StreamMode stream_mode
Mirrors RunConfig::stream_mode.
Definition engine.h:144
std::string trace_id
Per-run trace correlator.
Definition engine.h:131
Result of a graph execution run.
Definition engine.h:209
std::string interrupt_node
Name of the node that triggered the interrupt.
Definition engine.h:212
json channel_raw(const std::string &name) const
Read a channel value as a raw json node (issue #25).
Definition engine.h:247
bool interrupted
True if execution was interrupted (HITL).
Definition engine.h:211
bool has_channel(const std::string &name) const noexcept
Test whether a channel value exists in either shape.
Definition engine.h:264
std::string checkpoint_id
ID of the last checkpoint saved.
Definition engine.h:214
json interrupt_value
Value associated with the interrupt.
Definition engine.h:213
json output
Final serialized graph state. See struct docstring for shape details.
Definition engine.h:210
std::vector< std::string > execution_trace
Ordered list of executed node names.
Definition engine.h:215
T channel(const std::string &name) const
Read a channel value as type T (issue #25).
Definition engine.h:234