NeoGraph 0.10.0
A C++17 Graph Agent Engine Library — LangGraph for C++
Loading...
Searching...
No Matches
coordinator.h
Go to the documentation of this file.
1
26#pragma once
27
28#include <neograph/api.h>
32#include <memory>
33#include <optional>
34#include <string>
35#include <unordered_map>
36#include <vector>
37
38namespace neograph::graph {
39
40class GraphState;
41
50 bool have_cp = false;
51
52 std::string checkpoint_id;
54 int start_step = 0;
55 CheckpointPhase phase = CheckpointPhase::Completed;
56 std::vector<std::string> next_nodes;
57
61 std::unordered_map<std::string, NodeResult> replay_results;
62
66};
67
75class NEOGRAPH_API CheckpointCoordinator {
76public:
79 CheckpointCoordinator(std::shared_ptr<CheckpointStore> store,
80 std::string thread_id);
81
83 bool enabled() const noexcept { return store_ != nullptr && !thread_id_.empty(); }
84
85 const std::shared_ptr<CheckpointStore>& store() const noexcept { return store_; }
86 const std::string& thread_id() const noexcept { return thread_id_; }
87
92 std::string save_super_step(const GraphState& state,
93 const std::string& current_node,
94 const std::vector<std::string>& next_nodes,
95 CheckpointPhase phase,
96 int step,
97 const std::string& parent_id,
98 const BarrierState& barrier_state) const;
99
108
112 asio::awaitable<ResumeContext> load_for_resume_async() const;
113
121 void record_pending_write(const std::string& parent_cp_id,
122 const std::string& task_id,
123 const std::string& task_path,
124 const std::string& node_name,
125 const NodeResult& nr,
126 int step) const;
127
136 void clear_pending_writes(const std::string& parent_cp_id) const;
137
138 // ── Async peers (Stage 3 / Sem 3.6 incremental) ─────────────────────
139 //
140 // Each routes to the matching CheckpointStore::*_async so the
141 // coroutine engine path doesn't block the io_context on
142 // checkpoint I/O. Behaviour identical to the sync versions.
143
144 asio::awaitable<std::string> save_super_step_async(
145 const GraphState& state,
146 const std::string& current_node,
147 const std::vector<std::string>& next_nodes,
148 CheckpointPhase phase,
149 int step,
150 const std::string& parent_id,
151 const BarrierState& barrier_state) const;
152
153 asio::awaitable<void> record_pending_write_async(
154 const std::string& parent_cp_id,
155 const std::string& task_id,
156 const std::string& task_path,
157 const std::string& node_name,
158 const NodeResult& nr,
159 int step) const;
160
161 asio::awaitable<void> clear_pending_writes_async(
162 const std::string& parent_cp_id) const;
163
164private:
165 std::shared_ptr<CheckpointStore> store_;
166 std::string thread_id_;
167};
168
169} // namespace neograph::graph
NEOGRAPH_API export/import macro for shared-library builds.
Checkpoint system for graph execution state persistence and time-travel.
CheckpointPhase
Phase at which a Checkpoint was produced.
Definition checkpoint.h:53
Per-run coordinator for checkpoint lifecycle operations.
Definition coordinator.h:75
CheckpointCoordinator(std::shared_ptr< CheckpointStore > store, std::string thread_id)
ResumeContext load_for_resume() const
Load the latest checkpoint + all pending writes attached to it.
void record_pending_write(const std::string &parent_cp_id, const std::string &task_id, const std::string &task_path, const std::string &node_name, const NodeResult &nr, int step) const
Durably record a completed node's writes under parent_cp_id.
void clear_pending_writes(const std::string &parent_cp_id) const
Drop the pending-writes log for a parent cp after its super-step has been successfully committed.
asio::awaitable< ResumeContext > load_for_resume_async() const
Async peer of load_for_resume (Sem 3.7.5).
std::string save_super_step(const GraphState &state, const std::string &current_node, const std::vector< std::string > &next_nodes, CheckpointPhase phase, int step, const std::string &parent_id, const BarrierState &barrier_state) const
Write a super-step snapshot.
Thread-safe container for all graph state channels.
Definition state.h:26
Graph engine type definitions: channels, edges, nodes, events, and control flow.
Pure routing / next-step-planning logic extracted from GraphEngine.
std::map< std::string, std::set< std::string > > BarrierState
Per-barrier signal bookkeeping.
Definition scheduler.h:70
Extended result returned by node execution.
Definition types.h:297
Context needed to resume execution from the last saved snapshot.
Definition coordinator.h:49
BarrierState barrier_state
Barrier accumulators for in-flight AND-joins.
Definition coordinator.h:65
json channel_values
Serialized GraphState at cp time.
Definition coordinator.h:53
std::unordered_map< std::string, NodeResult > replay_results
Partially-completed super-step writes, keyed by task_id.
Definition coordinator.h:61
int start_step
Phase-adjusted step to re-enter at.
Definition coordinator.h:54