X

Implementing CAG with Qdrant and Redis on Kubernetes

As AI-powered applications scale, they demand faster data retrieval and processing. Traditional databases struggle to keep up, and even retrieval-augmented generation (RAG) has performance bottlenecks when fetching context from external sources. Cache-Augmented Generation (CAG) enhances response times by caching frequently accessed context, reducing redundant lookups.

This approach builds on proven caching mechanisms, like those in Redis, allowing large language models (LLMs) to reuse context efficiently. In this article, we explore how to implement CAG using Redis for caching, Qdrant as a vector database, and Kubernetes for scalability to optimize AI workloads.

RAG vs CAG: Comparison

Both approaches operate on a retrieval premise, following different mechanisms to fetch the context from the database or cache. RAG solutions query and fetch information from external knowledge bases and feed it to LLM for response generation. With CAG, the LLM will first check the user query against the cached context, and if a cache hit is successful, the model will return the response using the cached context to the user. If a cache hit is missed, the LLM will take the RAG route and generate a new response while caching the newly generated response.

Image by Author

Why introduce Kubernetes into the equation?

When limited users use the model, we can provision a small compute with standard RAM to meet business requirements. However, Redis’s capacity is only limited to the available memory. So this implementation is prone to fail due to resource and scalability bottlenecks when the user base increases. By managing Redis on Kubernetes, we have the upper hand with a great deal of capabilities like:

  1. We can scale Redis instances dynamically based on demand by starting with basic resources using horizontal pod autoscaling. This ensures efficient and optimal resource usage with cost benefits.
  2. We can use statefulsets for persistent storage when the LLM relies on specific context for domain-specific response generation.
  3. By deploying Redis in cluster mode, we can leverage Kubernetes capabilities to manage nodes across distributed environments, ensuring high availability of shared data.
  4. Most importantly, Kubernetes enables a declarative approach to define resource requests and limits for Redis pods, ensuring optimal performance throughout the lifecycle.

Practical Implementation

Relying solely on LLM training parameters can generate outdated responses due to the frozen model’s knowledge. So LLM’s relevance and speed depend on how quickly the underlying tools can extract context from respective sources. CAG is proven to deliver faster responses, given the model is used more frequently. However, keeping track of cached context and managing it can become overwhelming when the application scales. Caching only the relevant context and pruning the unused or irrelevant information becomes crucial from a resource utilization and efficiency standpoint.

Image by Author

Considering all the factors, our implementation will be straightforward yet performant. In a nutshell, our solution approach will operate as follows.

The user prompts the LLM with requirements. Before the LLM can process what is already known about the request, our tool will gather context from knowledge bases. The tool will search temp and persistent cache looking for context, when a cache hit occurs the tool will return the context to the model. If a cache hit is missed, the LLM will take the RAG route and search for context in the vector database. We will maintain two caches (temp and persistent). When a context is used for the first time, it will be cached in the temp cache, and when it is accessed more than once, it will be moved to the persistent cache. Temp cache is cleared daily as part of sanity checks, and contexts from persistent cache will be removed if they are not accessed for more than 2 days.

import redis
import time
from qdrant_client import QdrantClient
from qdrant_client.http.models import PointStruct, Distance, VectorParams
import uuid
from kubernetes import client, config
import logging

class CacheContextManager:
  def __init__(self):

      self.qdrant_client = QdrantClient(host="localhost", port=6333)
     

      self.temp_redis = redis.Redis(host='redis-service', port=6379, db=0)  # Temp cache
      self.persistent_redis = redis.Redis(host='redis-service', port=6379, db=1)  # Persistent cache

      config.load_incluster_config()
      self.k8s_apps_v1 = client.AppsV1Api()
      self.namespace = "default"
      self.deployment_name = "redis-deployment"
     
      self.collection_name = "CAG_Collection"
      self._setup_qdrant_collection()
     
      self.TWO_DAYS = 2 * 24 * 60 * 60  # 2 days
      self.ONE_DAY = 24 * 60 * 60       # 1 day
     
      logging.basicConfig(level=logging.INFO)
      self.logger = logging.getLogger(__name__)

  def _setup_qdrant_collection(self):
      try:
          self.qdrant_client.get_collection(self.collection_name)
      except:
          self.qdrant_client.recreate_collection(
              collection_name=self.collection_name,
              vectors_config=VectorParams(size=768, distance=Distance.COSINE)
          )

  def _generate_cache_key(self, query: str) -> str:
      return f"context:{hash(query)}"

  def clear_temp_cache(self):
      self.temp_redis.flushdb()
      self.logger.info("Temporary cache cleared")

  def cleanup_persistent_cache(self):
      current_time = time.time()
      for key in self.persistent_redis.keys("context:*"):
          last_accessed = float(self.persistent_redis.hget(key, "timestamp") or 0)
          if current_time - last_accessed > self.TWO_DAYS:
              self.persistent_redis.delete(key)
      self.logger.info("Persistent cache cleaned up")

  def _check_redis_memory_usage(self) -> float:

      info = self.persistent_redis.info("memory")
      used_memory = info["used_memory"]
      max_memory = info["maxmemory"]
      if max_memory == 0:
          return 0.0
      return (used_memory / max_memory) * 100

  def _scale_redis_deployment(self, replicas: int):

      try:
          deployment = self.k8s_apps_v1.read_namespaced_deployment(
              name=self.deployment_name, namespace=self.namespace
          )
          deployment.spec.replicas = replicas
          self.k8s_apps_v1.patch_namespaced_deployment(
              name=self.deployment_name,
              namespace=self.namespace,
              body=deployment
          )
          self.logger.info(f"Scaled Redis deployment to {replicas} replicas")
      except Exception as e:
          self.logger.error(f"Failed to scale Redis deployment: {e}")

  def get_context(self, query: str, vector: list) -> str:
      cache_key = self._generate_cache_key(query)
      current_time = time.time()

      # Check for context in temporary cache
      temp_context = self.temp_redis.get(cache_key)
      if temp_context:
          return temp_context.decode('utf-8')

      # Check for context in persistent cache
      persistent_context = self.persistent_redis.hget(cache_key, "context")
      if persistent_context:
          self.persistent_redis.hset(cache_key, "timestamp", current_time)
          self.temp_redis.setex(cache_key, self.ONE_DAY, persistent_context)
          return persistent_context.decode('utf-8')

      # Memory usage check with autoscaling
      memory_usage = self._check_redis_memory_usage()
      if memory_usage > 80:  # Scale if memory usage exceeds 80%
          current_replicas = self.k8s_apps_v1.read_namespaced_deployment(
              self.deployment_name, self.namespace
          ).spec.replicas
          self._scale_redis_deployment(current_replicas + 1)

      # Search Qdrant
      search_result = self.qdrant_client.search(
          collection_name=self.collection_name,
          query_vector=vector,
          limit=1
      )

      if search_result:
          context = search_result[0].payload.get("context", "")
          self.temp_redis.setex(cache_key, self.ONE_DAY, context)
          self.persistent_redis.hset(cache_key, mapping={
              "context": context,
              "timestamp": current_time
          })
          return context

      return "No relevant context found"

  def add_context(self, context: str, vector: list):
      point_id = str(uuid.uuid4())
      self.qdrant_client.upsert(
          collection_name=self.collection_name,
          points=[PointStruct(id=point_id, vector=vector, payload={"context": context})]
      )

The CacheContextManager class can be implemented as an AI tool that is initiated when a LLM call is triggered. With that in place, we can now declare Kubernetes configs that will deploy and scale our Redis instances based on demand.

apiVersion: apps/v1
kind: Deployment
metadata:
name: redis-deployment
namespace: default
spec:
replicas: 1  # Start with 1 replica
selector:
  matchLabels:
    app: redis
template:
  metadata:
    labels:
      app: redis
  spec:
    containers:
    - name: redis
      image: redis:latest
      args: ["--maxmemory", "500mb", "--maxmemory-policy", "allkeys-lru"]
      ports:
      - containerPort: 6379
      resources:
        limits:
          memory: "600Mi"
          cpu: "500m"
        requests:
          memory: "500Mi"
          cpu: "200m"

After deployment or scaling, we must ensure the Redis service is started. With the following yaml config, we can start the Redis service with the default namespace, using the available cluster IP.

apiVersion: v1
kind: Service
metadata:
name: redis-service
namespace: default
spec:
ports:
- port: 6379
  targetPort: 6379
selector:
  app: redis
type: ClusterIP

Conclusion

LLM response generation has come a long way in supercharging relevancy and accuracy, thanks to RAG. However, RAG comes with a performance bottleneck slowing down response generation at the enterprise scale. Caching has always been the go-to solution for the efficient reuse of information, which generally requires resource-intensive operations for generation/querying. Cache-augmented generation is the smart choice for boosting LLM responses with current information.

You may also like: Navigating Common Hurdles in Kubernetes Deployments

Categories: Development
Related Post