feat: init
This commit is contained in:
254
src/indexer.py
Normal file
254
src/indexer.py
Normal file
@@ -0,0 +1,254 @@
|
||||
"""
|
||||
Indexer script to parse emails from Maildir and push them to Qdrant.
|
||||
"""
|
||||
|
||||
import os
|
||||
import email
|
||||
import mailbox
|
||||
from datetime import datetime
|
||||
from email.utils import parsedate_to_datetime
|
||||
from email.header import decode_header
|
||||
from typing import List, Dict, Any, Tuple
|
||||
import uuid
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from qdrant_client import QdrantClient
|
||||
from qdrant_client.http import models
|
||||
from sentence_transformers import SentenceTransformer
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
# Load .env config
|
||||
load_dotenv()
|
||||
|
||||
# Configuration
|
||||
MAILDIR_PATH = os.environ.get("MAILDIR_PATH", "")
|
||||
QDRANT_URL = os.environ.get("QDRANT_URL", "")
|
||||
COLLECTION_NAME = os.environ.get("COLLECTION_NAME", "")
|
||||
|
||||
if not MAILDIR_PATH:
|
||||
raise ValueError("MAILDIR_PATH environment variable is required.")
|
||||
if not QDRANT_URL:
|
||||
raise ValueError("QDRANT_URL environment variable is required.")
|
||||
if not COLLECTION_NAME:
|
||||
raise ValueError("COLLECTION_NAME environment variable is required.")
|
||||
|
||||
EMBEDDING_MODEL_NAME = "all-MiniLM-L6-v2"
|
||||
BATCH_SIZE = 50
|
||||
|
||||
def decode_mime_words(s: str) -> str:
|
||||
"""Decodes MIME encoded strings (e.g. subjects, filenames)."""
|
||||
if not s:
|
||||
return ""
|
||||
decoded_words = decode_header(s)
|
||||
result = []
|
||||
for word, encoding in decoded_words:
|
||||
if isinstance(word, bytes):
|
||||
try:
|
||||
result.append(word.decode(encoding or 'utf-8', errors='replace'))
|
||||
except LookupError:
|
||||
result.append(word.decode('utf-8', errors='replace'))
|
||||
else:
|
||||
result.append(word)
|
||||
return "".join(result)
|
||||
|
||||
def extract_text_from_html(html_content: str) -> str:
|
||||
"""Extracts plain text from HTML content."""
|
||||
try:
|
||||
soup = BeautifulSoup(html_content, "html.parser")
|
||||
return soup.get_text(separator=" ", strip=True)
|
||||
except Exception:
|
||||
return html_content
|
||||
|
||||
def parse_email_message(msg: mailbox.Message) -> Tuple[str, List[str]]:
|
||||
"""Extracts plain text body and a list of attachment filenames."""
|
||||
body_parts = []
|
||||
attachments = []
|
||||
|
||||
for part in msg.walk():
|
||||
# Skip multiparts, we only care about leaf nodes
|
||||
if part.is_multipart():
|
||||
continue
|
||||
|
||||
content_type = part.get_content_type()
|
||||
content_disposition = str(part.get("Content-Disposition", ""))
|
||||
|
||||
# Check for attachments
|
||||
if "attachment" in content_disposition or part.get_filename():
|
||||
filename = part.get_filename()
|
||||
if filename:
|
||||
attachments.append(decode_mime_words(filename))
|
||||
continue
|
||||
|
||||
# Extract text body
|
||||
if content_type in ["text/plain", "text/html"]:
|
||||
try:
|
||||
payload = part.get_payload(decode=True)
|
||||
if payload:
|
||||
charset = part.get_content_charset('utf-8') or 'utf-8'
|
||||
if isinstance(payload, bytes):
|
||||
text = payload.decode(charset, errors='replace')
|
||||
else:
|
||||
text = str(payload)
|
||||
|
||||
if content_type == "text/html":
|
||||
text = extract_text_from_html(text)
|
||||
body_parts.append(text)
|
||||
except Exception as e:
|
||||
print(f"Error extracting payload: {e}")
|
||||
pass
|
||||
|
||||
return "\n".join(body_parts).strip(), attachments
|
||||
|
||||
def init_qdrant_collection(client: QdrantClient, model: SentenceTransformer):
|
||||
"""Ensures Qdrant collection exists and payload indexes are created."""
|
||||
vector_size = model.get_sentence_embedding_dimension()
|
||||
|
||||
# Check if collection exists
|
||||
collections = client.get_collections().collections
|
||||
if not any(c.name == COLLECTION_NAME for c in collections):
|
||||
print(f"Creating collection '{COLLECTION_NAME}' with vector size {vector_size}...")
|
||||
client.create_collection(
|
||||
collection_name=COLLECTION_NAME,
|
||||
vectors_config=models.VectorParams(size=vector_size, distance=models.Distance.COSINE),
|
||||
)
|
||||
else:
|
||||
print(f"Collection '{COLLECTION_NAME}' already exists.")
|
||||
|
||||
# Create payload indexes for filtering metadata deterministically
|
||||
print("Ensuring payload indexes exist...")
|
||||
|
||||
# Date index (DATETIME)
|
||||
client.create_payload_index(
|
||||
collection_name=COLLECTION_NAME,
|
||||
field_name="date",
|
||||
field_schema=models.PayloadSchemaType.DATETIME,
|
||||
)
|
||||
|
||||
# Sender index (KEYWORD)
|
||||
client.create_payload_index(
|
||||
collection_name=COLLECTION_NAME,
|
||||
field_name="sender",
|
||||
field_schema=models.PayloadSchemaType.KEYWORD,
|
||||
)
|
||||
|
||||
# Receiver index (KEYWORD)
|
||||
client.create_payload_index(
|
||||
collection_name=COLLECTION_NAME,
|
||||
field_name="receiver",
|
||||
field_schema=models.PayloadSchemaType.KEYWORD,
|
||||
)
|
||||
|
||||
def main():
|
||||
"""
|
||||
Main ingestion function.
|
||||
Reads Maildir, extracts text, generates local embeddings, and pushes to Qdrant.
|
||||
"""
|
||||
print(f"Indexing emails from {MAILDIR_PATH} into {QDRANT_URL}...")
|
||||
|
||||
if not os.path.exists(MAILDIR_PATH):
|
||||
print(f"Error: Maildir path not found: {MAILDIR_PATH}")
|
||||
return
|
||||
|
||||
# Initialize model
|
||||
print(f"Loading embedding model: {EMBEDDING_MODEL_NAME}...")
|
||||
model = SentenceTransformer(EMBEDDING_MODEL_NAME)
|
||||
|
||||
# Initialize Qdrant
|
||||
print("Connecting to Qdrant...")
|
||||
qdrant_client = QdrantClient(url=QDRANT_URL)
|
||||
init_qdrant_collection(qdrant_client, model)
|
||||
|
||||
points = []
|
||||
|
||||
# Iterate and parse over all maildir directories found in MAILDIR_PATH
|
||||
for root, dirs, files in os.walk(MAILDIR_PATH):
|
||||
# A valid Maildir has 'cur', 'new', and 'tmp' subdirectories
|
||||
if all(subdir in dirs for subdir in ['cur', 'new', 'tmp']):
|
||||
print(f"Processing Maildir found at: {root}")
|
||||
mbox = mailbox.Maildir(root)
|
||||
total_emails_in_dir = len(mbox)
|
||||
print(f"Found {total_emails_in_dir} emails in this directory.")
|
||||
|
||||
for idx, (key, msg) in enumerate(mbox.items()):
|
||||
try:
|
||||
# Parse headers
|
||||
subject = decode_mime_words(msg.get("Subject", "No Subject"))
|
||||
sender = decode_mime_words(msg.get("From", "Unknown"))
|
||||
receiver = decode_mime_words(msg.get("To", "Unknown"))
|
||||
message_id = msg.get("Message-ID", str(uuid.uuid4()))
|
||||
|
||||
# Parse date
|
||||
date_str = msg.get("Date")
|
||||
dt_obj = None
|
||||
if date_str:
|
||||
try:
|
||||
dt_obj = parsedate_to_datetime(date_str)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if dt_obj is None:
|
||||
dt_obj = datetime.now()
|
||||
|
||||
# Format to ISO 8601 for Qdrant DATETIME index
|
||||
iso_date = dt_obj.isoformat()
|
||||
|
||||
# Parse Body and Attachments
|
||||
body_text, attachments = parse_email_message(msg)
|
||||
|
||||
# Prepare Vector text
|
||||
attachments_str = ", ".join(attachments) if attachments else "None"
|
||||
vector_text = (
|
||||
f"Date: {iso_date}\n"
|
||||
f"From: {sender}\n"
|
||||
f"To: {receiver}\n"
|
||||
f"Subject: {subject}\n\n"
|
||||
f"{body_text}\n\n"
|
||||
f"Attachments: {attachments_str}"
|
||||
)
|
||||
|
||||
# Embed the text
|
||||
vector = model.encode(vector_text).tolist()
|
||||
|
||||
# Prepare payload (metadata)
|
||||
payload = {
|
||||
"message_id": message_id,
|
||||
"date": iso_date,
|
||||
"sender": sender,
|
||||
"receiver": receiver,
|
||||
"subject": subject,
|
||||
"body_text": body_text,
|
||||
"attachments": attachments
|
||||
}
|
||||
|
||||
# Assign deterministic UUID point ID based on message_id
|
||||
point_id = str(uuid.uuid5(uuid.NAMESPACE_OID, message_id))
|
||||
|
||||
points.append(models.PointStruct(
|
||||
id=point_id,
|
||||
vector=vector,
|
||||
payload=payload
|
||||
))
|
||||
|
||||
# Push in batches
|
||||
if len(points) >= BATCH_SIZE:
|
||||
qdrant_client.upsert(
|
||||
collection_name=COLLECTION_NAME,
|
||||
points=points
|
||||
)
|
||||
print(f"Processed {idx + 1}/{total_emails_in_dir} emails in current directory...")
|
||||
points = []
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error processing email key={key}: {e}")
|
||||
|
||||
# Push remaining points
|
||||
if points:
|
||||
qdrant_client.upsert(
|
||||
collection_name=COLLECTION_NAME,
|
||||
points=points
|
||||
)
|
||||
|
||||
print("Indexing completed successfully!")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
32
src/server.py
Normal file
32
src/server.py
Normal file
@@ -0,0 +1,32 @@
|
||||
"""
|
||||
MCP Server exposing search and read tools for the indexed emails.
|
||||
"""
|
||||
|
||||
import os
|
||||
from fastmcp import FastMCP
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
# Initialize FastMCP server
|
||||
mcp = FastMCP("mcp-maildir")
|
||||
|
||||
@mcp.tool()
|
||||
def search_emails(query: str, sender: str | None = None, start_date: str | None = None, end_date: str | None = None):
|
||||
"""
|
||||
Performs a hybrid search (Semantic + Exact filtering on metadata).
|
||||
"""
|
||||
# TODO: Implement Qdrant search
|
||||
return f"Searching for '{query}'..."
|
||||
|
||||
@mcp.tool()
|
||||
def read_email(message_id: str):
|
||||
"""
|
||||
Returns the full text content (cleaned of HTML) of a specific email.
|
||||
"""
|
||||
# TODO: Implement fetching email by message_id
|
||||
return f"Reading email {message_id}..."
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Start the MCP server using SSE (Server-Sent Events) over HTTP
|
||||
mcp.run(transport="sse", host="0.0.0.0", port=8000)
|
||||
Reference in New Issue
Block a user