fix: metadata
perso/mcp-maildir/pipeline/head Build queued...

This commit is contained in:
Julien Cabillot
2026-03-25 17:27:33 -04:00
parent 1b1ba0ef1a
commit 9cf58596c9
+37 -9
View File
@@ -5,6 +5,7 @@ Indexer script to parse emails from Maildir and push them to Qdrant.
import os import os
import email import email
import mailbox import mailbox
import warnings
from concurrent.futures import ThreadPoolExecutor, Future from concurrent.futures import ThreadPoolExecutor, Future
from datetime import datetime from datetime import datetime
from email.utils import parsedate_to_datetime, parseaddr from email.utils import parsedate_to_datetime, parseaddr
@@ -39,6 +40,7 @@ BATCH_SIZE = int(os.environ.get("BATCH_SIZE", "100"))
EMBEDDING_BATCH_SIZE = int(os.environ.get("EMBEDDING_BATCH_SIZE", "64")) EMBEDDING_BATCH_SIZE = int(os.environ.get("EMBEDDING_BATCH_SIZE", "64"))
METADATA_COLLECTION = "mcp_indexer_metadata" METADATA_COLLECTION = "mcp_indexer_metadata"
INCREMENTAL_DAYS = int(os.environ.get("INCREMENTAL_DAYS", "7")) INCREMENTAL_DAYS = int(os.environ.get("INCREMENTAL_DAYS", "7"))
FORCE_REINDEX = os.environ.get("FORCE_REINDEX", "").lower() in ("1", "true", "yes")
def decode_mime_words(s: str) -> str: def decode_mime_words(s: str) -> str:
"""Decodes MIME encoded strings (e.g. subjects, filenames).""" """Decodes MIME encoded strings (e.g. subjects, filenames)."""
@@ -72,7 +74,7 @@ def normalize_email_address(value: str) -> str:
return (addr or value).strip().lower() return (addr or value).strip().lower()
def parse_email_message(msg: mailbox.Message) -> Tuple[str, List[str]]: def parse_email_message(msg: mailbox.Message, email_id: str = "") -> Tuple[str, List[str]]:
"""Extracts plain text body and a list of attachment filenames.""" """Extracts plain text body and a list of attachment filenames."""
body_parts = [] body_parts = []
attachments = [] attachments = []
@@ -99,16 +101,24 @@ def parse_email_message(msg: mailbox.Message) -> Tuple[str, List[str]]:
if payload: if payload:
charset = part.get_content_charset('utf-8') or 'utf-8' charset = part.get_content_charset('utf-8') or 'utf-8'
if isinstance(payload, bytes): if isinstance(payload, bytes):
text = payload.decode(charset, errors='replace') try:
text = payload.decode(charset, errors='replace')
except (LookupError, UnicodeDecodeError):
# Unknown or broken charset — fall back to utf-8
print(f" Warning: unknown charset '{charset}', falling back to utf-8 [{email_id}]")
text = payload.decode('utf-8', errors='replace')
else: else:
text = str(payload) text = str(payload)
if content_type == "text/html": if content_type == "text/html":
text = extract_text_from_html(text) with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always")
text = extract_text_from_html(text)
for w in caught:
print(f" Warning: {w.category.__name__}: {w.message} [{email_id}]")
body_parts.append(text) body_parts.append(text)
except Exception as e: except Exception as e:
print(f"Error extracting payload: {e}") print(f" Warning: error extracting payload: {e} [{email_id}]")
pass
return "\n".join(body_parts).strip(), attachments return "\n".join(body_parts).strip(), attachments
@@ -255,6 +265,17 @@ def main():
# Initialize Qdrant # Initialize Qdrant
print("Connecting to Qdrant...") print("Connecting to Qdrant...")
qdrant_client = QdrantClient(url=QDRANT_URL) qdrant_client = QdrantClient(url=QDRANT_URL)
# Force reindex: wipe existing collections to start from scratch
if FORCE_REINDEX:
print("[FORCE_REINDEX] Deleting existing collections for a clean re-bootstrap...")
for col_name in (COLLECTION_NAME, METADATA_COLLECTION):
try:
qdrant_client.delete_collection(collection_name=col_name)
print(f" Deleted collection '{col_name}'.")
except Exception:
print(f" Collection '{col_name}' did not exist, skipping.")
init_qdrant_collection(qdrant_client, vector_size) init_qdrant_collection(qdrant_client, vector_size)
init_metadata_collection(qdrant_client) init_metadata_collection(qdrant_client)
@@ -343,10 +364,15 @@ def main():
receiver_raw = decode_mime_words(msg.get("To", "Unknown")) receiver_raw = decode_mime_words(msg.get("To", "Unknown"))
sender = normalize_email_address(sender_raw) sender = normalize_email_address(sender_raw)
receiver = normalize_email_address(receiver_raw) receiver = normalize_email_address(receiver_raw)
message_id = msg.get("Message-ID", str(uuid.uuid4())) message_id_raw = msg.get("Message-ID")
message_id = str(message_id_raw) if message_id_raw is not None else str(uuid.uuid4())
# Parse date # Parse date — msg.get() may return an email.header.Header
date_str = msg.get("Date") # object instead of str when the header contains non-ASCII
# bytes (e.g. timezone comments like "heure d'été").
# We must coerce to str before parsing.
date_raw = msg.get("Date")
date_str = str(date_raw) if date_raw is not None else None
dt_obj = None dt_obj = None
if date_str: if date_str:
try: try:
@@ -354,11 +380,13 @@ def main():
except Exception: except Exception:
pass pass
if dt_obj is None: if dt_obj is None:
# Fallback: warn and use current time
print(f" Warning: could not parse Date header: {repr(date_raw)} [key={key}, subject={subject}]")
dt_obj = datetime.now() dt_obj = datetime.now()
iso_date = dt_obj.isoformat() iso_date = dt_obj.isoformat()
# Parse body and attachments # Parse body and attachments
body_text, attachments = parse_email_message(msg) body_text, attachments = parse_email_message(msg, email_id=f"key={key}, subject={subject}")
attachments_str = ", ".join(attachments) if attachments else "None" attachments_str = ", ".join(attachments) if attachments else "None"
vector_text = ( vector_text = (