This commit is contained in:
38
Jenkinsfile
vendored
Normal file
38
Jenkinsfile
vendored
Normal file
@@ -0,0 +1,38 @@
|
||||
pipeline {
|
||||
environment {
|
||||
registry = 'https://registry.hub.docker.com'
|
||||
registryCredential = 'dockerhub_jcabillot'
|
||||
dockerImage = 'jcabillot/mcp-maildir'
|
||||
}
|
||||
|
||||
agent any
|
||||
|
||||
triggers {
|
||||
cron('@midnight')
|
||||
}
|
||||
|
||||
stages {
|
||||
stage('Clone repository') {
|
||||
steps{
|
||||
checkout scm
|
||||
}
|
||||
}
|
||||
|
||||
stage('Build image') {
|
||||
steps{
|
||||
sh 'docker build --force-rm=true --no-cache=true --pull -t ${dockerImage} -f pkg/Dockerfile .'
|
||||
}
|
||||
}
|
||||
|
||||
stage('Deploy Image') {
|
||||
steps{
|
||||
script {
|
||||
withCredentials([usernamePassword(credentialsId: 'dockerhub_jcabillot', usernameVariable: 'DOCKER_USER', passwordVariable: 'DOCKER_PASS')]) {
|
||||
sh 'docker login --username ${DOCKER_USER} --password ${DOCKER_PASS}'
|
||||
sh 'docker push ${dockerImage}'
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3,8 +3,8 @@ services:
|
||||
image: docker.io/qdrant/qdrant:latest
|
||||
container_name: mcp_maildir_qdrant
|
||||
ports:
|
||||
- "6333:6333" # REST API
|
||||
- "6334:6334" # gRPC API
|
||||
- "127.0.0.1:6333:6333" # REST API
|
||||
- "127.0.0.1:6334:6334" # gRPC API
|
||||
volumes:
|
||||
- ./qdrant_storage:/qdrant/storage:z
|
||||
restart: unless-stopped
|
||||
@@ -15,7 +15,7 @@ services:
|
||||
dockerfile: pkg/Dockerfile
|
||||
container_name: mcp_maildir_server
|
||||
ports:
|
||||
- "8000:8000" # Expose the MCP HTTP (SSE) server
|
||||
- "127.0.0.1:8000:8000" # Expose the MCP HTTP (SSE) server
|
||||
env_file:
|
||||
- .env
|
||||
environment:
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
# Use Python 3.14 as requested
|
||||
FROM docker.io/library/python:3.14-slim
|
||||
# 3.13, because with 3.14 it requires gcc (wheels not available)
|
||||
FROM docker.io/library/python:3.13-slim
|
||||
|
||||
# Set environment variables
|
||||
ENV PYTHONDONTWRITEBYTECODE=1
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
mcp
|
||||
fastmcp
|
||||
qdrant-client
|
||||
sentence-transformers
|
||||
fastembed
|
||||
python-dotenv
|
||||
uvicorn
|
||||
starlette
|
||||
|
||||
@@ -6,7 +6,7 @@ import os
|
||||
import email
|
||||
import mailbox
|
||||
from datetime import datetime
|
||||
from email.utils import parsedate_to_datetime
|
||||
from email.utils import parsedate_to_datetime, parseaddr
|
||||
from email.header import decode_header
|
||||
from typing import List, Dict, Any, Tuple
|
||||
import uuid
|
||||
@@ -14,7 +14,7 @@ import uuid
|
||||
from dotenv import load_dotenv
|
||||
from qdrant_client import QdrantClient
|
||||
from qdrant_client.http import models
|
||||
from sentence_transformers import SentenceTransformer
|
||||
from fastembed import TextEmbedding
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
# Load .env config
|
||||
@@ -32,7 +32,7 @@ if not QDRANT_URL:
|
||||
if not COLLECTION_NAME:
|
||||
raise ValueError("COLLECTION_NAME environment variable is required.")
|
||||
|
||||
EMBEDDING_MODEL_NAME = "all-MiniLM-L6-v2"
|
||||
EMBEDDING_MODEL_NAME = os.environ.get("EMBEDDING_MODEL_NAME", "BAAI/bge-small-en-v1.5")
|
||||
BATCH_SIZE = 50
|
||||
|
||||
def decode_mime_words(s: str) -> str:
|
||||
@@ -59,6 +59,14 @@ def extract_text_from_html(html_content: str) -> str:
|
||||
except Exception:
|
||||
return html_content
|
||||
|
||||
def normalize_email_address(value: str) -> str:
|
||||
"""Extracts and normalizes the bare email address from a header value."""
|
||||
if not value:
|
||||
return ""
|
||||
_, addr = parseaddr(value)
|
||||
return (addr or value).strip().lower()
|
||||
|
||||
|
||||
def parse_email_message(msg: mailbox.Message) -> Tuple[str, List[str]]:
|
||||
"""Extracts plain text body and a list of attachment filenames."""
|
||||
body_parts = []
|
||||
@@ -99,9 +107,8 @@ def parse_email_message(msg: mailbox.Message) -> Tuple[str, List[str]]:
|
||||
|
||||
return "\n".join(body_parts).strip(), attachments
|
||||
|
||||
def init_qdrant_collection(client: QdrantClient, model: SentenceTransformer):
|
||||
def init_qdrant_collection(client: QdrantClient, vector_size: int):
|
||||
"""Ensures Qdrant collection exists and payload indexes are created."""
|
||||
vector_size = model.get_sentence_embedding_dimension()
|
||||
|
||||
# Check if collection exists
|
||||
collections = client.get_collections().collections
|
||||
@@ -151,12 +158,13 @@ def main():
|
||||
|
||||
# Initialize model
|
||||
print(f"Loading embedding model: {EMBEDDING_MODEL_NAME}...")
|
||||
model = SentenceTransformer(EMBEDDING_MODEL_NAME)
|
||||
model = TextEmbedding(model_name=EMBEDDING_MODEL_NAME)
|
||||
vector_size = len(next(iter(model.embed(["dimension_probe"])) ))
|
||||
|
||||
# Initialize Qdrant
|
||||
print("Connecting to Qdrant...")
|
||||
qdrant_client = QdrantClient(url=QDRANT_URL)
|
||||
init_qdrant_collection(qdrant_client, model)
|
||||
init_qdrant_collection(qdrant_client, vector_size)
|
||||
|
||||
points = []
|
||||
|
||||
@@ -173,8 +181,10 @@ def main():
|
||||
try:
|
||||
# Parse headers
|
||||
subject = decode_mime_words(msg.get("Subject", "No Subject"))
|
||||
sender = decode_mime_words(msg.get("From", "Unknown"))
|
||||
receiver = decode_mime_words(msg.get("To", "Unknown"))
|
||||
sender_raw = decode_mime_words(msg.get("From", "Unknown"))
|
||||
receiver_raw = decode_mime_words(msg.get("To", "Unknown"))
|
||||
sender = normalize_email_address(sender_raw)
|
||||
receiver = normalize_email_address(receiver_raw)
|
||||
message_id = msg.get("Message-ID", str(uuid.uuid4()))
|
||||
|
||||
# Parse date
|
||||
@@ -207,14 +217,18 @@ def main():
|
||||
)
|
||||
|
||||
# Embed the text
|
||||
vector = model.encode(vector_text).tolist()
|
||||
# Fastembed returns an iterable of numpy arrays
|
||||
embeddings = list(model.embed([vector_text]))
|
||||
vector = embeddings[0].tolist()
|
||||
|
||||
# Prepare payload (metadata)
|
||||
payload = {
|
||||
"message_id": message_id,
|
||||
"date": iso_date,
|
||||
"sender": sender,
|
||||
"sender_raw": sender_raw,
|
||||
"receiver": receiver,
|
||||
"receiver_raw": receiver_raw,
|
||||
"subject": subject,
|
||||
"body_text": body_text,
|
||||
"attachments": attachments
|
||||
|
||||
326
src/server.py
326
src/server.py
@@ -2,31 +2,345 @@
|
||||
MCP Server exposing search and read tools for the indexed emails.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
from fastmcp import FastMCP
|
||||
from typing import Any, Optional
|
||||
|
||||
import qdrant_client as qdrant_pkg
|
||||
from dateutil import parser as date_parser
|
||||
from email.utils import parseaddr
|
||||
from dotenv import load_dotenv
|
||||
from fastmcp import FastMCP
|
||||
from qdrant_client import QdrantClient
|
||||
from qdrant_client.http import models
|
||||
from fastembed import TextEmbedding
|
||||
|
||||
load_dotenv()
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(levelname)s [%(name)s] %(message)s",
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Verbose MCP diagnostics (useful for -32602 / validation flow)
|
||||
logging.getLogger("mcp").setLevel(logging.DEBUG)
|
||||
logging.getLogger("mcp.server").setLevel(logging.DEBUG)
|
||||
logging.getLogger("mcp.server.lowlevel.server").setLevel(logging.DEBUG)
|
||||
|
||||
QDRANT_URL = os.environ.get("QDRANT_URL", "")
|
||||
COLLECTION_NAME = os.environ.get("COLLECTION_NAME", "")
|
||||
EMBEDDING_MODEL_NAME = os.environ.get("EMBEDDING_MODEL_NAME", "BAAI/bge-small-en-v1.5")
|
||||
DEFAULT_LIMIT = int(os.environ.get("SEARCH_LIMIT", "50"))
|
||||
|
||||
if not QDRANT_URL:
|
||||
raise ValueError("QDRANT_URL environment variable is required.")
|
||||
if not COLLECTION_NAME:
|
||||
raise ValueError("COLLECTION_NAME environment variable is required.")
|
||||
|
||||
logger.info(f"Starting MCP server with collection: {COLLECTION_NAME}")
|
||||
|
||||
# Initialize FastMCP server
|
||||
mcp = FastMCP("mcp-maildir")
|
||||
|
||||
# Lazy singletons
|
||||
_qdrant_client: Optional[QdrantClient] = None
|
||||
_embedding_model: Optional[TextEmbedding] = None
|
||||
|
||||
|
||||
def get_qdrant_client() -> QdrantClient:
|
||||
"""Returns a singleton Qdrant client."""
|
||||
global _qdrant_client
|
||||
if _qdrant_client is None:
|
||||
logger.info(f"Connecting to Qdrant at {QDRANT_URL}")
|
||||
_qdrant_client = QdrantClient(url=QDRANT_URL)
|
||||
return _qdrant_client
|
||||
|
||||
|
||||
def get_embedding_model() -> TextEmbedding:
|
||||
"""Returns a singleton sentence-transformer model."""
|
||||
global _embedding_model
|
||||
if _embedding_model is None:
|
||||
logger.info(f"Loading embedding model: {EMBEDDING_MODEL_NAME}")
|
||||
_embedding_model = TextEmbedding(model_name=EMBEDDING_MODEL_NAME)
|
||||
return _embedding_model
|
||||
|
||||
|
||||
def log_qdrant_diagnostics(client: QdrantClient) -> None:
|
||||
"""Logs Qdrant client capabilities to diagnose API/version mismatch issues."""
|
||||
version = getattr(qdrant_pkg, "__version__", "unknown")
|
||||
logger.info(
|
||||
"Qdrant diagnostics: client_type=%s, qdrant_client_version=%s, has_search=%s, has_query_points=%s, has_scroll=%s",
|
||||
type(client).__name__,
|
||||
version,
|
||||
hasattr(client, "search"),
|
||||
hasattr(client, "query_points"),
|
||||
hasattr(client, "scroll"),
|
||||
)
|
||||
|
||||
|
||||
def normalize_email_address(value: Optional[str]) -> str:
|
||||
"""Extracts and normalizes the bare email address from a value."""
|
||||
if not value:
|
||||
return ""
|
||||
_, addr = parseaddr(value)
|
||||
return (addr or value).strip().lower()
|
||||
|
||||
|
||||
def payload_matches_participant(payload: dict[str, Any], participant: str) -> bool:
|
||||
"""Checks if participant matches either sender or receiver on normalized and raw payload values."""
|
||||
participant_norm = normalize_email_address(participant)
|
||||
|
||||
# Sender checks
|
||||
payload_sender = normalize_email_address(payload.get("sender"))
|
||||
payload_sender_raw = normalize_email_address(payload.get("sender_raw"))
|
||||
payload_sender_text = str(payload.get("sender", "")).strip().lower()
|
||||
payload_sender_raw_text = str(payload.get("sender_raw", "")).strip().lower()
|
||||
|
||||
# Receiver checks
|
||||
payload_receiver = normalize_email_address(payload.get("receiver"))
|
||||
payload_receiver_raw = normalize_email_address(payload.get("receiver_raw"))
|
||||
payload_receiver_text = str(payload.get("receiver", "")).strip().lower()
|
||||
payload_receiver_raw_text = str(payload.get("receiver_raw", "")).strip().lower()
|
||||
|
||||
return participant_norm in {
|
||||
payload_sender,
|
||||
payload_sender_raw,
|
||||
payload_receiver,
|
||||
payload_receiver_raw,
|
||||
} or participant_norm in payload_sender_text \
|
||||
or participant_norm in payload_sender_raw_text \
|
||||
or participant_norm in payload_receiver_text \
|
||||
or participant_norm in payload_receiver_raw_text
|
||||
|
||||
|
||||
def build_filter(participant: Optional[str], start_date: Optional[str], end_date: Optional[str]) -> Optional[models.Filter]:
|
||||
"""Builds Qdrant payload filters from optional parameters."""
|
||||
conditions: list[models.FieldCondition] = []
|
||||
|
||||
if participant:
|
||||
# Filter for either sender or receiver matching the participant email
|
||||
normalized_participant = normalize_email_address(participant)
|
||||
|
||||
# In Qdrant, to do an OR condition, we use a Should clause within a Filter
|
||||
participant_filter = models.Filter(
|
||||
should=[
|
||||
models.FieldCondition(
|
||||
key="sender",
|
||||
match=models.MatchValue(value=normalized_participant),
|
||||
),
|
||||
models.FieldCondition(
|
||||
key="receiver",
|
||||
match=models.MatchValue(value=normalized_participant),
|
||||
)
|
||||
]
|
||||
)
|
||||
# We append this compound filter as a requirement
|
||||
conditions.append(participant_filter)
|
||||
|
||||
if start_date or end_date:
|
||||
gte = date_parser.parse(start_date).isoformat() if start_date else None
|
||||
lte = date_parser.parse(end_date).isoformat() if end_date else None
|
||||
conditions.append(
|
||||
models.FieldCondition(
|
||||
key="date",
|
||||
range=models.DatetimeRange(gte=gte, lte=lte),
|
||||
)
|
||||
)
|
||||
|
||||
return models.Filter(must=conditions) if conditions else None
|
||||
|
||||
|
||||
def format_search_result(point: Any) -> dict[str, Any]:
|
||||
"""Formats a Qdrant point into a compact response."""
|
||||
payload = point.payload or {}
|
||||
return {
|
||||
"message_id": payload.get("message_id"),
|
||||
"date": payload.get("date"),
|
||||
"sender": payload.get("sender"),
|
||||
"receiver": payload.get("receiver"),
|
||||
"subject": payload.get("subject"),
|
||||
"attachments": payload.get("attachments", []),
|
||||
"score": getattr(point, "score", None),
|
||||
}
|
||||
|
||||
|
||||
def vector_search_points(
|
||||
client: QdrantClient,
|
||||
*,
|
||||
query_vector: list[float],
|
||||
query_filter: Optional[models.Filter],
|
||||
limit: int,
|
||||
) -> list[Any]:
|
||||
"""Executes vector search across Qdrant client API variants."""
|
||||
if hasattr(client, "query_points"):
|
||||
response = client.query_points(
|
||||
collection_name=COLLECTION_NAME,
|
||||
query=query_vector,
|
||||
query_filter=query_filter,
|
||||
limit=limit,
|
||||
with_payload=True,
|
||||
with_vectors=False,
|
||||
)
|
||||
points = getattr(response, "points", None)
|
||||
if points is None:
|
||||
if isinstance(response, list):
|
||||
return response
|
||||
return []
|
||||
return points
|
||||
|
||||
if hasattr(client, "search"):
|
||||
return client.search(
|
||||
collection_name=COLLECTION_NAME,
|
||||
query_vector=query_vector,
|
||||
query_filter=query_filter,
|
||||
limit=limit,
|
||||
with_payload=True,
|
||||
with_vectors=False,
|
||||
)
|
||||
|
||||
raise AttributeError("Qdrant client exposes neither 'query_points' nor 'search'.")
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def search_emails(query: str, sender: str | None = None, start_date: str | None = None, end_date: str | None = None):
|
||||
def search_emails(
|
||||
query: str,
|
||||
participant: str = "",
|
||||
start_date: str = "",
|
||||
end_date: str = "",
|
||||
):
|
||||
"""
|
||||
Performs a hybrid search (Semantic + Exact filtering on metadata).
|
||||
"""
|
||||
# TODO: Implement Qdrant search
|
||||
return f"Searching for '{query}'..."
|
||||
logger.info(
|
||||
"Tool search_emails input diagnostics: query=%r(type=%s,len=%d), participant=%r(type=%s), start_date=%r(type=%s), end_date=%r(type=%s)",
|
||||
query,
|
||||
type(query).__name__,
|
||||
len(query) if isinstance(query, str) else -1,
|
||||
participant,
|
||||
type(participant).__name__,
|
||||
start_date,
|
||||
type(start_date).__name__,
|
||||
end_date,
|
||||
type(end_date).__name__,
|
||||
)
|
||||
|
||||
# Convert empty strings to None for internal logic
|
||||
p_val = normalize_email_address(participant) if participant else None
|
||||
sd_val = start_date if start_date else None
|
||||
ed_val = end_date if end_date else None
|
||||
|
||||
logger.info(
|
||||
"Tool search_emails normalized filters: participant=%r, start_date=%r, end_date=%r",
|
||||
p_val,
|
||||
sd_val,
|
||||
ed_val,
|
||||
)
|
||||
try:
|
||||
model = get_embedding_model()
|
||||
qdrant = get_qdrant_client()
|
||||
log_qdrant_diagnostics(qdrant)
|
||||
|
||||
query_vector = list(model.embed([query]))[0].tolist()
|
||||
query_filter = build_filter(participant=p_val, start_date=sd_val, end_date=ed_val)
|
||||
logger.info("Tool search_emails built filter: %s", query_filter)
|
||||
|
||||
points = vector_search_points(
|
||||
qdrant,
|
||||
query_vector=query_vector,
|
||||
query_filter=query_filter,
|
||||
limit=DEFAULT_LIMIT,
|
||||
)
|
||||
|
||||
# Backward-compatible fallback for legacy indexed payloads where participant may
|
||||
# still be stored as full "Display Name <email@domain>".
|
||||
if p_val and not points:
|
||||
logger.info("No result with exact participant filter, trying fallback matching...")
|
||||
fallback_filter = build_filter(participant=None, start_date=sd_val, end_date=ed_val)
|
||||
fallback_points = vector_search_points(
|
||||
qdrant,
|
||||
query_vector=query_vector,
|
||||
query_filter=fallback_filter,
|
||||
limit=max(DEFAULT_LIMIT * 5, 50),
|
||||
)
|
||||
points = [
|
||||
p for p in fallback_points
|
||||
if payload_matches_participant(p.payload or {}, p_val)
|
||||
][:DEFAULT_LIMIT]
|
||||
|
||||
logger.info(f"Found {len(points)} results")
|
||||
return {
|
||||
"query": query,
|
||||
"filters": {
|
||||
"participant": participant,
|
||||
"start_date": start_date,
|
||||
"end_date": end_date,
|
||||
},
|
||||
"count": len(points),
|
||||
"results": [format_search_result(point) for point in points],
|
||||
}
|
||||
except Exception as exc:
|
||||
logger.error(f"Error in search_emails: {exc}", exc_info=True)
|
||||
return {"error": f"search_emails failed: {exc}"}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def read_email(message_id: str):
|
||||
"""
|
||||
Returns the full text content (cleaned of HTML) of a specific email.
|
||||
"""
|
||||
# TODO: Implement fetching email by message_id
|
||||
return f"Reading email {message_id}..."
|
||||
logger.info(
|
||||
"Tool read_email input diagnostics: message_id=%r(type=%s,len=%d)",
|
||||
message_id,
|
||||
type(message_id).__name__,
|
||||
len(message_id) if isinstance(message_id, str) else -1,
|
||||
)
|
||||
try:
|
||||
qdrant = get_qdrant_client()
|
||||
|
||||
points, _ = qdrant.scroll(
|
||||
collection_name=COLLECTION_NAME,
|
||||
scroll_filter=models.Filter(
|
||||
must=[
|
||||
models.FieldCondition(
|
||||
key="message_id",
|
||||
match=models.MatchValue(value=message_id),
|
||||
)
|
||||
]
|
||||
),
|
||||
limit=1,
|
||||
with_payload=True,
|
||||
with_vectors=False,
|
||||
)
|
||||
|
||||
if not points:
|
||||
logger.warning(f"No email found for message_id={message_id}")
|
||||
return {"error": f"No email found for message_id={message_id}"}
|
||||
|
||||
payload = points[0].payload or {}
|
||||
return {
|
||||
"message_id": payload.get("message_id"),
|
||||
"date": payload.get("date"),
|
||||
"sender": payload.get("sender"),
|
||||
"receiver": payload.get("receiver"),
|
||||
"subject": payload.get("subject"),
|
||||
"attachments": payload.get("attachments", []),
|
||||
"body_text": payload.get("body_text", ""),
|
||||
}
|
||||
except Exception as exc:
|
||||
logger.error(f"Error in read_email: {exc}", exc_info=True)
|
||||
return {"error": f"read_email failed: {exc}"}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logger.info("Initializing models before starting server...")
|
||||
try:
|
||||
get_embedding_model()
|
||||
logger.info("Models loaded successfully.")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load models: {e}")
|
||||
|
||||
logger.info("Starting SSE server on 0.0.0.0:8000...")
|
||||
# Start the MCP server using SSE (Server-Sent Events) over HTTP
|
||||
mcp.run(transport="sse", host="0.0.0.0", port=8000)
|
||||
|
||||
Reference in New Issue
Block a user