LogoKalli
Chatbot

Retrieval Augmented Generation (RAG)

RAG (Retrieval Augmented Generation)

nAI'vi utilise le RAG pour répondre aux questions en s'appuyant sur des chunks et exemples Q&A stockés en PostgreSQL. Les index FAISS sont persistés en base par rôle et rechargés en mémoire à la demande.

Modèles

UsageModèleVariable d'env
Embeddingsmistral-embedRAG_EMBEDDING_MODEL_ID
Génération (chat)mistral-medium-latestRAG_QUERY_MODEL_ID

Paramètres RAG

Tous les paramètres sont configurables via variables d'environnement (voir config/rag_config.py):

ParamètreVariable d'envDefaultRangeDescription
kRAG_K_CHUNKS51–20Nombre de chunks injectés dans le prompt
nRAG_N_EXAMPLES40–10Nombre d'exemples Q&A few-shot
temperatureRAG_TEMP0.2Température LLM pour la génération
batch_sizeRAG_BATCH_SIZE64Taille des lots pour le calcul d'embeddings
few_shots_enabledFEW_SHOTS_ENABLEDtrueActiver/désactiver les exemples few-shot

Tables utilisées

TableUsage
chunksChunks category = 'dynamic' → inclus dans le RAG
qa_paircategory = 'static' → few-shot examples; category = 'dynamic' → inclus dans le RAG
indexesIndex FAISS sérialisés (bytea), un par rôle
index_chunksMapping index ↔ chunk_id (chunks + qa_pairs)
rolesRôles applicatifs — détermine quel index charger
role_tagsTags associés à un rôle — filtre les chunks inclus dans l'index
item_tagsTags attachés aux chunks/qa_pairs

Index FAISS per-role

Chaque rôle applicatif dispose d'un index FAISS dédié. Les chunks inclus dans cet index sont ceux dont les tags sont accessibles pour le rôle (via la hiérarchie role_tags).

Génération d'un index (regenerate_index)

apps/chatbot/src/chatbot/rag/indexing.pyregenerate_index(client, db_conn):

  1. Récupère tous les rôles depuis la table roles.
  2. Pour chaque rôle: a. Identifie les tag_ids accessibles (hiérarchie display_order). b. Récupère les chunks/qa_pairs avec ces tags (+ embeddings si existants). c. Calcule les embeddings manquants par lots (BATCH_SIZE=64) via Mistral mistral-embed. d. Sauvegarde les nouveaux embeddings en DB (save_chunk_embedding_to_db). e. Construit un index IndexFlatL2 FAISS. f. Sérialise l'index (pickle) et le stocke dans la table indexes (marqué is_active=true). g. Enregistre les chunk_id dans index_chunks.
  3. Retourne { "success": True, "message": "..." }.

Déclenché par: POST /regenerate-index sur le service Index API, appelé via la procédure ORPC regenerateIndex (admin).

Chargement d'un index (init_rag)

apps/chatbot/src/chatbot/rag/services.pyinit_rag(client, db_conn, role_id):

  1. Vérifie le cache mémoire ROLE_INDEX_CACHE[role_id].
  2. Si cache présent: compare db_index_id avec le dernier index actif en DB (get_latest_index_from_db).
    • Si plus récent: recharge depuis DB (invalidation du cache).
    • Sinon: retourne le cache (cache hit).
  3. Si pas de cache (cold start): charge depuis DB, peuple le cache.
  4. Fallback sur l'index global legacy si aucun index per-role n'existe.

Fonctions clés

prepare_rag_context

apps/chatbot/src/chatbot/rag/services.py

async def prepare_rag_context(
    client: Mistral,
    db_conn: psycopg2.extensions.connection,
    question: str,
    k: int = 5,
    n: int = 2,
    user_id: Optional[str] = None
) -> tuple[str, str, list] | tuple[None, None, None]:

Args:

  • client — client Mistral initialisé
  • db_conn — connexion PostgreSQL
  • question — question de l'utilisateur
  • k — nombre de chunks à récupérer (default: DEFAULT_K_CHUNKS)
  • n — nombre d'exemples few-shot (default: DEFAULT_N_EXAMPLES)
  • user_id — identifiant utilisateur (pour résolution du rôle)

Returns: (context_text, few_shots_text, sources) ou (None, None, None) en cas d'erreur.

  • context_text — chunks concaténés séparés par \n---------------------\n
  • few_shots_text — exemples Q&A formatés "question : answer\n"
  • sources — liste [{chunk_id, file_id, file_name, table_source}]

answer_question

apps/chatbot/src/chatbot/rag/services.py

async def answer_question(
    client: Mistral,
    db_conn: psycopg2.extensions.connection,
    question: str,
    model_id: str = QUERY_MODEL_ID,
    k: int = 5,
    n: int = 2,
    conversation_history: Optional[List[Dict[str, str]]] = None,
    user_id: Optional[str] = None
) -> AsyncGenerator[str, None]:

Yields: tokens de la réponse Mistral en streaming, puis __SOURCES__:{json}.

run_query_mistral

async def run_query_mistral(
    client: Mistral,
    prompt: str,
    model_id: str,
    conversation_history: Optional[List[Dict[str, str]]] = None
) -> AsyncGenerator[str, None]:

Construit la liste de messages (historique + prompt courant) et appelle client.chat.stream_async() avec temperature=DEFAULT_RAG_TEMP. Yields les delta de contenu.

create_prompt

def create_prompt(
    question: str,
    retrieved_chunks: List[str],
    retrieved_examples: List[Dict[str, Any]]
) -> str:

Formate le template BOT_SYSTEM_PROMPT (env var) avec:

  • {context} — chunks séparés par \n---------------------\n
  • {few_shots_examples} — exemples Q&A formatés
  • {question} — question de l'utilisateur

get_text_embedding

apps/chatbot/src/chatbot/rag/indexing.py

def get_text_embedding(
    client: Mistral,
    input_chunks: List[str]
) -> List[List[float]]:

Appelle client.embeddings.create(model="mistral-embed", inputs=input_chunks). Gère automatiquement la limite de tokens via splitting récursif des batches. Retourne une liste d'embeddings (vecteurs float).

Flux complet per-question

question + user_id
  → get_user_role(db_conn, user_id) → role_id
  → init_rag(client, db_conn, role_id)
      → ROLE_INDEX_CACHE[role_id] ou chargement depuis DB
  → get_text_embedding(client, [question]) → q_vector (1024-dim)
  → index.search(q_vector, k_search=min(len(chunks), k))
      → indices filtrés (idx != -1, chunk non null)
  → retrieved_chunks[0..k-1], sources[0..k-1]
  → get_few_shots_examples(db_conn, limit=n) → exemples statiques aléatoires
  → create_prompt(question, chunks, examples) → prompt formaté
  → run_query_mistral(client, prompt, QUERY_MODEL_ID, history) → stream tokens
  → yield tokens
  → yield "__SOURCES__:{json}"