diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ca45bc8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,41 @@ +# ========== 密钥与登录(切勿提交到远程)========== +.env +.env.* +*.session +*.session-journal + +# ========== 运行状态与抓取进度(与频道数据配套,勿提交)========== +state.json + +# ========== 按频道存放的抓取结果(SQLite、媒体、导出文件)========== +# 目录名一般为 Telegram 超级群/频道 ID(-100xxxxxxxxxx) +-100*/ + +# ========== 脚本生成的列表(可随时再生成)========== +channels_list.csv + +# ========== Python ========== +__pycache__/ +*.py[cod] +*$py.class +.Python +venv/ +.venv/ +*.egg-info/ +.eggs/ +dist/ +build/ + +# ========== 编辑器 / 本地工具 ========== +.cursor/ +.vscode/ +.idea/ +*.swp +*.swo +.DS_Store +Thumbs.db + +# ========== 日志与临时文件 =========== +*.log +*.tmp +*.temp diff --git a/README.md b/README.md index 5895af3..04b7673 100644 --- a/README.md +++ b/README.md @@ -119,6 +119,26 @@ python telegram-scraper.py - **QR Code** (Recommended) - Scan with your phone (no phone number needed) - **Phone Number** - Traditional SMS verification +## Web Console (MVP) 🌐 + +You can run a simple web control panel that manages `.env` configuration and starts/stops the scraper process: + +```bash +pip install -r requirements.txt +uvicorn app_web:app --host 0.0.0.0 --port 8000 --reload +``` + +Then open: + +```text +http://127.0.0.1:8000 +``` + +Features: +- Edit core config values from the web page (saved back to `.env`) +- Start / stop scraper process from browser +- View recent runtime logs + ## Usage 📝 The script provides a clean interactive menu: diff --git a/app_web.py b/app_web.py new file mode 100644 index 0000000..50bd13d --- /dev/null +++ b/app_web.py @@ -0,0 +1,807 @@ +import asyncio +import contextlib +import importlib.util +import io +import json +import os +import sqlite3 +import time +from collections import deque +from datetime import datetime, timedelta +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +from fastapi import FastAPI, Form, Query, Request +from fastapi.responses import RedirectResponse +from fastapi.templating import Jinja2Templates + +BASE_DIR = Path(__file__).resolve().parent +ENV_FILE = BASE_DIR / ".env" +SCRIPT_FILE = BASE_DIR / "telegram-scraper.py" +STATE_FILE = BASE_DIR / "state.json" + +app = FastAPI(title="Telegram Scraper Web Console") +templates = Jinja2Templates(directory=str(BASE_DIR / "templates")) + + +def read_state_channel_maps() -> Tuple[Dict[str, str], Dict[str, str]]: + if not STATE_FILE.exists(): + return {}, {} + try: + raw = json.loads(STATE_FILE.read_text(encoding="utf-8")) + titles = raw.get("channel_titles") or {} + names = raw.get("channel_names") or {} + return {str(k): str(v) for k, v in titles.items()}, {str(k): str(v) for k, v in names.items()} + except Exception: + return {}, {} + + +def channel_display_name(cid: str, titles: Dict[str, str], names: Dict[str, str]) -> str: + cid = str(cid) + t = (titles.get(cid) or "").strip() + if t and t != cid and not (len(t) > 6 and t.startswith("-") and t[1:].replace("-", "").isdigit()): + return t + u = (names.get(cid) or "no_username").strip() + if u and u != "no_username": + return "@" + u.lstrip("@") + return "未命名频道" + + +BINARY_ENV_KEYS = frozenset( + { + "SCRAPE_MEDIA", + "HEARTBEAT_ENABLED", + "FORWARD_ONLY_NEW_MESSAGES", + "FORWARD_ONLY_IF_OFFSET_POSITIVE", + "FORWARD_RAW_MENTIONS", + } +) + +CONFIG_KEYS: List[Tuple[str, str, str]] = [ + ("API_ID", "Telegram API_ID", "text"), + ("API_HASH", "Telegram API_HASH", "text"), + ("PROXY_TYPE", "代理类型", "text"), + ("PROXY_HOST", "代理主机", "text"), + ("PROXY_PORT", "代理端口", "text"), + ("PROXY_USERNAME", "代理账号", "text"), + ("PROXY_PASSWORD", "代理密码", "password"), + ("SCRAPE_MEDIA", "下载媒体(1开/0关)", "text"), + ("HEARTBEAT_ENABLED", "心跳开关(1开/0关)", "text"), + ("HEARTBEAT_INTERVAL_SECONDS", "心跳间隔秒", "text"), + ("HEARTBEAT_TO_CHAT", "心跳目标", "text"), + ("ACCOUNT_LIST_EXCLUDE_SUBSTRINGS", "账号列表隐藏(标题含子串则不展示,逗号分隔;* 不隐藏)", "text"), + ("MONITOR_CHATS", "监控源(逗号分隔)", "text"), + ("FORWARD_TO_CHAT", "推送目标(逗号分隔)", "text"), + ("FORWARD_ONLY_NEW_MESSAGES", "仅新消息(1开/0关)", "text"), + ("FORWARD_ONLY_IF_OFFSET_POSITIVE", "仅新消息别名", "text"), + ("FORWARD_AS_USERNAME", "@替换目标用户名(不带@)", "text"), + ("FORWARD_DELAY_SECONDS", "每条推送延迟秒", "text"), + ("FORWARD_RAW_MENTIONS", "保留原@不替换(1开/0关)", "text"), +] + + +class WebScraperService: + def __init__(self) -> None: + self.module = None + self.scraper = None + self.init_lock = asyncio.Lock() + self.job_lock = asyncio.Lock() + self.job_task: Optional[asyncio.Task] = None + self.job_name: str = "" + self.job_started_at: Optional[float] = None + self.logs: deque[str] = deque(maxlen=1200) + self.continuous_task: Optional[asyncio.Task] = None + self.continuous_started_at: Optional[float] = None + + def _append(self, text: str) -> None: + ts = time.strftime("%Y-%m-%d %H:%M:%S") + self.logs.append(f"[{ts}] {text}") + + def _load_module(self): + if self.module is not None: + return self.module + spec = importlib.util.spec_from_file_location("telegram_scraper_web_core", str(SCRIPT_FILE)) + if not spec or not spec.loader: + raise RuntimeError("无法加载 telegram-scraper.py") + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + self.module = module + return module + + async def ensure_ready(self) -> None: + async with self.init_lock: + if self.scraper and self.scraper.client and self.scraper.client.is_connected(): + return + module = self._load_module() + if self.scraper is None: + self.scraper = module.OptimizedTelegramScraper() + self._append("正在初始化 Telegram 客户端...") + ok = await self._run_and_capture(self.scraper.initialize_client()) + if not ok: + raise RuntimeError("客户端初始化失败,请检查 API_ID/API_HASH 或 session。") + await self._run_and_capture(self.scraper.sync_monitor_chats_from_env()) + self.scraper._apply_scrape_media_from_env() + self._append("Telegram 客户端已就绪。") + + async def disconnect(self) -> None: + await self.stop_continuous() + if self.scraper and self.scraper.client: + await self.scraper.client.disconnect() + self._append("Telegram 客户端已断开。") + + def is_connected(self) -> bool: + return bool(self.scraper and self.scraper.client and self.scraper.client.is_connected()) + + def current_job_uptime(self) -> str: + if not self.job_started_at or not self.job_task or self.job_task.done(): + return "-" + seconds = int(time.time() - self.job_started_at) + return f"{seconds // 3600:02d}:{(seconds % 3600) // 60:02d}:{seconds % 60:02d}" + + def is_job_running(self) -> bool: + return self.job_task is not None and not self.job_task.done() + + def is_continuous_running(self) -> bool: + return self.continuous_task is not None and not self.continuous_task.done() + + def continuous_uptime(self) -> str: + if not self.continuous_started_at or not self.is_continuous_running(): + return "-" + seconds = int(time.time() - self.continuous_started_at) + return f"{seconds // 3600:02d}:{(seconds % 3600) // 60:02d}:{seconds % 60:02d}" + + async def _run_and_capture(self, awaitable): + buf = io.StringIO() + with contextlib.redirect_stdout(buf), contextlib.redirect_stderr(buf): + result = await awaitable + captured = buf.getvalue().strip() + if captured: + for line in captured.splitlines(): + if line.strip(): + self._append(line.strip()) + return result + + async def list_account_channels(self) -> List[Dict]: + await self.ensure_ready() + data = await self._run_and_capture(self.scraper.list_channels()) + return data or [] + + async def list_monitored_channels(self) -> List[Dict[str, str]]: + if not self.scraper: + if not STATE_FILE.exists(): + return [] + try: + raw = json.loads(STATE_FILE.read_text(encoding="utf-8")) + channels = raw.get("channels", {}) + names = raw.get("channel_names", {}) + titles = raw.get("channel_titles", {}) + return [ + { + "channel_id": str(cid), + "last_message_id": str(last_id), + "username": names.get(str(cid), "no_username"), + "display_name": channel_display_name(str(cid), titles, names), + } + for cid, last_id in channels.items() + ] + except Exception: + return [] + titles = self.scraper.state.get("channel_titles", {}) + names = self.scraper.state.get("channel_names", {}) + rows: List[Dict[str, str]] = [] + for cid, last_id in self.scraper.state.get("channels", {}).items(): + rows.append( + { + "channel_id": cid, + "last_message_id": str(last_id), + "username": names.get(cid, "no_username"), + "display_name": channel_display_name(str(cid), titles, names), + } + ) + return rows + + async def add_channel(self, spec: str) -> str: + await self.ensure_ready() + s = (spec or "").strip() + if not s: + raise ValueError("频道标识不能为空(可用 @用户名、-100…、t.me 链接)") + ent = await self.scraper._resolve_forward_entity(s) + cid = str(self.module.utils.get_peer_id(ent)) + username = getattr(ent, "username", None) or "no_username" + title = getattr(ent, "title", None) or cid + if cid in self.scraper.state["channels"]: + return f"频道已存在:{title}({cid})" + self.scraper.state["channels"][cid] = 0 + if "channel_names" not in self.scraper.state: + self.scraper.state["channel_names"] = {} + self.scraper.state["channel_names"][cid] = username + if "channel_titles" not in self.scraper.state: + self.scraper.state["channel_titles"] = {} + self.scraper.state["channel_titles"][cid] = str(title) if title else "" + self.scraper.save_state() + return f"已添加频道:{title}({cid})" + + async def remove_channel(self, spec: str) -> str: + if not self.scraper: + await self.ensure_ready() + s = (spec or "").strip() + if not s: + raise ValueError("要移除的频道不能为空") + targets = await self.resolve_monitored_channel_ids(s) + removed = 0 + for cid in targets: + if cid in self.scraper.state["channels"]: + del self.scraper.state["channels"][cid] + if "channel_names" in self.scraper.state: + self.scraper.state["channel_names"].pop(cid, None) + if "channel_titles" in self.scraper.state: + self.scraper.state["channel_titles"].pop(cid, None) + removed += 1 + if removed == 0: + raise ValueError( + "未移除任何频道。请使用:all;编号 1,2(与下方已监控列表从上到下对应);" + "完整频道 ID -100…;或已在监控列表中的 @用户名 / t.me 链接。" + ) + self.scraper.save_state() + return f"已移除 {removed} 个频道" + + async def resolve_monitored_channel_ids(self, selection: str) -> List[str]: + """解析为当前已加入监控(state)的频道 ID:all / 序号 / -100… / @用户名 / t.me。""" + if not self.scraper: + return [] + s = (selection or "").strip() or "all" + primary = self.scraper.parse_channel_selection(s) + if primary: + return primary + if s.lower() == "all": + return self.scraper.parse_channel_selection("all") + out: List[str] = [] + for token in [x.strip() for x in s.split(",") if x.strip()]: + part = self.scraper.parse_channel_selection(token) + if part: + out.extend(part) + continue + try: + ent = await self.scraper._resolve_forward_entity(token) + cid = str(self.module.utils.get_peer_id(ent)) + if cid in self.scraper.state.get("channels", {}): + out.append(cid) + except Exception: + continue + return list(dict.fromkeys(out)) + + async def _run_job(self, name: str, coro) -> None: + try: + self._append(f"任务开始:{name}") + await self._run_and_capture(coro) + self._append(f"任务完成:{name}") + except Exception as e: + self._append(f"任务失败:{name},错误:{e}") + finally: + self.job_name = "" + self.job_started_at = None + + async def start_job(self, name: str, coro) -> str: + async with self.job_lock: + if self.is_job_running(): + raise RuntimeError(f"已有任务在运行:{self.job_name}") + self.job_name = name + self.job_started_at = time.time() + self.job_task = asyncio.create_task(self._run_job(name, coro)) + return f"任务已启动:{name}" + + async def start_scrape_job(self, selection: str) -> str: + await self.ensure_ready() + channels = await self.resolve_monitored_channel_ids(selection) + if not channels: + raise ValueError("没有可抓取频道,请先添加频道。") + + async def runner(): + for cid in channels: + await self.scraper.scrape_channel(cid, self.scraper.state["channels"][cid]) + + return await self.start_job(f"抓取({len(channels)}个频道)", runner()) + + async def start_export_job(self, selection: str) -> str: + await self.ensure_ready() + channels = await self.resolve_monitored_channel_ids(selection) + if not channels: + raise ValueError("没有可导出频道,请先添加频道。") + + async def runner(): + for cid in channels: + self.scraper.export_to_csv(cid) + self.scraper.export_to_json(cid) + + return await self.start_job(f"导出({len(channels)}个频道)", runner()) + + async def start_rescrape_job(self, selection: str) -> str: + await self.ensure_ready() + channels = await self.resolve_monitored_channel_ids(selection) + if not channels: + raise ValueError("没有可补抓频道,请先添加频道。") + + async def runner(): + for cid in channels: + await self.scraper.rescrape_media(cid) + + return await self.start_job(f"补抓媒体({len(channels)}个频道)", runner()) + + async def start_continuous(self) -> str: + await self.ensure_ready() + if self.is_continuous_running(): + return "持续抓取已在运行中" + if not self.scraper.state.get("channels"): + raise ValueError("当前没有监控频道,请先添加频道。") + + async def runner(): + try: + await self._run_and_capture(self.scraper.continuous_scraping()) + except asyncio.CancelledError: + self._append("持续抓取任务已取消。") + raise + except Exception as e: + self._append(f"持续抓取异常:{e}") + + self.continuous_started_at = time.time() + self.continuous_task = asyncio.create_task(runner()) + self._append("已启动持续抓取(含心跳逻辑)。") + return "持续抓取已启动" + + async def stop_continuous(self) -> str: + if not self.is_continuous_running(): + return "持续抓取未运行" + self.scraper.continuous_scraping_active = False + self.continuous_task.cancel() + try: + await self.continuous_task + except asyncio.CancelledError: + pass + self.continuous_task = None + self.continuous_started_at = None + self._append("已停止持续抓取。") + return "持续抓取已停止" + + +service = WebScraperService() + + +def _parse_env_lines() -> List[str]: + if not ENV_FILE.exists(): + return [] + return ENV_FILE.read_text(encoding="utf-8").splitlines() + + +def read_env_dict() -> Dict[str, str]: + values: Dict[str, str] = {} + for line in _parse_env_lines(): + s = line.strip() + if not s or s.startswith("#") or "=" not in s: + continue + k, v = s.split("=", 1) + values[k.strip()] = v.strip() + return values + + +def write_env_updates(updates: Dict[str, str]) -> None: + existing_lines = _parse_env_lines() + out_lines: List[str] = [] + touched = set() + + for raw_line in existing_lines: + s = raw_line.strip() + if not s or s.startswith("#") or "=" not in s: + out_lines.append(raw_line) + continue + k, _v = raw_line.split("=", 1) + key = k.strip() + if key in updates: + out_lines.append(f"{key}={updates[key]}") + touched.add(key) + else: + out_lines.append(raw_line) + + if out_lines and out_lines[-1].strip() != "": + out_lines.append("") + + for key, value in updates.items(): + if key not in touched: + out_lines.append(f"{key}={value}") + + ENV_FILE.write_text("\n".join(out_lines).rstrip() + "\n", encoding="utf-8") + + +DEFAULT_STATS_KEYWORDS: Tuple[str, ...] = ( + "招聘", + "急聘", + "岗位", + "求职", + "诚聘", + "内推", + "hiring", + "recruit", +) + + +def find_channel_databases(base: Path) -> List[Tuple[str, Path]]: + out: List[Tuple[str, Path]] = [] + if not base.exists(): + return out + for d in base.iterdir(): + if not d.is_dir(): + continue + name = d.name + if name.startswith(".") or name == "__pycache__": + continue + cand = d / f"{name}.db" + if cand.is_file(): + out.append((name, cand)) + return sorted(out, key=lambda x: x[0]) + + +def count_state_monitored_channels() -> int: + if not STATE_FILE.exists(): + return 0 + try: + data = json.loads(STATE_FILE.read_text(encoding="utf-8")) + return len(data.get("channels") or {}) + except Exception: + return 0 + + +def normalize_stats_keywords_param(raw: str) -> Tuple[str, ...]: + parts: List[str] = [] + for x in (raw or "").split(","): + s = x.strip().replace("%", "").replace("_", "") + if s: + parts.append(s) + return tuple(parts) if parts else DEFAULT_STATS_KEYWORDS + + +def compute_storage_stats(base: Path, days: int, keyword_list: Tuple[str, ...]) -> Dict[str, Any]: + titles, names = read_state_channel_maps() + dbs = find_channel_databases(base) + end = datetime.utcnow().date() + start = end - timedelta(days=max(days, 1) - 1) + start_day = start.strftime("%Y-%m-%d") + end_day = end.strftime("%Y-%m-%d") + + day_keys: List[str] = [] + cur = start + while cur <= end: + day_keys.append(cur.strftime("%Y-%m-%d")) + cur += timedelta(days=1) + + daily_total: Dict[str, int] = {d: 0 for d in day_keys} + daily_kw: Dict[str, int] = {d: 0 for d in day_keys} + + total_all_time = 0 + in_range_total = 0 + in_range_kw = 0 + by_channel: List[Dict[str, Any]] = [] + + kw_sql = " OR ".join(["(message LIKE ?)"] * len(keyword_list)) + kw_params = [f"%{k}%" for k in keyword_list] + + for channel_id, path in dbs: + conn = sqlite3.connect(str(path)) + try: + cur_sql = conn.cursor() + cur_sql.execute("SELECT COUNT(*) FROM messages") + total_all_time += int(cur_sql.fetchone()[0]) + + cur_sql.execute( + "SELECT COUNT(*) FROM messages WHERE substr(date,1,10) >= ? AND substr(date,1,10) <= ?", + (start_day, end_day), + ) + ch_range = int(cur_sql.fetchone()[0]) + in_range_total += ch_range + + cur_sql.execute( + f"SELECT COUNT(*) FROM messages WHERE substr(date,1,10) >= ? AND substr(date,1,10) <= ? AND ({kw_sql})", + (start_day, end_day, *kw_params), + ) + ch_kw = int(cur_sql.fetchone()[0]) + in_range_kw += ch_kw + + cur_sql.execute( + "SELECT substr(date,1,10) AS d, COUNT(*) FROM messages WHERE substr(date,1,10) >= ? AND substr(date,1,10) <= ? GROUP BY d", + (start_day, end_day), + ) + for d, c in cur_sql.fetchall(): + ds = str(d) + if ds in daily_total: + daily_total[ds] += int(c) + + cur_sql.execute( + f"SELECT substr(date,1,10) AS d, COUNT(*) FROM messages WHERE substr(date,1,10) >= ? AND substr(date,1,10) <= ? AND ({kw_sql}) GROUP BY d", + (start_day, end_day, *kw_params), + ) + for d, c in cur_sql.fetchall(): + ds = str(d) + if ds in daily_kw: + daily_kw[ds] += int(c) + + by_channel.append( + { + "channel_id": channel_id, + "display_name": channel_display_name(str(channel_id), titles, names), + "messages_in_range": ch_range, + "job_like_in_range": ch_kw, + } + ) + finally: + conn.close() + + by_channel.sort(key=lambda x: -x["messages_in_range"]) + return { + "days": days, + "range_start": start_day, + "range_end": end_day, + "keywords_used": list(keyword_list), + "total_messages_all_time": total_all_time, + "messages_in_range": in_range_total, + "job_like_in_range": in_range_kw, + "database_count": len(dbs), + "by_channel": by_channel[:24], + "daily_messages": [[d, daily_total[d]] for d in day_keys], + "daily_job_like": [[d, daily_kw[d]] for d in day_keys], + } + + +@app.get("/api/stats/overview") +async def api_stats_overview( + days: int = Query(30, ge=1, le=366), + keywords: str = Query("", description="逗号分隔关键词,用于估算「招聘类」消息;留空用内置词表"), +): + kw = normalize_stats_keywords_param(keywords) + + def run() -> Dict[str, Any]: + data = compute_storage_stats(BASE_DIR, days, kw) + data["state_monitored_count"] = count_state_monitored_channels() + return data + + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, run) + + +@app.get("/") +async def index(request: Request): + env_vals = read_env_dict() + fields = [ + {"key": k, "label": label, "type": field_type, "value": env_vals.get(k, "")} + for k, label, field_type in CONFIG_KEYS + ] + monitored = await service.list_monitored_channels() + monitored_ids = {str(row["channel_id"]) for row in monitored} + account_channels: List[Dict] = [] + err = "" + try: + account_channels = await service.list_account_channels() + except Exception as e: + err = str(e) + return templates.TemplateResponse( + "index.html", + { + "request": request, + "fields": fields, + "binary_env_keys": list(BINARY_ENV_KEYS), + "monitored": monitored, + "monitored_ids": monitored_ids, + "account_channels": account_channels, + "connected": service.is_connected(), + "job_running": service.is_job_running(), + "job_name": service.job_name, + "job_uptime": service.current_job_uptime(), + "continuous_running": service.is_continuous_running(), + "continuous_uptime": service.continuous_uptime(), + "logs": "\n".join(service.logs), + "error": err, + }, + ) + + +@app.post("/config") +async def save_config(request: Request): + form = await request.form() + updates: Dict[str, str] = {} + for k, _label, _field_type in CONFIG_KEYS: + if k in BINARY_ENV_KEYS: + updates[k] = "1" if str(form.get(k, "")).strip() == "1" else "0" + else: + updates[k] = str(form.get(k, "")).strip() + write_env_updates(updates) + return RedirectResponse(url="/", status_code=303) + + +@app.post("/start") +async def start_scraper(): + try: + await service.ensure_ready() + except Exception as e: + service._append(f"初始化失败:{e}") + return RedirectResponse(url="/", status_code=303) + + +@app.post("/stop") +async def stop_scraper(): + await service.disconnect() + return RedirectResponse(url="/", status_code=303) + + +@app.post("/channels/add") +async def add_channel(channel_spec: str = Form(...)): + try: + msg = await service.add_channel(channel_spec) + service._append(msg) + except Exception as e: + service._append(f"添加频道失败:{e}") + return RedirectResponse(url="/", status_code=303) + + +@app.post("/channels/add-selected") +async def add_channels_selected(request: Request): + form = await request.form() + raw_ids = form.getlist("channel_id") + ids = [str(x).strip() for x in raw_ids if str(x).strip()] + if not ids: + service._append("未选择任何频道,请在列表中勾选后再提交。") + return RedirectResponse(url="/", status_code=303) + for spec in ids: + try: + msg = await service.add_channel(spec) + service._append(msg) + except Exception as e: + service._append(f"添加失败({spec}):{e}") + return RedirectResponse(url="/", status_code=303) + + +@app.post("/channels/remove") +async def remove_channel(channel_spec: str = Form(...)): + try: + msg = await service.remove_channel(channel_spec) + service._append(msg) + except Exception as e: + service._append(f"移除频道失败:{e}") + return RedirectResponse(url="/", status_code=303) + + +@app.post("/jobs/scrape") +async def start_scrape(selection: str = Form("all")): + try: + msg = await service.start_scrape_job(selection) + service._append(msg) + except Exception as e: + service._append(f"启动抓取失败:{e}") + return RedirectResponse(url="/", status_code=303) + + +@app.post("/jobs/export") +async def start_export(selection: str = Form("all")): + try: + msg = await service.start_export_job(selection) + service._append(msg) + except Exception as e: + service._append(f"启动导出失败:{e}") + return RedirectResponse(url="/", status_code=303) + + +@app.post("/jobs/rescrape") +async def start_rescrape(selection: str = Form("all")): + try: + msg = await service.start_rescrape_job(selection) + service._append(msg) + except Exception as e: + service._append(f"启动补抓失败:{e}") + return RedirectResponse(url="/", status_code=303) + + +@app.post("/jobs/continuous/start") +async def start_continuous(): + try: + msg = await service.start_continuous() + service._append(msg) + except Exception as e: + service._append(f"启动持续抓取失败:{e}") + return RedirectResponse(url="/", status_code=303) + + +@app.post("/jobs/continuous/stop") +async def stop_continuous(): + try: + msg = await service.stop_continuous() + service._append(msg) + except Exception as e: + service._append(f"停止持续抓取失败:{e}") + return RedirectResponse(url="/", status_code=303) + + +@app.get("/status") +async def status(): + return { + "ready": service.scraper is not None, + "connected": service.is_connected(), + "job_running": service.is_job_running(), + "job_name": service.job_name, + "job_uptime": service.current_job_uptime(), + "continuous_running": service.is_continuous_running(), + "continuous_uptime": service.continuous_uptime(), + "monitored_channels": await service.list_monitored_channels(), + "logs": list(service.logs), + } + + +@app.get("/api/channels/monitored") +async def api_monitored_channels(): + return {"items": await service.list_monitored_channels()} + + +@app.get("/api/channels/account") +async def api_account_channels(): + try: + items = await service.list_account_channels() + return {"items": items} + except Exception as e: + return {"items": [], "error": str(e)} + + +@app.post("/api/channels/add") +async def api_add_channel(channel_spec: str = Form(...)): + msg = await service.add_channel(channel_spec) + service._append(msg) + return {"ok": True, "message": msg} + + +@app.post("/api/channels/remove") +async def api_remove_channel(channel_spec: str = Form(...)): + msg = await service.remove_channel(channel_spec) + service._append(msg) + return {"ok": True, "message": msg} + + +@app.post("/api/jobs/scrape") +async def api_job_scrape(selection: str = Form("all")): + msg = await service.start_scrape_job(selection) + service._append(msg) + return {"ok": True, "message": msg} + + +@app.post("/api/jobs/export") +async def api_job_export(selection: str = Form("all")): + msg = await service.start_export_job(selection) + service._append(msg) + return {"ok": True, "message": msg} + + +@app.post("/api/jobs/rescrape") +async def api_job_rescrape(selection: str = Form("all")): + msg = await service.start_rescrape_job(selection) + service._append(msg) + return {"ok": True, "message": msg} + + +@app.get("/api/jobs/status") +async def api_job_status(): + return { + "connected": service.is_connected(), + "job_running": service.is_job_running(), + "job_name": service.job_name, + "job_uptime": service.current_job_uptime(), + "continuous_running": service.is_continuous_running(), + "continuous_uptime": service.continuous_uptime(), + "logs": list(service.logs), + } + + +@app.post("/api/jobs/continuous/start") +async def api_start_continuous(): + msg = await service.start_continuous() + service._append(msg) + return {"ok": True, "message": msg} + + +@app.post("/api/jobs/continuous/stop") +async def api_stop_continuous(): + msg = await service.stop_continuous() + service._append(msg) + return {"ok": True, "message": msg} diff --git a/requirements.txt b/requirements.txt index ee961d7..fa95a3c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,15 +1,8 @@ -aiohappyeyeballs==2.6.1 -aiohttp==3.12.14 -aiosignal==1.4.0 -asyncio==3.4.3 -attrs==25.3.0 -frozenlist==1.7.0 -idna==3.10 -multidict==6.6.3 -propcache==0.3.2 -pyaes==1.6.1 -pyasn1==0.6.1 -qrcode==8.0 -rsa==4.9.1 -Telethon==1.40.0 -yarl==1.20.1 +# 直接依赖(由 pip 自动解析 aiohttp / telethon 等子依赖,避免 freeze 在其它系统装不上) +Telethon>=1.36.0,<2 +fastapi>=0.109.0,<1 +uvicorn>=0.27.0,<1 +jinja2>=3.1.0,<4 +python-multipart>=0.0.6 +qrcode>=7.4.0 +PySocks>=1.7.1 diff --git a/telegram-scraper.py b/telegram-scraper.py index 5b5863d..0897de1 100644 --- a/telegram-scraper.py +++ b/telegram-scraper.py @@ -1,4 +1,5 @@ import os +import re import sqlite3 import json import csv @@ -7,17 +8,35 @@ import time import sys import uuid import warnings +import tempfile from dataclasses import dataclass -from typing import Dict, List, Optional, Any +from typing import Any, Dict, List, Optional, Tuple from pathlib import Path from io import StringIO -from telethon import TelegramClient -from telethon.tl.types import MessageMediaPhoto, MessageMediaDocument, MessageMediaWebPage, User, PeerChannel, Channel, Chat +from telethon import TelegramClient, utils +from telethon.tl.types import MessageMediaPhoto, MessageMediaDocument, MessageMediaWebPage, User, PeerChannel, Channel, Chat, MessageService from telethon.errors import FloodWaitError, SessionPasswordNeededError import qrcode warnings.filterwarnings("ignore", message="Using async sessions support is an experimental feature") +def load_env_file(env_path: str = ".env"): + if not os.path.exists(env_path): + return + try: + with open(env_path, "r", encoding="utf-8") as f: + for raw_line in f: + line = raw_line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, value = line.split("=", 1) + key = key.strip() + value = value.strip().strip("'").strip('"') + if key and key not in os.environ: + os.environ[key] = value + except Exception: + pass + def display_ascii_art(): WHITE = "\033[97m" RESET = "\033[0m" @@ -50,6 +69,7 @@ class MessageData: class OptimizedTelegramScraper: def __init__(self): + load_env_file() self.STATE_FILE = 'state.json' self.state = self.load_state() self.client = None @@ -58,6 +78,321 @@ class OptimizedTelegramScraper: self.batch_size = 100 self.state_save_interval = 50 self.db_connections = {} + self._forward_delay_seconds = self._read_forward_delay() + + def _read_forward_delay(self) -> float: + raw = os.getenv("FORWARD_DELAY_SECONDS", "").strip() + if not raw: + return 0.0 + try: + return max(0.0, float(raw)) + except ValueError: + return 0.0 + + def _apply_scrape_media_from_env(self) -> None: + raw = os.getenv("SCRAPE_MEDIA", "").strip().lower() + if not raw: + return + if raw in ("0", "false", "no", "off"): + self.state["scrape_media"] = False + elif raw in ("1", "true", "yes", "on"): + self.state["scrape_media"] = True + else: + return + self.save_state() + print( + "ℹ️ 已从环境变量 SCRAPE_MEDIA 写入 state:" + f"{'开启' if self.state['scrape_media'] else '关闭'}媒体下载(关闭时只存文字与元数据,不抓文件)" + ) + + def _heartbeat_enabled(self) -> bool: + return self._env_flag_true("HEARTBEAT_ENABLED", "SCRAPER_HEARTBEAT") + + def _heartbeat_interval_seconds(self) -> float: + raw = os.getenv("HEARTBEAT_INTERVAL_SECONDS", "30").strip() + try: + return max(5.0, float(raw)) + except ValueError: + return 30.0 + + def _heartbeat_target_spec(self) -> Optional[str]: + s = (os.getenv("HEARTBEAT_TO_CHAT") or os.getenv("HEARTBEAT_TO") or "").strip() + if s: + return s + t = self._forward_targets_from_env() + return t[0] if t else None + + async def _heartbeat_loop(self) -> None: + spec = self._heartbeat_target_spec() + if not spec: + return + try: + entity = await self._resolve_forward_entity(spec) + except Exception as e: + print(f"💓 心跳目标无法解析({spec}):{e}") + return + while self.continuous_scraping_active: + try: + ts = time.strftime("%Y-%m-%d %H:%M:%S") + await self.client.send_message( + entity, f"💓 抓取心跳 · 持续运行中 · {ts}" + ) + except asyncio.CancelledError: + break + except FloodWaitError as e: + await asyncio.sleep(min(e.seconds, 300)) + except Exception as e: + print(f"\n💓 心跳发送失败:{e}") + try: + await asyncio.sleep(self._heartbeat_interval_seconds()) + except asyncio.CancelledError: + break + + def _forward_targets_from_env(self) -> List[str]: + raw = (os.getenv("FORWARD_TO_CHAT") or os.getenv("FORWARD_TO") or "").strip() + if not raw: + return [] + parts = re.split(r"[,,;;]+", raw) + out: List[str] = [] + for p in parts: + s = p.strip().strip("\ufeff").strip("\u200b") + if s: + out.append(s) + return out + + def _monitor_chats_specs_from_env(self) -> List[str]: + raw = (os.getenv("MONITOR_CHATS") or os.getenv("MONITOR_GROUPS") or "").strip() + if not raw: + return [] + parts = re.split(r"[,,;;]+", raw) + return [p.strip().strip("\ufeff").strip("\u200b") for p in parts if p.strip()] + + async def sync_monitor_chats_from_env(self) -> None: + """把 MONITOR_CHATS 中的多个群/频道加入 state['channels'],与菜单 [L] 添加效果一致。""" + specs = self._monitor_chats_specs_from_env() + if not specs or not self.client: + return + if "channel_names" not in self.state: + self.state["channel_names"] = {} + if "channel_titles" not in self.state: + self.state["channel_titles"] = {} + added = 0 + for spec in specs: + try: + ent = await self._resolve_forward_entity(spec) + cid = str(utils.get_peer_id(ent)) + uname = getattr(ent, "username", None) or "no_username" + title = getattr(ent, "title", None) or cid + if cid not in self.state["channels"]: + self.state["channels"][cid] = 0 + self.state["channel_names"][cid] = uname + self.state["channel_titles"][cid] = str(title) if title else "" + print(f"📌 已从 MONITOR_CHATS 加入监控源:{title}({cid})") + added += 1 + else: + self.state["channel_names"][cid] = uname + self.state["channel_titles"][cid] = str(title) if title else "" + except Exception as e: + print(f"⚠️ MONITOR_CHATS 无法解析「{spec}」:{e}") + if added: + self.save_state() + + def _normalize_entity_input(self, spec: str) -> str: + """把 t.me / telegram.me 链接规范成 @username,便于 get_entity 解析。""" + s = (spec or "").strip().strip("\ufeff").strip("\u200b") + if not s: + return s + # 邀请链接保留原样,交给 get_entity 处理 + if re.search(r"(?:t\.me|telegram\.me)/(?:joinchat/|\+)", s, re.I): + return s + m = re.search( + r"(?:https?://)?(?:t\.me|telegram\.me)/([A-Za-z0-9_]+)(?:[/?#].*)?$", + s, + re.I, + ) + if m: + slug = m.group(1) + if slug.isdigit(): + return s + reserved = {"addstickers", "share", "iv", "socks", "setlanguage", "proxy"} + if slug.lower() in reserved: + return s + return "@" + slug + return s + + async def _resolve_forward_entity(self, spec: str): + """解析转发目标:-100 超级群用 PeerChannel,避免 get_entity(str) 偶发失败。""" + s = self._normalize_entity_input(spec) + if not s: + raise ValueError("转发目标为空") + if s.startswith("@"): + return await self.client.get_entity(s) + if re.fullmatch(r"-100\d+", s): + inner = int(s[4:]) + try: + return await self.client.get_entity(PeerChannel(inner)) + except Exception: + return await self.client.get_entity(int(s)) + if re.fullmatch(r"-?\d+", s): + num = int(s) + try: + return await self.client.get_entity(num) + except Exception: + sn = str(num) + if sn.startswith("-100") and len(sn) > 4 and sn[4:].isdigit(): + return await self.client.get_entity(PeerChannel(int(sn[4:]))) + raise + return await self.client.get_entity(s) + + def _env_flag_true(self, *keys: str) -> bool: + for key in keys: + v = os.getenv(key, "").strip().lower() + if v in ("1", "true", "yes", "on"): + return True + return False + + def _forward_only_new_messages(self) -> bool: + return self._env_flag_true( + "FORWARD_ONLY_NEW_MESSAGES", + "FORWARD_ONLY_IF_OFFSET_POSITIVE", + ) + + def _forward_my_username(self) -> Optional[str]: + raw = (os.getenv("FORWARD_MY_USERNAME") or os.getenv("FORWARD_AS_USERNAME") or "").strip() + if not raw: + return None + return raw.lstrip("@").strip() or None + + def _env_flag_false(self, *keys: str) -> bool: + """为真时表示显式关闭(用于默认开启的行为)。""" + for key in keys: + v = os.getenv(key, "").strip().lower() + if v in ("0", "false", "no", "off"): + return True + return False + + def _forward_replace_mentions_enabled(self) -> bool: + if self._env_flag_false("FORWARD_RAW_MENTIONS", "FORWARD_NO_MENTION_REPLACE"): + return False + return True + + def _sanitize_mentions_text(self, text: str, replacement: str) -> str: + if not text: + return "" + if not replacement: + return text + # Telegram 用户名一般为 5–32 位 [A-Za-z0-9_];4 位兼容边界情况 + return re.sub(r"@[A-Za-z0-9_]{4,32}\b", lambda _: replacement, text) + + async def _resolve_forward_mention_replacement(self, me) -> str: + """推送到汇总群时,用于替换正文中所有 @用户名的展示串(优先环境变量,否则为当前登录号)。""" + custom = self._forward_my_username() + if custom: + return "@" + custom.lstrip("@") + if getattr(me, "username", None): + return "@" + me.username + parts = [ + (getattr(me, "first_name", None) or "").strip(), + (getattr(me, "last_name", None) or "").strip(), + ] + name = " ".join(p for p in parts if p) + return name or "我" + + async def _forward_scraped_message( + self, + target, + _source_entity, + message, + *, + replacement: str, + replace_mentions: bool, + ) -> None: + """以当前账号「重发」副本:先本地落盘再上传,避免出现「转发自 …」卡片。""" + if isinstance(message, MessageService): + return + text_raw = message.message or "" + text_out = ( + self._sanitize_mentions_text(text_raw, replacement) + if replace_mentions + else text_raw + ) + has_media = bool( + message.media and not isinstance(message.media, MessageMediaWebPage) + ) + + async def _send_copy() -> None: + cap = text_out.strip() or None + if has_media: + if isinstance(message.media, MessageMediaPhoto): + suffix = ".jpg" + else: + suffix = ".bin" + if isinstance(message.media, MessageMediaDocument) and message.file: + ext = getattr(message.file, "ext", None) or "" + nm = getattr(message.file, "name", None) or "" + if nm: + suf = Path(nm).suffix + if suf: + suffix = suf + elif ext: + suffix = "." + ext.lstrip(".") + tmp_path = str( + Path(tempfile.gettempdir()) / f"tgfwd_{uuid.uuid4().hex}{suffix}" + ) + saved: Optional[str] = None + try: + saved = await self.client.download_media(message, file=tmp_path) + if saved and Path(saved).exists(): + await self.client.send_file(target, saved, caption=cap) + return + finally: + if saved: + try: + Path(saved).unlink(missing_ok=True) + except OSError: + pass + if cap: + await self.client.send_message(target, cap) + else: + print( + f"\n⚠️ 消息 {message.id} 有媒体但下载失败,且无可用文字说明,已跳过推送。" + ) + return + if cap: + await self.client.send_message(target, cap) + + try: + await _send_copy() + if self._forward_delay_seconds > 0: + await asyncio.sleep(self._forward_delay_seconds) + except FloodWaitError as e: + await asyncio.sleep(e.seconds) + try: + await _send_copy() + except Exception as err: + print(f"\n推送消息 {message.id} 时仍失败(FloodWait 后):{err}") + except Exception as e: + print(f"\n推送消息 {message.id} 失败(可能无发言权限或媒体无法下载):{e}") + + def build_proxy_config(self): + proxy_host = os.getenv("PROXY_HOST") + proxy_port = os.getenv("PROXY_PORT") + if not proxy_host or not proxy_port: + return None + + try: + proxy_port = int(proxy_port) + except ValueError: + print("环境变量中的 PROXY_PORT 无效,已跳过代理设置。") + return None + + proxy_type = os.getenv("PROXY_TYPE", "socks5").lower() + proxy_username = os.getenv("PROXY_USERNAME") + proxy_password = os.getenv("PROXY_PASSWORD") + + if proxy_username or proxy_password: + return (proxy_type, proxy_host, proxy_port, True, proxy_username, proxy_password) + return (proxy_type, proxy_host, proxy_port) def load_state(self) -> Dict[str, Any]: if os.path.exists(self.STATE_FILE): @@ -71,6 +406,7 @@ class OptimizedTelegramScraper: 'api_hash': None, 'channels': {}, 'channel_names': {}, + 'channel_titles': {}, 'scrape_media': True, } @@ -79,7 +415,7 @@ class OptimizedTelegramScraper: with open(self.STATE_FILE, 'w') as f: json.dump(self.state, f, indent=2) except Exception as e: - print(f"Failed to save state: {e}") + print(f"保存状态失败:{e}") def get_db_connection(self, channel: str) -> sqlite3.Connection: if channel not in self.db_connections: @@ -216,10 +552,52 @@ class OptimizedTelegramScraper: total_messages = result.total if total_messages == 0: - print(f"No messages found in channel {channel}") + print(f"频道 {channel} 中未找到消息") return - print(f"Found {total_messages} messages in channel {channel}") + print(f"在频道 {channel} 中找到 {total_messages} 条消息") + + forward_specs = self._forward_targets_from_env() + forward_pairs: List[Tuple[Any, str]] = [] + mention_replacement = "" + replace_mentions = True + if forward_specs: + for spec in forward_specs: + try: + ent = await self._resolve_forward_entity(spec) + forward_pairs.append((ent, spec)) + except Exception as e: + print(f"⚠️ 转发目标解析失败:{spec}:{e}") + forward_entities = [p[0] for p in forward_pairs] + if not forward_entities: + print("⚠️ 没有可用的转发目标(已跳过转发)") + else: + dest_titles = [getattr(ent, "title", None) or sp for ent, sp in forward_pairs] + only_new = self._forward_only_new_messages() + if only_new and offset_id <= 0: + joined = "、".join(dest_titles) + print( + f"📤 已配置 {len(forward_entities)} 个转发目标({joined}),但仅新消息模式开启且当前 offset_id={offset_id}," + "本次为历史抓取,不向群组推送;待 state 中已有进度后再次抓取时才会转发新消息。" + ) + forward_entities = [] + else: + me = await self.client.get_me() + mention_replacement = await self._resolve_forward_mention_replacement(me) + replace_mentions = self._forward_replace_mentions_enabled() + hints = [] + if only_new: + hints.append("仅新消息") + hints.append("以当前账号重发(非转发卡片)") + if replace_mentions: + hints.append(f"正文 @用户名 → {mention_replacement}") + else: + hints.append("保留正文 @(FORWARD_RAW_MENTIONS=1)") + hint_str = "(" + ",".join(hints) + ")" if hints else "" + joined = "、".join(dest_titles) + print(f"📤 推送已开启{hint_str}:共 {len(forward_entities)} 个目标 → {joined}") + else: + forward_entities = [] message_batch = [] media_tasks = [] @@ -261,6 +639,15 @@ class OptimizedTelegramScraper: message_batch.append(msg_data) + for fwd_ent in forward_entities: + await self._forward_scraped_message( + fwd_ent, + entity, + message, + replacement=mention_replacement, + replace_mentions=replace_mentions, + ) + if self.state['scrape_media'] and message.media and not isinstance(message.media, MessageMediaWebPage): media_tasks.append(message) @@ -284,7 +671,7 @@ class OptimizedTelegramScraper: sys.stdout.flush() except Exception as e: - print(f"\nError processing message {message.id}: {e}") + print(f"\n处理消息 {message.id} 时出错:{e}") if message_batch: self.batch_insert_messages(channel, message_batch) @@ -324,14 +711,14 @@ class OptimizedTelegramScraper: sys.stdout.write(f"\r📥 Media: [{bar}] {progress:.1f}% ({completed_media}/{total_media})") sys.stdout.flush() - print(f"\n✅ Media download complete! ({successful_downloads}/{total_media} successful)") + print(f"\n✅ 媒体下载完成!(成功 {successful_downloads}/{total_media})") self.state['channels'][channel] = last_message_id self.save_state() print(f"\nCompleted scraping channel {channel}") except Exception as e: - print(f"Error with channel {channel}: {e}") + print(f"处理频道 {channel} 时出错:{e}") async def rescrape_media(self, channel: str): conn = self.get_db_connection(channel) @@ -342,7 +729,7 @@ class OptimizedTelegramScraper: channel_name = self.state.get('channel_names', {}).get(channel, 'Unknown') if not message_ids: - print(f"No media files to reprocess for {channel_name} (ID: {channel})") + print(f"{channel_name}(ID: {channel})没有可补抓的媒体文件") return print(f"📥 Reprocessing {len(message_ids)} media files for {channel_name} (ID: {channel})") @@ -386,10 +773,10 @@ class OptimizedTelegramScraper: sys.stdout.write(f"\r🔄 Rescrape: [{bar}] {progress:.1f}% ({completed_media}/{len(message_ids)})") sys.stdout.flush() - print(f"\n✅ Media reprocessing complete! ({successful_downloads}/{len(message_ids)} successful)") + print(f"\n✅ 媒体补抓完成!(成功 {successful_downloads}/{len(message_ids)})") except Exception as e: - print(f"Error reprocessing media: {e}") + print(f"媒体补抓出错:{e}") async def fix_missing_media(self, channel: str): conn = self.get_db_connection(channel) @@ -404,20 +791,20 @@ class OptimizedTelegramScraper: missing_count = total_with_media - total_with_files channel_name = self.state.get('channel_names', {}).get(channel, 'Unknown') - print(f"\n📊 Media Analysis for {channel_name} (ID: {channel}):") - print(f"Messages with media: {total_with_media}") - print(f"Media files downloaded: {total_with_files}") - print(f"Missing media files: {missing_count}") + print(f"\n📊 {channel_name}(ID: {channel})媒体分析:") + print(f"含媒体的消息数:{total_with_media}") + print(f"已下载媒体文件数:{total_with_files}") + print(f"缺失媒体文件数:{missing_count}") if missing_count == 0: - print("✅ All media files are already downloaded!") + print("✅ 所有媒体文件都已下载!") return cursor.execute('SELECT message_id, media_type FROM messages WHERE media_type IS NOT NULL AND media_type != "MessageMediaWebPage" AND (media_path IS NULL OR media_path = "")') missing_media = cursor.fetchall() if not missing_media: - print("✅ No missing media found!") + print("✅ 未发现缺失媒体!") return print(f"\n🔧 Attempting to download {len(missing_media)} missing media files...") @@ -463,33 +850,53 @@ class OptimizedTelegramScraper: sys.stdout.write(f"\r🔧 Fix Media: [{bar}] {progress:.1f}% ({completed_media}/{len(missing_media)})") sys.stdout.flush() - print(f"\n✅ Media fix complete! ({successful_downloads}/{len(missing_media)} successful)") + print(f"\n✅ 缺失媒体修复完成!(成功 {successful_downloads}/{len(missing_media)})") except Exception as e: - print(f"Error fixing missing media: {e}") + print(f"修复缺失媒体时出错:{e}") async def continuous_scraping(self): self.continuous_scraping_active = True - + hb_task = None + if self._heartbeat_enabled(): + if self._heartbeat_target_spec(): + hb_task = asyncio.create_task(self._heartbeat_loop()) + print( + f"💓 抓取心跳已开启:启动后先发一条,之后约每 " + f"{self._heartbeat_interval_seconds():.0f} 秒一条(未配置 HEARTBEAT_TO_CHAT 时用 FORWARD_TO_CHAT 的首个目标)" + ) + else: + print( + "⚠️ HEARTBEAT_ENABLED=1 但未配置 HEARTBEAT_TO_CHAT,且 FORWARD_TO_CHAT 为空," + "已跳过心跳。" + ) + try: while self.continuous_scraping_active: start_time = time.time() - + for channel in self.state['channels']: if not self.continuous_scraping_active: break print(f"\nChecking for new messages in channel: {channel}") await self.scrape_channel(channel, self.state['channels'][channel]) - + elapsed = time.time() - start_time sleep_time = max(0, 60 - elapsed) if sleep_time > 0: await asyncio.sleep(sleep_time) - + except asyncio.CancelledError: - print("Continuous scraping stopped") + print("持续抓取已停止") + raise finally: self.continuous_scraping_active = False + if hb_task: + hb_task.cancel() + try: + await hb_task + except asyncio.CancelledError: + pass def get_export_filename(self, channel: str): username = self.state.get('channel_names', {}).get(channel, 'no_username') @@ -545,24 +952,24 @@ class OptimizedTelegramScraper: async def export_data(self): if not self.state['channels']: - print("No channels to export") + print("没有可导出的频道") return for channel in self.state['channels']: - print(f"Exporting data for channel {channel}...") + print(f"正在导出频道 {channel} 的数据...") try: self.export_to_csv(channel) self.export_to_json(channel) - print(f"✅ Completed export for channel {channel}") + print(f"✅ 频道 {channel} 导出完成") except Exception as e: - print(f"❌ Export failed for channel {channel}: {e}") + print(f"❌ 频道 {channel} 导出失败:{e}") async def view_channels(self): if not self.state['channels']: - print("No channels saved") + print("还没有保存任何频道") return - print("\nCurrent channels:") + print("\n当前频道列表:") for i, (channel, last_id) in enumerate(self.state['channels'].items(), 1): try: conn = self.get_db_connection(channel) @@ -575,25 +982,44 @@ class OptimizedTelegramScraper: channel_name = self.state.get('channel_names', {}).get(channel, 'Unknown') print(f"[{i}] {channel_name} (ID: {channel}), Last Message ID: {last_id}") + def _account_list_exclude_substrings(self) -> List[str]: + """账号可见列表中不展示的会话标题子串(如自有招聘群),环境变量 ACCOUNT_LIST_EXCLUDE_SUBSTRINGS 逗号分隔;* 表示不隐藏。""" + raw = (os.getenv("ACCOUNT_LIST_EXCLUDE_SUBSTRINGS") or "").strip() + if raw in ("*", "none", "off"): + return [] + if raw: + return [p.strip() for p in re.split(r"[,,;;]+", raw) if p.strip()] + return ["远程-到岗-技术招聘"] + + def _skip_dialog_in_account_channel_list(self, title: Optional[str]) -> bool: + if not title: + return False + t = str(title).strip() + for sub in self._account_list_exclude_substrings(): + if sub and sub in t: + return True + return False + async def list_channels(self): try: - print("\nList of channels and groups joined by account:") - count = 1 + print("\n当前账号加入的频道/群组:") channels_data = [] async for dialog in self.client.iter_dialogs(): entity = dialog.entity if dialog.id != 777000 and (isinstance(entity, Channel) or isinstance(entity, Chat)): - channel_type = "Channel" if isinstance(entity, Channel) and entity.broadcast else "Group" + if self._skip_dialog_in_account_channel_list(getattr(dialog, "title", None) or getattr(entity, "title", None)): + continue + channel_type = "频道" if isinstance(entity, Channel) and entity.broadcast else "群组" username = getattr(entity, 'username', None) or 'no_username' - print(f"[{count}] {dialog.title} (ID: {dialog.id}, Type: {channel_type}, Username: @{username})") + n = len(channels_data) + 1 + print(f"[{n}] {dialog.title}(ID: {dialog.id},类型:{channel_type},用户名:@{username})") channels_data.append({ - 'number': count, + 'number': n, 'channel_name': dialog.title, 'channel_id': str(dialog.id), 'username': username, 'type': channel_type }) - count += 1 if channels_data: csv_file = Path('channels_list.csv') @@ -601,12 +1027,12 @@ class OptimizedTelegramScraper: writer = csv.DictWriter(f, fieldnames=['number', 'channel_name', 'channel_id', 'username', 'type']) writer.writeheader() writer.writerows(channels_data) - print(f"\n✅ Saved channels list to {csv_file}") + print(f"\n✅ 频道列表已保存到 {csv_file}") return channels_data except Exception as e: - print(f"Error listing channels: {e}") + print(f"列出频道时出错:{e}") return [] def display_qr_code_ascii(self, qr_login): @@ -620,85 +1046,98 @@ class OptimizedTelegramScraper: print(f.read()) async def qr_code_auth(self): - print("\nChoosing QR Code authentication...") - print("Please scan the QR code with your Telegram app:") - print("1. Open Telegram on your phone") - print("2. Go to Settings > Devices > Scan QR") - print("3. Scan the code below\n") + print("\n正在使用二维码登录...") + print("请用 Telegram 手机端扫描二维码:") + print("1. 在手机上打开 Telegram") + print("2. 进入 设置 > 设备 > 扫描二维码") + print("3. 扫描下方二维码\n") qr_login = await self.client.qr_login() self.display_qr_code_ascii(qr_login) try: await qr_login.wait() - print("\n✅ Successfully logged in via QR code!") + print("\n✅ 二维码登录成功!") return True except SessionPasswordNeededError: - password = input("Two-factor authentication enabled. Enter your password: ") + password = input("已开启两步验证,请输入密码:") await self.client.sign_in(password=password) - print("\n✅ Successfully logged in with 2FA!") + print("\n✅ 两步验证登录成功!") return True except Exception as e: - print(f"\n❌ QR code authentication failed: {e}") + print(f"\n❌ 二维码登录失败:{e}") return False async def phone_auth(self): - phone = input("Enter your phone number: ") + phone = input("请输入手机号:") await self.client.send_code_request(phone) - code = input("Enter the code you received: ") + code = input("请输入收到的验证码:") try: await self.client.sign_in(phone, code) - print("\n✅ Successfully logged in via phone!") + print("\n✅ 手机号登录成功!") return True except SessionPasswordNeededError: - password = input("Two-factor authentication enabled. Enter your password: ") + password = input("已开启两步验证,请输入密码:") await self.client.sign_in(password=password) - print("\n✅ Successfully logged in with 2FA!") + print("\n✅ 两步验证登录成功!") return True except Exception as e: - print(f"\n❌ Phone authentication failed: {e}") + print(f"\n❌ 手机号登录失败:{e}") return False async def initialize_client(self): - if not all([self.state.get('api_id'), self.state.get('api_hash')]): - print("\n=== API Configuration Required ===") - print("You need to provide API credentials from https://my.telegram.org") + if not self.state.get('api_id') and os.getenv("API_ID"): try: - self.state['api_id'] = int(input("Enter your API ID: ")) - self.state['api_hash'] = input("Enter your API Hash: ") + self.state['api_id'] = int(os.getenv("API_ID")) + except ValueError: + print("环境变量中的 API_ID 无效,将改为手动输入。") + + if not self.state.get('api_hash') and os.getenv("API_HASH"): + self.state['api_hash'] = os.getenv("API_HASH") + + if not all([self.state.get('api_id'), self.state.get('api_hash')]): + print("\n=== 需要配置 API ===") + print("请提供来自 https://my.telegram.org 的 API 凭据") + try: + self.state['api_id'] = int(input("请输入 API ID:")) + self.state['api_hash'] = input("请输入 API Hash:") self.save_state() except ValueError: - print("Invalid API ID. Must be a number.") + print("API ID 无效,必须是数字。") return False - self.client = TelegramClient('session', self.state['api_id'], self.state['api_hash']) + proxy = self.build_proxy_config() + if proxy: + print(f"正在使用代理:{proxy[0]}://{proxy[1]}:{proxy[2]}") + + self.client = TelegramClient('session', self.state['api_id'], self.state['api_hash'], proxy=proxy) try: await self.client.connect() except Exception as e: - print(f"Failed to connect: {e}") + print(f"连接失败:{e}") return False if not await self.client.is_user_authorized(): - print("\n=== Choose Authentication Method ===") - print("[1] QR Code (Recommended - No phone number needed)") - print("[2] Phone Number (Traditional method)") + print("\n=== 请选择登录方式 ===") + print("[1] 二维码登录(推荐,无需手机号)") + print("[2] 手机号登录(传统方式)") while True: - choice = input("Enter your choice (1 or 2): ").strip() + choice = input("请输入选项(1 或 2):").strip() if choice in ['1', '2']: break - print("Please enter 1 or 2") + print("请输入 1 或 2") success = await self.qr_code_auth() if choice == '1' else await self.phone_auth() if not success: - print("Authentication failed. Please try again.") + print("登录失败,请重试。") await self.client.disconnect() return False else: - print("✅ Already authenticated!") + print("✅ 已登录!") return True @@ -715,70 +1154,70 @@ class OptimizedTelegramScraper: if selection in self.state['channels']: selected_channels.append(selection) else: - print(f"Channel ID {selection} not found in your channels") + print(f"你的频道列表中不存在频道 ID {selection}") else: num = int(selection) if 1 <= num <= len(channels_list): selected_channels.append(channels_list[num - 1]) else: - print(f"Invalid channel number: {num}. Valid range: 1-{len(channels_list)}") + print(f"频道编号无效:{num}。有效范围:1-{len(channels_list)}") except ValueError: - print(f"Invalid input: {selection}. Use numbers (1,2,3) or full IDs (-100123...)") + print(f"输入无效:{selection}。请用编号(1,2,3)或完整频道ID(-100123...)") return selected_channels async def scrape_specific_channels(self): if not self.state['channels']: - print("No channels available. Use [L] to add channels first") + print("当前没有可用频道,请先用 [L] 添加频道") return await self.view_channels() - print("\n📥 Scrape Options:") - print("• Single: 1 or -1001234567890") - print("• Multiple: 1,3,5 or mix formats") - print("• All channels: all") + print("\n📥 抓取选项:") + print("• 单个:1 或 -1001234567890") + print("• 多个:1,3,5 或混合输入") + print("• 全部频道:all") - choice = input("\nEnter selection: ").strip() + choice = input("\n请输入选择:").strip() selected_channels = self.parse_channel_selection(choice) if selected_channels: - print(f"\n🚀 Starting scrape of {len(selected_channels)} channel(s)...") + print(f"\n🚀 开始抓取 {len(selected_channels)} 个频道...") for i, channel in enumerate(selected_channels, 1): - print(f"\n[{i}/{len(selected_channels)}] Scraping: {channel}") + print(f"\n[{i}/{len(selected_channels)}] 正在抓取:{channel}") await self.scrape_channel(channel, self.state['channels'][channel]) - print(f"\n✅ Completed scraping {len(selected_channels)} channel(s)!") + print(f"\n✅ 已完成 {len(selected_channels)} 个频道的抓取!") else: - print("❌ No valid channels selected") + print("❌ 未选择有效频道") async def manage_channels(self): while True: print("\n" + "="*40) - print(" TELEGRAM SCRAPER") + print(" Telegram 抓取工具") print("="*40) - print("[S] Scrape channels") - print("[C] Continuous scraping") - print(f"[M] Media scraping: {'ON' if self.state['scrape_media'] else 'OFF'}") - print("[L] List & add channels") - print("[R] Remove channels") - print("[E] Export data") - print("[T] Rescrape media") - print("[F] Fix missing media") - print("[Q] Quit") + print("[S] 抓取频道") + print("[C] 持续抓取") + print(f"[M] 媒体抓取:{'开启' if self.state['scrape_media'] else '关闭'}") + print("[L] 列出并添加频道") + print("[R] 移除频道") + print("[E] 导出数据") + print("[T] 补抓媒体") + print("[F] 修复缺失媒体") + print("[Q] 退出") print("="*40) - choice = input("Enter your choice: ").lower().strip() + choice = input("请输入选项:").lower().strip() try: if choice == 'r': if not self.state['channels']: - print("No channels to remove") + print("没有可移除的频道") continue await self.view_channels() - print("\nTo remove channels:") - print("• Single: 1 or -1001234567890") - print("• Multiple: 1,2,3 or mix formats") - selection = input("Enter selection: ").strip() + print("\n移除频道说明:") + print("• 单个:1 或 -1001234567890") + print("• 多个:1,2,3 或混合输入") + selection = input("请输入选择:").strip() selected_channels = self.parse_channel_selection(selection) if selected_channels: @@ -786,19 +1225,23 @@ class OptimizedTelegramScraper: for channel in selected_channels: if channel in self.state['channels']: del self.state['channels'][channel] - print(f"✅ Removed channel {channel}") + if 'channel_names' in self.state: + self.state['channel_names'].pop(channel, None) + if 'channel_titles' in self.state: + self.state['channel_titles'].pop(channel, None) + print(f"✅ 已移除频道 {channel}") removed_count += 1 else: - print(f"❌ Channel {channel} not found") + print(f"❌ 未找到频道 {channel}") if removed_count > 0: self.save_state() - print(f"\n🎉 Removed {removed_count} channel(s)!") + print(f"\n🎉 已移除 {removed_count} 个频道!") await self.view_channels() else: - print("No channels were removed") + print("未移除任何频道") else: - print("No valid channels selected") + print("未选择有效频道") elif choice == 's': await self.scrape_specific_channels() @@ -806,17 +1249,17 @@ class OptimizedTelegramScraper: elif choice == 'm': self.state['scrape_media'] = not self.state['scrape_media'] self.save_state() - print(f"\n✅ Media scraping {'enabled' if self.state['scrape_media'] else 'disabled'}") + print(f"\n✅ 媒体抓取已{'开启' if self.state['scrape_media'] else '关闭'}") elif choice == 'c': task = asyncio.create_task(self.continuous_scraping()) - print("Continuous scraping started. Press Ctrl+C to stop.") + print("持续抓取已启动,按 Ctrl+C 停止。") try: await asyncio.sleep(float('inf')) except KeyboardInterrupt: self.continuous_scraping_active = False task.cancel() - print("\nStopping continuous scraping...") + print("\n正在停止持续抓取...") try: await task except asyncio.CancelledError: @@ -831,12 +1274,12 @@ class OptimizedTelegramScraper: if not channels_data: continue - print("\nTo add channels from the list above:") - print("• Single: 1 or -1001234567890") - print("• Multiple: 1,3,5 or mix formats") - print("• All channels: all") - print("• Press Enter to skip adding") - selection = input("\nEnter selection (or Enter to skip): ").strip() + print("\n从上方列表添加频道:") + print("• 单个:1 或 -1001234567890") + print("• 多个:1,3,5 或混合输入") + print("• 全部频道:all") + print("• 直接回车可跳过添加") + selection = input("\n请输入选择(或回车跳过):").strip() if selection: added_count = 0 @@ -848,11 +1291,14 @@ class OptimizedTelegramScraper: self.state['channels'][channel_id] = 0 if 'channel_names' not in self.state: self.state['channel_names'] = {} + if 'channel_titles' not in self.state: + self.state['channel_titles'] = {} self.state['channel_names'][channel_id] = channel_info['username'] - print(f"✅ Added channel {channel_info['channel_name']} (ID: {channel_id})") + self.state['channel_titles'][channel_id] = channel_info['channel_name'] + print(f"✅ 已添加频道 {channel_info['channel_name']}(ID: {channel_id})") added_count += 1 else: - print(f"Channel {channel_info['channel_name']} already added") + print(f"频道 {channel_info['channel_name']} 已添加过") else: for sel in [x.strip() for x in selection.split(',')]: try: @@ -860,7 +1306,7 @@ class OptimizedTelegramScraper: channel_id = sel channel_info = next((c for c in channels_data if c['channel_id'] == channel_id), None) if not channel_info: - print(f"Channel ID {channel_id} not found") + print(f"未找到频道 ID {channel_id}") continue else: num = int(sel) @@ -868,98 +1314,122 @@ class OptimizedTelegramScraper: channel_info = channels_data[num - 1] channel_id = channel_info['channel_id'] else: - print(f"Invalid number: {num}. Choose 1-{len(channels_data)}") + print(f"编号无效:{num}。请选择 1-{len(channels_data)}") continue if channel_id in self.state['channels']: - print(f"Channel {channel_info['channel_name']} already added") + print(f"频道 {channel_info['channel_name']} 已添加过") else: self.state['channels'][channel_id] = 0 if 'channel_names' not in self.state: self.state['channel_names'] = {} + if 'channel_titles' not in self.state: + self.state['channel_titles'] = {} self.state['channel_names'][channel_id] = channel_info['username'] - print(f"✅ Added channel {channel_info['channel_name']} (ID: {channel_id})") + self.state['channel_titles'][channel_id] = channel_info['channel_name'] + print(f"✅ 已添加频道 {channel_info['channel_name']}(ID: {channel_id})") added_count += 1 except ValueError: - print(f"Invalid input: {sel}") + print(f"输入无效:{sel}") if added_count > 0: self.save_state() - print(f"\n🎉 Added {added_count} new channel(s)!") + print(f"\n🎉 已新增 {added_count} 个频道!") await self.view_channels() else: - print("No new channels were added") + print("没有新增频道") elif choice == 't': if not self.state['channels']: - print("No channels available. Add channels first") + print("当前没有可用频道,请先添加频道") continue await self.view_channels() - print("\nEnter channel NUMBER (1,2,3...) or full channel ID (-100123...)") - selection = input("Enter your selection: ").strip() + print("\n请输入频道编号(1,2,3...)或完整频道ID(-100123...)") + selection = input("请输入选择:").strip() selected_channels = self.parse_channel_selection(selection) if len(selected_channels) == 1: channel = selected_channels[0] - print(f"Rescaping media for channel: {channel}") + print(f"正在为频道补抓媒体:{channel}") await self.rescrape_media(channel) elif len(selected_channels) > 1: - print("Please select only one channel for media rescaping") + print("补抓媒体时一次只能选择一个频道") else: - print("No valid channel selected") + print("未选择有效频道") elif choice == 'f': if not self.state['channels']: - print("No channels available. Add channels first") + print("当前没有可用频道,请先添加频道") continue await self.view_channels() - print("\nEnter channel NUMBER (1,2,3...) or full channel ID (-100123...)") - selection = input("Enter your selection: ").strip() + print("\n请输入频道编号(1,2,3...)或完整频道ID(-100123...)") + selection = input("请输入选择:").strip() selected_channels = self.parse_channel_selection(selection) if len(selected_channels) == 1: channel = selected_channels[0] await self.fix_missing_media(channel) elif len(selected_channels) > 1: - print("Please select only one channel for fixing missing media") + print("修复缺失媒体时一次只能选择一个频道") else: - print("No valid channel selected") + print("未选择有效频道") elif choice == 'q': - print("\n👋 Goodbye!") + print("\n👋 再见!") self.close_db_connections() if self.client: await self.client.disconnect() sys.exit() else: - print("Invalid option") + print("无效选项") except Exception as e: - print(f"Error: {e}") + print(f"错误:{e}") async def run(self): display_ascii_art() if await self.initialize_client(): try: + await self.sync_monitor_chats_from_env() + self._apply_scrape_media_from_env() + if self._heartbeat_enabled(): + print( + "💡 心跳已配置开启:请选菜单 [C]「持续抓取」后才会往群里发;" + "若只用 [S] 单次抓取,不会出现心跳。" + ) + dests = self._forward_targets_from_env() + if len(dests) > 1: + print( + "ℹ️ FORWARD_TO_CHAT 含多个目标,每条消息会发到多个群。" + "若只需「多群监控 → 一个汇总频道」,请只保留一个转发 ID,多个源请用 MONITOR_CHATS。" + ) await self.manage_channels() finally: self.close_db_connections() if self.client: await self.client.disconnect() else: - print("Failed to initialize client. Exiting.") + print("客户端初始化失败,程序退出。") async def main(): scraper = OptimizedTelegramScraper() await scraper.run() +def configure_event_loop_policy(): + if sys.platform.startswith("win"): + try: + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + except Exception: + pass + if __name__ == '__main__': try: + configure_event_loop_policy() asyncio.run(main()) except KeyboardInterrupt: - print("\nProgram interrupted. Exiting...") + print("\n程序已中断,正在退出...") sys.exit() diff --git a/templates/index.html b/templates/index.html new file mode 100644 index 0000000..7257624 --- /dev/null +++ b/templates/index.html @@ -0,0 +1,1293 @@ + + +
+ + +数据概览 · 本地 SQLite 聚合(与是否在线无关)
+区间内按天汇总(所有已抓取入库的频道)
+ +正文包含任一关键词即计入(非 NLP 分类)
+ +横轴为消息条数,纵轴为频道显示名(来自 state 中的标题 / @用户名)
+ +