From 52fcba17e5f1a9025e7b6023064f5a7248234fc8 Mon Sep 17 00:00:00 2001 From: Julien Cabillot Date: Mon, 16 Mar 2026 16:34:55 -0400 Subject: [PATCH] feat: add incremental --- src/indexer.py | 283 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 196 insertions(+), 87 deletions(-) diff --git a/src/indexer.py b/src/indexer.py index c054381..32b69a1 100644 --- a/src/indexer.py +++ b/src/indexer.py @@ -34,6 +34,8 @@ if not COLLECTION_NAME: EMBEDDING_MODEL_NAME = os.environ.get("EMBEDDING_MODEL_NAME", "BAAI/bge-small-en-v1.5") BATCH_SIZE = 50 +METADATA_COLLECTION = "mcp_indexer_metadata" +INCREMENTAL_DAYS = int(os.environ.get("INCREMENTAL_DAYS", "7")) def decode_mime_words(s: str) -> str: """Decodes MIME encoded strings (e.g. subjects, filenames).""" @@ -145,13 +147,93 @@ def init_qdrant_collection(client: QdrantClient, vector_size: int): field_schema=models.PayloadSchemaType.KEYWORD, ) +def init_metadata_collection(client: QdrantClient): + """Ensures the indexer metadata collection exists in Qdrant.""" + collections = client.get_collections().collections + if not any(c.name == METADATA_COLLECTION for c in collections): + print(f"Creating metadata collection '{METADATA_COLLECTION}'...") + client.create_collection( + collection_name=METADATA_COLLECTION, + # Minimal vector (size=1) — we only use this collection for payload storage + vectors_config=models.VectorParams(size=1, distance=models.Distance.COSINE), + ) + + +def is_bootstrap_done(client: QdrantClient) -> bool: + """Returns True if a successful full bootstrap has already been recorded.""" + try: + results, _ = client.scroll( + collection_name=METADATA_COLLECTION, + scroll_filter=models.Filter( + must=[ + models.FieldCondition( + key="event", + match=models.MatchValue(value="bootstrap_complete"), + ) + ] + ), + limit=1, + ) + return len(results) > 0 + except Exception as e: + print(f"Warning: could not check bootstrap state: {e}") + return False + + +def mark_bootstrap_done(client: QdrantClient): + """Records a bootstrap_complete event in the metadata collection.""" + point_id = str(uuid.uuid5(uuid.NAMESPACE_OID, "bootstrap_complete")) + client.upsert( + collection_name=METADATA_COLLECTION, + points=[ + models.PointStruct( + id=point_id, + vector=[0.0], # placeholder — collection is payload-only + payload={ + "event": "bootstrap_complete", + "timestamp": datetime.now().isoformat(), + }, + ) + ], + ) + print("Bootstrap state recorded in Qdrant metadata collection.") + + +def get_recent_keys(mbox: mailbox.Maildir, days: int) -> set: + """ + Returns the set of Maildir keys whose backing file has been modified + within the last `days` days (based on filesystem mtime). + """ + from datetime import timezone, timedelta + + cutoff = datetime.now(tz=timezone.utc) - timedelta(days=days) + recent = set() + maildir_root = mbox._path # type: ignore[attr-defined] + + for key in mbox.keys(): + # Maildir keys are stored under cur/ or new/ — try both + for subdir in ("cur", "new"): + file_path = os.path.join(maildir_root, subdir, key) + if os.path.isfile(file_path): + try: + mtime = os.path.getmtime(file_path) + mtime_dt = datetime.fromtimestamp(mtime, tz=timezone.utc) + if mtime_dt >= cutoff: + recent.add(key) + except OSError: + pass + break # found the file, no need to check the other subdir + + return recent + + def main(): """ Main ingestion function. Reads Maildir, extracts text, generates local embeddings, and pushes to Qdrant. """ print(f"Indexing emails from {MAILDIR_PATH} into {QDRANT_URL}...") - + if not os.path.exists(MAILDIR_PATH): print(f"Error: Maildir path not found: {MAILDIR_PATH}") return @@ -159,109 +241,136 @@ def main(): # Initialize model print(f"Loading embedding model: {EMBEDDING_MODEL_NAME}...") model = TextEmbedding(model_name=EMBEDDING_MODEL_NAME) - vector_size = len(next(iter(model.embed(["dimension_probe"])) )) - + vector_size = len(next(iter(model.embed(["dimension_probe"])))) + # Initialize Qdrant print("Connecting to Qdrant...") qdrant_client = QdrantClient(url=QDRANT_URL) init_qdrant_collection(qdrant_client, vector_size) + init_metadata_collection(qdrant_client) + + # Determine indexing mode: full bootstrap or incremental update + if is_bootstrap_done(qdrant_client): + mode = "incremental" + print(f"[MODE] INCREMENTAL — scanning only files modified in the last {INCREMENTAL_DAYS} days.") + else: + mode = "full" + print("[MODE] BOOTSTRAP — full scan of all emails (first-time indexing).") points = [] + has_error = False # Iterate and parse over all maildir directories found in MAILDIR_PATH for root, dirs, files in os.walk(MAILDIR_PATH): # A valid Maildir has 'cur', 'new', and 'tmp' subdirectories - if all(subdir in dirs for subdir in ['cur', 'new', 'tmp']): - print(f"Processing Maildir found at: {root}") - mbox = mailbox.Maildir(root) - total_emails_in_dir = len(mbox) - print(f"Found {total_emails_in_dir} emails in this directory.") - - for idx, (key, msg) in enumerate(mbox.items()): - try: - # Parse headers - subject = decode_mime_words(msg.get("Subject", "No Subject")) - sender_raw = decode_mime_words(msg.get("From", "Unknown")) - receiver_raw = decode_mime_words(msg.get("To", "Unknown")) - sender = normalize_email_address(sender_raw) - receiver = normalize_email_address(receiver_raw) - message_id = msg.get("Message-ID", str(uuid.uuid4())) - - # Parse date - date_str = msg.get("Date") - dt_obj = None - if date_str: - try: - dt_obj = parsedate_to_datetime(date_str) - except Exception: - pass - - if dt_obj is None: - dt_obj = datetime.now() - - # Format to ISO 8601 for Qdrant DATETIME index - iso_date = dt_obj.isoformat() - - # Parse Body and Attachments - body_text, attachments = parse_email_message(msg) - - # Prepare Vector text - attachments_str = ", ".join(attachments) if attachments else "None" - vector_text = ( - f"Date: {iso_date}\n" - f"From: {sender}\n" - f"To: {receiver}\n" - f"Subject: {subject}\n\n" - f"{body_text}\n\n" - f"Attachments: {attachments_str}" + if not all(subdir in dirs for subdir in ['cur', 'new', 'tmp']): + continue + + print(f"Processing Maildir found at: {root}") + mbox = mailbox.Maildir(root) + total_emails_in_dir = len(mbox) + + if mode == "incremental": + keys_to_process = get_recent_keys(mbox, INCREMENTAL_DAYS) + print( + f"Found {total_emails_in_dir} emails total, " + f"{len(keys_to_process)} modified in the last {INCREMENTAL_DAYS} days." + ) + else: + keys_to_process = set(mbox.keys()) + print(f"Found {total_emails_in_dir} emails — indexing all.") + + for idx, key in enumerate(keys_to_process): + try: + msg = mbox[key] + + # Parse headers + subject = decode_mime_words(msg.get("Subject", "No Subject")) + sender_raw = decode_mime_words(msg.get("From", "Unknown")) + receiver_raw = decode_mime_words(msg.get("To", "Unknown")) + sender = normalize_email_address(sender_raw) + receiver = normalize_email_address(receiver_raw) + message_id = msg.get("Message-ID", str(uuid.uuid4())) + + # Parse date + date_str = msg.get("Date") + dt_obj = None + if date_str: + try: + dt_obj = parsedate_to_datetime(date_str) + except Exception: + pass + + if dt_obj is None: + dt_obj = datetime.now() + + # Format to ISO 8601 for Qdrant DATETIME index + iso_date = dt_obj.isoformat() + + # Parse Body and Attachments + body_text, attachments = parse_email_message(msg) + + # Prepare Vector text + attachments_str = ", ".join(attachments) if attachments else "None" + vector_text = ( + f"Date: {iso_date}\n" + f"From: {sender}\n" + f"To: {receiver}\n" + f"Subject: {subject}\n\n" + f"{body_text}\n\n" + f"Attachments: {attachments_str}" + ) + + # Embed the text — fastembed returns an iterable of numpy arrays + embeddings = list(model.embed([vector_text])) + vector = embeddings[0].tolist() + + # Prepare payload (metadata) + payload = { + "message_id": message_id, + "date": iso_date, + "sender": sender, + "sender_raw": sender_raw, + "receiver": receiver, + "receiver_raw": receiver_raw, + "subject": subject, + "body_text": body_text, + "attachments": attachments, + } + + # Assign deterministic UUID point ID based on message_id + point_id = str(uuid.uuid5(uuid.NAMESPACE_OID, message_id)) + + points.append(models.PointStruct( + id=point_id, + vector=vector, + payload=payload, + )) + + # Push in batches + if len(points) >= BATCH_SIZE: + qdrant_client.upsert( + collection_name=COLLECTION_NAME, + points=points, ) - - # Embed the text - # Fastembed returns an iterable of numpy arrays - embeddings = list(model.embed([vector_text])) - vector = embeddings[0].tolist() - - # Prepare payload (metadata) - payload = { - "message_id": message_id, - "date": iso_date, - "sender": sender, - "sender_raw": sender_raw, - "receiver": receiver, - "receiver_raw": receiver_raw, - "subject": subject, - "body_text": body_text, - "attachments": attachments - } - - # Assign deterministic UUID point ID based on message_id - point_id = str(uuid.uuid5(uuid.NAMESPACE_OID, message_id)) - - points.append(models.PointStruct( - id=point_id, - vector=vector, - payload=payload - )) - - # Push in batches - if len(points) >= BATCH_SIZE: - qdrant_client.upsert( - collection_name=COLLECTION_NAME, - points=points - ) - print(f"Processed {idx + 1}/{total_emails_in_dir} emails in current directory...") - points = [] - - except Exception as e: - print(f"Error processing email key={key}: {e}") + print(f" Upserted batch — {idx + 1}/{len(keys_to_process)} emails processed in current directory.") + points = [] + + except Exception as e: + print(f"Error processing email key={key}: {e}") + has_error = True # Push remaining points if points: qdrant_client.upsert( collection_name=COLLECTION_NAME, - points=points + points=points, ) - + + # Record bootstrap completion so subsequent runs use incremental mode + if mode == "full" and not has_error: + mark_bootstrap_done(qdrant_client) + print("Indexing completed successfully!") if __name__ == "__main__":