feat: add incremental
All checks were successful
perso/mcp-maildir/pipeline/head This commit looks good
All checks were successful
perso/mcp-maildir/pipeline/head This commit looks good
This commit is contained in:
283
src/indexer.py
283
src/indexer.py
@@ -34,6 +34,8 @@ if not COLLECTION_NAME:
|
||||
|
||||
EMBEDDING_MODEL_NAME = os.environ.get("EMBEDDING_MODEL_NAME", "BAAI/bge-small-en-v1.5")
|
||||
BATCH_SIZE = 50
|
||||
METADATA_COLLECTION = "mcp_indexer_metadata"
|
||||
INCREMENTAL_DAYS = int(os.environ.get("INCREMENTAL_DAYS", "7"))
|
||||
|
||||
def decode_mime_words(s: str) -> str:
|
||||
"""Decodes MIME encoded strings (e.g. subjects, filenames)."""
|
||||
@@ -145,13 +147,93 @@ def init_qdrant_collection(client: QdrantClient, vector_size: int):
|
||||
field_schema=models.PayloadSchemaType.KEYWORD,
|
||||
)
|
||||
|
||||
def init_metadata_collection(client: QdrantClient):
|
||||
"""Ensures the indexer metadata collection exists in Qdrant."""
|
||||
collections = client.get_collections().collections
|
||||
if not any(c.name == METADATA_COLLECTION for c in collections):
|
||||
print(f"Creating metadata collection '{METADATA_COLLECTION}'...")
|
||||
client.create_collection(
|
||||
collection_name=METADATA_COLLECTION,
|
||||
# Minimal vector (size=1) — we only use this collection for payload storage
|
||||
vectors_config=models.VectorParams(size=1, distance=models.Distance.COSINE),
|
||||
)
|
||||
|
||||
|
||||
def is_bootstrap_done(client: QdrantClient) -> bool:
|
||||
"""Returns True if a successful full bootstrap has already been recorded."""
|
||||
try:
|
||||
results, _ = client.scroll(
|
||||
collection_name=METADATA_COLLECTION,
|
||||
scroll_filter=models.Filter(
|
||||
must=[
|
||||
models.FieldCondition(
|
||||
key="event",
|
||||
match=models.MatchValue(value="bootstrap_complete"),
|
||||
)
|
||||
]
|
||||
),
|
||||
limit=1,
|
||||
)
|
||||
return len(results) > 0
|
||||
except Exception as e:
|
||||
print(f"Warning: could not check bootstrap state: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def mark_bootstrap_done(client: QdrantClient):
|
||||
"""Records a bootstrap_complete event in the metadata collection."""
|
||||
point_id = str(uuid.uuid5(uuid.NAMESPACE_OID, "bootstrap_complete"))
|
||||
client.upsert(
|
||||
collection_name=METADATA_COLLECTION,
|
||||
points=[
|
||||
models.PointStruct(
|
||||
id=point_id,
|
||||
vector=[0.0], # placeholder — collection is payload-only
|
||||
payload={
|
||||
"event": "bootstrap_complete",
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
},
|
||||
)
|
||||
],
|
||||
)
|
||||
print("Bootstrap state recorded in Qdrant metadata collection.")
|
||||
|
||||
|
||||
def get_recent_keys(mbox: mailbox.Maildir, days: int) -> set:
|
||||
"""
|
||||
Returns the set of Maildir keys whose backing file has been modified
|
||||
within the last `days` days (based on filesystem mtime).
|
||||
"""
|
||||
from datetime import timezone, timedelta
|
||||
|
||||
cutoff = datetime.now(tz=timezone.utc) - timedelta(days=days)
|
||||
recent = set()
|
||||
maildir_root = mbox._path # type: ignore[attr-defined]
|
||||
|
||||
for key in mbox.keys():
|
||||
# Maildir keys are stored under cur/ or new/ — try both
|
||||
for subdir in ("cur", "new"):
|
||||
file_path = os.path.join(maildir_root, subdir, key)
|
||||
if os.path.isfile(file_path):
|
||||
try:
|
||||
mtime = os.path.getmtime(file_path)
|
||||
mtime_dt = datetime.fromtimestamp(mtime, tz=timezone.utc)
|
||||
if mtime_dt >= cutoff:
|
||||
recent.add(key)
|
||||
except OSError:
|
||||
pass
|
||||
break # found the file, no need to check the other subdir
|
||||
|
||||
return recent
|
||||
|
||||
|
||||
def main():
|
||||
"""
|
||||
Main ingestion function.
|
||||
Reads Maildir, extracts text, generates local embeddings, and pushes to Qdrant.
|
||||
"""
|
||||
print(f"Indexing emails from {MAILDIR_PATH} into {QDRANT_URL}...")
|
||||
|
||||
|
||||
if not os.path.exists(MAILDIR_PATH):
|
||||
print(f"Error: Maildir path not found: {MAILDIR_PATH}")
|
||||
return
|
||||
@@ -159,109 +241,136 @@ def main():
|
||||
# Initialize model
|
||||
print(f"Loading embedding model: {EMBEDDING_MODEL_NAME}...")
|
||||
model = TextEmbedding(model_name=EMBEDDING_MODEL_NAME)
|
||||
vector_size = len(next(iter(model.embed(["dimension_probe"])) ))
|
||||
|
||||
vector_size = len(next(iter(model.embed(["dimension_probe"]))))
|
||||
|
||||
# Initialize Qdrant
|
||||
print("Connecting to Qdrant...")
|
||||
qdrant_client = QdrantClient(url=QDRANT_URL)
|
||||
init_qdrant_collection(qdrant_client, vector_size)
|
||||
init_metadata_collection(qdrant_client)
|
||||
|
||||
# Determine indexing mode: full bootstrap or incremental update
|
||||
if is_bootstrap_done(qdrant_client):
|
||||
mode = "incremental"
|
||||
print(f"[MODE] INCREMENTAL — scanning only files modified in the last {INCREMENTAL_DAYS} days.")
|
||||
else:
|
||||
mode = "full"
|
||||
print("[MODE] BOOTSTRAP — full scan of all emails (first-time indexing).")
|
||||
|
||||
points = []
|
||||
has_error = False
|
||||
|
||||
# Iterate and parse over all maildir directories found in MAILDIR_PATH
|
||||
for root, dirs, files in os.walk(MAILDIR_PATH):
|
||||
# A valid Maildir has 'cur', 'new', and 'tmp' subdirectories
|
||||
if all(subdir in dirs for subdir in ['cur', 'new', 'tmp']):
|
||||
print(f"Processing Maildir found at: {root}")
|
||||
mbox = mailbox.Maildir(root)
|
||||
total_emails_in_dir = len(mbox)
|
||||
print(f"Found {total_emails_in_dir} emails in this directory.")
|
||||
|
||||
for idx, (key, msg) in enumerate(mbox.items()):
|
||||
try:
|
||||
# Parse headers
|
||||
subject = decode_mime_words(msg.get("Subject", "No Subject"))
|
||||
sender_raw = decode_mime_words(msg.get("From", "Unknown"))
|
||||
receiver_raw = decode_mime_words(msg.get("To", "Unknown"))
|
||||
sender = normalize_email_address(sender_raw)
|
||||
receiver = normalize_email_address(receiver_raw)
|
||||
message_id = msg.get("Message-ID", str(uuid.uuid4()))
|
||||
|
||||
# Parse date
|
||||
date_str = msg.get("Date")
|
||||
dt_obj = None
|
||||
if date_str:
|
||||
try:
|
||||
dt_obj = parsedate_to_datetime(date_str)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if dt_obj is None:
|
||||
dt_obj = datetime.now()
|
||||
|
||||
# Format to ISO 8601 for Qdrant DATETIME index
|
||||
iso_date = dt_obj.isoformat()
|
||||
|
||||
# Parse Body and Attachments
|
||||
body_text, attachments = parse_email_message(msg)
|
||||
|
||||
# Prepare Vector text
|
||||
attachments_str = ", ".join(attachments) if attachments else "None"
|
||||
vector_text = (
|
||||
f"Date: {iso_date}\n"
|
||||
f"From: {sender}\n"
|
||||
f"To: {receiver}\n"
|
||||
f"Subject: {subject}\n\n"
|
||||
f"{body_text}\n\n"
|
||||
f"Attachments: {attachments_str}"
|
||||
if not all(subdir in dirs for subdir in ['cur', 'new', 'tmp']):
|
||||
continue
|
||||
|
||||
print(f"Processing Maildir found at: {root}")
|
||||
mbox = mailbox.Maildir(root)
|
||||
total_emails_in_dir = len(mbox)
|
||||
|
||||
if mode == "incremental":
|
||||
keys_to_process = get_recent_keys(mbox, INCREMENTAL_DAYS)
|
||||
print(
|
||||
f"Found {total_emails_in_dir} emails total, "
|
||||
f"{len(keys_to_process)} modified in the last {INCREMENTAL_DAYS} days."
|
||||
)
|
||||
else:
|
||||
keys_to_process = set(mbox.keys())
|
||||
print(f"Found {total_emails_in_dir} emails — indexing all.")
|
||||
|
||||
for idx, key in enumerate(keys_to_process):
|
||||
try:
|
||||
msg = mbox[key]
|
||||
|
||||
# Parse headers
|
||||
subject = decode_mime_words(msg.get("Subject", "No Subject"))
|
||||
sender_raw = decode_mime_words(msg.get("From", "Unknown"))
|
||||
receiver_raw = decode_mime_words(msg.get("To", "Unknown"))
|
||||
sender = normalize_email_address(sender_raw)
|
||||
receiver = normalize_email_address(receiver_raw)
|
||||
message_id = msg.get("Message-ID", str(uuid.uuid4()))
|
||||
|
||||
# Parse date
|
||||
date_str = msg.get("Date")
|
||||
dt_obj = None
|
||||
if date_str:
|
||||
try:
|
||||
dt_obj = parsedate_to_datetime(date_str)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if dt_obj is None:
|
||||
dt_obj = datetime.now()
|
||||
|
||||
# Format to ISO 8601 for Qdrant DATETIME index
|
||||
iso_date = dt_obj.isoformat()
|
||||
|
||||
# Parse Body and Attachments
|
||||
body_text, attachments = parse_email_message(msg)
|
||||
|
||||
# Prepare Vector text
|
||||
attachments_str = ", ".join(attachments) if attachments else "None"
|
||||
vector_text = (
|
||||
f"Date: {iso_date}\n"
|
||||
f"From: {sender}\n"
|
||||
f"To: {receiver}\n"
|
||||
f"Subject: {subject}\n\n"
|
||||
f"{body_text}\n\n"
|
||||
f"Attachments: {attachments_str}"
|
||||
)
|
||||
|
||||
# Embed the text — fastembed returns an iterable of numpy arrays
|
||||
embeddings = list(model.embed([vector_text]))
|
||||
vector = embeddings[0].tolist()
|
||||
|
||||
# Prepare payload (metadata)
|
||||
payload = {
|
||||
"message_id": message_id,
|
||||
"date": iso_date,
|
||||
"sender": sender,
|
||||
"sender_raw": sender_raw,
|
||||
"receiver": receiver,
|
||||
"receiver_raw": receiver_raw,
|
||||
"subject": subject,
|
||||
"body_text": body_text,
|
||||
"attachments": attachments,
|
||||
}
|
||||
|
||||
# Assign deterministic UUID point ID based on message_id
|
||||
point_id = str(uuid.uuid5(uuid.NAMESPACE_OID, message_id))
|
||||
|
||||
points.append(models.PointStruct(
|
||||
id=point_id,
|
||||
vector=vector,
|
||||
payload=payload,
|
||||
))
|
||||
|
||||
# Push in batches
|
||||
if len(points) >= BATCH_SIZE:
|
||||
qdrant_client.upsert(
|
||||
collection_name=COLLECTION_NAME,
|
||||
points=points,
|
||||
)
|
||||
|
||||
# Embed the text
|
||||
# Fastembed returns an iterable of numpy arrays
|
||||
embeddings = list(model.embed([vector_text]))
|
||||
vector = embeddings[0].tolist()
|
||||
|
||||
# Prepare payload (metadata)
|
||||
payload = {
|
||||
"message_id": message_id,
|
||||
"date": iso_date,
|
||||
"sender": sender,
|
||||
"sender_raw": sender_raw,
|
||||
"receiver": receiver,
|
||||
"receiver_raw": receiver_raw,
|
||||
"subject": subject,
|
||||
"body_text": body_text,
|
||||
"attachments": attachments
|
||||
}
|
||||
|
||||
# Assign deterministic UUID point ID based on message_id
|
||||
point_id = str(uuid.uuid5(uuid.NAMESPACE_OID, message_id))
|
||||
|
||||
points.append(models.PointStruct(
|
||||
id=point_id,
|
||||
vector=vector,
|
||||
payload=payload
|
||||
))
|
||||
|
||||
# Push in batches
|
||||
if len(points) >= BATCH_SIZE:
|
||||
qdrant_client.upsert(
|
||||
collection_name=COLLECTION_NAME,
|
||||
points=points
|
||||
)
|
||||
print(f"Processed {idx + 1}/{total_emails_in_dir} emails in current directory...")
|
||||
points = []
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error processing email key={key}: {e}")
|
||||
print(f" Upserted batch — {idx + 1}/{len(keys_to_process)} emails processed in current directory.")
|
||||
points = []
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error processing email key={key}: {e}")
|
||||
has_error = True
|
||||
|
||||
# Push remaining points
|
||||
if points:
|
||||
qdrant_client.upsert(
|
||||
collection_name=COLLECTION_NAME,
|
||||
points=points
|
||||
points=points,
|
||||
)
|
||||
|
||||
|
||||
# Record bootstrap completion so subsequent runs use incremental mode
|
||||
if mode == "full" and not has_error:
|
||||
mark_bootstrap_done(qdrant_client)
|
||||
|
||||
print("Indexing completed successfully!")
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user