NeoGraph 0.10.0
A C++17 Graph Agent Engine Library — LangGraph for C++
Loading...
Searching...
No Matches
postgres_checkpoint.h File Reference

PostgreSQL-backed CheckpointStore — durable, multi-process, blob-deduplicated, connection-pooled. More...

#include <neograph/graph/checkpoint.h>
#include <atomic>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <vector>
Include dependency graph for postgres_checkpoint.h:

Go to the source code of this file.

Classes

struct  neograph::graph::PgConn
 RAII wrapper around a libpq PGconn. More...
 
class  neograph::graph::PostgresCheckpointStore
 Persistent CheckpointStore backed by PostgreSQL via libpq. More...
 

Detailed Description

PostgreSQL-backed CheckpointStore — durable, multi-process, blob-deduplicated, connection-pooled.

Mirrors LangGraph's PostgresSaver schema so a single PG database can host NeoGraph and LangGraph state side-by-side (NeoGraph uses a neograph_ table prefix to avoid collisions with LangGraph's unprefixed tables, which carry a different column shape).

Stage 3 / Sem 3.3: migrated off libpqxx onto libpq directly. The libpqxx-7.8t64 package on Ubuntu 24.04 has a C++17/C++20 ABI split (link error on pqxx::conversion_error(..., std::source_location)) that made it unbuildable against NeoGraph 2.0's C++20 baseline. libpq has a stable C ABI and is what libpqxx wraps internally, so the migration is a net simplification.

Schema

Three tables, all auto-created on first use via ensure_schema():

  • neograph_checkpoints — cp metadata + per-channel version map. Channel VALUES are NOT stored here (just versions).
  • neograph_checkpoint_blobs(thread_id, channel, version) → value. Identical (channel, version) writes collapse via ON CONFLICT DO NOTHING, so a long-running thread that touches only a few channels per super-step pays linear blob storage in distinct writes, not in steps × channels.
  • neograph_checkpoint_writes — pending intra-super-step writes, keyed (thread_id, parent_checkpoint_id, task_id, seq).

Concurrency — connection pool

The store owns a fixed-size pool of libpq connections sized by the constructor's pool_size parameter (default 8). Each method acquires one connection for its work and releases it back to the pool when done. With N workers ≤ pool_size, there is no serialisation: each worker commits in parallel, scaling roughly linearly until PG itself (WAL fsync, lock contention) becomes the limit.

Why a pool and not a single mutex'd connection: PG commit is dominated by synchronous_commit=on WAL fsync (~10ms per commit). One connection serializes those into a sequential queue and caps throughput at ~70 saves/sec. A pool of 8 lets 8 commits flush concurrently — measured ~6× speedup at 8 worker threads.

Pool sizing guidance:

  • Embedded / single-process: pool_size = 1 is fine.
  • Server / multi-tenant: match your worker thread count, capped at max_connections on the PG side (PG default 100; pgbouncer in front is recommended for very large pools).

Failure model

A broken connection (PG restart, network blip, pgbouncer idle timeout) is detected after each operation via PQstatus(conn) != CONNECTION_OK or by matching the 08xxx SQLSTATE class on an error result. The dead connection is replaced with a fresh one (using the original connection string) and the operation is retried once. reconnect_count() exposes the cumulative number of slot replacements for monitoring. Other errors (constraint violations, query errors) propagate as std::runtime_error with the PG server's error message.

Definition in file postgres_checkpoint.h.