#!/usr/bin/env python3 import asyncio import logging import os import queue import threading import requests from dotenv import load_dotenv from telegram import Update from telegram.ext import ( Application, CommandHandler, ContextTypes, MessageHandler, filters, ) load_dotenv() logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) logger = logging.getLogger(__name__) OPENCODE_API_URL = os.getenv("OPENCODE_API_URL", "http://127.0.0.1:4096") OPENCODE_SERVER_USERNAME = os.getenv("OPENCODE_SERVER_USERNAME", "opencode") OPENCODE_SERVER_PASSWORD = os.getenv("OPENCODE_SERVER_PASSWORD") BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") ALLOWED_CHAT_ID = os.getenv("TELEGRAM_ALLOWED_CHAT_ID") SESSION_ID = None message_queue = queue.Queue() is_processing = False current_task = "Idle" processing_lock = threading.Lock() def is_authorized(update: Update) -> bool: """Check if the update comes from the allowed chat.""" chat_id = str(update.message.chat.id) return chat_id == ALLOWED_CHAT_ID def get_session(): """Get or create a session.""" global SESSION_ID if SESSION_ID: return SESSION_ID try: r = requests.get( f"{OPENCODE_API_URL}/session", timeout=10, auth=(OPENCODE_SERVER_USERNAME, OPENCODE_SERVER_PASSWORD) if OPENCODE_SERVER_PASSWORD else None, ) if r.ok: sessions = r.json() if sessions: SESSION_ID = sessions[0]["id"] return SESSION_ID except Exception: logger.exception("Failed to fetch existing sessions") try: r = requests.post( f"{OPENCODE_API_URL}/session", json={}, timeout=10, auth=(OPENCODE_SERVER_USERNAME, OPENCODE_SERVER_PASSWORD) if OPENCODE_SERVER_PASSWORD else None, ) if r.ok: SESSION_ID = r.json()["id"] return SESSION_ID except Exception: logger.exception("Failed to create new session") return None def send_to_opencode(message): """Send message to opencode and return response.""" session_id = get_session() if not session_id: return "Could not connect to opencode session." try: r = requests.post( f"{OPENCODE_API_URL}/session/{session_id}/message", json={"parts": [{"type": "text", "text": f"[Telegram] {message}"}]}, timeout=1200, auth=(OPENCODE_SERVER_USERNAME, OPENCODE_SERVER_PASSWORD) if OPENCODE_SERVER_PASSWORD else None, ) if r.ok: data = r.json() parts = data.get("parts", []) text_parts = [ p["text"] for p in parts if p.get("type") == "text" and p.get("text") ] return ( "\n".join(text_parts) if text_parts else "Message sent, no text response." ) else: logger.error("opencode returned %d: %s", r.status_code, r.text[:500]) return "opencode returned an error. Check server logs." except requests.exceptions.ConnectionError: logger.error("Connection error to opencode at %s", OPENCODE_API_URL) return "Can't connect to opencode. Is it running?" except requests.exceptions.Timeout: logger.warning("opencode request timed out") return "opencode took too long to respond. Please try again." except Exception: logger.exception("Unexpected error sending message to opencode") return "An unexpected error occurred. Check server logs." async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE): if not is_authorized(update): return await update.message.reply_text( "opencode-dispatch bot\n\n" "Send any message and opencode will process it.\n" "Commands: /start, /help, /status, /working, /clear" ) async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE): if not is_authorized(update): return await update.message.reply_text( "How to use:\n\n" "1. Make sure opencode is running\n" "2. Send me any message\n" "3. I'll forward it to opencode and relay the response\n\n" "Commands: /start, /help, /status, /working, /clear" ) async def status_command(update: Update, context: ContextTypes.DEFAULT_TYPE): if not is_authorized(update): return try: r = requests.get( f"{OPENCODE_API_URL}/global/health", timeout=5, auth=(OPENCODE_SERVER_USERNAME, OPENCODE_SERVER_PASSWORD) if OPENCODE_SERVER_PASSWORD else None, ) healthy = r.ok except Exception: healthy = False queue_size = message_queue.qsize() await update.message.reply_text( f"opencode: {'connected' if healthy else 'unreachable'}\n" f"Session: {'active' if SESSION_ID else 'none'}\n" f"Queue: {queue_size} messages" ) async def clear_command(update: Update, context: ContextTypes.DEFAULT_TYPE): if not is_authorized(update): return with processing_lock: if is_processing: await update.message.reply_text( "Can't clear queue while processing. Wait for current task to finish." ) else: while not message_queue.empty(): try: message_queue.get_nowait() except queue.Empty: break await update.message.reply_text("Queue cleared.") async def working_command(update: Update, context: ContextTypes.DEFAULT_TYPE): if not is_authorized(update): return if is_processing: await update.message.reply_text( f'Currently working on:\n"{current_task}"\n\nQueue: {message_queue.qsize()} messages' ) else: await update.message.reply_text("Currently idle. No task in progress.") async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): global is_processing, current_task if not is_authorized(update): return user_message = update.message.text user_id = update.effective_user.id if update.effective_user else "unknown" with processing_lock: currently_processing = is_processing if currently_processing: sent = await update.message.reply_text( "opencode is busy. Your message has been added to the queue.\n" "I'll respond when ready. Use /status to check queue position." ) message_queue.put((user_id, sent.message_id, user_message)) else: current_task = ( user_message[:50] + "..." if len(user_message) > 50 else user_message ) await update.message.chat.send_action("typing") sent = await update.message.reply_text("Processing...") with processing_lock: is_processing = True try: loop = asyncio.get_event_loop() reply = await loop.run_in_executor(None, send_to_opencode, user_message) await sent.edit_text(reply[:4000]) except Exception: logger.exception("Error processing message") await sent.edit_text("An error occurred. Check server logs.") finally: with processing_lock: is_processing = False current_task = "Idle" async def handle_voice(update: Update, context: ContextTypes.DEFAULT_TYPE): if not is_authorized(update): return await update.message.reply_text("Voice messages not yet supported. Send text.") async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE): if not is_authorized(update): return await update.message.reply_text("File handling not yet supported. Send text.") async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE): if not is_authorized(update): return await update.message.reply_text("Image handling not yet supported. Send text.") def main(): if not BOT_TOKEN: logger.error("TELEGRAM_BOT_TOKEN not set in .env file") return if not ALLOWED_CHAT_ID: logger.error( "TELEGRAM_ALLOWED_CHAT_ID not set in .env file. " "Refusing to start without access control." ) return app = Application.builder().token(BOT_TOKEN).build() app.add_handler(CommandHandler("start", start_command)) app.add_handler(CommandHandler("help", help_command)) app.add_handler(CommandHandler("status", status_command)) app.add_handler(CommandHandler("working", working_command)) app.add_handler(CommandHandler("clear", clear_command)) app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) app.add_handler(MessageHandler(filters.VOICE, handle_voice)) app.add_handler(MessageHandler(filters.Document.ALL, handle_document)) app.add_handler(MessageHandler(filters.PHOTO, handle_photo)) logger.info("opencode-dispatch bot starting...") logger.info("Press Ctrl+C to stop") app.run_polling(allowed_updates=Update.ALL_TYPES) if __name__ == "__main__": main()