Home Resources Scaling RAG Ingestion with Ragbits, Ray and Qdrant

Scaling RAG Ingestion with Ragbits, Ray and Qdrant

Introduction

RAG (retrieval augmented generation) systems have become quite popular in recent times. These systems are designed to combine the benefits of LLMs and information retrieval. The idea is to first retrieve relevant documents from a large corpus and then use the retrieved documents to generate the final response. This approach is particularly useful when the answer to a question is not present in the training data of the LLM and it allows for significantly reduce LLM hallucinations.

There are several potential issues that might arise when scaling up RAG systems. In this cookbook we will focus on one such issue: ingestion of large amounts of document data. We will show how you can use Ragbits and Ray to easily scale up the ingestion of document data. Beside these elements we will also use Qdrant and VLLM (in case of GPU-assisted deployment).

High level overview

In this cookbook, we will show you how to use Ragbits and Ray to ingest large amounts of document data into Qdrant vector database. As an example input data we will use several WHO documents (all cancer related), but this approach can be easily extended to any other document data. We will show you two different approaches to deployment: on-premises and cloud-based. This specific cookbook will focus on the on-premises deployment, while the cloud-based deployment will be covered in a separate cookbook.

Tools used

Ragbits

Ragbits is an open-source modular framework by deepsense.ai that streamlines the development of AI applications by supporting over 100 large language models and various vector stores. It processes more than 20 document formats and offers a powerful command-line interface for efficient execution, prompt testing, and vector store management. With built-in observability and integrated testing tools, Ragbits enhances monitoring and evaluation, simplifying the deployment process.

Ray

Ray is a fast, simple, and popular framework for building and running distributed workloads. It provides a simple API for building distributed programs and is designed to be scalable and efficient. Ray is widely used in the machine learning community for scaling up training and inference workloads.

Qdrant

Qdrant is a vector database and search engine. It provides a fast and scalable solution for storing, searching, and managing vectors with additional payload. Qdrant supports various distance metrics and filtering capabilities, making it ideal for building efficient vector search applications like semantic search, recommendation systems, and other AI-powered solutions.

VLLM

vLLM is an open-source inference and serving engine designed for efficient deployment of large language models (LLMs). It utilizes PagedAttention to optimize memory management, significantly reducing memory waste and enabling higher throughput compared to frameworks like Hugging Face Transformers. With features such as continuous batching, OpenAI-compatible APIs, and support for distributed inference, vLLM provides a scalable, cost-efficient solution for real-time and large-scale AI applications.

Pre-requisites

Before we start, make sure you have the following installed on your machine:

  • Docker
  • Docker Compose
  • Python 3.12 or higher
  • Nvidia Container Toolkit (if you have a GPU and want to use it for inference)

On-premises deployment

In this approach, the entire system can be set up on your local machine or group of machines. We will start small by setting up the entire infrastructure on a single machine, using Docker Compose. This will allow you to test the system locally before deploying it on a larger scale.

In general we will set up a system that consists of the following containers:

  • Qdrant DB
  • Ray cluster head
  • Ray cluster worker (x3)
  • LLM inference server (only if you have a GPU)

Step 0: Choose whether to use a local or external LLM provide API keys

Before we start, you need to decide whether you want to use a local LLM:

  • If you want to use a local LLM, you need a GPU and an API key for Hugging Face Hub, so the model can be downloaded.
  • If you want to use an OpenAI model via API, you need to provide the API key for OpenAI endpoint.

Please choose one of the following options and provide the necessary API key in the next cell:

USE_GPU = False
HUGGING_FACE_HUB_TOKEN=""
OPENAI_API_KEY=""

if USE_GPU:
    assert HUGGING_FACE_HUB_TOKEN, "Please set HUGGING_FACE_HUB_TOKEN"
else:
    assert OPENAI_API_KEY, "Please set OPENAI_API_KEY"

Step 1: Set up the environment

Create Docker Compose containers based on the following compose YML (it’s a copy of the docker-compose.yml file):

services:
  qdrant:
    image: qdrant/qdrant:latest
    ports:
      - "6333:6333"      # API port
      - "6334:6334"      # Web UI port
    volumes:
      - qdrant_data:/qdrant/storage
    environment:
      - QDRANT_ALLOW_RECOVERY_ON_CORRUPTED_DB=true
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:6333/readiness"]
      interval: 5s
      timeout: 5s
      retries: 3
      start_period: 5s

  ray_head:
    build:
      context: .
      dockerfile: Dockerfile.ray
    ports:
      - "8265:8265"      # Ray dashboard
      - "10001:10001"    # Ray client server
      - "6379:6379"      # Redis
    command: >
      ray start
      --head
      --port=6379
      --redis-password=password
      --dashboard-host=0.0.0.0
      --num-cpus=0
      --block
    healthcheck:
      test: ["CMD", "ray", "status"]
      interval: 10s
      timeout: 10s
      retries: 5

  ray_worker:
    build:
      context: .
      dockerfile: Dockerfile.ray
    depends_on:
      - ray_head
    deploy:
      replicas: 3
    command: >
      ray start
      --address=ray_head:6379
      --redis-password=password
      --block
    environment:
      - OPENAI_API_KEY=$OPENAI_API_KEY

  llm:
    image: vllm/vllm-openai:v0.7.2
    restart: always
    ipc: host
    profiles:
      - gpu
    volumes:
      - ~/.hf_cache:/root/.cache/huggingface/hub
    ports:
      - "8000:8000"
    environment:
      - HUGGING_FACE_HUB_TOKEN=$HUGGING_FACE_HUB_TOKEN
      - PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              device_ids: ["all"]
              capabilities: [ gpu ]
    command:
      - "--model=alpindale/Llama-3.2-11B-Vision-Instruct"
      - "--dtype=half"
      - "--tensor-parallel-size=1" # If you have multiple GPUs, you might want to adjust this value
      - "--gpu-memory-utilization=0.9"
      - "--max-num-seqs=20"
      - "--max-model-len=10000"
      - "--tokenizer-mode=auto"
      - "--config-format=auto"
      - "--load-format=auto"
      - "--tool-call-parser=pythonic"
      - "--chat-template=examples/tool_chat_template_llama3.2_pythonic.jinja"
      - "--enable-auto-tool-choice"

volumes:
  qdrant_data:

As you might notice, we use a custom Dockerfile for Ray nodes. It’s necessary, because we need to install several system dependencies (beside Python libraries) for the processing to work. Here you can find the content of Dockerfile.ray:

FROM rayproject/ray:latest-py312

USER root

# Install dependencies
RUN apt update && \
    apt install -y \
    libgl1 \
    libglib2.0-0 \
    tesseract-ocr \
    libtesseract-dev \
    poppler-utils && \
    apt clean && \
    rm -rf /var/lib/apt/lists/*

# Install Python packages
RUN pip install \
    "ragbits-core[qdrant,fastembed]==0.13.0" \
    "ragbits-document-search[ray]==0.13.0" \
    "unstructured[pdf]==0.17.2"

USER 1000

# Set environment variable to suppress Docker CPU warning
ENV RAY_DISABLE_DOCKER_CPU_WARNING=1

Finally, you can deploy the containers simply by running docker compose --profile gpu up -d (omit profile if using only CPU).
In fact, you can do it inside this notebook by running the next cell. Note the cell will take some time to run as it will download and build the necessary Docker images and set up the containers. The containers will then run in the background.

if USE_GPU:
    !HUGGING_FACE_HUB_TOKEN={HUGGING_FACE_HUB_TOKEN} docker compose --profile gpu up -d
    print("GPU-powered deployment has been started")
else:
    !OPENAI_API_KEY={OPENAI_API_KEY} docker compose up -d
    print("CPU-only deployment has been started")

Step 2: Prepare ingestion job

There are several ways you can work with Ray clusters, but not all of them are created equal. While it might be tempting to use Ray Client directly in a notebook, it is not recommended for production workloads and can result in version mismatch issues. Instead, we will use a more robust approach by creating a Python script that will be executed on the Ray cluster as a Ray Job.

Document search client

First, we need to define the document search client. We’ll keep it in a separate document_search.py file to make it easier to reuse in e.g. this notebook:

from qdrant_client import AsyncQdrantClient
from ragbits.core.embeddings.fastembed import FastEmbedEmbedder
from ragbits.core.llms import LLM
from ragbits.core.vector_stores.qdrant import QdrantVectorStore
from ragbits.document_search import DocumentSearch
from ragbits.document_search.documents.element import ImageElement
from ragbits.document_search.ingestion.enrichers.image import ImageElementEnricher
from ragbits.document_search.ingestion.strategies import RayDistributedIngestStrategy


def get_document_search(qdrant_host: str, image_description_llm: LLM) -> DocumentSearch:
    embedder = FastEmbedEmbedder(model_name="BAAI/bge-base-en")
    vector_store = QdrantVectorStore(
        embedder=embedder,
        client=AsyncQdrantClient(qdrant_host),
        index_name="documents",
    )

    image_enricher_routing = {
        ImageElement: ImageElementEnricher(llm=image_description_llm)
    }

    processing_strategy = RayDistributedIngestStrategy()
    return DocumentSearch(
        vector_store=vector_store,
        ingest_strategy=processing_strategy,
        enricher_router=image_enricher_routing,
    )

The script above defines a function to instantiate a DocumentSearch instance, which facilitates both document ingestion into Qdrant and subsequent retrieval operations. Let’s examine the key components that make up this implementation:

  • FastEmbedEmbedder is used to embed the documents into vectors. We use the BAAI/bge-base-en model, but you can use any other model supported by fastembed or use any other embedder supported by Ragbits.
  • QdrantVectorStore is basically a wrapper around Qdrant client that handles the embedding and indexing of the documents. Just like with the embedder, Ragbits supports several vector stores.
  • image_enricher_routing is a dictionary that maps document elements to their respective handlers. In this case, we overwrite the default handler for ImageElement to use a specific LLM model for image description.
  • RayDistributedIngestStrategy is a strategy that allows you to distribute the ingestion process across multiple Ray workers.

Ingestion job

As soon as we have the client, we can work on the ingestion script. Let’s call it ingest_files.py. The script will look like this:

import argparse
import asyncio
import pprint

from ragbits.core.llms import LiteLLM
from ragbits.document_search.documents.sources import WebSource

from document_search import get_document_search

urls = [
    "https://iris.who.int/bitstream/handle/10665/379245/9789240101050-eng.pdf",
    "https://iris.who.int/bitstream/handle/10665/379225/9789240100954-eng.pdf",
    "https://iris.who.int/bitstream/handle/10665/378875/9789290211709-eng.pdf",
    ... # Full list available in the source file
]

headers = {
    "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:136.0) Gecko/20100101 Firefox/136.0"
}

documents = [WebSource(url=url, headers=headers) for url in urls]


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("--use_gpu", action="store_true")
    return parser.parse_args()


async def main(use_gpu: bool = False):
    if use_gpu:
        llm = LiteLLM(
            model_name="hosted_vllm/alpindale/Llama-3.2-11B-Vision-Instruct",
            base_url="http://llm:8000/v1",
            use_structured_output=True,
            # Necessary, otherwise the model will not report the vision capabilities
            custom_model_cost_config={
                "alpindale/Llama-3.2-11B-Vision-Instruct": {
                    "litellm_provider": "hosted_vllm",
                    "supports_vision": True,
                }
            }
        )
    else:
        llm = LiteLLM(model_name="openai/gpt-4o-mini", use_structured_output=True)

    document_search_client = get_document_search("http://qdrant:6333", llm)
    results = await document_search_client.ingest(documents)
    pprint.pprint(results)


if __name__ == "__main__":
    args = parse_args()
    asyncio.run(main(args.use_gpu))

The script first defines a list of documents that we want to ingest. In this case, we use the WHO website to get documents about cancer.
Note that Ragbits supports various document sources, including local files, URLs, and text strings and more. WebSource is by far the easiest to work with on Ray clusters, as you don’t need to set up any additional file sharing (e.g. Docker volumes or NFS).

The script then selects GPU/remote LLM variant, creates the DocumentSearch client and ingests the documents into Qdrant. In the end the ingestion results are printed to the console.
Keep in mind Ragbits is designed with asynchrony in mind, that’s why we need to use asyncio.run to run the main function.
Also, take a look at the address we’re using to connect to Qdrant and local LLM server. These addresses must be reachable from the Ray cluster, so in this case they’re container names (qdrant and llm).

Step 3: Submit the ingestion job

Now that we have the ingestion script ready, we can submit it to the Ray cluster. You can do this by using Ray Job Submission API or CLI. In this case, we will use the Python API, which we can run directly in this notebook.

from ray.job_submission import JobSubmissionClient

client = JobSubmissionClient("http://localhost:8265")
job_id = client.submit_job(
    entrypoint=f"python ingest_files.py {'--use_gpu' if USE_GPU else ''}",
    runtime_env={"working_dir": "./"}
)

As you can see it’s actually quite simple to submit a Ragbits ingestion job to the Ray cluster. The only thing you need to keep in mind is to correctly specify the pip libraries that need to be installed on the Ray workers. For simple, one-off jobs, you can use the pip field of runtime_env argument of the ray.submit function. For more complex jobs, you might want to use a tailored Docker image, just like the one we defined before.

Let’s wait for the job to finish before we move on:

from ray.job_submission import JobStatus
import time

def wait_until_status(job_id, status_to_wait_for, timeout_seconds=3600):
    start = time.time()
    while time.time() - start <= timeout_seconds:
        status = client.get_job_status(job_id)
        print(f"status: {status}")
        if status in status_to_wait_for:
            break
        time.sleep(5)


wait_until_status(job_id, {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED})

Note that even if you close the notebook, the job will continue running on the Ray cluster (this is one of the main advantages of Ray Jobs). You can check the status of the job by using the Ray dashboard or the Ray CLI.

Step 4: Retrieve the documents

Now that we have ingested the documents into Qdrant, we can check if the retrieval works as expected. We can do this by querying Qdrant for the documents and then printing the results. We will create a DocumentSearch object and use it to retrieve the documents:

from ragbits.core.llms import LiteLLM
from document_search import get_document_search
from pprint import pprint

if USE_GPU:
    llm = LiteLLM(
        model_name="alpindale/Llama-3.2-3B-Instruct",
        base_url="http://localhost:8000",
        use_structured_output=True
    )
else:
    llm = LiteLLM(
        model_name="openai/gpt-4o-mini",
        use_structured_output=True
    )

search_client = get_document_search("http://localhost:6333", llm)
pprint(await search_client.search("cancer"))

A keen eye will notice that even though we are using the same get_document_search function, we provide different addresses to the Qdrant instance and the LLM. This is because the retrieval process is done on the local machine, not on the Ray cluster.

With a working retrieval you are able to connect your vector search engine with any LLM, but this is out of scope of this cookbook – look up one of our other Ragbits cookbooks to check how easy it is!

Authors

Posted in