import asyncio import contextlib import html import importlib.util import inspect import io import json import logging import os import secrets import sqlite3 import time import traceback from collections import deque from datetime import datetime, timedelta from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union from fastapi import FastAPI, Form, Query, Request from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse, Response from fastapi.templating import Jinja2Templates from starlette.middleware.sessions import SessionMiddleware BASE_DIR = Path(__file__).resolve().parent STATIC_DIR = BASE_DIR / "static" ENV_FILE = BASE_DIR / ".env" SCRIPT_FILE = BASE_DIR / "telegram-scraper.py" STATE_FILE = BASE_DIR / "state.json" TEMPLATES_DIR = BASE_DIR / "templates" # Web 进程:禁用交互式 stdin(避免伪 TTY 下 input() 报 EOF);代理策略见 telegram-scraper.build_proxy_config os.environ.setdefault("TELEGRAM_WEB_UI", "1") logger = logging.getLogger("uvicorn.error") def web_url_prefix() -> str: """反代子路径时设置 WEB_URL_PREFIX=/前缀(勿尾斜杠),与 nginx location 一致。""" return (os.getenv("WEB_URL_PREFIX") or "").strip().rstrip("/") def with_url_prefix(path: str) -> str: path = path if path.startswith("/") else f"/{path}" pre = web_url_prefix() return f"{pre}{path}" if pre else path def app_home_url(*, needauth: bool = False) -> str: pre = web_url_prefix() base = f"{pre}/" if pre else "/" if needauth: return f"{base}?needauth=1" return base WEB_CONSOLE_AUTH_KEY = "web_console_authenticated" def _web_console_password() -> str: return os.getenv("WEB_CONSOLE_PASSWORD", "Aa123456") def _web_session_secret() -> str: s = (os.getenv("WEB_CONSOLE_SESSION_SECRET") or "").strip() if s: return s return "telegram-scraper-web-console-dev-secret-change-me" def is_console_authed(request: Request) -> bool: return bool(request.session.get(WEB_CONSOLE_AUTH_KEY)) def redirect_if_console_unauthed(request: Request) -> Optional[RedirectResponse]: if is_console_authed(request): return None return RedirectResponse(url=app_home_url(needauth=True), status_code=303) def json_if_console_unauthed(request: Request) -> Optional[JSONResponse]: if is_console_authed(request): return None return JSONResponse({"ok": False, "error": "需要控制台密码验证"}, status_code=401) app = FastAPI(title="Telegram Scraper Web Console") app.add_middleware( SessionMiddleware, secret_key=_web_session_secret(), max_age=14 * 24 * 3600, same_site="lax", ) templates = Jinja2Templates(directory=str(TEMPLATES_DIR)) templates.env.globals["app_url"] = with_url_prefix templates.env.globals["url_prefix"] = web_url_prefix @app.get("/favicon.ico", include_in_schema=False) async def favicon_ico(): path = STATIC_DIR / "favicon.png" if path.is_file(): return FileResponse(path, media_type="image/png") return Response(status_code=204) _favicon_prefix = web_url_prefix() if _favicon_prefix: app.add_api_route( f"{_favicon_prefix}/favicon.ico", favicon_ico, methods=["GET"], include_in_schema=False, ) # Starlette 较新版本:TemplateResponse(request, name, context);旧版:(name, context) _template_response_new_style: Optional[bool] = None def template_response(request: Request, name: str, context: Dict[str, Any]): global _template_response_new_style if _template_response_new_style is None: keys = list(inspect.signature(templates.TemplateResponse).parameters.keys()) if keys and keys[0] == "self": keys = keys[1:] _template_response_new_style = bool(keys) and keys[0] == "request" ctx = dict(context) ctx.setdefault("request", request) if _template_response_new_style: return templates.TemplateResponse(request, name, ctx) return templates.TemplateResponse(name, ctx) @app.on_event("startup") async def _verify_runtime_files() -> None: idx = TEMPLATES_DIR / "index.html" if not idx.is_file(): raise RuntimeError(f"缺少模板文件(请确认挂载目录含 templates/):{idx.resolve()}") if not SCRIPT_FILE.is_file(): raise RuntimeError(f"缺少 telegram-scraper.py:{SCRIPT_FILE.resolve()}") def read_state_channel_maps() -> Tuple[Dict[str, str], Dict[str, str]]: if not STATE_FILE.exists(): return {}, {} try: raw = json.loads(STATE_FILE.read_text(encoding="utf-8")) titles = raw.get("channel_titles") or {} names = raw.get("channel_names") or {} if not isinstance(titles, dict): titles = {} if not isinstance(names, dict): names = {} return {str(k): str(v) for k, v in titles.items()}, {str(k): str(v) for k, v in names.items()} except Exception: return {}, {} def _as_str(v: Union[str, int, float, None]) -> str: if v is None: return "" return str(v).strip() def channel_display_name(cid: str, titles: Dict[str, str], names: Dict[str, str]) -> str: cid = str(cid) t = _as_str(titles.get(cid)) if t and t != cid and not (len(t) > 6 and t.startswith("-") and t[1:].replace("-", "").isdigit()): return t u = _as_str(names.get(cid)) or "no_username" if u and u != "no_username": return "@" + u.lstrip("@") return "未命名频道" BINARY_ENV_KEYS = frozenset( { "SCRAPE_MEDIA", "HEARTBEAT_ENABLED", "FORWARD_ONLY_NEW_MESSAGES", "FORWARD_ONLY_IF_OFFSET_POSITIVE", "FORWARD_RAW_MENTIONS", } ) CONFIG_KEYS: List[Tuple[str, str, str]] = [ ("API_ID", "Telegram API_ID", "text"), ("API_HASH", "Telegram API_HASH", "text"), ("PROXY_ENABLED", "启用代理(网页端默认关:仅填 1/true/on 才走代理;CLI 本地可留空+填 HOST)", "text"), ("PROXY_TYPE", "代理类型", "text"), ("PROXY_HOST", "代理主机", "text"), ("PROXY_PORT", "代理端口", "text"), ("PROXY_USERNAME", "代理账号", "text"), ("PROXY_PASSWORD", "代理密码", "password"), ("SCRAPE_MEDIA", "下载媒体(1开/0关)", "text"), ("HEARTBEAT_ENABLED", "心跳开关(1开/0关)", "text"), ("HEARTBEAT_INTERVAL_SECONDS", "心跳间隔秒", "text"), ("HEARTBEAT_TO_CHAT", "心跳目标", "text"), ("ACCOUNT_LIST_EXCLUDE_SUBSTRINGS", "账号列表隐藏(标题含子串则不展示,逗号分隔;* 不隐藏)", "text"), ("MONITOR_CHATS", "监控源(逗号分隔)", "text"), ("FORWARD_TO_CHAT", "推送目标(逗号分隔)", "text"), ("FORWARD_ONLY_NEW_MESSAGES", "仅新消息(1开/0关)", "text"), ("FORWARD_ONLY_IF_OFFSET_POSITIVE", "仅新消息别名", "text"), ("FORWARD_AS_USERNAME", "@替换目标用户名(不带@)", "text"), ("FORWARD_DELAY_SECONDS", "每条推送延迟秒", "text"), ("FORWARD_RAW_MENTIONS", "保留原@不替换(1开/0关)", "text"), ("TELEGRAM_HEADLESS_QR", "无界面扫码登录(1开:日志里打开二维码链接;服务器/Docker 用)", "text"), ("TELEGRAM_2FA_PASSWORD", "两步验证密码(仅 headless 扫码需要;勿提交 Git)", "password"), ] class WebScraperService: def __init__(self) -> None: self.module = None self.scraper = None self.init_lock = asyncio.Lock() self.job_lock = asyncio.Lock() self.job_task: Optional[asyncio.Task] = None self.job_name: str = "" self.job_started_at: Optional[float] = None self.logs: deque[str] = deque(maxlen=1200) self.continuous_task: Optional[asyncio.Task] = None self.continuous_started_at: Optional[float] = None self.continuous_start_lock = asyncio.Lock() def _append(self, text: str) -> None: ts = time.strftime("%Y-%m-%d %H:%M:%S") self.logs.append(f"[{ts}] {text}") def _load_module(self): if self.module is not None: return self.module spec = importlib.util.spec_from_file_location("telegram_scraper_web_core", str(SCRIPT_FILE)) if not spec or not spec.loader: raise RuntimeError("无法加载 telegram-scraper.py") module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) self.module = module return module async def ensure_ready(self) -> None: async with self.init_lock: if self.scraper and self.scraper.client and self.scraper.client.is_connected(): return module = self._load_module() if self.scraper is None: self.scraper = module.OptimizedTelegramScraper() self._append("正在初始化 Telegram 客户端...") ok = await self._run_and_capture(self.scraper.initialize_client()) if not ok: raise RuntimeError("客户端初始化失败,请检查 API_ID/API_HASH 或 session。") await self._run_and_capture(self.scraper.sync_monitor_chats_from_env()) self.scraper._apply_scrape_media_from_env() self._append("Telegram 客户端已就绪。") async def disconnect(self) -> None: await self.stop_continuous() if self.scraper and self.scraper.client: await self.scraper.client.disconnect() self._append("Telegram 客户端已断开。") def is_connected(self) -> bool: return bool(self.scraper and self.scraper.client and self.scraper.client.is_connected()) def current_job_uptime(self) -> str: if not self.job_started_at or not self.job_task or self.job_task.done(): return "-" seconds = int(time.time() - self.job_started_at) return f"{seconds // 3600:02d}:{(seconds % 3600) // 60:02d}:{seconds % 60:02d}" def is_job_running(self) -> bool: return self.job_task is not None and not self.job_task.done() def is_continuous_running(self) -> bool: return self.continuous_task is not None and not self.continuous_task.done() def continuous_uptime(self) -> str: if not self.continuous_started_at or not self.is_continuous_running(): return "-" seconds = int(time.time() - self.continuous_started_at) return f"{seconds // 3600:02d}:{(seconds % 3600) // 60:02d}:{seconds % 60:02d}" async def _run_and_capture(self, awaitable): buf = io.StringIO() with contextlib.redirect_stdout(buf), contextlib.redirect_stderr(buf): result = await awaitable captured = buf.getvalue().strip() if captured: for line in captured.splitlines(): if line.strip(): self._append(line.strip()) return result async def list_account_channels(self) -> List[Dict]: await self.ensure_ready() data = await self._run_and_capture(self.scraper.list_channels()) return data or [] async def list_monitored_channels(self) -> List[Dict[str, str]]: if not self.scraper: if not STATE_FILE.exists(): return [] try: raw = json.loads(STATE_FILE.read_text(encoding="utf-8")) channels = raw.get("channels") or {} if not isinstance(channels, dict): channels = {} names = raw.get("channel_names") or {} titles = raw.get("channel_titles") or {} if not isinstance(names, dict): names = {} if not isinstance(titles, dict): titles = {} return [ { "channel_id": str(cid), "last_message_id": str(last_id), "username": names.get(str(cid), "no_username"), "display_name": channel_display_name(str(cid), titles, names), } for cid, last_id in channels.items() ] except Exception: return [] titles = self.scraper.state.get("channel_titles") or {} names = self.scraper.state.get("channel_names") or {} if not isinstance(titles, dict): titles = {} if not isinstance(names, dict): names = {} rows: List[Dict[str, str]] = [] for cid, last_id in self.scraper.state.get("channels", {}).items(): rows.append( { "channel_id": cid, "last_message_id": str(last_id), "username": names.get(cid, "no_username"), "display_name": channel_display_name(str(cid), titles, names), } ) return rows async def add_channel(self, spec: str) -> str: await self.ensure_ready() s = (spec or "").strip() if not s: raise ValueError("频道标识不能为空(可用 @用户名、-100…、t.me 链接)") ent = await self.scraper._resolve_forward_entity(s) cid = str(self.module.utils.get_peer_id(ent)) username = getattr(ent, "username", None) or "no_username" title = getattr(ent, "title", None) or cid if cid in self.scraper.state["channels"]: return f"频道已存在:{title}({cid})" self.scraper.state["channels"][cid] = 0 if "channel_names" not in self.scraper.state: self.scraper.state["channel_names"] = {} self.scraper.state["channel_names"][cid] = username if "channel_titles" not in self.scraper.state: self.scraper.state["channel_titles"] = {} self.scraper.state["channel_titles"][cid] = str(title) if title else "" self.scraper.save_state() return f"已添加频道:{title}({cid})" async def remove_channel(self, spec: str) -> str: if not self.scraper: await self.ensure_ready() s = (spec or "").strip() if not s: raise ValueError("要移除的频道不能为空") targets = await self.resolve_monitored_channel_ids(s) removed = 0 for cid in targets: if cid in self.scraper.state["channels"]: del self.scraper.state["channels"][cid] if "channel_names" in self.scraper.state: self.scraper.state["channel_names"].pop(cid, None) if "channel_titles" in self.scraper.state: self.scraper.state["channel_titles"].pop(cid, None) removed += 1 if removed == 0: raise ValueError( "未移除任何频道。请使用:all;编号 1,2(与下方已监控列表从上到下对应);" "完整频道 ID -100…;或已在监控列表中的 @用户名 / t.me 链接。" ) self.scraper.save_state() return f"已移除 {removed} 个频道" async def resolve_monitored_channel_ids(self, selection: str) -> List[str]: """解析为当前已加入监控(state)的频道 ID:all / 序号 / -100… / @用户名 / t.me。""" if not self.scraper: return [] s = (selection or "").strip() or "all" primary = self.scraper.parse_channel_selection(s) if primary: return primary if s.lower() == "all": return self.scraper.parse_channel_selection("all") out: List[str] = [] for token in [x.strip() for x in s.split(",") if x.strip()]: part = self.scraper.parse_channel_selection(token) if part: out.extend(part) continue try: ent = await self.scraper._resolve_forward_entity(token) cid = str(self.module.utils.get_peer_id(ent)) if cid in self.scraper.state.get("channels", {}): out.append(cid) except Exception: continue return list(dict.fromkeys(out)) async def _run_job(self, name: str, coro) -> None: try: self._append(f"任务开始:{name}") await self._run_and_capture(coro) self._append(f"任务完成:{name}") except Exception as e: self._append(f"任务失败:{name},错误:{e}") finally: self.job_name = "" self.job_started_at = None async def start_job(self, name: str, coro) -> str: async with self.job_lock: if self.is_job_running(): raise RuntimeError(f"已有任务在运行:{self.job_name}") self.job_name = name self.job_started_at = time.time() self.job_task = asyncio.create_task(self._run_job(name, coro)) return f"任务已启动:{name}" async def start_scrape_job(self, selection: str) -> str: await self.ensure_ready() channels = await self.resolve_monitored_channel_ids(selection) if not channels: raise ValueError("没有可抓取频道,请先添加频道。") async def runner(): for cid in channels: await self.scraper.scrape_channel(cid, self.scraper.state["channels"][cid]) return await self.start_job(f"抓取({len(channels)}个频道)", runner()) async def start_export_job(self, selection: str) -> str: await self.ensure_ready() channels = await self.resolve_monitored_channel_ids(selection) if not channels: raise ValueError("没有可导出频道,请先添加频道。") async def runner(): for cid in channels: self.scraper.export_to_csv(cid) self.scraper.export_to_json(cid) return await self.start_job(f"导出({len(channels)}个频道)", runner()) async def start_rescrape_job(self, selection: str) -> str: await self.ensure_ready() channels = await self.resolve_monitored_channel_ids(selection) if not channels: raise ValueError("没有可补抓频道,请先添加频道。") async def runner(): for cid in channels: await self.scraper.rescrape_media(cid) return await self.start_job(f"补抓媒体({len(channels)}个频道)", runner()) async def start_continuous(self) -> str: async with self.continuous_start_lock: await self.ensure_ready() if self.is_continuous_running(): return "持续抓取已在运行中" if not self.scraper.state.get("channels"): raise ValueError("当前没有监控频道,请先添加频道。") async def runner(): try: await self._run_and_capture(self.scraper.continuous_scraping()) except asyncio.CancelledError: self._append("持续抓取任务已取消。") raise except Exception as e: self._append(f"持续抓取异常:{e}") self.continuous_started_at = time.time() self.continuous_task = asyncio.create_task(runner()) self._append("已启动持续抓取(含心跳逻辑)。") return "持续抓取已启动" async def stop_continuous(self) -> str: if not self.is_continuous_running(): return "持续抓取未运行" self.scraper.continuous_scraping_active = False self.continuous_task.cancel() try: await self.continuous_task except asyncio.CancelledError: pass self.continuous_task = None self.continuous_started_at = None self._append("已停止持续抓取。") return "持续抓取已停止" service = WebScraperService() def _parse_env_lines() -> List[str]: if not ENV_FILE.exists(): return [] return ENV_FILE.read_text(encoding="utf-8").splitlines() def read_env_dict() -> Dict[str, str]: values: Dict[str, str] = {} for line in _parse_env_lines(): s = line.strip() if not s or s.startswith("#") or "=" not in s: continue k, v = s.split("=", 1) values[k.strip()] = v.strip() return values def write_env_updates(updates: Dict[str, str]) -> None: existing_lines = _parse_env_lines() out_lines: List[str] = [] touched = set() for raw_line in existing_lines: s = raw_line.strip() if not s or s.startswith("#") or "=" not in s: out_lines.append(raw_line) continue k, _v = raw_line.split("=", 1) key = k.strip() if key in updates: out_lines.append(f"{key}={updates[key]}") touched.add(key) else: out_lines.append(raw_line) if out_lines and out_lines[-1].strip() != "": out_lines.append("") for key, value in updates.items(): if key not in touched: out_lines.append(f"{key}={value}") ENV_FILE.write_text("\n".join(out_lines).rstrip() + "\n", encoding="utf-8") DEFAULT_STATS_KEYWORDS: Tuple[str, ...] = ( "招聘", "急聘", "岗位", "求职", "诚聘", "内推", "hiring", "recruit", ) def find_channel_databases(base: Path) -> List[Tuple[str, Path]]: out: List[Tuple[str, Path]] = [] if not base.exists(): return out for d in base.iterdir(): if not d.is_dir(): continue name = d.name if name.startswith(".") or name == "__pycache__": continue cand = d / f"{name}.db" if cand.is_file(): out.append((name, cand)) return sorted(out, key=lambda x: x[0]) def count_state_monitored_channels() -> int: if not STATE_FILE.exists(): return 0 try: data = json.loads(STATE_FILE.read_text(encoding="utf-8")) return len(data.get("channels") or {}) except Exception: return 0 def normalize_stats_keywords_param(raw: str) -> Tuple[str, ...]: parts: List[str] = [] for x in (raw or "").split(","): s = x.strip().replace("%", "").replace("_", "") if s: parts.append(s) return tuple(parts) if parts else DEFAULT_STATS_KEYWORDS def compute_storage_stats(base: Path, days: int, keyword_list: Tuple[str, ...]) -> Dict[str, Any]: titles, names = read_state_channel_maps() dbs = find_channel_databases(base) end = datetime.utcnow().date() start = end - timedelta(days=max(days, 1) - 1) start_day = start.strftime("%Y-%m-%d") end_day = end.strftime("%Y-%m-%d") day_keys: List[str] = [] cur = start while cur <= end: day_keys.append(cur.strftime("%Y-%m-%d")) cur += timedelta(days=1) daily_total: Dict[str, int] = {d: 0 for d in day_keys} daily_kw: Dict[str, int] = {d: 0 for d in day_keys} total_all_time = 0 in_range_total = 0 in_range_kw = 0 by_channel: List[Dict[str, Any]] = [] kw_sql = " OR ".join(["(message LIKE ?)"] * len(keyword_list)) kw_params = [f"%{k}%" for k in keyword_list] for channel_id, path in dbs: conn = sqlite3.connect(str(path)) try: cur_sql = conn.cursor() cur_sql.execute("SELECT COUNT(*) FROM messages") total_all_time += int(cur_sql.fetchone()[0]) cur_sql.execute( "SELECT COUNT(*) FROM messages WHERE substr(date,1,10) >= ? AND substr(date,1,10) <= ?", (start_day, end_day), ) ch_range = int(cur_sql.fetchone()[0]) in_range_total += ch_range cur_sql.execute( f"SELECT COUNT(*) FROM messages WHERE substr(date,1,10) >= ? AND substr(date,1,10) <= ? AND ({kw_sql})", (start_day, end_day, *kw_params), ) ch_kw = int(cur_sql.fetchone()[0]) in_range_kw += ch_kw cur_sql.execute( "SELECT substr(date,1,10) AS d, COUNT(*) FROM messages WHERE substr(date,1,10) >= ? AND substr(date,1,10) <= ? GROUP BY d", (start_day, end_day), ) for d, c in cur_sql.fetchall(): ds = str(d) if ds in daily_total: daily_total[ds] += int(c) cur_sql.execute( f"SELECT substr(date,1,10) AS d, COUNT(*) FROM messages WHERE substr(date,1,10) >= ? AND substr(date,1,10) <= ? AND ({kw_sql}) GROUP BY d", (start_day, end_day, *kw_params), ) for d, c in cur_sql.fetchall(): ds = str(d) if ds in daily_kw: daily_kw[ds] += int(c) by_channel.append( { "channel_id": channel_id, "display_name": channel_display_name(str(channel_id), titles, names), "messages_in_range": ch_range, "job_like_in_range": ch_kw, } ) finally: conn.close() by_channel.sort(key=lambda x: -x["messages_in_range"]) return { "days": days, "range_start": start_day, "range_end": end_day, "keywords_used": list(keyword_list), "total_messages_all_time": total_all_time, "messages_in_range": in_range_total, "job_like_in_range": in_range_kw, "database_count": len(dbs), "by_channel": by_channel[:24], "daily_messages": [[d, daily_total[d]] for d in day_keys], "daily_job_like": [[d, daily_kw[d]] for d in day_keys], } @app.get("/auth/console/status") async def auth_console_status(request: Request): return {"ok": is_console_authed(request)} @app.post("/auth/console/login") async def auth_console_login(request: Request): ct = (request.headers.get("content-type") or "").lower() password = "" if "application/json" in ct: try: body = await request.json() password = str(body.get("password", "")) except Exception: password = "" else: form = await request.form() password = str(form.get("password", "")) expected = _web_console_password() try: ok = secrets.compare_digest(password.encode("utf-8"), expected.encode("utf-8")) except Exception: ok = False if ok: request.session[WEB_CONSOLE_AUTH_KEY] = True return {"ok": True} return JSONResponse({"ok": False, "error": "密码错误"}, status_code=401) @app.post("/auth/console/logout") async def auth_console_logout(request: Request): request.session.pop(WEB_CONSOLE_AUTH_KEY, None) return RedirectResponse(url=app_home_url(), status_code=303) @app.get("/api/stats/overview") async def api_stats_overview( days: int = Query(30, ge=1, le=366), keywords: str = Query("", description="逗号分隔关键词,用于估算「招聘类」消息;留空用内置词表"), ): kw = normalize_stats_keywords_param(keywords) def run() -> Dict[str, Any]: data = compute_storage_stats(BASE_DIR, days, kw) data["state_monitored_count"] = count_state_monitored_channels() return data loop = asyncio.get_running_loop() return await loop.run_in_executor(None, run) @app.get("/") async def index(request: Request): try: env_vals = read_env_dict() fields = [ {"key": k, "label": label, "type": field_type, "value": env_vals.get(k, "")} for k, label, field_type in CONFIG_KEYS ] monitored = await service.list_monitored_channels() monitored_ids = {str(row["channel_id"]) for row in monitored} account_channels: List[Dict] = [] err = "" authed = is_console_authed(request) if authed: try: account_channels = await service.list_account_channels() except Exception as e: err = str(e) logs_text = "\n".join(service.logs) if authed else "" return template_response( request, "index.html", { "fields": fields, "binary_env_keys": list(BINARY_ENV_KEYS), "monitored": monitored, "monitored_ids": monitored_ids, "account_channels": account_channels, "console_authed": authed, "need_auth_banner": request.query_params.get("needauth") == "1", "connected": service.is_connected(), "job_running": service.is_job_running(), "job_name": service.job_name, "job_uptime": service.current_job_uptime(), "continuous_running": service.is_continuous_running(), "continuous_uptime": service.continuous_uptime(), "logs": logs_text, "error": err, }, ) except Exception as e: logger.exception("GET / 渲染失败") tb = traceback.format_exc() try: service._append(f"首页异常:{e}") for line in tb.splitlines()[:80]: if line.strip(): service._append(line.strip()) except Exception: pass detail = tb[:12000] return HTMLResponse( "错误" "" "

首页 Internal Server Error

" "

常见原因:Docker 挂载目录里缺少 templates/telegram-scraper.py;或 state.json 格式异常。

" "
"
            + html.escape(detail)
            + "
", status_code=500, ) @app.post("/config") async def save_config(request: Request): redir = redirect_if_console_unauthed(request) if redir: return redir form = await request.form() updates: Dict[str, str] = {} for k, _label, _field_type in CONFIG_KEYS: if k in BINARY_ENV_KEYS: updates[k] = "1" if str(form.get(k, "")).strip() == "1" else "0" else: updates[k] = str(form.get(k, "")).strip() write_env_updates(updates) return RedirectResponse(url=app_home_url(), status_code=303) @app.post("/start") async def start_scraper(request: Request): redir = redirect_if_console_unauthed(request) if redir: return redir try: await service.ensure_ready() except Exception as e: service._append(f"初始化失败:{e}") return RedirectResponse(url=app_home_url(), status_code=303) @app.post("/stop") async def stop_scraper(request: Request): redir = redirect_if_console_unauthed(request) if redir: return redir await service.disconnect() return RedirectResponse(url=app_home_url(), status_code=303) @app.post("/channels/add") async def add_channel(request: Request, channel_spec: str = Form(...)): redir = redirect_if_console_unauthed(request) if redir: return redir try: msg = await service.add_channel(channel_spec) service._append(msg) except Exception as e: service._append(f"添加频道失败:{e}") return RedirectResponse(url=app_home_url(), status_code=303) @app.post("/channels/add-selected") async def add_channels_selected(request: Request): redir = redirect_if_console_unauthed(request) if redir: return redir form = await request.form() raw_ids = form.getlist("channel_id") ids = [str(x).strip() for x in raw_ids if str(x).strip()] if not ids: service._append("未选择任何频道,请在列表中勾选后再提交。") return RedirectResponse(url=app_home_url(), status_code=303) for spec in ids: try: msg = await service.add_channel(spec) service._append(msg) except Exception as e: service._append(f"添加失败({spec}):{e}") return RedirectResponse(url=app_home_url(), status_code=303) @app.post("/channels/remove") async def remove_channel(request: Request, channel_spec: str = Form(...)): redir = redirect_if_console_unauthed(request) if redir: return redir try: msg = await service.remove_channel(channel_spec) service._append(msg) except Exception as e: service._append(f"移除频道失败:{e}") return RedirectResponse(url=app_home_url(), status_code=303) @app.post("/jobs/scrape") async def start_scrape(request: Request, selection: str = Form("all")): redir = redirect_if_console_unauthed(request) if redir: return redir try: msg = await service.start_scrape_job(selection) service._append(msg) except Exception as e: service._append(f"启动抓取失败:{e}") return RedirectResponse(url=app_home_url(), status_code=303) @app.post("/jobs/export") async def start_export(request: Request, selection: str = Form("all")): redir = redirect_if_console_unauthed(request) if redir: return redir try: msg = await service.start_export_job(selection) service._append(msg) except Exception as e: service._append(f"启动导出失败:{e}") return RedirectResponse(url=app_home_url(), status_code=303) @app.post("/jobs/rescrape") async def start_rescrape(request: Request, selection: str = Form("all")): redir = redirect_if_console_unauthed(request) if redir: return redir try: msg = await service.start_rescrape_job(selection) service._append(msg) except Exception as e: service._append(f"启动补抓失败:{e}") return RedirectResponse(url=app_home_url(), status_code=303) @app.post("/jobs/continuous/start") async def start_continuous_route(request: Request): redir = redirect_if_console_unauthed(request) if redir: return redir async def _run_start_continuous() -> None: try: await service.start_continuous() except Exception as e: service._append(f"启动持续抓取失败:{e}") asyncio.create_task(_run_start_continuous()) service._append("已接收「启动持续抓取」:正在后台连接 Telegram 并启动(请勿重复点击;进度见运行日志)。") return RedirectResponse(url=app_home_url(), status_code=303) @app.post("/jobs/continuous/stop") async def stop_continuous(request: Request): redir = redirect_if_console_unauthed(request) if redir: return redir try: msg = await service.stop_continuous() service._append(msg) except Exception as e: service._append(f"停止持续抓取失败:{e}") return RedirectResponse(url=app_home_url(), status_code=303) @app.get("/status") async def status(request: Request): denied = json_if_console_unauthed(request) if denied: return denied return { "ready": service.scraper is not None, "connected": service.is_connected(), "job_running": service.is_job_running(), "job_name": service.job_name, "job_uptime": service.current_job_uptime(), "continuous_running": service.is_continuous_running(), "continuous_uptime": service.continuous_uptime(), "monitored_channels": await service.list_monitored_channels(), "logs": list(service.logs), } @app.get("/api/channels/monitored") async def api_monitored_channels(): return {"items": await service.list_monitored_channels()} @app.get("/api/channels/account") async def api_account_channels(request: Request): denied = json_if_console_unauthed(request) if denied: return denied try: items = await service.list_account_channels() return {"items": items} except Exception as e: return {"items": [], "error": str(e)} @app.post("/api/channels/add") async def api_add_channel(request: Request, channel_spec: str = Form(...)): denied = json_if_console_unauthed(request) if denied: return denied msg = await service.add_channel(channel_spec) service._append(msg) return {"ok": True, "message": msg} @app.post("/api/channels/remove") async def api_remove_channel(request: Request, channel_spec: str = Form(...)): denied = json_if_console_unauthed(request) if denied: return denied msg = await service.remove_channel(channel_spec) service._append(msg) return {"ok": True, "message": msg} @app.post("/api/jobs/scrape") async def api_job_scrape(request: Request, selection: str = Form("all")): denied = json_if_console_unauthed(request) if denied: return denied msg = await service.start_scrape_job(selection) service._append(msg) return {"ok": True, "message": msg} @app.post("/api/jobs/export") async def api_job_export(request: Request, selection: str = Form("all")): denied = json_if_console_unauthed(request) if denied: return denied msg = await service.start_export_job(selection) service._append(msg) return {"ok": True, "message": msg} @app.post("/api/jobs/rescrape") async def api_job_rescrape(request: Request, selection: str = Form("all")): denied = json_if_console_unauthed(request) if denied: return denied msg = await service.start_rescrape_job(selection) service._append(msg) return {"ok": True, "message": msg} @app.get("/api/jobs/status") async def api_job_status(request: Request): payload = { "connected": service.is_connected(), "job_running": service.is_job_running(), "job_name": service.job_name, "job_uptime": service.current_job_uptime(), "continuous_running": service.is_continuous_running(), "continuous_uptime": service.continuous_uptime(), "logs": list(service.logs) if is_console_authed(request) else [], } return payload @app.post("/api/jobs/continuous/start") async def api_start_continuous(request: Request): denied = json_if_console_unauthed(request) if denied: return denied msg = await service.start_continuous() service._append(msg) return {"ok": True, "message": msg} @app.post("/api/jobs/continuous/stop") async def api_stop_continuous(request: Request): denied = json_if_console_unauthed(request) if denied: return denied msg = await service.stop_continuous() service._append(msg) return {"ok": True, "message": msg} if __name__ == "__main__": import uvicorn _host = os.getenv("WEB_BIND_HOST", "0.0.0.0") _port = int(os.getenv("WEB_BIND_PORT", "8000")) uvicorn.run("app_web:app", host=_host, port=_port, reload=False)