diff --git a/src/indexer.py b/src/indexer.py index 32b69a1..4301fb3 100644 --- a/src/indexer.py +++ b/src/indexer.py @@ -5,6 +5,7 @@ Indexer script to parse emails from Maildir and push them to Qdrant. import os import email import mailbox +from concurrent.futures import ThreadPoolExecutor, Future from datetime import datetime from email.utils import parsedate_to_datetime, parseaddr from email.header import decode_header @@ -33,7 +34,8 @@ if not COLLECTION_NAME: raise ValueError("COLLECTION_NAME environment variable is required.") EMBEDDING_MODEL_NAME = os.environ.get("EMBEDDING_MODEL_NAME", "BAAI/bge-small-en-v1.5") -BATCH_SIZE = 50 +BATCH_SIZE = int(os.environ.get("BATCH_SIZE", "100")) +EMBEDDING_BATCH_SIZE = int(os.environ.get("EMBEDDING_BATCH_SIZE", "64")) METADATA_COLLECTION = "mcp_indexer_metadata" INCREMENTAL_DAYS = int(os.environ.get("INCREMENTAL_DAYS", "7")) @@ -207,6 +209,7 @@ def get_recent_keys(mbox: mailbox.Maildir, days: int) -> set: from datetime import timezone, timedelta cutoff = datetime.now(tz=timezone.utc) - timedelta(days=days) + cutoff_ts = cutoff.timestamp() recent = set() maildir_root = mbox._path # type: ignore[attr-defined] @@ -214,15 +217,15 @@ def get_recent_keys(mbox: mailbox.Maildir, days: int) -> set: # Maildir keys are stored under cur/ or new/ — try both for subdir in ("cur", "new"): file_path = os.path.join(maildir_root, subdir, key) - if os.path.isfile(file_path): - try: - mtime = os.path.getmtime(file_path) - mtime_dt = datetime.fromtimestamp(mtime, tz=timezone.utc) - if mtime_dt >= cutoff: - recent.add(key) - except OSError: - pass - break # found the file, no need to check the other subdir + try: + # Single stat() call instead of isfile() + getmtime() + if os.stat(file_path).st_mtime >= cutoff_ts: + recent.add(key) + except FileNotFoundError: + continue # try next subdir + except OSError: + pass + break # found the file (stat succeeded or non-ENOENT error), stop trying subdirs return recent @@ -257,116 +260,138 @@ def main(): mode = "full" print("[MODE] BOOTSTRAP — full scan of all emails (first-time indexing).") - points = [] + # Pipeline: + # parse phase → accumulates (vector_text, payload, point_id) into a batch + # embed phase → model.embed() called once per batch (vectorized ONNX inference) + # upsert phase → submitted to a background thread so the next batch can be + # parsed+embedded while the previous one is in-flight to Qdrant + # + # pending_batch: accumulates parsed email metadata + vector_text until BATCH_SIZE + # pending_future: the in-flight ThreadPoolExecutor Future for the previous upsert + pending_batch: List[Dict[str, Any]] = [] + pending_future: Future | None = None has_error = False + total_processed = 0 - # Iterate and parse over all maildir directories found in MAILDIR_PATH - for root, dirs, files in os.walk(MAILDIR_PATH): - # A valid Maildir has 'cur', 'new', and 'tmp' subdirectories - if not all(subdir in dirs for subdir in ['cur', 'new', 'tmp']): - continue - - print(f"Processing Maildir found at: {root}") - mbox = mailbox.Maildir(root) - total_emails_in_dir = len(mbox) - - if mode == "incremental": - keys_to_process = get_recent_keys(mbox, INCREMENTAL_DAYS) - print( - f"Found {total_emails_in_dir} emails total, " - f"{len(keys_to_process)} modified in the last {INCREMENTAL_DAYS} days." + def _flush_batch(executor: ThreadPoolExecutor, batch: List[Dict[str, Any]]) -> Future: + """Embed a batch of pre-parsed emails and submit an async upsert to Qdrant.""" + vector_texts = [item["vector_text"] for item in batch] + vectors = [v.tolist() for v in model.embed(vector_texts, batch_size=EMBEDDING_BATCH_SIZE)] + points = [ + models.PointStruct( + id=item["point_id"], + vector=vectors[i], + payload=item["payload"], ) - else: - keys_to_process = set(mbox.keys()) - print(f"Found {total_emails_in_dir} emails — indexing all.") - - for idx, key in enumerate(keys_to_process): - try: - msg = mbox[key] - - # Parse headers - subject = decode_mime_words(msg.get("Subject", "No Subject")) - sender_raw = decode_mime_words(msg.get("From", "Unknown")) - receiver_raw = decode_mime_words(msg.get("To", "Unknown")) - sender = normalize_email_address(sender_raw) - receiver = normalize_email_address(receiver_raw) - message_id = msg.get("Message-ID", str(uuid.uuid4())) - - # Parse date - date_str = msg.get("Date") - dt_obj = None - if date_str: - try: - dt_obj = parsedate_to_datetime(date_str) - except Exception: - pass - - if dt_obj is None: - dt_obj = datetime.now() - - # Format to ISO 8601 for Qdrant DATETIME index - iso_date = dt_obj.isoformat() - - # Parse Body and Attachments - body_text, attachments = parse_email_message(msg) - - # Prepare Vector text - attachments_str = ", ".join(attachments) if attachments else "None" - vector_text = ( - f"Date: {iso_date}\n" - f"From: {sender}\n" - f"To: {receiver}\n" - f"Subject: {subject}\n\n" - f"{body_text}\n\n" - f"Attachments: {attachments_str}" - ) - - # Embed the text — fastembed returns an iterable of numpy arrays - embeddings = list(model.embed([vector_text])) - vector = embeddings[0].tolist() - - # Prepare payload (metadata) - payload = { - "message_id": message_id, - "date": iso_date, - "sender": sender, - "sender_raw": sender_raw, - "receiver": receiver, - "receiver_raw": receiver_raw, - "subject": subject, - "body_text": body_text, - "attachments": attachments, - } - - # Assign deterministic UUID point ID based on message_id - point_id = str(uuid.uuid5(uuid.NAMESPACE_OID, message_id)) - - points.append(models.PointStruct( - id=point_id, - vector=vector, - payload=payload, - )) - - # Push in batches - if len(points) >= BATCH_SIZE: - qdrant_client.upsert( - collection_name=COLLECTION_NAME, - points=points, - ) - print(f" Upserted batch — {idx + 1}/{len(keys_to_process)} emails processed in current directory.") - points = [] - - except Exception as e: - print(f"Error processing email key={key}: {e}") - has_error = True - - # Push remaining points - if points: - qdrant_client.upsert( + for i, item in enumerate(batch) + ] + return executor.submit( + qdrant_client.upsert, collection_name=COLLECTION_NAME, points=points, ) + with ThreadPoolExecutor(max_workers=1) as executor: + # Iterate and parse over all maildir directories found in MAILDIR_PATH + for root, dirs, files in os.walk(MAILDIR_PATH): + # A valid Maildir has 'cur', 'new', and 'tmp' subdirectories + if not all(subdir in dirs for subdir in ['cur', 'new', 'tmp']): + continue + + print(f"Processing Maildir found at: {root}") + mbox = mailbox.Maildir(root) + total_emails_in_dir = len(mbox) + + if mode == "incremental": + keys_to_process = get_recent_keys(mbox, INCREMENTAL_DAYS) + print( + f"Found {total_emails_in_dir} emails total, " + f"{len(keys_to_process)} modified in the last {INCREMENTAL_DAYS} days." + ) + else: + keys_to_process = set(mbox.keys()) + print(f"Found {total_emails_in_dir} emails — indexing all.") + + for key in keys_to_process: + try: + msg = mbox[key] + + # Parse headers + subject = decode_mime_words(msg.get("Subject", "No Subject")) + sender_raw = decode_mime_words(msg.get("From", "Unknown")) + receiver_raw = decode_mime_words(msg.get("To", "Unknown")) + sender = normalize_email_address(sender_raw) + receiver = normalize_email_address(receiver_raw) + message_id = msg.get("Message-ID", str(uuid.uuid4())) + + # Parse date + date_str = msg.get("Date") + dt_obj = None + if date_str: + try: + dt_obj = parsedate_to_datetime(date_str) + except Exception: + pass + if dt_obj is None: + dt_obj = datetime.now() + iso_date = dt_obj.isoformat() + + # Parse body and attachments + body_text, attachments = parse_email_message(msg) + + attachments_str = ", ".join(attachments) if attachments else "None" + vector_text = ( + f"Date: {iso_date}\n" + f"From: {sender}\n" + f"To: {receiver}\n" + f"Subject: {subject}\n\n" + f"{body_text}\n\n" + f"Attachments: {attachments_str}" + ) + + pending_batch.append({ + "vector_text": vector_text, + "point_id": str(uuid.uuid5(uuid.NAMESPACE_OID, message_id)), + "payload": { + "message_id": message_id, + "date": iso_date, + "sender": sender, + "sender_raw": sender_raw, + "receiver": receiver, + "receiver_raw": receiver_raw, + "subject": subject, + "body_text": body_text, + "attachments": attachments, + }, + }) + + if len(pending_batch) >= BATCH_SIZE: + # Wait for the previous upsert to complete before submitting the next + if pending_future is not None: + pending_future.result() + pending_future = _flush_batch(executor, pending_batch) + total_processed += len(pending_batch) + print(f" Embedded+upserted batch — {total_processed} emails total so far.") + pending_batch = [] + + except Exception as e: + print(f"Error processing email key={key}: {e}") + has_error = True + + # Flush remaining emails + if pending_batch: + if pending_future is not None: + pending_future.result() + pending_future = _flush_batch(executor, pending_batch) + total_processed += len(pending_batch) + pending_batch = [] + + # Wait for the last upsert to finish before exiting the executor context + if pending_future is not None: + pending_future.result() + + print(f" Total emails indexed: {total_processed}") + # Record bootstrap completion so subsequent runs use incremental mode if mode == "full" and not has_error: mark_bootstrap_done(qdrant_client)