1084 lines
38 KiB
Python
1084 lines
38 KiB
Python
import asyncio
|
||
import contextlib
|
||
import html
|
||
import importlib.util
|
||
import inspect
|
||
import io
|
||
import json
|
||
import logging
|
||
import os
|
||
import secrets
|
||
import sqlite3
|
||
import time
|
||
import traceback
|
||
from collections import deque
|
||
from datetime import datetime, timedelta
|
||
from pathlib import Path
|
||
from typing import Any, Dict, List, Optional, Tuple, Union
|
||
|
||
from fastapi import FastAPI, Form, Query, Request
|
||
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse, Response
|
||
from fastapi.templating import Jinja2Templates
|
||
from starlette.middleware.sessions import SessionMiddleware
|
||
|
||
BASE_DIR = Path(__file__).resolve().parent
|
||
STATIC_DIR = BASE_DIR / "static"
|
||
ENV_FILE = BASE_DIR / ".env"
|
||
SCRIPT_FILE = BASE_DIR / "telegram-scraper.py"
|
||
STATE_FILE = BASE_DIR / "state.json"
|
||
TEMPLATES_DIR = BASE_DIR / "templates"
|
||
|
||
# Web 进程:禁用交互式 stdin(避免伪 TTY 下 input() 报 EOF);代理策略见 telegram-scraper.build_proxy_config
|
||
os.environ.setdefault("TELEGRAM_WEB_UI", "1")
|
||
|
||
logger = logging.getLogger("uvicorn.error")
|
||
|
||
|
||
def web_url_prefix() -> str:
|
||
"""反代子路径时设置 WEB_URL_PREFIX=/前缀(勿尾斜杠),与 nginx location 一致。"""
|
||
return (os.getenv("WEB_URL_PREFIX") or "").strip().rstrip("/")
|
||
|
||
|
||
def with_url_prefix(path: str) -> str:
|
||
path = path if path.startswith("/") else f"/{path}"
|
||
pre = web_url_prefix()
|
||
return f"{pre}{path}" if pre else path
|
||
|
||
|
||
def app_home_url(*, needauth: bool = False) -> str:
|
||
pre = web_url_prefix()
|
||
base = f"{pre}/" if pre else "/"
|
||
if needauth:
|
||
return f"{base}?needauth=1"
|
||
return base
|
||
|
||
WEB_CONSOLE_AUTH_KEY = "web_console_authenticated"
|
||
|
||
|
||
def _web_console_password() -> str:
|
||
return os.getenv("WEB_CONSOLE_PASSWORD", "Aa123456")
|
||
|
||
|
||
def _web_session_secret() -> str:
|
||
s = (os.getenv("WEB_CONSOLE_SESSION_SECRET") or "").strip()
|
||
if s:
|
||
return s
|
||
return "telegram-scraper-web-console-dev-secret-change-me"
|
||
|
||
|
||
def is_console_authed(request: Request) -> bool:
|
||
return bool(request.session.get(WEB_CONSOLE_AUTH_KEY))
|
||
|
||
|
||
def redirect_if_console_unauthed(request: Request) -> Optional[RedirectResponse]:
|
||
if is_console_authed(request):
|
||
return None
|
||
return RedirectResponse(url=app_home_url(needauth=True), status_code=303)
|
||
|
||
|
||
def json_if_console_unauthed(request: Request) -> Optional[JSONResponse]:
|
||
if is_console_authed(request):
|
||
return None
|
||
return JSONResponse({"ok": False, "error": "需要控制台密码验证"}, status_code=401)
|
||
|
||
|
||
app = FastAPI(title="Telegram Scraper Web Console")
|
||
app.add_middleware(
|
||
SessionMiddleware,
|
||
secret_key=_web_session_secret(),
|
||
max_age=14 * 24 * 3600,
|
||
same_site="lax",
|
||
)
|
||
templates = Jinja2Templates(directory=str(TEMPLATES_DIR))
|
||
templates.env.globals["app_url"] = with_url_prefix
|
||
templates.env.globals["url_prefix"] = web_url_prefix
|
||
|
||
|
||
@app.get("/favicon.ico", include_in_schema=False)
|
||
async def favicon_ico():
|
||
path = STATIC_DIR / "favicon.png"
|
||
if path.is_file():
|
||
return FileResponse(path, media_type="image/png")
|
||
return Response(status_code=204)
|
||
|
||
|
||
_favicon_prefix = web_url_prefix()
|
||
if _favicon_prefix:
|
||
app.add_api_route(
|
||
f"{_favicon_prefix}/favicon.ico",
|
||
favicon_ico,
|
||
methods=["GET"],
|
||
include_in_schema=False,
|
||
)
|
||
|
||
# Starlette 较新版本:TemplateResponse(request, name, context);旧版:(name, context)
|
||
_template_response_new_style: Optional[bool] = None
|
||
|
||
|
||
def template_response(request: Request, name: str, context: Dict[str, Any]):
|
||
global _template_response_new_style
|
||
if _template_response_new_style is None:
|
||
keys = list(inspect.signature(templates.TemplateResponse).parameters.keys())
|
||
if keys and keys[0] == "self":
|
||
keys = keys[1:]
|
||
_template_response_new_style = bool(keys) and keys[0] == "request"
|
||
ctx = dict(context)
|
||
ctx.setdefault("request", request)
|
||
ctx["app_url"] = with_url_prefix
|
||
ctx["url_prefix"] = web_url_prefix()
|
||
if _template_response_new_style:
|
||
return templates.TemplateResponse(request, name, ctx)
|
||
return templates.TemplateResponse(name, ctx)
|
||
|
||
|
||
@app.on_event("startup")
|
||
async def _verify_runtime_files() -> None:
|
||
idx = TEMPLATES_DIR / "index.html"
|
||
if not idx.is_file():
|
||
raise RuntimeError(f"缺少模板文件(请确认挂载目录含 templates/):{idx.resolve()}")
|
||
if not SCRIPT_FILE.is_file():
|
||
raise RuntimeError(f"缺少 telegram-scraper.py:{SCRIPT_FILE.resolve()}")
|
||
|
||
|
||
def read_state_channel_maps() -> Tuple[Dict[str, str], Dict[str, str]]:
|
||
if not STATE_FILE.exists():
|
||
return {}, {}
|
||
try:
|
||
raw = json.loads(STATE_FILE.read_text(encoding="utf-8"))
|
||
titles = raw.get("channel_titles") or {}
|
||
names = raw.get("channel_names") or {}
|
||
if not isinstance(titles, dict):
|
||
titles = {}
|
||
if not isinstance(names, dict):
|
||
names = {}
|
||
return {str(k): str(v) for k, v in titles.items()}, {str(k): str(v) for k, v in names.items()}
|
||
except Exception:
|
||
return {}, {}
|
||
|
||
|
||
def _as_str(v: Union[str, int, float, None]) -> str:
|
||
if v is None:
|
||
return ""
|
||
return str(v).strip()
|
||
|
||
|
||
def channel_display_name(cid: str, titles: Dict[str, str], names: Dict[str, str]) -> str:
|
||
cid = str(cid)
|
||
t = _as_str(titles.get(cid))
|
||
if t and t != cid and not (len(t) > 6 and t.startswith("-") and t[1:].replace("-", "").isdigit()):
|
||
return t
|
||
u = _as_str(names.get(cid)) or "no_username"
|
||
if u and u != "no_username":
|
||
return "@" + u.lstrip("@")
|
||
return "未命名频道"
|
||
|
||
|
||
BINARY_ENV_KEYS = frozenset(
|
||
{
|
||
"SCRAPE_MEDIA",
|
||
"HEARTBEAT_ENABLED",
|
||
"FORWARD_ONLY_NEW_MESSAGES",
|
||
"FORWARD_ONLY_IF_OFFSET_POSITIVE",
|
||
"FORWARD_RAW_MENTIONS",
|
||
}
|
||
)
|
||
|
||
CONFIG_KEYS: List[Tuple[str, str, str]] = [
|
||
("API_ID", "Telegram API_ID", "text"),
|
||
("API_HASH", "Telegram API_HASH", "text"),
|
||
("PROXY_ENABLED", "启用代理(网页端默认关:仅填 1/true/on 才走代理;CLI 本地可留空+填 HOST)", "text"),
|
||
("PROXY_TYPE", "代理类型", "text"),
|
||
("PROXY_HOST", "代理主机", "text"),
|
||
("PROXY_PORT", "代理端口", "text"),
|
||
("PROXY_USERNAME", "代理账号", "text"),
|
||
("PROXY_PASSWORD", "代理密码", "password"),
|
||
("SCRAPE_MEDIA", "下载媒体(1开/0关)", "text"),
|
||
("HEARTBEAT_ENABLED", "心跳开关(1开/0关)", "text"),
|
||
("HEARTBEAT_INTERVAL_SECONDS", "心跳间隔秒", "text"),
|
||
("HEARTBEAT_TO_CHAT", "心跳目标", "text"),
|
||
("ACCOUNT_LIST_EXCLUDE_SUBSTRINGS", "账号列表隐藏(标题含子串则不展示,逗号分隔;* 不隐藏)", "text"),
|
||
("MONITOR_CHATS", "监控源(逗号分隔)", "text"),
|
||
("FORWARD_TO_CHAT", "推送目标(逗号分隔)", "text"),
|
||
("FORWARD_ONLY_NEW_MESSAGES", "仅新消息(1开/0关)", "text"),
|
||
("FORWARD_ONLY_IF_OFFSET_POSITIVE", "仅新消息别名", "text"),
|
||
("FORWARD_AS_USERNAME", "@替换目标用户名(不带@)", "text"),
|
||
("FORWARD_DELAY_SECONDS", "每条推送延迟秒", "text"),
|
||
("FORWARD_RAW_MENTIONS", "保留原@不替换(1开/0关)", "text"),
|
||
("TELEGRAM_HEADLESS_QR", "无界面扫码登录(1开:日志里打开二维码链接;服务器/Docker 用)", "text"),
|
||
("TELEGRAM_2FA_PASSWORD", "两步验证密码(仅 headless 扫码需要;勿提交 Git)", "password"),
|
||
]
|
||
|
||
|
||
class WebScraperService:
|
||
def __init__(self) -> None:
|
||
self.module = None
|
||
self.scraper = None
|
||
self.init_lock = asyncio.Lock()
|
||
self.job_lock = asyncio.Lock()
|
||
self.job_task: Optional[asyncio.Task] = None
|
||
self.job_name: str = ""
|
||
self.job_started_at: Optional[float] = None
|
||
self.logs: deque[str] = deque(maxlen=1200)
|
||
self.continuous_task: Optional[asyncio.Task] = None
|
||
self.continuous_started_at: Optional[float] = None
|
||
self.continuous_start_lock = asyncio.Lock()
|
||
|
||
def _append(self, text: str) -> None:
|
||
ts = time.strftime("%Y-%m-%d %H:%M:%S")
|
||
self.logs.append(f"[{ts}] {text}")
|
||
|
||
def _load_module(self):
|
||
if self.module is not None:
|
||
return self.module
|
||
spec = importlib.util.spec_from_file_location("telegram_scraper_web_core", str(SCRIPT_FILE))
|
||
if not spec or not spec.loader:
|
||
raise RuntimeError("无法加载 telegram-scraper.py")
|
||
module = importlib.util.module_from_spec(spec)
|
||
spec.loader.exec_module(module)
|
||
self.module = module
|
||
return module
|
||
|
||
async def ensure_ready(self) -> None:
|
||
async with self.init_lock:
|
||
if self.scraper and self.scraper.client and self.scraper.client.is_connected():
|
||
return
|
||
module = self._load_module()
|
||
if self.scraper is None:
|
||
self.scraper = module.OptimizedTelegramScraper()
|
||
self._append("正在初始化 Telegram 客户端...")
|
||
ok = await self._run_and_capture(self.scraper.initialize_client())
|
||
if not ok:
|
||
raise RuntimeError("客户端初始化失败,请检查 API_ID/API_HASH 或 session。")
|
||
await self._run_and_capture(self.scraper.sync_monitor_chats_from_env())
|
||
self.scraper._apply_scrape_media_from_env()
|
||
self._append("Telegram 客户端已就绪。")
|
||
|
||
async def disconnect(self) -> None:
|
||
await self.stop_continuous()
|
||
if self.scraper and self.scraper.client:
|
||
await self.scraper.client.disconnect()
|
||
self._append("Telegram 客户端已断开。")
|
||
|
||
def is_connected(self) -> bool:
|
||
return bool(self.scraper and self.scraper.client and self.scraper.client.is_connected())
|
||
|
||
def current_job_uptime(self) -> str:
|
||
if not self.job_started_at or not self.job_task or self.job_task.done():
|
||
return "-"
|
||
seconds = int(time.time() - self.job_started_at)
|
||
return f"{seconds // 3600:02d}:{(seconds % 3600) // 60:02d}:{seconds % 60:02d}"
|
||
|
||
def is_job_running(self) -> bool:
|
||
return self.job_task is not None and not self.job_task.done()
|
||
|
||
def is_continuous_running(self) -> bool:
|
||
return self.continuous_task is not None and not self.continuous_task.done()
|
||
|
||
def continuous_uptime(self) -> str:
|
||
if not self.continuous_started_at or not self.is_continuous_running():
|
||
return "-"
|
||
seconds = int(time.time() - self.continuous_started_at)
|
||
return f"{seconds // 3600:02d}:{(seconds % 3600) // 60:02d}:{seconds % 60:02d}"
|
||
|
||
async def _run_and_capture(self, awaitable):
|
||
buf = io.StringIO()
|
||
with contextlib.redirect_stdout(buf), contextlib.redirect_stderr(buf):
|
||
result = await awaitable
|
||
captured = buf.getvalue().strip()
|
||
if captured:
|
||
for line in captured.splitlines():
|
||
if line.strip():
|
||
self._append(line.strip())
|
||
return result
|
||
|
||
async def list_account_channels(self) -> List[Dict]:
|
||
await self.ensure_ready()
|
||
data = await self._run_and_capture(self.scraper.list_channels())
|
||
return data or []
|
||
|
||
async def list_monitored_channels(self) -> List[Dict[str, str]]:
|
||
if not self.scraper:
|
||
if not STATE_FILE.exists():
|
||
return []
|
||
try:
|
||
raw = json.loads(STATE_FILE.read_text(encoding="utf-8"))
|
||
channels = raw.get("channels") or {}
|
||
if not isinstance(channels, dict):
|
||
channels = {}
|
||
names = raw.get("channel_names") or {}
|
||
titles = raw.get("channel_titles") or {}
|
||
if not isinstance(names, dict):
|
||
names = {}
|
||
if not isinstance(titles, dict):
|
||
titles = {}
|
||
return [
|
||
{
|
||
"channel_id": str(cid),
|
||
"last_message_id": str(last_id),
|
||
"username": names.get(str(cid), "no_username"),
|
||
"display_name": channel_display_name(str(cid), titles, names),
|
||
}
|
||
for cid, last_id in channels.items()
|
||
]
|
||
except Exception:
|
||
return []
|
||
titles = self.scraper.state.get("channel_titles") or {}
|
||
names = self.scraper.state.get("channel_names") or {}
|
||
if not isinstance(titles, dict):
|
||
titles = {}
|
||
if not isinstance(names, dict):
|
||
names = {}
|
||
rows: List[Dict[str, str]] = []
|
||
for cid, last_id in self.scraper.state.get("channels", {}).items():
|
||
rows.append(
|
||
{
|
||
"channel_id": cid,
|
||
"last_message_id": str(last_id),
|
||
"username": names.get(cid, "no_username"),
|
||
"display_name": channel_display_name(str(cid), titles, names),
|
||
}
|
||
)
|
||
return rows
|
||
|
||
async def add_channel(self, spec: str) -> str:
|
||
await self.ensure_ready()
|
||
s = (spec or "").strip()
|
||
if not s:
|
||
raise ValueError("频道标识不能为空(可用 @用户名、-100…、t.me 链接)")
|
||
ent = await self.scraper._resolve_forward_entity(s)
|
||
cid = str(self.module.utils.get_peer_id(ent))
|
||
username = getattr(ent, "username", None) or "no_username"
|
||
title = getattr(ent, "title", None) or cid
|
||
if cid in self.scraper.state["channels"]:
|
||
return f"频道已存在:{title}({cid})"
|
||
self.scraper.state["channels"][cid] = 0
|
||
if "channel_names" not in self.scraper.state:
|
||
self.scraper.state["channel_names"] = {}
|
||
self.scraper.state["channel_names"][cid] = username
|
||
if "channel_titles" not in self.scraper.state:
|
||
self.scraper.state["channel_titles"] = {}
|
||
self.scraper.state["channel_titles"][cid] = str(title) if title else ""
|
||
self.scraper.save_state()
|
||
return f"已添加频道:{title}({cid})"
|
||
|
||
async def remove_channel(self, spec: str) -> str:
|
||
if not self.scraper:
|
||
await self.ensure_ready()
|
||
s = (spec or "").strip()
|
||
if not s:
|
||
raise ValueError("要移除的频道不能为空")
|
||
targets = await self.resolve_monitored_channel_ids(s)
|
||
removed = 0
|
||
for cid in targets:
|
||
if cid in self.scraper.state["channels"]:
|
||
del self.scraper.state["channels"][cid]
|
||
if "channel_names" in self.scraper.state:
|
||
self.scraper.state["channel_names"].pop(cid, None)
|
||
if "channel_titles" in self.scraper.state:
|
||
self.scraper.state["channel_titles"].pop(cid, None)
|
||
removed += 1
|
||
if removed == 0:
|
||
raise ValueError(
|
||
"未移除任何频道。请使用:all;编号 1,2(与下方已监控列表从上到下对应);"
|
||
"完整频道 ID -100…;或已在监控列表中的 @用户名 / t.me 链接。"
|
||
)
|
||
self.scraper.save_state()
|
||
return f"已移除 {removed} 个频道"
|
||
|
||
async def resolve_monitored_channel_ids(self, selection: str) -> List[str]:
|
||
"""解析为当前已加入监控(state)的频道 ID:all / 序号 / -100… / @用户名 / t.me。"""
|
||
if not self.scraper:
|
||
return []
|
||
s = (selection or "").strip() or "all"
|
||
primary = self.scraper.parse_channel_selection(s)
|
||
if primary:
|
||
return primary
|
||
if s.lower() == "all":
|
||
return self.scraper.parse_channel_selection("all")
|
||
out: List[str] = []
|
||
for token in [x.strip() for x in s.split(",") if x.strip()]:
|
||
part = self.scraper.parse_channel_selection(token)
|
||
if part:
|
||
out.extend(part)
|
||
continue
|
||
try:
|
||
ent = await self.scraper._resolve_forward_entity(token)
|
||
cid = str(self.module.utils.get_peer_id(ent))
|
||
if cid in self.scraper.state.get("channels", {}):
|
||
out.append(cid)
|
||
except Exception:
|
||
continue
|
||
return list(dict.fromkeys(out))
|
||
|
||
async def _run_job(self, name: str, coro) -> None:
|
||
try:
|
||
self._append(f"任务开始:{name}")
|
||
await self._run_and_capture(coro)
|
||
self._append(f"任务完成:{name}")
|
||
except Exception as e:
|
||
self._append(f"任务失败:{name},错误:{e}")
|
||
finally:
|
||
self.job_name = ""
|
||
self.job_started_at = None
|
||
|
||
async def start_job(self, name: str, coro) -> str:
|
||
async with self.job_lock:
|
||
if self.is_job_running():
|
||
raise RuntimeError(f"已有任务在运行:{self.job_name}")
|
||
self.job_name = name
|
||
self.job_started_at = time.time()
|
||
self.job_task = asyncio.create_task(self._run_job(name, coro))
|
||
return f"任务已启动:{name}"
|
||
|
||
async def start_scrape_job(self, selection: str) -> str:
|
||
await self.ensure_ready()
|
||
channels = await self.resolve_monitored_channel_ids(selection)
|
||
if not channels:
|
||
raise ValueError("没有可抓取频道,请先添加频道。")
|
||
|
||
async def runner():
|
||
for cid in channels:
|
||
await self.scraper.scrape_channel(cid, self.scraper.state["channels"][cid])
|
||
|
||
return await self.start_job(f"抓取({len(channels)}个频道)", runner())
|
||
|
||
async def start_export_job(self, selection: str) -> str:
|
||
await self.ensure_ready()
|
||
channels = await self.resolve_monitored_channel_ids(selection)
|
||
if not channels:
|
||
raise ValueError("没有可导出频道,请先添加频道。")
|
||
|
||
async def runner():
|
||
for cid in channels:
|
||
self.scraper.export_to_csv(cid)
|
||
self.scraper.export_to_json(cid)
|
||
|
||
return await self.start_job(f"导出({len(channels)}个频道)", runner())
|
||
|
||
async def start_rescrape_job(self, selection: str) -> str:
|
||
await self.ensure_ready()
|
||
channels = await self.resolve_monitored_channel_ids(selection)
|
||
if not channels:
|
||
raise ValueError("没有可补抓频道,请先添加频道。")
|
||
|
||
async def runner():
|
||
for cid in channels:
|
||
await self.scraper.rescrape_media(cid)
|
||
|
||
return await self.start_job(f"补抓媒体({len(channels)}个频道)", runner())
|
||
|
||
async def start_continuous(self) -> str:
|
||
async with self.continuous_start_lock:
|
||
await self.ensure_ready()
|
||
if self.is_continuous_running():
|
||
return "持续抓取已在运行中"
|
||
if not self.scraper.state.get("channels"):
|
||
raise ValueError("当前没有监控频道,请先添加频道。")
|
||
|
||
async def runner():
|
||
try:
|
||
await self._run_and_capture(self.scraper.continuous_scraping())
|
||
except asyncio.CancelledError:
|
||
self._append("持续抓取任务已取消。")
|
||
raise
|
||
except Exception as e:
|
||
self._append(f"持续抓取异常:{e}")
|
||
|
||
self.continuous_started_at = time.time()
|
||
self.continuous_task = asyncio.create_task(runner())
|
||
self._append("已启动持续抓取(含心跳逻辑)。")
|
||
return "持续抓取已启动"
|
||
|
||
async def stop_continuous(self) -> str:
|
||
if not self.is_continuous_running():
|
||
return "持续抓取未运行"
|
||
self.scraper.continuous_scraping_active = False
|
||
self.continuous_task.cancel()
|
||
try:
|
||
await self.continuous_task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
self.continuous_task = None
|
||
self.continuous_started_at = None
|
||
self._append("已停止持续抓取。")
|
||
return "持续抓取已停止"
|
||
|
||
|
||
service = WebScraperService()
|
||
|
||
|
||
def _parse_env_lines() -> List[str]:
|
||
if not ENV_FILE.exists():
|
||
return []
|
||
return ENV_FILE.read_text(encoding="utf-8").splitlines()
|
||
|
||
|
||
def read_env_dict() -> Dict[str, str]:
|
||
values: Dict[str, str] = {}
|
||
for line in _parse_env_lines():
|
||
s = line.strip()
|
||
if not s or s.startswith("#") or "=" not in s:
|
||
continue
|
||
k, v = s.split("=", 1)
|
||
values[k.strip()] = v.strip()
|
||
return values
|
||
|
||
|
||
def write_env_updates(updates: Dict[str, str]) -> None:
|
||
existing_lines = _parse_env_lines()
|
||
out_lines: List[str] = []
|
||
touched = set()
|
||
|
||
for raw_line in existing_lines:
|
||
s = raw_line.strip()
|
||
if not s or s.startswith("#") or "=" not in s:
|
||
out_lines.append(raw_line)
|
||
continue
|
||
k, _v = raw_line.split("=", 1)
|
||
key = k.strip()
|
||
if key in updates:
|
||
out_lines.append(f"{key}={updates[key]}")
|
||
touched.add(key)
|
||
else:
|
||
out_lines.append(raw_line)
|
||
|
||
if out_lines and out_lines[-1].strip() != "":
|
||
out_lines.append("")
|
||
|
||
for key, value in updates.items():
|
||
if key not in touched:
|
||
out_lines.append(f"{key}={value}")
|
||
|
||
ENV_FILE.write_text("\n".join(out_lines).rstrip() + "\n", encoding="utf-8")
|
||
|
||
|
||
DEFAULT_STATS_KEYWORDS: Tuple[str, ...] = (
|
||
"招聘",
|
||
"急聘",
|
||
"岗位",
|
||
"求职",
|
||
"诚聘",
|
||
"内推",
|
||
"hiring",
|
||
"recruit",
|
||
)
|
||
|
||
|
||
def find_channel_databases(base: Path) -> List[Tuple[str, Path]]:
|
||
out: List[Tuple[str, Path]] = []
|
||
if not base.exists():
|
||
return out
|
||
for d in base.iterdir():
|
||
if not d.is_dir():
|
||
continue
|
||
name = d.name
|
||
if name.startswith(".") or name == "__pycache__":
|
||
continue
|
||
cand = d / f"{name}.db"
|
||
if cand.is_file():
|
||
out.append((name, cand))
|
||
return sorted(out, key=lambda x: x[0])
|
||
|
||
|
||
def count_state_monitored_channels() -> int:
|
||
if not STATE_FILE.exists():
|
||
return 0
|
||
try:
|
||
data = json.loads(STATE_FILE.read_text(encoding="utf-8"))
|
||
return len(data.get("channels") or {})
|
||
except Exception:
|
||
return 0
|
||
|
||
|
||
def normalize_stats_keywords_param(raw: str) -> Tuple[str, ...]:
|
||
parts: List[str] = []
|
||
for x in (raw or "").split(","):
|
||
s = x.strip().replace("%", "").replace("_", "")
|
||
if s:
|
||
parts.append(s)
|
||
return tuple(parts) if parts else DEFAULT_STATS_KEYWORDS
|
||
|
||
|
||
def compute_storage_stats(base: Path, days: int, keyword_list: Tuple[str, ...]) -> Dict[str, Any]:
|
||
titles, names = read_state_channel_maps()
|
||
dbs = find_channel_databases(base)
|
||
end = datetime.utcnow().date()
|
||
start = end - timedelta(days=max(days, 1) - 1)
|
||
start_day = start.strftime("%Y-%m-%d")
|
||
end_day = end.strftime("%Y-%m-%d")
|
||
|
||
day_keys: List[str] = []
|
||
cur = start
|
||
while cur <= end:
|
||
day_keys.append(cur.strftime("%Y-%m-%d"))
|
||
cur += timedelta(days=1)
|
||
|
||
daily_total: Dict[str, int] = {d: 0 for d in day_keys}
|
||
daily_kw: Dict[str, int] = {d: 0 for d in day_keys}
|
||
|
||
total_all_time = 0
|
||
in_range_total = 0
|
||
in_range_kw = 0
|
||
by_channel: List[Dict[str, Any]] = []
|
||
|
||
kw_sql = " OR ".join(["(message LIKE ?)"] * len(keyword_list))
|
||
kw_params = [f"%{k}%" for k in keyword_list]
|
||
|
||
for channel_id, path in dbs:
|
||
conn = sqlite3.connect(str(path))
|
||
try:
|
||
cur_sql = conn.cursor()
|
||
cur_sql.execute("SELECT COUNT(*) FROM messages")
|
||
total_all_time += int(cur_sql.fetchone()[0])
|
||
|
||
cur_sql.execute(
|
||
"SELECT COUNT(*) FROM messages WHERE substr(date,1,10) >= ? AND substr(date,1,10) <= ?",
|
||
(start_day, end_day),
|
||
)
|
||
ch_range = int(cur_sql.fetchone()[0])
|
||
in_range_total += ch_range
|
||
|
||
cur_sql.execute(
|
||
f"SELECT COUNT(*) FROM messages WHERE substr(date,1,10) >= ? AND substr(date,1,10) <= ? AND ({kw_sql})",
|
||
(start_day, end_day, *kw_params),
|
||
)
|
||
ch_kw = int(cur_sql.fetchone()[0])
|
||
in_range_kw += ch_kw
|
||
|
||
cur_sql.execute(
|
||
"SELECT substr(date,1,10) AS d, COUNT(*) FROM messages WHERE substr(date,1,10) >= ? AND substr(date,1,10) <= ? GROUP BY d",
|
||
(start_day, end_day),
|
||
)
|
||
for d, c in cur_sql.fetchall():
|
||
ds = str(d)
|
||
if ds in daily_total:
|
||
daily_total[ds] += int(c)
|
||
|
||
cur_sql.execute(
|
||
f"SELECT substr(date,1,10) AS d, COUNT(*) FROM messages WHERE substr(date,1,10) >= ? AND substr(date,1,10) <= ? AND ({kw_sql}) GROUP BY d",
|
||
(start_day, end_day, *kw_params),
|
||
)
|
||
for d, c in cur_sql.fetchall():
|
||
ds = str(d)
|
||
if ds in daily_kw:
|
||
daily_kw[ds] += int(c)
|
||
|
||
by_channel.append(
|
||
{
|
||
"channel_id": channel_id,
|
||
"display_name": channel_display_name(str(channel_id), titles, names),
|
||
"messages_in_range": ch_range,
|
||
"job_like_in_range": ch_kw,
|
||
}
|
||
)
|
||
finally:
|
||
conn.close()
|
||
|
||
by_channel.sort(key=lambda x: -x["messages_in_range"])
|
||
return {
|
||
"days": days,
|
||
"range_start": start_day,
|
||
"range_end": end_day,
|
||
"keywords_used": list(keyword_list),
|
||
"total_messages_all_time": total_all_time,
|
||
"messages_in_range": in_range_total,
|
||
"job_like_in_range": in_range_kw,
|
||
"database_count": len(dbs),
|
||
"by_channel": by_channel[:24],
|
||
"daily_messages": [[d, daily_total[d]] for d in day_keys],
|
||
"daily_job_like": [[d, daily_kw[d]] for d in day_keys],
|
||
}
|
||
|
||
|
||
@app.get("/auth/console/status")
|
||
async def auth_console_status(request: Request):
|
||
return {"ok": is_console_authed(request)}
|
||
|
||
|
||
@app.post("/auth/console/login")
|
||
async def auth_console_login(request: Request):
|
||
ct = (request.headers.get("content-type") or "").lower()
|
||
password = ""
|
||
if "application/json" in ct:
|
||
try:
|
||
body = await request.json()
|
||
password = str(body.get("password", ""))
|
||
except Exception:
|
||
password = ""
|
||
else:
|
||
form = await request.form()
|
||
password = str(form.get("password", ""))
|
||
expected = _web_console_password()
|
||
try:
|
||
ok = secrets.compare_digest(password.encode("utf-8"), expected.encode("utf-8"))
|
||
except Exception:
|
||
ok = False
|
||
if ok:
|
||
request.session[WEB_CONSOLE_AUTH_KEY] = True
|
||
return {"ok": True}
|
||
return JSONResponse({"ok": False, "error": "密码错误"}, status_code=401)
|
||
|
||
|
||
@app.post("/auth/console/logout")
|
||
async def auth_console_logout(request: Request):
|
||
request.session.pop(WEB_CONSOLE_AUTH_KEY, None)
|
||
return RedirectResponse(url=app_home_url(), status_code=303)
|
||
|
||
|
||
@app.get("/api/stats/overview")
|
||
async def api_stats_overview(
|
||
days: int = Query(30, ge=1, le=366),
|
||
keywords: str = Query("", description="逗号分隔关键词,用于估算「招聘类」消息;留空用内置词表"),
|
||
):
|
||
kw = normalize_stats_keywords_param(keywords)
|
||
|
||
def run() -> Dict[str, Any]:
|
||
data = compute_storage_stats(BASE_DIR, days, kw)
|
||
data["state_monitored_count"] = count_state_monitored_channels()
|
||
return data
|
||
|
||
loop = asyncio.get_running_loop()
|
||
return await loop.run_in_executor(None, run)
|
||
|
||
|
||
@app.get("/")
|
||
async def index(request: Request):
|
||
try:
|
||
env_vals = read_env_dict()
|
||
fields = [
|
||
{"key": k, "label": label, "type": field_type, "value": env_vals.get(k, "")}
|
||
for k, label, field_type in CONFIG_KEYS
|
||
]
|
||
monitored = await service.list_monitored_channels()
|
||
monitored_ids = {str(row["channel_id"]) for row in monitored}
|
||
account_channels: List[Dict] = []
|
||
err = ""
|
||
authed = is_console_authed(request)
|
||
if authed:
|
||
try:
|
||
account_channels = await service.list_account_channels()
|
||
except Exception as e:
|
||
err = str(e)
|
||
logs_text = "\n".join(service.logs) if authed else ""
|
||
return template_response(
|
||
request,
|
||
"index.html",
|
||
{
|
||
"fields": fields,
|
||
"binary_env_keys": list(BINARY_ENV_KEYS),
|
||
"monitored": monitored,
|
||
"monitored_ids": monitored_ids,
|
||
"account_channels": account_channels,
|
||
"console_authed": authed,
|
||
"need_auth_banner": request.query_params.get("needauth") == "1",
|
||
"connected": service.is_connected(),
|
||
"job_running": service.is_job_running(),
|
||
"job_name": service.job_name,
|
||
"job_uptime": service.current_job_uptime(),
|
||
"continuous_running": service.is_continuous_running(),
|
||
"continuous_uptime": service.continuous_uptime(),
|
||
"logs": logs_text,
|
||
"error": err,
|
||
},
|
||
)
|
||
except Exception as e:
|
||
logger.exception("GET / 渲染失败")
|
||
tb = traceback.format_exc()
|
||
try:
|
||
service._append(f"首页异常:{e}")
|
||
for line in tb.splitlines()[:80]:
|
||
if line.strip():
|
||
service._append(line.strip())
|
||
except Exception:
|
||
pass
|
||
detail = tb[:12000]
|
||
return HTMLResponse(
|
||
"<!DOCTYPE html><html lang=\"zh-CN\"><head><meta charset=\"utf-8\"/><title>错误</title></head>"
|
||
"<body style=\"font-family:system-ui;padding:16px;background:#0f172a;color:#e2e8f0;\">"
|
||
"<h1>首页 Internal Server Error</h1>"
|
||
"<p>常见原因:Docker 挂载目录里缺少 <code>templates/</code> 或 <code>telegram-scraper.py</code>;或 <code>state.json</code> 格式异常。</p>"
|
||
"<pre style=\"white-space:pre-wrap;word-break:break-all;font-size:12px;\">"
|
||
+ html.escape(detail)
|
||
+ "</pre></body></html>",
|
||
status_code=500,
|
||
)
|
||
|
||
|
||
@app.post("/config")
|
||
async def save_config(request: Request):
|
||
redir = redirect_if_console_unauthed(request)
|
||
if redir:
|
||
return redir
|
||
form = await request.form()
|
||
updates: Dict[str, str] = {}
|
||
for k, _label, _field_type in CONFIG_KEYS:
|
||
if k in BINARY_ENV_KEYS:
|
||
updates[k] = "1" if str(form.get(k, "")).strip() == "1" else "0"
|
||
else:
|
||
updates[k] = str(form.get(k, "")).strip()
|
||
write_env_updates(updates)
|
||
return RedirectResponse(url=app_home_url(), status_code=303)
|
||
|
||
|
||
@app.post("/start")
|
||
async def start_scraper(request: Request):
|
||
redir = redirect_if_console_unauthed(request)
|
||
if redir:
|
||
return redir
|
||
try:
|
||
await service.ensure_ready()
|
||
except Exception as e:
|
||
service._append(f"初始化失败:{e}")
|
||
return RedirectResponse(url=app_home_url(), status_code=303)
|
||
|
||
|
||
@app.post("/stop")
|
||
async def stop_scraper(request: Request):
|
||
redir = redirect_if_console_unauthed(request)
|
||
if redir:
|
||
return redir
|
||
await service.disconnect()
|
||
return RedirectResponse(url=app_home_url(), status_code=303)
|
||
|
||
|
||
@app.post("/channels/add")
|
||
async def add_channel(request: Request, channel_spec: str = Form(...)):
|
||
redir = redirect_if_console_unauthed(request)
|
||
if redir:
|
||
return redir
|
||
try:
|
||
msg = await service.add_channel(channel_spec)
|
||
service._append(msg)
|
||
except Exception as e:
|
||
service._append(f"添加频道失败:{e}")
|
||
return RedirectResponse(url=app_home_url(), status_code=303)
|
||
|
||
|
||
@app.post("/channels/add-selected")
|
||
async def add_channels_selected(request: Request):
|
||
redir = redirect_if_console_unauthed(request)
|
||
if redir:
|
||
return redir
|
||
form = await request.form()
|
||
raw_ids = form.getlist("channel_id")
|
||
ids = [str(x).strip() for x in raw_ids if str(x).strip()]
|
||
if not ids:
|
||
service._append("未选择任何频道,请在列表中勾选后再提交。")
|
||
return RedirectResponse(url=app_home_url(), status_code=303)
|
||
for spec in ids:
|
||
try:
|
||
msg = await service.add_channel(spec)
|
||
service._append(msg)
|
||
except Exception as e:
|
||
service._append(f"添加失败({spec}):{e}")
|
||
return RedirectResponse(url=app_home_url(), status_code=303)
|
||
|
||
|
||
@app.post("/channels/remove")
|
||
async def remove_channel(request: Request, channel_spec: str = Form(...)):
|
||
redir = redirect_if_console_unauthed(request)
|
||
if redir:
|
||
return redir
|
||
try:
|
||
msg = await service.remove_channel(channel_spec)
|
||
service._append(msg)
|
||
except Exception as e:
|
||
service._append(f"移除频道失败:{e}")
|
||
return RedirectResponse(url=app_home_url(), status_code=303)
|
||
|
||
|
||
@app.post("/jobs/scrape")
|
||
async def start_scrape(request: Request, selection: str = Form("all")):
|
||
redir = redirect_if_console_unauthed(request)
|
||
if redir:
|
||
return redir
|
||
try:
|
||
msg = await service.start_scrape_job(selection)
|
||
service._append(msg)
|
||
except Exception as e:
|
||
service._append(f"启动抓取失败:{e}")
|
||
return RedirectResponse(url=app_home_url(), status_code=303)
|
||
|
||
|
||
@app.post("/jobs/export")
|
||
async def start_export(request: Request, selection: str = Form("all")):
|
||
redir = redirect_if_console_unauthed(request)
|
||
if redir:
|
||
return redir
|
||
try:
|
||
msg = await service.start_export_job(selection)
|
||
service._append(msg)
|
||
except Exception as e:
|
||
service._append(f"启动导出失败:{e}")
|
||
return RedirectResponse(url=app_home_url(), status_code=303)
|
||
|
||
|
||
@app.post("/jobs/rescrape")
|
||
async def start_rescrape(request: Request, selection: str = Form("all")):
|
||
redir = redirect_if_console_unauthed(request)
|
||
if redir:
|
||
return redir
|
||
try:
|
||
msg = await service.start_rescrape_job(selection)
|
||
service._append(msg)
|
||
except Exception as e:
|
||
service._append(f"启动补抓失败:{e}")
|
||
return RedirectResponse(url=app_home_url(), status_code=303)
|
||
|
||
|
||
@app.post("/jobs/continuous/start")
|
||
async def start_continuous_route(request: Request):
|
||
redir = redirect_if_console_unauthed(request)
|
||
if redir:
|
||
return redir
|
||
|
||
async def _run_start_continuous() -> None:
|
||
try:
|
||
await service.start_continuous()
|
||
except Exception as e:
|
||
service._append(f"启动持续抓取失败:{e}")
|
||
|
||
asyncio.create_task(_run_start_continuous())
|
||
service._append("已接收「启动持续抓取」:正在后台连接 Telegram 并启动(请勿重复点击;进度见运行日志)。")
|
||
return RedirectResponse(url=app_home_url(), status_code=303)
|
||
|
||
|
||
@app.post("/jobs/continuous/stop")
|
||
async def stop_continuous(request: Request):
|
||
redir = redirect_if_console_unauthed(request)
|
||
if redir:
|
||
return redir
|
||
try:
|
||
msg = await service.stop_continuous()
|
||
service._append(msg)
|
||
except Exception as e:
|
||
service._append(f"停止持续抓取失败:{e}")
|
||
return RedirectResponse(url=app_home_url(), status_code=303)
|
||
|
||
|
||
@app.get("/status")
|
||
async def status(request: Request):
|
||
denied = json_if_console_unauthed(request)
|
||
if denied:
|
||
return denied
|
||
return {
|
||
"ready": service.scraper is not None,
|
||
"connected": service.is_connected(),
|
||
"job_running": service.is_job_running(),
|
||
"job_name": service.job_name,
|
||
"job_uptime": service.current_job_uptime(),
|
||
"continuous_running": service.is_continuous_running(),
|
||
"continuous_uptime": service.continuous_uptime(),
|
||
"monitored_channels": await service.list_monitored_channels(),
|
||
"logs": list(service.logs),
|
||
}
|
||
|
||
|
||
@app.get("/api/channels/monitored")
|
||
async def api_monitored_channels():
|
||
return {"items": await service.list_monitored_channels()}
|
||
|
||
|
||
@app.get("/api/channels/account")
|
||
async def api_account_channels(request: Request):
|
||
denied = json_if_console_unauthed(request)
|
||
if denied:
|
||
return denied
|
||
try:
|
||
items = await service.list_account_channels()
|
||
return {"items": items}
|
||
except Exception as e:
|
||
return {"items": [], "error": str(e)}
|
||
|
||
|
||
@app.post("/api/channels/add")
|
||
async def api_add_channel(request: Request, channel_spec: str = Form(...)):
|
||
denied = json_if_console_unauthed(request)
|
||
if denied:
|
||
return denied
|
||
msg = await service.add_channel(channel_spec)
|
||
service._append(msg)
|
||
return {"ok": True, "message": msg}
|
||
|
||
|
||
@app.post("/api/channels/remove")
|
||
async def api_remove_channel(request: Request, channel_spec: str = Form(...)):
|
||
denied = json_if_console_unauthed(request)
|
||
if denied:
|
||
return denied
|
||
msg = await service.remove_channel(channel_spec)
|
||
service._append(msg)
|
||
return {"ok": True, "message": msg}
|
||
|
||
|
||
@app.post("/api/jobs/scrape")
|
||
async def api_job_scrape(request: Request, selection: str = Form("all")):
|
||
denied = json_if_console_unauthed(request)
|
||
if denied:
|
||
return denied
|
||
msg = await service.start_scrape_job(selection)
|
||
service._append(msg)
|
||
return {"ok": True, "message": msg}
|
||
|
||
|
||
@app.post("/api/jobs/export")
|
||
async def api_job_export(request: Request, selection: str = Form("all")):
|
||
denied = json_if_console_unauthed(request)
|
||
if denied:
|
||
return denied
|
||
msg = await service.start_export_job(selection)
|
||
service._append(msg)
|
||
return {"ok": True, "message": msg}
|
||
|
||
|
||
@app.post("/api/jobs/rescrape")
|
||
async def api_job_rescrape(request: Request, selection: str = Form("all")):
|
||
denied = json_if_console_unauthed(request)
|
||
if denied:
|
||
return denied
|
||
msg = await service.start_rescrape_job(selection)
|
||
service._append(msg)
|
||
return {"ok": True, "message": msg}
|
||
|
||
|
||
@app.get("/api/jobs/status")
|
||
async def api_job_status(request: Request):
|
||
payload = {
|
||
"connected": service.is_connected(),
|
||
"job_running": service.is_job_running(),
|
||
"job_name": service.job_name,
|
||
"job_uptime": service.current_job_uptime(),
|
||
"continuous_running": service.is_continuous_running(),
|
||
"continuous_uptime": service.continuous_uptime(),
|
||
"logs": list(service.logs) if is_console_authed(request) else [],
|
||
}
|
||
return payload
|
||
|
||
|
||
@app.post("/api/jobs/continuous/start")
|
||
async def api_start_continuous(request: Request):
|
||
denied = json_if_console_unauthed(request)
|
||
if denied:
|
||
return denied
|
||
msg = await service.start_continuous()
|
||
service._append(msg)
|
||
return {"ok": True, "message": msg}
|
||
|
||
|
||
@app.post("/api/jobs/continuous/stop")
|
||
async def api_stop_continuous(request: Request):
|
||
denied = json_if_console_unauthed(request)
|
||
if denied:
|
||
return denied
|
||
msg = await service.stop_continuous()
|
||
service._append(msg)
|
||
return {"ok": True, "message": msg}
|
||
|
||
|
||
if __name__ == "__main__":
|
||
import uvicorn
|
||
|
||
_host = os.getenv("WEB_BIND_HOST", "0.0.0.0")
|
||
_port = int(os.getenv("WEB_BIND_PORT", "8000"))
|
||
uvicorn.run("app_web:app", host=_host, port=_port, reload=False)
|