feat: add batching
All checks were successful
perso/mcp-maildir/pipeline/head This commit looks good

This commit is contained in:
Julien Cabillot
2026-03-16 21:06:26 -04:00
parent 52fcba17e5
commit eeae628a1a

View File

@@ -5,6 +5,7 @@ Indexer script to parse emails from Maildir and push them to Qdrant.
import os
import email
import mailbox
from concurrent.futures import ThreadPoolExecutor, Future
from datetime import datetime
from email.utils import parsedate_to_datetime, parseaddr
from email.header import decode_header
@@ -33,7 +34,8 @@ if not COLLECTION_NAME:
raise ValueError("COLLECTION_NAME environment variable is required.")
EMBEDDING_MODEL_NAME = os.environ.get("EMBEDDING_MODEL_NAME", "BAAI/bge-small-en-v1.5")
BATCH_SIZE = 50
BATCH_SIZE = int(os.environ.get("BATCH_SIZE", "100"))
EMBEDDING_BATCH_SIZE = int(os.environ.get("EMBEDDING_BATCH_SIZE", "64"))
METADATA_COLLECTION = "mcp_indexer_metadata"
INCREMENTAL_DAYS = int(os.environ.get("INCREMENTAL_DAYS", "7"))
@@ -207,6 +209,7 @@ def get_recent_keys(mbox: mailbox.Maildir, days: int) -> set:
from datetime import timezone, timedelta
cutoff = datetime.now(tz=timezone.utc) - timedelta(days=days)
cutoff_ts = cutoff.timestamp()
recent = set()
maildir_root = mbox._path # type: ignore[attr-defined]
@@ -214,15 +217,15 @@ def get_recent_keys(mbox: mailbox.Maildir, days: int) -> set:
# Maildir keys are stored under cur/ or new/ — try both
for subdir in ("cur", "new"):
file_path = os.path.join(maildir_root, subdir, key)
if os.path.isfile(file_path):
try:
mtime = os.path.getmtime(file_path)
mtime_dt = datetime.fromtimestamp(mtime, tz=timezone.utc)
if mtime_dt >= cutoff:
recent.add(key)
except OSError:
pass
break # found the file, no need to check the other subdir
try:
# Single stat() call instead of isfile() + getmtime()
if os.stat(file_path).st_mtime >= cutoff_ts:
recent.add(key)
except FileNotFoundError:
continue # try next subdir
except OSError:
pass
break # found the file (stat succeeded or non-ENOENT error), stop trying subdirs
return recent
@@ -257,116 +260,138 @@ def main():
mode = "full"
print("[MODE] BOOTSTRAP — full scan of all emails (first-time indexing).")
points = []
# Pipeline:
# parse phase → accumulates (vector_text, payload, point_id) into a batch
# embed phase → model.embed() called once per batch (vectorized ONNX inference)
# upsert phase → submitted to a background thread so the next batch can be
# parsed+embedded while the previous one is in-flight to Qdrant
#
# pending_batch: accumulates parsed email metadata + vector_text until BATCH_SIZE
# pending_future: the in-flight ThreadPoolExecutor Future for the previous upsert
pending_batch: List[Dict[str, Any]] = []
pending_future: Future | None = None
has_error = False
total_processed = 0
# Iterate and parse over all maildir directories found in MAILDIR_PATH
for root, dirs, files in os.walk(MAILDIR_PATH):
# A valid Maildir has 'cur', 'new', and 'tmp' subdirectories
if not all(subdir in dirs for subdir in ['cur', 'new', 'tmp']):
continue
print(f"Processing Maildir found at: {root}")
mbox = mailbox.Maildir(root)
total_emails_in_dir = len(mbox)
if mode == "incremental":
keys_to_process = get_recent_keys(mbox, INCREMENTAL_DAYS)
print(
f"Found {total_emails_in_dir} emails total, "
f"{len(keys_to_process)} modified in the last {INCREMENTAL_DAYS} days."
def _flush_batch(executor: ThreadPoolExecutor, batch: List[Dict[str, Any]]) -> Future:
"""Embed a batch of pre-parsed emails and submit an async upsert to Qdrant."""
vector_texts = [item["vector_text"] for item in batch]
vectors = [v.tolist() for v in model.embed(vector_texts, batch_size=EMBEDDING_BATCH_SIZE)]
points = [
models.PointStruct(
id=item["point_id"],
vector=vectors[i],
payload=item["payload"],
)
else:
keys_to_process = set(mbox.keys())
print(f"Found {total_emails_in_dir} emails — indexing all.")
for idx, key in enumerate(keys_to_process):
try:
msg = mbox[key]
# Parse headers
subject = decode_mime_words(msg.get("Subject", "No Subject"))
sender_raw = decode_mime_words(msg.get("From", "Unknown"))
receiver_raw = decode_mime_words(msg.get("To", "Unknown"))
sender = normalize_email_address(sender_raw)
receiver = normalize_email_address(receiver_raw)
message_id = msg.get("Message-ID", str(uuid.uuid4()))
# Parse date
date_str = msg.get("Date")
dt_obj = None
if date_str:
try:
dt_obj = parsedate_to_datetime(date_str)
except Exception:
pass
if dt_obj is None:
dt_obj = datetime.now()
# Format to ISO 8601 for Qdrant DATETIME index
iso_date = dt_obj.isoformat()
# Parse Body and Attachments
body_text, attachments = parse_email_message(msg)
# Prepare Vector text
attachments_str = ", ".join(attachments) if attachments else "None"
vector_text = (
f"Date: {iso_date}\n"
f"From: {sender}\n"
f"To: {receiver}\n"
f"Subject: {subject}\n\n"
f"{body_text}\n\n"
f"Attachments: {attachments_str}"
)
# Embed the text — fastembed returns an iterable of numpy arrays
embeddings = list(model.embed([vector_text]))
vector = embeddings[0].tolist()
# Prepare payload (metadata)
payload = {
"message_id": message_id,
"date": iso_date,
"sender": sender,
"sender_raw": sender_raw,
"receiver": receiver,
"receiver_raw": receiver_raw,
"subject": subject,
"body_text": body_text,
"attachments": attachments,
}
# Assign deterministic UUID point ID based on message_id
point_id = str(uuid.uuid5(uuid.NAMESPACE_OID, message_id))
points.append(models.PointStruct(
id=point_id,
vector=vector,
payload=payload,
))
# Push in batches
if len(points) >= BATCH_SIZE:
qdrant_client.upsert(
collection_name=COLLECTION_NAME,
points=points,
)
print(f" Upserted batch — {idx + 1}/{len(keys_to_process)} emails processed in current directory.")
points = []
except Exception as e:
print(f"Error processing email key={key}: {e}")
has_error = True
# Push remaining points
if points:
qdrant_client.upsert(
for i, item in enumerate(batch)
]
return executor.submit(
qdrant_client.upsert,
collection_name=COLLECTION_NAME,
points=points,
)
with ThreadPoolExecutor(max_workers=1) as executor:
# Iterate and parse over all maildir directories found in MAILDIR_PATH
for root, dirs, files in os.walk(MAILDIR_PATH):
# A valid Maildir has 'cur', 'new', and 'tmp' subdirectories
if not all(subdir in dirs for subdir in ['cur', 'new', 'tmp']):
continue
print(f"Processing Maildir found at: {root}")
mbox = mailbox.Maildir(root)
total_emails_in_dir = len(mbox)
if mode == "incremental":
keys_to_process = get_recent_keys(mbox, INCREMENTAL_DAYS)
print(
f"Found {total_emails_in_dir} emails total, "
f"{len(keys_to_process)} modified in the last {INCREMENTAL_DAYS} days."
)
else:
keys_to_process = set(mbox.keys())
print(f"Found {total_emails_in_dir} emails — indexing all.")
for key in keys_to_process:
try:
msg = mbox[key]
# Parse headers
subject = decode_mime_words(msg.get("Subject", "No Subject"))
sender_raw = decode_mime_words(msg.get("From", "Unknown"))
receiver_raw = decode_mime_words(msg.get("To", "Unknown"))
sender = normalize_email_address(sender_raw)
receiver = normalize_email_address(receiver_raw)
message_id = msg.get("Message-ID", str(uuid.uuid4()))
# Parse date
date_str = msg.get("Date")
dt_obj = None
if date_str:
try:
dt_obj = parsedate_to_datetime(date_str)
except Exception:
pass
if dt_obj is None:
dt_obj = datetime.now()
iso_date = dt_obj.isoformat()
# Parse body and attachments
body_text, attachments = parse_email_message(msg)
attachments_str = ", ".join(attachments) if attachments else "None"
vector_text = (
f"Date: {iso_date}\n"
f"From: {sender}\n"
f"To: {receiver}\n"
f"Subject: {subject}\n\n"
f"{body_text}\n\n"
f"Attachments: {attachments_str}"
)
pending_batch.append({
"vector_text": vector_text,
"point_id": str(uuid.uuid5(uuid.NAMESPACE_OID, message_id)),
"payload": {
"message_id": message_id,
"date": iso_date,
"sender": sender,
"sender_raw": sender_raw,
"receiver": receiver,
"receiver_raw": receiver_raw,
"subject": subject,
"body_text": body_text,
"attachments": attachments,
},
})
if len(pending_batch) >= BATCH_SIZE:
# Wait for the previous upsert to complete before submitting the next
if pending_future is not None:
pending_future.result()
pending_future = _flush_batch(executor, pending_batch)
total_processed += len(pending_batch)
print(f" Embedded+upserted batch — {total_processed} emails total so far.")
pending_batch = []
except Exception as e:
print(f"Error processing email key={key}: {e}")
has_error = True
# Flush remaining emails
if pending_batch:
if pending_future is not None:
pending_future.result()
pending_future = _flush_batch(executor, pending_batch)
total_processed += len(pending_batch)
pending_batch = []
# Wait for the last upsert to finish before exiting the executor context
if pending_future is not None:
pending_future.result()
print(f" Total emails indexed: {total_processed}")
# Record bootstrap completion so subsequent runs use incremental mode
if mode == "full" and not has_error:
mark_bootstrap_done(qdrant_client)