NeoGraph 0.10.0
A C++17 Graph Agent Engine Library — LangGraph for C++
Loading...
Searching...
No Matches
checkpoint.h
Go to the documentation of this file.
1
9#pragma once
10
11#include <neograph/api.h>
13
14#include <asio/awaitable.hpp>
15
16#include <cstdint>
17
18#include <optional>
19#include <mutex>
20#include <map>
21#include <set>
22#include <vector>
23#include <string_view>
24#include <chrono>
25
26namespace neograph::graph {
27
41// Unsigned + fixed-width: schema versions are non-negative and a
42// platform-variable `int` width is wrong for a value persisted to
43// disk and round-tripped through JSON. Old `int` left the door open
44// to a `-1` sentinel value that wasn't documented anywhere.
45constexpr std::uint32_t CHECKPOINT_SCHEMA_VERSION = 2;
46
53enum class CheckpointPhase {
54 Before,
55 After,
56 Completed,
58 Updated
59};
60
66NEOGRAPH_API const char* to_string(CheckpointPhase phase);
67
73NEOGRAPH_API CheckpointPhase parse_checkpoint_phase(std::string_view s);
74
83struct Checkpoint {
84 std::string id;
85 std::string thread_id;
88 std::string parent_id;
89 std::string current_node;
94 std::vector<std::string> next_nodes;
95 CheckpointPhase interrupt_phase = CheckpointPhase::Completed;
105 std::map<std::string, std::set<std::string>> barrier_state;
106 json metadata;
107 int64_t step;
108 int64_t timestamp;
116
121 static std::string generate_id();
122};
123
141 std::string task_id;
142 std::string task_path;
143 std::string node_name;
144 json writes;
145 json command;
146 json sends;
147 int64_t step;
148 int64_t timestamp;
149};
150
207class NEOGRAPH_API CheckpointStore {
208public:
209 virtual ~CheckpointStore() = default;
210
211 // ── Sync API ────────────────────────────────────────────────────────
212 //
213 // Stage 3 / Semester 3.1: each sync method is now non-pure with a
214 // default implementation that bridges to its async peer via
215 // `neograph::async::run_sync`. The legacy stores (InMemory, SQLite,
216 // Postgres) still override these directly. Async-native stores can
217 // override only the *_async() variants and inherit these as facades.
218 // Override at least one side per method — overriding neither yields
219 // infinite mutual recursion at call time.
220
225 virtual void save(const Checkpoint& cp);
226
232 virtual std::optional<Checkpoint> load_latest(const std::string& thread_id);
233
239 virtual std::optional<Checkpoint> load_by_id(const std::string& id);
240
247 virtual std::vector<Checkpoint> list(const std::string& thread_id,
248 int limit = 100);
249
254 virtual void delete_thread(const std::string& thread_id);
255
256 // ── Async API ───────────────────────────────────────────────────────
257 //
258 // Each method's default body co_returns the matching sync call. Real
259 // async stores (libpq pipeline in Sem 3.3) override these to perform
260 // non-blocking I/O.
261
262 virtual asio::awaitable<void> save_async(const Checkpoint& cp);
263 virtual asio::awaitable<std::optional<Checkpoint>>
264 load_latest_async(const std::string& thread_id);
265 virtual asio::awaitable<std::optional<Checkpoint>>
266 load_by_id_async(const std::string& id);
267 virtual asio::awaitable<std::vector<Checkpoint>>
268 list_async(const std::string& thread_id, int limit = 100);
269 virtual asio::awaitable<void>
270 delete_thread_async(const std::string& thread_id);
271
272 // ── Pending writes (fine-grained progress log) ──────────────────────
273
289 virtual void put_writes(const std::string& /*thread_id*/,
290 const std::string& /*parent_checkpoint_id*/,
291 const PendingWrite& /*write*/) {}
292
303 virtual std::vector<PendingWrite> get_writes(
304 const std::string& /*thread_id*/,
305 const std::string& /*parent_checkpoint_id*/) { return {}; }
306
318 virtual void clear_writes(const std::string& /*thread_id*/,
319 const std::string& /*parent_checkpoint_id*/) {}
320
321 // Async peers for the pending-writes API. Each defaults to calling
322 // the matching sync method (which itself is a no-op for stores that
323 // don't override it), so existing custom stores keep working.
324
325 virtual asio::awaitable<void> put_writes_async(
326 const std::string& thread_id,
327 const std::string& parent_checkpoint_id,
328 const PendingWrite& write);
329 virtual asio::awaitable<std::vector<PendingWrite>> get_writes_async(
330 const std::string& thread_id,
331 const std::string& parent_checkpoint_id);
332 virtual asio::awaitable<void> clear_writes_async(
333 const std::string& thread_id,
334 const std::string& parent_checkpoint_id);
335};
336
361class NEOGRAPH_API InMemoryCheckpointStore : public CheckpointStore {
362public:
363 void save(const Checkpoint& cp) override;
364 std::optional<Checkpoint> load_latest(const std::string& thread_id) override;
365 std::optional<Checkpoint> load_by_id(const std::string& id) override;
366 std::vector<Checkpoint> list(const std::string& thread_id,
367 int limit = 100) override;
368 void delete_thread(const std::string& thread_id) override;
369
370 void put_writes(const std::string& thread_id,
371 const std::string& parent_checkpoint_id,
372 const PendingWrite& write) override;
373 std::vector<PendingWrite> get_writes(
374 const std::string& thread_id,
375 const std::string& parent_checkpoint_id) override;
376 void clear_writes(const std::string& thread_id,
377 const std::string& parent_checkpoint_id) override;
378
383 size_t size() const;
384
391 size_t blob_count() const;
392
396 size_t pending_writes_count(const std::string& thread_id,
397 const std::string& parent_checkpoint_id) const;
398
399private:
405 Checkpoint split_blobs_locked(Checkpoint cp);
406
413 Checkpoint join_blobs_locked(Checkpoint cp) const;
414
415 mutable std::mutex mutex_;
416 std::map<std::string, std::vector<Checkpoint>> by_thread_;
417 std::map<std::string, Checkpoint> by_id_;
421 std::map<std::tuple<std::string, std::string, uint64_t>, json> blobs_;
422 // Keyed by (thread_id, parent_checkpoint_id) → ordered list of pending writes
423 std::map<std::pair<std::string, std::string>, std::vector<PendingWrite>> pending_;
424};
425
426} // namespace neograph::graph
NEOGRAPH_API export/import macro for shared-library builds.
CheckpointPhase
Phase at which a Checkpoint was produced.
Definition checkpoint.h:53
@ After
Saved just after an interrupt_after node completed.
@ Before
Saved just before an interrupt_before node fires.
@ Updated
Saved by update_state() injecting state externally.
NEOGRAPH_API const char * to_string(CheckpointPhase phase)
Canonical wire / log string for a CheckpointPhase.
NEOGRAPH_API CheckpointPhase parse_checkpoint_phase(std::string_view s)
Parse a phase string back to the enum.
constexpr std::uint32_t CHECKPOINT_SCHEMA_VERSION
Current Checkpoint layout version.
Definition checkpoint.h:45
Abstract interface for checkpoint persistence backends.
Definition checkpoint.h:207
virtual void clear_writes(const std::string &, const std::string &)
Discard pending writes for a parent checkpoint once its successor super-step has been fully committed...
Definition checkpoint.h:318
virtual std::vector< PendingWrite > get_writes(const std::string &, const std::string &)
Load all pending writes attached to a parent checkpoint.
Definition checkpoint.h:303
virtual std::optional< Checkpoint > load_latest(const std::string &thread_id)
Load the most recent checkpoint for a thread.
virtual void put_writes(const std::string &, const std::string &, const PendingWrite &)
Record a successful node execution within an in-progress super-step.
Definition checkpoint.h:289
virtual void save(const Checkpoint &cp)
Save a checkpoint.
virtual std::optional< Checkpoint > load_by_id(const std::string &id)
Load a checkpoint by its unique ID.
virtual void delete_thread(const std::string &thread_id)
Delete all checkpoints for a thread.
virtual std::vector< Checkpoint > list(const std::string &thread_id, int limit=100)
List checkpoints for a thread, ordered by timestamp (newest first).
In-memory checkpoint store for testing and single-process use.
Definition checkpoint.h:361
size_t pending_writes_count(const std::string &thread_id, const std::string &parent_checkpoint_id) const
Get the number of pending writes for a parent checkpoint (test helper).
void delete_thread(const std::string &thread_id) override
Delete all checkpoints for a thread.
std::vector< PendingWrite > get_writes(const std::string &thread_id, const std::string &parent_checkpoint_id) override
Load all pending writes attached to a parent checkpoint.
std::optional< Checkpoint > load_latest(const std::string &thread_id) override
Load the most recent checkpoint for a thread.
size_t blob_count() const
Number of distinct channel-value blobs currently held (test helper).
std::optional< Checkpoint > load_by_id(const std::string &id) override
Load a checkpoint by its unique ID.
void put_writes(const std::string &thread_id, const std::string &parent_checkpoint_id, const PendingWrite &write) override
Record a successful node execution within an in-progress super-step.
std::vector< Checkpoint > list(const std::string &thread_id, int limit=100) override
List checkpoints for a thread, ordered by timestamp (newest first).
size_t size() const
Get the total number of stored checkpoints (test helper).
void clear_writes(const std::string &thread_id, const std::string &parent_checkpoint_id) override
Discard pending writes for a parent checkpoint once its successor super-step has been fully committed...
void save(const Checkpoint &cp) override
Save a checkpoint.
Exception thrown from inside a node to trigger a dynamic breakpoint.
Definition types.h:101
Graph engine type definitions: channels, edges, nodes, events, and control flow.
Serialized snapshot of graph execution state at a single super-step.
Definition checkpoint.h:83
std::string thread_id
Conversation/session identifier.
Definition checkpoint.h:85
static std::string generate_id()
Generate a new UUID v4 string.
int64_t step
Super-step number.
Definition checkpoint.h:107
int64_t timestamp
Unix epoch milliseconds.
Definition checkpoint.h:108
CheckpointPhase interrupt_phase
Phase at which this cp was produced.
Definition checkpoint.h:95
std::map< std::string, std::set< std::string > > barrier_state
Per-barrier accumulator: each entry maps a declared barrier node to the set of upstreams that have si...
Definition checkpoint.h:105
json channel_versions
Per-channel version counters.
Definition checkpoint.h:87
std::string current_node
Node that was active at checkpoint time.
Definition checkpoint.h:89
std::string parent_id
Previous checkpoint ID (for time-travel chain).
Definition checkpoint.h:88
json metadata
User-defined metadata.
Definition checkpoint.h:106
json channel_values
Serialized channel data.
Definition checkpoint.h:86
std::uint32_t schema_version
Layout version of this record.
Definition checkpoint.h:115
std::vector< std::string > next_nodes
Nodes to execute on resume.
Definition checkpoint.h:94
std::string id
Unique checkpoint ID (UUID v4).
Definition checkpoint.h:84
Successful node writes recorded within an in-progress super-step.
Definition checkpoint.h:140
std::string task_id
Deterministic per-execution ID (survives replay).
Definition checkpoint.h:141
std::string node_name
Node that produced these writes.
Definition checkpoint.h:143
int64_t step
Super-step number this write belongs to.
Definition checkpoint.h:147
json writes
Serialized ChannelWrite vector (json array of {channel, value}).
Definition checkpoint.h:144
std::string task_path
Human-readable path, e.g. "s3:executor_2" or "s3:send[0]:searcher".
Definition checkpoint.h:142
json command
Serialized optional Command, or null if the node didn't emit one.
Definition checkpoint.h:145
json sends
Serialized Send vector (json array of {target_node, input}); empty if none.
Definition checkpoint.h:146
int64_t timestamp
Unix epoch milliseconds at record time.
Definition checkpoint.h:148