top of page

Building a Production-Grade RAG Engine with LangChain, Postgres, and HNSW Indexing.

Updated: Jan 16

Most RAG systems suffer from duplicate records and slow retrieval as they grow. This robust RAG orchestrator solves the most common 'Day 2' database challenges. By integrating the langchain SQLRecordManager for duplicate vector embedding prevention and PGVECTOR’s HNSW indexing for high-speed retrieval, the system ensures data integrity and performance at scale. It features an interactive CLI for seamless document ingestion with surgical cleanup of old records, and optimized Q&A over multiple document collections. No more duplicate chunks, no more slow scans—just a robust, local-first AI storage engine.


Understanding Few concepts before deep diving:

What does a Record Manager do?


a simple PGVector vectorstore and add_document is not enough for production.


  • No Duplicate Prevention: By default, it will happily insert the same text multiple times, creating redundant vectors.

  • Limited Metadata Control: It stores everything in a generic langchain_pg_embedding table. As you scale, you may need custom columns (like user_id or tenant_id) for performance-critical filtering.

  • Memory Overhead: Large batch inserts via LangChain can lead to high memory usage in your application layer before the data even reaches the database.


A More Robust Strategy: "The Record Manager"

  • LangChain actually provides a tool specifically for this called the Record Manager. Instead of just adding docs, you "index" them. This keeps track of what has already been inserted.

  • RecordManager stores its data in a dedicated table within your PostgreSQL database. By default, this table is named upsertion_record, though you can customize this name when you initialize the manager.This table acts as a "lookup ledger" to track the state of your documents separate from the actual vector embeddings.

  • Langchain provides an index function acts as the "brain" that coordinates between the Record Manager (which remembers what you've already processed) and the Vector Store (where the searchable vectors live).


Project Overview

AgenticRAG demonstrates a production-grade RAG implementation with the following core components:


System Overview

## 1. INITIALIZATION FLOW

```
┌─────────────────────────────────────────────────────────────────┐
│                     COMMAND LINE ENTRY                          │
│                     (asyncio.run(main()))                       │
└────────────────────────┬────────────────────────────────────────┘
                         │
                    ┌────▼─────┐
                    │  Setup   │
                    └────┬─────┘
                         │
        ┌────────────────┼────────────────┐
        ▼                ▼                ▼
    Logger         Config         Env Variables
    Loader         Loader         (DB Credentials)
        │                │                │
        └────────────────┼────────────────┘
                         │
                    ┌────▼──────────────────────────┐
                    │ Build PostgreSQL Connection   │
                    │ String (DB_URL)               │
                    └────┬──────────────────────────┘
                         │
                    ┌────▼──────────────────────────┐
                    │ Display Welcome Menu          │
                    │ (3 Options)                   │
                    └────┬──────────────────────────┘
                         │
```
## 2. MAIN MENU - USER INTERACTION

```
┌────────────────────────────────────────────────────────┐
│         CONSOLE: Display Menu & Get User Choice        │
│                                                        │
│    1. Document Ingestion                              │
│    2. Q&A (Question & Answer)                         │
│    3. Exit                                            │
└────────────────────┬─────────────────────────────────┘
                     │
        ┌────────────┼────────────┐
        │            │            │
     Choice=1    Choice=2      Choice=3
        │            │            │
        ▼            ▼            ▼
     [FLOW A]   [FLOW B]        EXIT
```
## 3. FLOW A: DOCUMENT INGESTION PATH

```
┌─────────────────────────────────────────────────────────────┐
│  User Input: Document Name                                  │
│  (folder/zip file/PDF in data/ directory)                   │
└────────────────┬────────────────────────────────────────────┘
                 │
            ┌────▼──────────────────────────────────┐
            │ RAGUtility.create()                   │
            │ (Async Factory Method)                │
            └────┬───────────────────────────────────┘
                 │
     ┌───────────┼────────────────┬──────────────┐
     ▼           ▼                ▼              ▼
   Logger    SQLRecord       OpenAI          PGVector
  Instance   Manager      Embeddings         Engine
             Created       (text-embedding-
                          ada-002)
     │           │                │              │
     └───────────┼────────────────┴──────────────┘
                 │
            ┌────▼──────────────────────────────────┐
            │ _check_table_exists()                 │
            │ (Query PostgreSQL)                    │
            └────┬───────────────────────────────────┘
                 │
        ┌────────┴─────────┐
        │                  │
     Table             Table
     Exists          Not Exists
        │                  │
        │              ┌───▼──────────────────┐
        │              │ Create New Table:    │
        │              │ - Columns:           │
        │              │   • langchain_id     │
        │              │   • langchain_metadata
        │              │   • embedding (1536) │
        │              │   • content          │
        │              │ ainit_vectorstore_   │
        │              │ table()              │
        │              └───┬──────────────────┘
        │                  │
        │              ┌───▼──────────────────┐
        │              │ Create Indexes:      │
        │              │ - HNSW index         │
        │              │ - Metadata GIN index │
        │              │ _create_index()      │
        │              └───┬──────────────────┘
        │                  │
        └────────┬─────────┘
                 │
            ┌────▼──────────────────────────────────┐
            │ Initialize PGVectorStore               │
            │ (Connect embeddings to table)         │
            └────┬───────────────────────────────────┘
                 │
            ┌────▼──────────────────────────────────┐
            │ await ingest_pdfs_to_pgvector()       │
            │                                       │
            │  ├─ load_documents(doc_name)         │
            │  │  ├─ Check if dir, zip, or single │
            │  │  ├─ Extract if needed             │
            │  │  └─ Load PDFs via LangChain       │
            │  │     PyPDFLoader/                   │
            │  │     PyPDFDirectoryLoader          │
            │  │                                   │
            │  ├─ split_documents() [CHUNKING]     │
            │  │  ├─ RecursiveCharacterText       │
            │  │  │  Splitter (tiktoken encoder)  │
            │  │  ├─ chunk_size: 1000             │
            │  │  └─ overlap: 200                 │
            │  │                                   │
            │  ├─ _run_indexing(chunks)           │
            │  │  ├─ Generate embeddings for     │
            │  │  │  each chunk                   │
            │  │  ├─ Store in PostgreSQL vector   │
            │  │  ├─ Record in SQLRecordManager    │
            │  │  │  (cleanup: 'incremental')     │
            │  │  └─ Return indexing_stats        │
            │  │                                   │
            │  ├─ Check if 1000+ chunks added     │
            │  │  └─ Rebuild HNSW index if true   │
            │  │     (_maintenance_task())         │
            │  │                                   │
            │  └─ update_table_cache()             │
            │     └─ Save collection list to      │
            │        cache/vector_tables_         │
            │        cache.json                    │
            │                                       │
            └────┬───────────────────────────────────┘
                 │
            ┌────▼──────────────────────────────────┐
            │ Console Output:                       │
            │ "Successfully ingested chunks..."    │
            └────┬───────────────────────────────────┘
                 │
            ┌────▼──────────────────────────────────┐
            │ Return to Main Menu                   │
            └────────────────────────────────────────┘
```
## 5. RESPONSE & LOOP

```
┌─────────────────────────────────────────────────────────────┐
│  Console Output:                                            │
│  "Agent's response:\n{response}"                           │
└────────────────┬────────────────────────────────────────────┘
                 │
            ┌────▼──────────────────────────────┐
            │ Return to Main Menu Loop           │
            │ (Prompt user for next action)      │
            └────────────────────────────────────┘
```

Low level design components:

The system is built on the following architecture:


  • Postgres Database: For local development I have spinned up a docker image to run postgres database as a container in my machine. You can do it too using Docker Desktop.

docker run --name local-postgres -e POSTGRES_PASSWORD=xxxxxxxx -p 5432:5432 -d postgres

Verify through < using WSL if Windows >:

psql -h localhost -p 5432 -U postgres -d postgres -W
  • PDF Ingestion Pipeline: Automatically reads PDF files from designated directories

  • Embedding Generation: Converts documents into embeddings using OpenAI's text-embedding-ada-002

  • Vector Database: Stores embeddings in PostgreSQL with HNSW vector indexing

  • Intelligent Retrieval: Fetches relevant documents based on semantic similarity for user queries

  • Orchestrated Agent: Coordinates the entire workflow using LangChain agents with GPT-4o-mini


Core Components & Function Descriptions



  1. KEY DESIGN PATTERNS

### Async/Await Pattern
- `RAGUtility.create()` - Async factory for initialization
- `ingest_pdfs_to_pgvector()` - Async ingestion
- `_check_table_exists()`, `_create_index()` - Async DB operations
- `update_table_cache()` - Async cache updates

### Tool-Based Agent Pattern
- Agent has access to `search_knowledge_base` tool
- Agent decides autonomously whether to use it
- Tool retrieves context from PostgreSQL

### Vector Search Pipeline
1. **Chunking**: Split PDFs into 1000-char chunks (200 overlap)
2. **Embedding**: Convert chunks to 1536-dim vectors
3. **Storage**: Store in PostgreSQL with HNSW index
4. **Retrieval**: Cosine similarity search for top-K results

  1. FLOW DECISION TREE

```
START
  │
  ├─ User chooses 1 (Ingestion)
  │  ├─ Provide doc name
  │  ├─ Load PDFs
  │  ├─ Chunk documents
  │  ├─ Generate embeddings
  │  ├─ Store in PostgreSQL
  │  ├─ Create HNSW indexes
  │  ├─ Update cache
  │  └─ Return to menu
  │
  ├─ User chooses 2 (Q&A)
  │  ├─ Select collection
  │  ├─ Enter query
  │  ├─ Initialize Agent
  │  ├─ Agent invokes search tool (if needed)
  │  ├─ Retrieve context from PostgreSQL
  │  ├─ LLM generates response
  │  ├─ Return response to console
  │  └─ Return to menu
  │
  └─ User chooses 3 (Exit)
     └─ Log "Exiting..." & STOP
```

---
  1. CACHING MECHANISM



```
Cache File: cache/vector_tables_cache.json

Content: {"tables": ["doc_name_1", "doc_name_2", ...]}

Updated: After each successful ingestion
Loaded: When user selects option 2 (Q&A)
Purpose: Quick display of available collections without DB query
```

  1. DATABASE SCHEMA


```
Table: {table_name-> doc_name} 
Description: dynamically created per ingestion using langchain_postgres.PGEngine.ainit_vectorstore_table

Columns:
├─ langchain_id (UUID) - Primary key
├─ langchain_metadata (JSONB) - Document metadata (source, page, etc.)
├─ embedding (vector[1536]) - OpenAI embeddings
└─ content (TEXT) - Chunk text

Indexes:
├─ {table_name}_hnsw_idx (HNSW) - Vector similarity search
└─ {table_name}_metadata (GIN) - Metadata filtering
________________________________________________________________________
Table: upsertion_records 
Description: RecordManager stores its data in a dedicated table within PostgreSQL database. By default, this table is named upsertion_record and acts as a "lookup ledger" to track the state of your documents separate from the actual vector embeddings. 
langchain_classic.indexex.index integrates langchain_classic.indexes.SQLRecordManager and langchain_postgres.PGVectorStore together to create this table

Columns:
├─ key (TEXT) - A unique ID for the chunk (often a hash of the content).
├─ namespace(TEXT) - Groups records (e.g., postgres/collection_name).
├─ group_id (TEXT) - The source_id (e.g., the filename) used to identify each document source, after chunking langchain documents has this info as part of metadata->source representing the document or filename.
└─ updated_at (DOUBLE PRECISION) - Server timestamp of the last write.
```

  1. SYSTEM DEPENDENCIES


```
External Services:
├─ PostgreSQL + pgvector - Vector storage & retrieval
├─ OpenAI API - Embeddings (text-embedding-ada-002)
└─ OpenAI API - LLM (gpt-4o-mini)

Python Libraries:
├─ LangChain - RAG framework & agent orchestration
├─ LangChain_postgres - For PGVectorStore,PGEngine for async doc ingestion
├─ Psycopg3 - PostgreSQL async driver
├─ RecursiveCharacterTextSplitter - Document chunking
└─ PyPDFLoader - PDF document loading
```

Configuration

The system requires a config/config.json file with the following structure:

Data Flow



1. Ingestion Flow:

  1. User selects "Document Ingestion" → Provides document name

  2. RAGUtility.create() initializes vector store and record manager

  3. ingest_pdfs_to_pgvector() loads documents via load_documents()

  4. Documents are split into chunks using RecursiveCharacterTextSplitter

  5. Chunks are embedded and indexed in PostgreSQL vector table

  6. HNSW index is optimized if needed

  7. Table cache is updated

    Note that num_skipped: 0, so if a user tries to re-upload the same document < name should remain same>, RecordManager will check for chunks <metadata+content> and skip those which are duplicates.
    Note that num_skipped: 0, so if a user tries to re-upload the same document < name should remain same>, RecordManager will check for chunks <metadata+content> and skip those which are duplicates.
    An example of how the langchain-postgres v2 table schema and vector embedding looks like. Note there is an upsertion_record table, RecordManager stores its data in this dedicated table within PostgreSQL database and this table acts as a "lookup ledger" to track the state of our documents separate from the actual vector embeddings.
    An example of how the langchain-postgres v2 table schema and vector embedding looks like. Note there is an upsertion_record table, RecordManager stores its data in this dedicated table within PostgreSQL database and this table acts as a "lookup ledger" to track the state of our documents separate from the actual vector embeddings.



2. Query Flow:
  1. User selects "Q&A" → Selects collection and enters query

  2. RAGUtility.create() initializes vector store for selected collection

  3. AgentOrchestrator is initialized with user query

  4. Agent decides whether to call search_knowledge_base tool

  5. retrieve_docs_from_pgvector() fetches relevant document chunks

  6. LLM generates response based on retrieved context and user query

  7. Response is returned to user

The Q&A option lists all the available documents for user to choose from and then post a query.
The Q&A option lists all the available documents for user to choose from and then post a query.

The HNSW indexing on the embedding makes the pgvector search faster and accurately responds back with the content.
The HNSW indexing on the embedding makes the pgvector search faster and accurately responds back with the content.



Key Features

  • Intelligent Document Processing: Handles single PDFs, directories, and ZIP archives

  • Semantic Search: Uses vector embeddings for context-aware document retrieval

  • Agent-Driven: LLM decides whether to consult knowledge base or respond directly

  • Production Database: PostgreSQL with HNSW vector indexing for scalability

  • Caching System: Maintains cache of available collections for quick access

  • Async Operations: Non-blocking database operations for better performance

  • Comprehensive Logging: Color-coded console and file logging for debugging


Comments


bottom of page