top of page

Building a Production-Grade RAG Engine with LangChain, Postgres, and HNSW Indexing.

  • Neha Singh
  • 3 days ago
  • 6 min read

Updated: 2 hours ago

An AI-powered Retrieval-Augmented Generation (RAG) system that serves as an intelligent document research assistant. The project implements an orchestrated agent architecture with a comprehensive RAG pipeline for ingesting PDF documents and retrieving relevant information from a PostgreSQL vector database.


Most RAG systems suffer from duplicate records and slow retrieval as they grow. I’ve developed a robust RAG orchestrator that solves the most common 'Day 2' database challenges. By integrating the langchain SQLRecordManager for duplicate vector embedding prevention and PGVECTOR’s HNSW indexing for high-speed retrieval, the system ensures data integrity and performance at scale. It features an interactive CLI for seamless document ingestion with surgical cleanup of old records, and optimized Q&A over multiple document collections. No more duplicate chunks, no more slow scans—just a robust, local-first AI storage engine.


Project Overview

AgenticRAG demonstrates a production-grade RAG implementation with the following core components:


Architecture

The system is built on the following architecture:


  • Postgres Database: For local development I have spinned up a docker image to run postgres database as a container in my machine. You can do it too using Docker Desktop.

docker run --name local-postgres -e POSTGRES_PASSWORD=xxxxxxxx -p 5432:5432 -d postgres

Verify through WSL < Ubuntu>:

psql -h localhost -p 5432 -U postgres -d postgres -W
  • PDF Ingestion Pipeline: Automatically reads PDF files from designated directories

  • Embedding Generation: Converts documents into embeddings using OpenAI's text-embedding-ada-002

  • Vector Database: Stores embeddings in PostgreSQL with HNSW vector indexing

  • Intelligent Retrieval: Fetches relevant documents based on semantic similarity for user queries

  • Orchestrated Agent: Coordinates the entire workflow using LangChain agents with GPT-4o-mini


Core Components & Function Descriptions


1. Main Application (main.py)

Entry point for the RAG system with an interactive menu system.


Functions:

  • get_cached_tables()

    • Reads the cached list of vector tables from disk for immediate UI display

    • Returns a list of available collections

    • Returns empty list if cache file doesn't exist

  • main() (async)

    • Primary orchestration function that initializes the system and manages user interactions

    • Loads environment variables, database configuration, and system logger

    • Displays interactive menu with three options:

    • Document Ingestion: Ingest PDFs into the vector database

    • Q&A: Query the knowledge base

    • Exit: Terminate the application

    • Handles connection string construction for PostgreSQL

    • Manages the flow between RAGUtility and AgentOrchestrator based on user choice


2. Agent Orchestrator (agents/agent.py)

Manages the LLM-based agent that decides when and how to use the RAG tools.


Class: AgentOrchestrator

Constructor: __init__(rag_utility, query: str)

  • Initializes the agent with a RAGUtility instance and user query

  • Sets up GPT-4o-mini as the LLM with temperature=0 for deterministic responses

  • Defines system prompt to guide agent behavior

  • Registers available tools (search_knowledge_base)


Methods:
_initialize_tools() → List[Any]
  • Defines and returns the list of tools available to the agent

  • Currently implements one tool: search_knowledge_base

  • Tool definition uses LangChain's @tool decorator


search_knowledge_base(query: str) (tool)
  • Consults the internal PDF documentation by querying the RAG system

  • Acts as a bridge between the LLM and the vector database

  • Returns relevant document chunks that match the query


chat() → str
  • Main entry point for user interactions with the agent

  • Invokes the LangChain agent with the user query wrapped in HumanMessage

  • Logs all tool calls made by the agent for debugging

  • Extracts and returns the final text response from the agent

  • Handles exceptions gracefully with error logging


3. RAG Utility Module (tools/rag_utility.py)

Core engine for all RAG operations including document ingestion and retrieval.


Class: RAGUtility

Constructor: __init__(vector_store, record_manager, logger, connection_string, engine, db_url)

  • Initializes RAG system with PostgreSQL vector store configuration

  • Stores references to LangChain vector store, record manager, and database connections


Class Methods:
@classmethod
    async def create(cls,connection_string, table_name)
  • Factory method that initializes the complete RAG pipeline

  • Creates or validates PostgreSQL vector table

  • Initializes embeddings using OpenAI's text-embedding-ada-002

  • Sets up Record Manager for document lifecycle tracking

  • Creates HNSW indexes for vector similarity search

  • Handles async PostgreSQL connections with proper error handling


Static/Async Methods:
_check_table_exists(db_url: str, table_name: str) → bool (async)
  • Checks if a vector table exists in PostgreSQL

  • Queries information_schema to verify table presence

  • Returns boolean indicating table existence



_create_index(db_url: str, table_name: str) (async)
  • Creates HNSW index for vector similarity search

  • Creates GIN index for metadata queries

  • Converts langchain_metadata column to JSONB type

  • Enables efficient vector and metadata-based queries



Instance Methods:

def _run_indexing(self,docs) → dict
  • Runs the indexing pipeline for document chunks

  • Uses LangChain's index function with SQLRecordManager

  • Implements incremental cleanup to avoid duplicating existing content

  • Returns indexing statistics (num_added, num_updated, etc.)


async def _maintenance_task(self,table_name):
  • Optimizes HNSW index after large batch ingestions

  • Reindexes concurrently to allow reads/writes during optimization

  • Prevents HNSW bloating from dead rows and nodes


async def update_table_cache(self):
  • Saves the current list of vector tables to disk cache

  • Queries PostgreSQL for all vector-type columns

  • Stores table names in JSON cache file for immediate UI display


async def ingest_pdfs_to_pgvector(self,doc_name):
  • Main PDF ingestion pipeline orchestrator

  • Loads documents from specified document name

  • Splits documents into chunks using RecursiveCharacterTextSplitter (1000 chunk size, 200 overlap)

  • Runs indexing pipeline <self._run_indexing(docs)> to store chunks in PostgreSQL

  • Triggers maintenance task <self._maintenance_task(doc_name)> if more than 1000 chunks were added

  • Updates table cache for UI availability <self.update_table_cache()>

  • Logs all operations for debugging


retrieve_docs_from_pgvector(user_query: str, k: int = 3) → str
  • Retrieves relevant document chunks from the vector database

  • Performs semantic similarity search using the user's query

  • Returns top k results (default 3) concatenated as context string

  • Returns empty string on error with exception logging

  • Acts as the retrieval backend for the agent



4. Utility Modules

Provides consistent, color-coded logging across the application.


Class: ColorFormatter(logging.Formatter)


Custom formatter that adds ANSI color codes based on log level

Maps: DEBUG (cyan), INFO (green), WARNING (yellow), ERROR (red), CRITICAL (bold red)

Function: get_logger(name: str, level: int = logging.INFO) → logging.Logger
  • Creates and configures a logger instance with the specified name

  • Sets up console handler with ColorFormatter for terminal output

  • Sets up file handler for persistent logging (unless name starts with "test")

  • Creates logs directory automatically if it doesn't exist

  • Returns fully configured logger ready for use


  1. config_loader.py

Manages configuration loading from JSON configuration files.


Function: load_config() → dict
  • Reads configuration from config/config.json

  • Loads and sets environment variables for:

  • OpenAI API key and base URL

  • PostgreSQL connection parameters (host, port, database, user, password)

  • Returns the complete configuration dictionary

  • Called at application startup to initialize system settings


  1. data_ingestion.py

Handles PDF document loading from various sources.


Function: load_documents(doc_name: str = None) → List[Document]
  • Intelligent document loader that handles multiple input formats:

  • Directory of PDFs: Uses PyPDFDirectoryLoader if folder exists

  • ZIP file: Extracts contents to data folder and loads all PDFs

  • Single PDF file: Uses PyPDFLoader for individual file

  • Not found: Returns empty Document with error logging

  • Converts all PDF pages into LangChain Document objects

  • Logs document loading progress and file count

  • Handles file path construction relative to project structure

  • Returns list of Document objects ready for chunking and ingestion



Configuration

The system requires a config/config.json file with the following structure:

Data Flow

  1. Ingestion Flow:
    1. User selects "Document Ingestion" → Provides document name

    2. RAGUtility.create() initializes vector store and record manager

    3. ingest_pdfs_to_pgvector() loads documents via load_documents()

    4. Documents are split into chunks using RecursiveCharacterTextSplitter

    5. Chunks are embedded and indexed in PostgreSQL vector table

    6. HNSW index is optimized if needed

    7. Table cache is updated

      Note that num_skipped: 0, so if a user tries to re-upload the same document < name should remain same>, RecordManager will check for chunks <metadata+content> and skip those which are duplicates.
      Note that num_skipped: 0, so if a user tries to re-upload the same document < name should remain same>, RecordManager will check for chunks <metadata+content> and skip those which are duplicates.
      An example of how the langchain-postgres v2 table schema and vector embedding looks like. Note there is an upsertion_record table, RecordManager stores its data in this dedicated table within PostgreSQL database and this table acts as a "lookup ledger" to track the state of our documents separate from the actual vector embeddings.
      An example of how the langchain-postgres v2 table schema and vector embedding looks like. Note there is an upsertion_record table, RecordManager stores its data in this dedicated table within PostgreSQL database and this table acts as a "lookup ledger" to track the state of our documents separate from the actual vector embeddings.



2. Query Flow:
  1. User selects "Q&A" → Selects collection and enters query

  2. RAGUtility.create() initializes vector store for selected collection

  3. AgentOrchestrator is initialized with user query

  4. Agent decides whether to call search_knowledge_base tool

  5. retrieve_docs_from_pgvector() fetches relevant document chunks

  6. LLM generates response based on retrieved context and user query

  7. Response is returned to user

The Q&A option lists all the available documents for user to choose from and then post a query.
The Q&A option lists all the available documents for user to choose from and then post a query.

The HNSW indexing on the embedding makes the pgvector search faster and accurately responds back with the content.
The HNSW indexing on the embedding makes the pgvector search faster and accurately responds back with the content.



Key Features

  • Intelligent Document Processing: Handles single PDFs, directories, and ZIP archives

  • Semantic Search: Uses vector embeddings for context-aware document retrieval

  • Agent-Driven: LLM decides whether to consult knowledge base or respond directly

  • Production Database: PostgreSQL with HNSW vector indexing for scalability

  • Caching System: Maintains cache of available collections for quick access

  • Async Operations: Non-blocking database operations for better performance

  • Comprehensive Logging: Color-coded console and file logging for debugging


Comments


bottom of page