70#include <condition_variable>
81namespace neograph::graph {
87 pg_conn* raw =
nullptr;
89 explicit PgConn(pg_conn* p) : raw(p) {}
115 size_t pool_size = 8);
135 std::optional<Checkpoint>
load_latest(
const std::string& thread_id)
override;
136 std::optional<Checkpoint>
load_by_id(
const std::string&
id)
override;
137 std::vector<Checkpoint>
list(
const std::string& thread_id,
138 int limit = 100)
override;
142 const std::string& parent_checkpoint_id,
145 const std::string& thread_id,
146 const std::string& parent_checkpoint_id)
override;
148 const std::string& parent_checkpoint_id)
override;
162 asio::awaitable<void> save_async(
const Checkpoint& cp)
override;
163 asio::awaitable<std::optional<Checkpoint>>
164 load_latest_async(
const std::string& thread_id)
override;
165 asio::awaitable<std::optional<Checkpoint>>
166 load_by_id_async(
const std::string&
id)
override;
167 asio::awaitable<std::vector<Checkpoint>>
168 list_async(
const std::string& thread_id,
int limit = 100)
override;
169 asio::awaitable<void>
170 delete_thread_async(
const std::string& thread_id)
override;
172 asio::awaitable<void> put_writes_async(
173 const std::string& thread_id,
174 const std::string& parent_checkpoint_id,
176 asio::awaitable<std::vector<PendingWrite>> get_writes_async(
177 const std::string& thread_id,
178 const std::string& parent_checkpoint_id)
override;
179 asio::awaitable<void> clear_writes_async(
180 const std::string& thread_id,
181 const std::string& parent_checkpoint_id)
override;
193 void ensure_schema();
197 template <
typename Fn>
198 auto with_conn(Fn&& fn);
205 template <
typename Fn>
206 auto with_conn_async(Fn fn) ->
decltype(fn(std::declval<pg_conn*>()));
210 size_t acquire_slot();
211 void release_slot(
size_t idx);
215 std::string conn_str_;
219 std::vector<std::unique_ptr<PgConn>> pool_;
223 std::queue<size_t> free_;
224 std::mutex pool_mutex_;
225 std::condition_variable pool_cv_;
230 std::atomic<size_t> reconnect_count_{0};
Checkpoint system for graph execution state persistence and time-travel.
Abstract interface for checkpoint persistence backends.
Persistent CheckpointStore backed by PostgreSQL via libpq.
PostgresCheckpointStore(const std::string &conn_str, size_t pool_size=8)
std::vector< PendingWrite > get_writes(const std::string &thread_id, const std::string &parent_checkpoint_id) override
Load all pending writes attached to a parent checkpoint.
size_t pool_size() const
Number of connections in the pool.
void save(const Checkpoint &cp) override
Save a checkpoint.
void clear_writes(const std::string &thread_id, const std::string &parent_checkpoint_id) override
Discard pending writes for a parent checkpoint once its successor super-step has been fully committed...
size_t reconnect_count() const
Number of times a pool slot's connection was replaced after a broken-connection detection.
void delete_thread(const std::string &thread_id) override
Delete all checkpoints for a thread.
std::vector< Checkpoint > list(const std::string &thread_id, int limit=100) override
List checkpoints for a thread, ordered by timestamp (newest first).
void drop_schema()
Drop all neograph_* tables.
std::optional< Checkpoint > load_by_id(const std::string &id) override
Load a checkpoint by its unique ID.
std::optional< Checkpoint > load_latest(const std::string &thread_id) override
Load the most recent checkpoint for a thread.
void put_writes(const std::string &thread_id, const std::string &parent_checkpoint_id, const PendingWrite &write) override
Record a successful node execution within an in-progress super-step.
size_t blob_count()
Test helper: count rows in neograph_checkpoint_blobs.
Serialized snapshot of graph execution state at a single super-step.
Successful node writes recorded within an in-progress super-step.
RAII wrapper around a libpq PGconn.