NeoGraph 0.10.0
A C++17 Graph Agent Engine Library — LangGraph for C++
Loading...
Searching...
No Matches
schema_provider.h
Go to the documentation of this file.
1
26#pragma once
27
28#include <neograph/api.h>
29#include <neograph/provider.h>
31#include <asio/executor_work_guard.hpp>
32#include <asio/io_context.hpp>
33#include <fstream>
34#include <memory>
35#include <mutex>
36#include <optional>
37#include <set>
38#include <string>
39#include <thread>
40#include <map>
41
42namespace neograph::async { class ConnPool; class CurlH2Pool; }
43
44namespace neograph::llm {
45
46namespace test_access { class SchemaProviderTestAccess; } // fwd-decl for friend
47
65class NEOGRAPH_API SchemaProvider : public Provider {
66 public:
68 struct Config {
69 std::string schema_path;
70 std::string api_key;
71 std::string default_model = "gpt-4o-mini";
72 int timeout_seconds = 60;
73 std::string base_url_override;
74
83 bool use_websocket = false;
84
95 bool prefer_libcurl = false;
96 };
97
108 static std::unique_ptr<SchemaProvider> create(const Config& config);
109
114
120 asio::awaitable<ChatCompletion>
121 complete_async(const CompletionParams& params) override;
122
125
144 const StreamCallback& on_chunk) override;
145
160 asio::awaitable<ChatCompletion>
162 const StreamCallback& on_chunk) override;
163
171 asio::awaitable<ChatCompletion>
172 invoke(const CompletionParams& params, StreamCallback on_chunk) override;
173
176 std::string get_name() const override;
177
178 private:
179 explicit SchemaProvider(Config config, json schema);
180
181 // --- Strategies (internal) ---
182 enum class SystemPromptStrategy { IN_MESSAGES, TOP_LEVEL, TOP_LEVEL_PARTS };
183 // FLAT_ITEMS: OpenAI Responses — tool calls are separate top-level items in input[] (not nested in a message).
184 enum class ToolCallStrategy { TOOL_CALLS_ARRAY, CONTENT_ARRAY, PARTS_ARRAY, FLAT_ITEMS };
185 // FLAT_ITEM: OpenAI Responses — {type:"function_call_output", call_id, output} as a top-level input[] item.
186 enum class ToolResultStrategy { FLAT, CONTENT_ARRAY, PARTS_ARRAY, FLAT_ITEM };
187 // FLAT_FUNCTION: OpenAI Responses — [{type:"function", name, description, parameters}] (no nesting under "function").
188 enum class ToolDefWrapper { FUNCTION, NONE, FUNCTION_DECLARATIONS, FLAT_FUNCTION };
189 // OUTPUT_ARRAY: OpenAI Responses — output[] with mixed message/function_call items.
190 enum class ResponseStrategy { CHOICES_MESSAGE, CONTENT_ARRAY, CANDIDATES_PARTS, OUTPUT_ARRAY };
191 enum class StreamFormat { SSE_DATA, SSE_EVENTS };
192
193 // --- Internal config parsed from schema ---
194 struct ConnectionConfig {
195 std::string base_url;
196 std::string endpoint;
197 std::string stream_endpoint;
198 std::string auth_header;
199 std::string auth_prefix;
200 std::string api_key_env;
201 std::string auth_query_param;
202 std::map<std::string, std::string> extra_headers;
203 };
204
205 struct RequestConfig {
206 std::string model_field;
207 std::string messages_field;
208 std::string tools_field;
209 std::string temperature_path;
210 std::string max_tokens_path;
211 bool max_tokens_required = false;
212 int max_tokens_default = -1;
213 std::string stream_field;
214 json extra_fields;
215
224 std::set<std::string> per_call_fields;
225 };
226
227 struct SystemPromptConfig {
228 SystemPromptStrategy strategy;
229 std::string field;
230 std::string role_name;
231 std::string parts_field;
232 std::string text_field;
233 };
234
235 struct MessagesConfig {
236 std::string role_field;
237 std::string content_field;
238 std::map<std::string, std::string> role_map;
239 bool content_is_parts = false;
240 json text_part_template;
241 };
242
243 struct ToolDefConfig {
244 ToolDefWrapper wrapper;
245 std::string name_field;
246 std::string description_field;
247 std::string parameters_field;
248 };
249
250 struct ToolCallConfig {
251 ToolCallStrategy strategy;
252 std::string field;
253 json item_template;
254 json text_item_template;
255 };
256
257 struct ToolResultConfig {
258 std::string role;
259 ToolResultStrategy strategy;
260 std::string id_field;
261 std::string content_field;
262 json item_template;
263 };
264
265 struct ImageConfig {
266 std::string strategy;
267 json item_template;
268 json text_part_template;
269 };
270
271 struct ResponseConfig {
272 ResponseStrategy strategy;
273 std::string message_path;
274 std::string content_field;
275 std::string role_field;
276 std::string tool_calls_field;
277 std::string tool_call_id_field;
278 std::string tool_call_name_path;
279 std::string tool_call_args_path;
280 bool tool_call_args_is_string = true;
281 std::string content_path;
282 std::string text_type;
283 std::string text_field;
284 std::string tool_use_type;
285 std::string tool_call_name_field;
286 std::string tool_call_args_field;
287 std::string parts_path;
288 std::string function_call_field;
289 // OUTPUT_ARRAY (OpenAI Responses)
290 std::string output_path;
291 std::string message_item_type;
292 std::string function_call_item_type;
293 std::string message_content_field;
294 std::string function_call_id_field;
295 std::string usage_path;
296 std::string prompt_tokens_field;
297 std::string completion_tokens_field;
298 std::string total_tokens_field;
299 };
300
301 struct StreamConfig {
302 StreamFormat format;
303 std::string prefix;
304 std::string done_signal;
305 std::string delta_path;
306 std::string content_field;
307 std::string tool_calls_field;
308 std::string tool_call_index_field;
309 std::string tool_call_id_field;
310 std::string tool_call_name_path;
311 std::string tool_call_args_path;
312 std::string delta_strategy;
313 std::string delta_parts_path;
314 std::string delta_text_field;
315 std::string delta_function_call_field;
316 std::string delta_tool_call_name_field;
317 std::string delta_tool_call_args_field;
318 json events_config;
319 };
320
321 // Serializes access to the schema-derived json templates (schema_,
322 // tool_call_.item_template, tool_result_.item_template, req_.extra_fields,
323 // etc.). These are backed by shared yyjson_mut_doc handles — even
324 // read-only traversal of a yyjson_mut_val from multiple threads at once
325 // trips internal iterator state that yyjson explicitly disclaims as
326 // thread-unsafe for mutable docs. HTTP I/O is issued OUTSIDE this lock
327 // so concurrent fan-out requests still overlap on the network.
328 mutable std::mutex schema_mutex_;
329
330 // --- Connection pool for HTTP keep-alive ---
331 //
332 // Each Provider::complete() goes through run_sync, which creates a
333 // fresh asio::io_context per call. A ConnPool bound to that
334 // throw-away executor would survive only one request — defeating
335 // its purpose. So SchemaProvider owns its own long-lived
336 // io_context + worker thread; the pool is bound to that, and
337 // every complete_async dispatches through it. Successive calls to
338 // the same host then amortise TCP connect + TLS handshake.
339 std::unique_ptr<asio::io_context> http_io_;
340 std::optional<asio::executor_work_guard<asio::io_context::executor_type>> http_work_;
341 std::thread http_thread_;
342 std::unique_ptr<async::ConnPool> conn_pool_;
343
344 // --- Long-lived "sync-bridge" thread for streaming (issue #16) ---
345 //
346 // The streaming HTTP/SSE path is implemented as a synchronous
347 // httplib::Client::Post call inside `complete_stream`. The previous
348 // `complete_stream_async` default ran that on a *fresh* `std::thread`
349 // per call, which exposed cold thread-local resolver / NSS init in
350 // glibc. The wild ptr in `internal_strlen` reported in #16 had this
351 // shape: outer io.run() driven from an HTTP server worker thread →
352 // fresh-spawn NeoGraph worker → first getaddrinfo on cold TLS.
353 //
354 // Fix: own one long-lived bridge thread (mirror of `http_thread_`
355 // for ConnPool). `complete_stream_async` HTTP/SSE branch dispatches
356 // each call onto this thread instead of spawning fresh. After the
357 // first call warms the thread-local resolver state, every
358 // subsequent call reuses the warm state — same robustness profile
359 // as the working `complete_async` path.
360 std::unique_ptr<asio::io_context> bridge_io_;
361 std::optional<asio::executor_work_guard<asio::io_context::executor_type>> bridge_work_;
362 std::thread bridge_thread_;
363 // libcurl-backed HTTP/2 pool with multiplexing. Default transport
364 // for SchemaProvider — passes Cloudflare/anti-bot WAFs (it IS curl)
365 // and gives us native HTTP/2 stream multiplexing for parallel
366 // fan-out workloads.
367 std::unique_ptr<async::CurlH2Pool> curl_pool_;
368
369 // --- Parsed config ---
370 Config user_config_;
371 json schema_;
372 std::string provider_name_;
373 ConnectionConfig conn_;
374 RequestConfig req_;
375 SystemPromptConfig sys_;
376 MessagesConfig msgs_;
377 ToolDefConfig tool_def_;
378 ToolCallConfig tool_call_;
379 ToolResultConfig tool_result_;
380 ImageConfig image_;
381 ResponseConfig resp_;
382 StreamConfig stream_;
383
384 // --- Internal methods ---
385 void parse_schema();
386
387 json build_body(const CompletionParams& params) const;
388 json serialize_messages(const std::vector<ChatMessage>& messages) const;
389 json serialize_tools(const std::vector<ChatTool>& tools) const;
390 json serialize_single_message(const ChatMessage& msg) const;
391
395 asio::awaitable<ChatCompletion>
396 complete_stream_ws_responses(const CompletionParams& params,
397 const StreamCallback& on_chunk);
398
399 ChatMessage parse_response(const json& resp_json) const;
400 ChatCompletion::Usage parse_usage(const json& resp_json) const;
401
402 std::string build_endpoint(const std::string& model, bool streaming) const;
403 std::map<std::string, std::string> build_headers() const;
404 std::string get_api_key() const;
405
406 static std::pair<std::string, std::string> parse_data_url(const std::string& url);
407 static json substitute(const json& tmpl, const std::map<std::string, json>& vars);
408 static std::string generate_tool_call_id();
409
410 // Test-only access to private internals. The helper class lives in a
411 // separate `test_access` namespace to discourage accidental use; tests
412 // that need to inspect build_body / serialize_messages output for
413 // contract verification (e.g. issue #34, #35 regression coverage)
414 // pull it in explicitly. NOT a public API surface — may change without
415 // notice between versions.
417};
418
419namespace test_access {
420
425 public:
426 static json build_body(const SchemaProvider& sp,
427 const CompletionParams& params) {
428 return sp.build_body(params);
429 }
430};
431
432} // namespace test_access
433
434} // namespace neograph::llm
NEOGRAPH_API export/import macro for shared-library builds.
Abstract base class for LLM providers.
Definition provider.h:127
LLM provider that adapts to any API via a JSON schema.
std::string get_name() const override
Get the provider name (from the schema's "name" field).
~SchemaProvider()
Destructor — shuts down the background HTTP loop + worker thread held alongside the ConnPool.
asio::awaitable< ChatCompletion > complete_async(const CompletionParams &params) override
Async completion — single wire path implemented over the owned ConnPool (HTTP keep-alive).
ChatCompletion complete_stream(const CompletionParams &params, const StreamCallback &on_chunk) override
Sync completion is inherited from Provider::complete(), which drives complete_async via neograph::asy...
asio::awaitable< ChatCompletion > invoke(const CompletionParams &params, StreamCallback on_chunk) override
v1.0 single-dispatch override (Candidate 6 PR6).
asio::awaitable< ChatCompletion > complete_stream_async(const CompletionParams &params, const StreamCallback &on_chunk) override
Async streaming completion — native override (issue #4).
static std::unique_ptr< SchemaProvider > create(const Config &config)
Create a schema-based provider instance.
JSON dot-path navigation utilities.
Abstract LLM provider interface.
std::function< void(const std::string &chunk)> StreamCallback
Callback invoked per token during streaming completion.
Definition provider.h:52
LLM completion response including the message and token usage.
Definition types.h:60
Parameters for an LLM completion request.
Definition provider.h:57
Configuration for schema-based provider.
std::string api_key
API key (overrides env var if set).
std::string base_url_override
If non-empty, overrides the schema's connection.base_url. Useful for test doubles and self-hosted Ope...
std::string schema_path
Path to schema file, or built-in name ("openai", "claude", "gemini").