NeoGraph 0.10.0
A C++17 Graph Agent Engine Library — LangGraph for C++
Loading...
Searching...
No Matches
postgres_checkpoint.h
Go to the documentation of this file.
1
66#pragma once
67
69#include <atomic>
70#include <condition_variable>
71#include <memory>
72#include <mutex>
73#include <queue>
74#include <string>
75#include <vector>
76
77// Forward-declare libpq's opaque PGconn. Keeps libpq-fe.h out of
78// every translation unit that includes this header.
79struct pg_conn;
80
81namespace neograph::graph {
82
86struct PgConn {
87 pg_conn* raw = nullptr;
88 PgConn() = default;
89 explicit PgConn(pg_conn* p) : raw(p) {}
90 ~PgConn();
91 PgConn(const PgConn&) = delete;
92 PgConn& operator=(const PgConn&) = delete;
93 PgConn(PgConn&&) = delete;
94 PgConn& operator=(PgConn&&) = delete;
95};
96
106class NEOGRAPH_API PostgresCheckpointStore : public CheckpointStore {
107public:
114 explicit PostgresCheckpointStore(const std::string& conn_str,
115 size_t pool_size = 8);
116
121 size_t reconnect_count() const { return reconnect_count_; }
122
125 size_t pool_size() const { return pool_.size(); }
126
127 ~PostgresCheckpointStore() override;
128
129 // Non-copyable, non-movable — mutex would need rebinding on move
130 // and connection pool slots are keyed by stable indices.
132 PostgresCheckpointStore& operator=(const PostgresCheckpointStore&) = delete;
133
134 void save(const Checkpoint& cp) override;
135 std::optional<Checkpoint> load_latest(const std::string& thread_id) override;
136 std::optional<Checkpoint> load_by_id(const std::string& id) override;
137 std::vector<Checkpoint> list(const std::string& thread_id,
138 int limit = 100) override;
139 void delete_thread(const std::string& thread_id) override;
140
141 void put_writes(const std::string& thread_id,
142 const std::string& parent_checkpoint_id,
143 const PendingWrite& write) override;
144 std::vector<PendingWrite> get_writes(
145 const std::string& thread_id,
146 const std::string& parent_checkpoint_id) override;
147 void clear_writes(const std::string& thread_id,
148 const std::string& parent_checkpoint_id) override;
149
150 // ── Async peers (Sem 4 follow-up) ───────────────────────────────────
151 //
152 // True async: libpq nonblocking mode + asio::posix::stream_descriptor
153 // on PQsocket() + co_await on write/read readiness. The io_context
154 // stays free for other coroutines during PG's commit fsync, so N
155 // concurrent save_async calls across pool slots actually overlap
156 // instead of serialising on the main worker thread.
157 //
158 // Behaviour identical to the sync peers (same retry semantics,
159 // same broken-connection auto-replacement, same schema). Only the
160 // wire-level wait is non-blocking.
161
162 asio::awaitable<void> save_async(const Checkpoint& cp) override;
163 asio::awaitable<std::optional<Checkpoint>>
164 load_latest_async(const std::string& thread_id) override;
165 asio::awaitable<std::optional<Checkpoint>>
166 load_by_id_async(const std::string& id) override;
167 asio::awaitable<std::vector<Checkpoint>>
168 list_async(const std::string& thread_id, int limit = 100) override;
169 asio::awaitable<void>
170 delete_thread_async(const std::string& thread_id) override;
171
172 asio::awaitable<void> put_writes_async(
173 const std::string& thread_id,
174 const std::string& parent_checkpoint_id,
175 const PendingWrite& write) override;
176 asio::awaitable<std::vector<PendingWrite>> get_writes_async(
177 const std::string& thread_id,
178 const std::string& parent_checkpoint_id) override;
179 asio::awaitable<void> clear_writes_async(
180 const std::string& thread_id,
181 const std::string& parent_checkpoint_id) override;
182
186
190 size_t blob_count();
191
192private:
193 void ensure_schema();
194
197 template <typename Fn>
198 auto with_conn(Fn&& fn);
199
205 template <typename Fn>
206 auto with_conn_async(Fn fn) -> decltype(fn(std::declval<pg_conn*>()));
207
210 size_t acquire_slot();
211 void release_slot(size_t idx);
212
215 std::string conn_str_;
216
219 std::vector<std::unique_ptr<PgConn>> pool_;
220
223 std::queue<size_t> free_;
224 std::mutex pool_mutex_;
225 std::condition_variable pool_cv_;
226
230 std::atomic<size_t> reconnect_count_{0};
231};
232
233} // namespace neograph::graph
Checkpoint system for graph execution state persistence and time-travel.
Abstract interface for checkpoint persistence backends.
Definition checkpoint.h:207
Persistent CheckpointStore backed by PostgreSQL via libpq.
PostgresCheckpointStore(const std::string &conn_str, size_t pool_size=8)
std::vector< PendingWrite > get_writes(const std::string &thread_id, const std::string &parent_checkpoint_id) override
Load all pending writes attached to a parent checkpoint.
size_t pool_size() const
Number of connections in the pool.
void save(const Checkpoint &cp) override
Save a checkpoint.
void clear_writes(const std::string &thread_id, const std::string &parent_checkpoint_id) override
Discard pending writes for a parent checkpoint once its successor super-step has been fully committed...
size_t reconnect_count() const
Number of times a pool slot's connection was replaced after a broken-connection detection.
void delete_thread(const std::string &thread_id) override
Delete all checkpoints for a thread.
std::vector< Checkpoint > list(const std::string &thread_id, int limit=100) override
List checkpoints for a thread, ordered by timestamp (newest first).
void drop_schema()
Drop all neograph_* tables.
std::optional< Checkpoint > load_by_id(const std::string &id) override
Load a checkpoint by its unique ID.
std::optional< Checkpoint > load_latest(const std::string &thread_id) override
Load the most recent checkpoint for a thread.
void put_writes(const std::string &thread_id, const std::string &parent_checkpoint_id, const PendingWrite &write) override
Record a successful node execution within an in-progress super-step.
size_t blob_count()
Test helper: count rows in neograph_checkpoint_blobs.
Serialized snapshot of graph execution state at a single super-step.
Definition checkpoint.h:83
Successful node writes recorded within an in-progress super-step.
Definition checkpoint.h:140
RAII wrapper around a libpq PGconn.