1505 lines
66 KiB
Python
1505 lines
66 KiB
Python
import os
|
||
import re
|
||
import sqlite3
|
||
import json
|
||
import csv
|
||
import asyncio
|
||
import time
|
||
import sys
|
||
import uuid
|
||
import warnings
|
||
import tempfile
|
||
from dataclasses import dataclass
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
from pathlib import Path
|
||
from io import StringIO
|
||
from telethon import TelegramClient, utils
|
||
from telethon.tl.types import MessageMediaPhoto, MessageMediaDocument, MessageMediaWebPage, User, PeerChannel, Channel, Chat, MessageService
|
||
from telethon.errors import FloodWaitError, SessionPasswordNeededError
|
||
import qrcode
|
||
|
||
warnings.filterwarnings("ignore", message="Using async sessions support is an experimental feature")
|
||
|
||
def load_env_file(env_path: str = ".env"):
|
||
if not os.path.exists(env_path):
|
||
return
|
||
try:
|
||
with open(env_path, "r", encoding="utf-8") as f:
|
||
for raw_line in f:
|
||
line = raw_line.strip()
|
||
if not line or line.startswith("#") or "=" not in line:
|
||
continue
|
||
key, value = line.split("=", 1)
|
||
key = key.strip()
|
||
value = value.strip().strip("'").strip('"')
|
||
if key and key not in os.environ:
|
||
os.environ[key] = value
|
||
except Exception:
|
||
pass
|
||
|
||
def display_ascii_art():
|
||
WHITE = "\033[97m"
|
||
RESET = "\033[0m"
|
||
art = r"""
|
||
___________________ _________
|
||
\__ ___/ _____/ / _____/
|
||
| | / \ ___ \_____ \
|
||
| | \ \_\ \/ \
|
||
|____| \______ /_______ /
|
||
\/ \/
|
||
"""
|
||
print(WHITE + art + RESET)
|
||
|
||
@dataclass
|
||
class MessageData:
|
||
message_id: int
|
||
date: str
|
||
sender_id: int
|
||
first_name: Optional[str]
|
||
last_name: Optional[str]
|
||
username: Optional[str]
|
||
message: str
|
||
media_type: Optional[str]
|
||
media_path: Optional[str]
|
||
reply_to: Optional[int]
|
||
post_author: Optional[str]
|
||
views: Optional[int]
|
||
forwards: Optional[int]
|
||
reactions: Optional[str]
|
||
|
||
class OptimizedTelegramScraper:
|
||
def __init__(self):
|
||
load_env_file()
|
||
self.STATE_FILE = 'state.json'
|
||
self.state = self.load_state()
|
||
self.client = None
|
||
self.continuous_scraping_active = False
|
||
self.max_concurrent_downloads = 5
|
||
self.batch_size = 100
|
||
self.state_save_interval = 50
|
||
self.db_connections = {}
|
||
self._forward_delay_seconds = self._read_forward_delay()
|
||
|
||
def _read_forward_delay(self) -> float:
|
||
raw = os.getenv("FORWARD_DELAY_SECONDS", "").strip()
|
||
if not raw:
|
||
return 0.0
|
||
try:
|
||
return max(0.0, float(raw))
|
||
except ValueError:
|
||
return 0.0
|
||
|
||
def _apply_scrape_media_from_env(self) -> None:
|
||
raw = os.getenv("SCRAPE_MEDIA", "").strip().lower()
|
||
if not raw:
|
||
return
|
||
if raw in ("0", "false", "no", "off"):
|
||
self.state["scrape_media"] = False
|
||
elif raw in ("1", "true", "yes", "on"):
|
||
self.state["scrape_media"] = True
|
||
else:
|
||
return
|
||
self.save_state()
|
||
print(
|
||
"ℹ️ 已从环境变量 SCRAPE_MEDIA 写入 state:"
|
||
f"{'开启' if self.state['scrape_media'] else '关闭'}媒体下载(关闭时只存文字与元数据,不抓文件)"
|
||
)
|
||
|
||
def _heartbeat_enabled(self) -> bool:
|
||
return self._env_flag_true("HEARTBEAT_ENABLED", "SCRAPER_HEARTBEAT")
|
||
|
||
def _heartbeat_interval_seconds(self) -> float:
|
||
raw = os.getenv("HEARTBEAT_INTERVAL_SECONDS", "30").strip()
|
||
try:
|
||
return max(5.0, float(raw))
|
||
except ValueError:
|
||
return 30.0
|
||
|
||
def _heartbeat_target_spec(self) -> Optional[str]:
|
||
s = (os.getenv("HEARTBEAT_TO_CHAT") or os.getenv("HEARTBEAT_TO") or "").strip()
|
||
if s:
|
||
return s
|
||
t = self._forward_targets_from_env()
|
||
return t[0] if t else None
|
||
|
||
async def _heartbeat_loop(self) -> None:
|
||
spec = self._heartbeat_target_spec()
|
||
if not spec:
|
||
return
|
||
try:
|
||
entity = await self._resolve_forward_entity(spec)
|
||
except Exception as e:
|
||
print(f"💓 心跳目标无法解析({spec}):{e}")
|
||
return
|
||
while self.continuous_scraping_active:
|
||
try:
|
||
ts = time.strftime("%Y-%m-%d %H:%M:%S")
|
||
await self.client.send_message(
|
||
entity, f"💓 抓取心跳 · 持续运行中 · {ts}"
|
||
)
|
||
except asyncio.CancelledError:
|
||
break
|
||
except FloodWaitError as e:
|
||
await asyncio.sleep(min(e.seconds, 300))
|
||
except Exception as e:
|
||
print(f"\n💓 心跳发送失败:{e}")
|
||
try:
|
||
await asyncio.sleep(self._heartbeat_interval_seconds())
|
||
except asyncio.CancelledError:
|
||
break
|
||
|
||
def _forward_targets_from_env(self) -> List[str]:
|
||
raw = (os.getenv("FORWARD_TO_CHAT") or os.getenv("FORWARD_TO") or "").strip()
|
||
if not raw:
|
||
return []
|
||
parts = re.split(r"[,,;;]+", raw)
|
||
out: List[str] = []
|
||
for p in parts:
|
||
s = p.strip().strip("\ufeff").strip("\u200b")
|
||
if s:
|
||
out.append(s)
|
||
return out
|
||
|
||
def _monitor_chats_specs_from_env(self) -> List[str]:
|
||
raw = (os.getenv("MONITOR_CHATS") or os.getenv("MONITOR_GROUPS") or "").strip()
|
||
if not raw:
|
||
return []
|
||
parts = re.split(r"[,,;;]+", raw)
|
||
return [p.strip().strip("\ufeff").strip("\u200b") for p in parts if p.strip()]
|
||
|
||
async def sync_monitor_chats_from_env(self) -> None:
|
||
"""把 MONITOR_CHATS 中的多个群/频道加入 state['channels'],与菜单 [L] 添加效果一致。"""
|
||
specs = self._monitor_chats_specs_from_env()
|
||
if not specs or not self.client:
|
||
return
|
||
if "channel_names" not in self.state:
|
||
self.state["channel_names"] = {}
|
||
if "channel_titles" not in self.state:
|
||
self.state["channel_titles"] = {}
|
||
added = 0
|
||
for spec in specs:
|
||
try:
|
||
ent = await self._resolve_forward_entity(spec)
|
||
cid = str(utils.get_peer_id(ent))
|
||
uname = getattr(ent, "username", None) or "no_username"
|
||
title = getattr(ent, "title", None) or cid
|
||
if cid not in self.state["channels"]:
|
||
self.state["channels"][cid] = 0
|
||
self.state["channel_names"][cid] = uname
|
||
self.state["channel_titles"][cid] = str(title) if title else ""
|
||
print(f"📌 已从 MONITOR_CHATS 加入监控源:{title}({cid})")
|
||
added += 1
|
||
else:
|
||
self.state["channel_names"][cid] = uname
|
||
self.state["channel_titles"][cid] = str(title) if title else ""
|
||
except Exception as e:
|
||
print(f"⚠️ MONITOR_CHATS 无法解析「{spec}」:{e}")
|
||
if added:
|
||
self.save_state()
|
||
|
||
def _normalize_entity_input(self, spec: str) -> str:
|
||
"""把 t.me / telegram.me 链接规范成 @username,便于 get_entity 解析。"""
|
||
s = (spec or "").strip().strip("\ufeff").strip("\u200b")
|
||
if not s:
|
||
return s
|
||
# 邀请链接保留原样,交给 get_entity 处理
|
||
if re.search(r"(?:t\.me|telegram\.me)/(?:joinchat/|\+)", s, re.I):
|
||
return s
|
||
m = re.search(
|
||
r"(?:https?://)?(?:t\.me|telegram\.me)/([A-Za-z0-9_]+)(?:[/?#].*)?$",
|
||
s,
|
||
re.I,
|
||
)
|
||
if m:
|
||
slug = m.group(1)
|
||
if slug.isdigit():
|
||
return s
|
||
reserved = {"addstickers", "share", "iv", "socks", "setlanguage", "proxy"}
|
||
if slug.lower() in reserved:
|
||
return s
|
||
return "@" + slug
|
||
return s
|
||
|
||
async def _resolve_forward_entity(self, spec: str):
|
||
"""解析转发目标:-100 超级群用 PeerChannel,避免 get_entity(str) 偶发失败。"""
|
||
s = self._normalize_entity_input(spec)
|
||
if not s:
|
||
raise ValueError("转发目标为空")
|
||
if s.startswith("@"):
|
||
return await self.client.get_entity(s)
|
||
if re.fullmatch(r"-100\d+", s):
|
||
inner = int(s[4:])
|
||
try:
|
||
return await self.client.get_entity(PeerChannel(inner))
|
||
except Exception:
|
||
return await self.client.get_entity(int(s))
|
||
if re.fullmatch(r"-?\d+", s):
|
||
num = int(s)
|
||
try:
|
||
return await self.client.get_entity(num)
|
||
except Exception:
|
||
sn = str(num)
|
||
if sn.startswith("-100") and len(sn) > 4 and sn[4:].isdigit():
|
||
return await self.client.get_entity(PeerChannel(int(sn[4:])))
|
||
raise
|
||
return await self.client.get_entity(s)
|
||
|
||
def _env_flag_true(self, *keys: str) -> bool:
|
||
for key in keys:
|
||
v = os.getenv(key, "").strip().lower()
|
||
if v in ("1", "true", "yes", "on"):
|
||
return True
|
||
return False
|
||
|
||
def _forward_only_new_messages(self) -> bool:
|
||
return self._env_flag_true(
|
||
"FORWARD_ONLY_NEW_MESSAGES",
|
||
"FORWARD_ONLY_IF_OFFSET_POSITIVE",
|
||
)
|
||
|
||
def _forward_my_username(self) -> Optional[str]:
|
||
raw = (os.getenv("FORWARD_MY_USERNAME") or os.getenv("FORWARD_AS_USERNAME") or "").strip()
|
||
if not raw:
|
||
return None
|
||
return raw.lstrip("@").strip() or None
|
||
|
||
def _env_flag_false(self, *keys: str) -> bool:
|
||
"""为真时表示显式关闭(用于默认开启的行为)。"""
|
||
for key in keys:
|
||
v = os.getenv(key, "").strip().lower()
|
||
if v in ("0", "false", "no", "off"):
|
||
return True
|
||
return False
|
||
|
||
def _forward_replace_mentions_enabled(self) -> bool:
|
||
if self._env_flag_false("FORWARD_RAW_MENTIONS", "FORWARD_NO_MENTION_REPLACE"):
|
||
return False
|
||
return True
|
||
|
||
def _sanitize_mentions_text(self, text: str, replacement: str) -> str:
|
||
if not text:
|
||
return ""
|
||
if not replacement:
|
||
return text
|
||
# Telegram 用户名一般为 5–32 位 [A-Za-z0-9_];4 位兼容边界情况
|
||
return re.sub(r"@[A-Za-z0-9_]{4,32}\b", lambda _: replacement, text)
|
||
|
||
async def _resolve_forward_mention_replacement(self, me) -> str:
|
||
"""推送到汇总群时,用于替换正文中所有 @用户名的展示串(优先环境变量,否则为当前登录号)。"""
|
||
custom = self._forward_my_username()
|
||
if custom:
|
||
return "@" + custom.lstrip("@")
|
||
if getattr(me, "username", None):
|
||
return "@" + me.username
|
||
parts = [
|
||
(getattr(me, "first_name", None) or "").strip(),
|
||
(getattr(me, "last_name", None) or "").strip(),
|
||
]
|
||
name = " ".join(p for p in parts if p)
|
||
return name or "我"
|
||
|
||
async def _forward_scraped_message(
|
||
self,
|
||
target,
|
||
_source_entity,
|
||
message,
|
||
*,
|
||
replacement: str,
|
||
replace_mentions: bool,
|
||
) -> None:
|
||
"""以当前账号「重发」副本:先本地落盘再上传,避免出现「转发自 …」卡片。"""
|
||
if isinstance(message, MessageService):
|
||
return
|
||
text_raw = message.message or ""
|
||
text_out = (
|
||
self._sanitize_mentions_text(text_raw, replacement)
|
||
if replace_mentions
|
||
else text_raw
|
||
)
|
||
has_media = bool(
|
||
message.media and not isinstance(message.media, MessageMediaWebPage)
|
||
)
|
||
|
||
async def _send_copy() -> None:
|
||
cap = text_out.strip() or None
|
||
if has_media:
|
||
if isinstance(message.media, MessageMediaPhoto):
|
||
suffix = ".jpg"
|
||
else:
|
||
suffix = ".bin"
|
||
if isinstance(message.media, MessageMediaDocument) and message.file:
|
||
ext = getattr(message.file, "ext", None) or ""
|
||
nm = getattr(message.file, "name", None) or ""
|
||
if nm:
|
||
suf = Path(nm).suffix
|
||
if suf:
|
||
suffix = suf
|
||
elif ext:
|
||
suffix = "." + ext.lstrip(".")
|
||
tmp_path = str(
|
||
Path(tempfile.gettempdir()) / f"tgfwd_{uuid.uuid4().hex}{suffix}"
|
||
)
|
||
saved: Optional[str] = None
|
||
try:
|
||
saved = await self.client.download_media(message, file=tmp_path)
|
||
if saved and Path(saved).exists():
|
||
await self.client.send_file(target, saved, caption=cap)
|
||
return
|
||
finally:
|
||
if saved:
|
||
try:
|
||
Path(saved).unlink(missing_ok=True)
|
||
except OSError:
|
||
pass
|
||
if cap:
|
||
await self.client.send_message(target, cap)
|
||
else:
|
||
print(
|
||
f"\n⚠️ 消息 {message.id} 有媒体但下载失败,且无可用文字说明,已跳过推送。"
|
||
)
|
||
return
|
||
if cap:
|
||
await self.client.send_message(target, cap)
|
||
|
||
try:
|
||
await _send_copy()
|
||
if self._forward_delay_seconds > 0:
|
||
await asyncio.sleep(self._forward_delay_seconds)
|
||
except FloodWaitError as e:
|
||
await asyncio.sleep(e.seconds)
|
||
try:
|
||
await _send_copy()
|
||
except Exception as err:
|
||
print(f"\n推送消息 {message.id} 时仍失败(FloodWait 后):{err}")
|
||
except Exception as e:
|
||
print(f"\n推送消息 {message.id} 失败(可能无发言权限或媒体无法下载):{e}")
|
||
|
||
def build_proxy_config(self):
|
||
"""代理策略:
|
||
- TELEGRAM_WEB_UI:网页端默认直连,仅当 PROXY_ENABLED=1/true/on 时才使用代理(避免服务器误配 HOST 即走代理)。
|
||
- 命令行本地:PROXY_ENABLED=0/false/off 禁用;未设置 PROXY_ENABLED 时仍可按 HOST+PORT 启用(兼容旧配置)。
|
||
"""
|
||
pe = (os.getenv("PROXY_ENABLED") or "").strip().lower()
|
||
web_ui = (os.getenv("TELEGRAM_WEB_UI") or "").strip().lower() in ("1", "true", "yes", "on")
|
||
if web_ui:
|
||
if pe not in ("1", "true", "yes", "on"):
|
||
return None
|
||
else:
|
||
if pe in ("0", "false", "no", "off", "none"):
|
||
return None
|
||
proxy_host = (os.getenv("PROXY_HOST") or "").strip()
|
||
proxy_port_raw = (os.getenv("PROXY_PORT") or "").strip()
|
||
if not proxy_host or not proxy_port_raw:
|
||
return None
|
||
if not web_ui and pe not in ("", "1", "true", "yes", "on"):
|
||
print(f"PROXY_ENABLED 取值无效({pe!r}),已跳过代理。")
|
||
return None
|
||
|
||
try:
|
||
proxy_port = int(proxy_port_raw)
|
||
except ValueError:
|
||
print("环境变量中的 PROXY_PORT 无效,已跳过代理设置。")
|
||
return None
|
||
|
||
proxy_type = os.getenv("PROXY_TYPE", "socks5").lower()
|
||
proxy_username = os.getenv("PROXY_USERNAME")
|
||
proxy_password = os.getenv("PROXY_PASSWORD")
|
||
|
||
if proxy_username or proxy_password:
|
||
return (proxy_type, proxy_host, proxy_port, True, proxy_username, proxy_password)
|
||
return (proxy_type, proxy_host, proxy_port)
|
||
|
||
def load_state(self) -> Dict[str, Any]:
|
||
if os.path.exists(self.STATE_FILE):
|
||
try:
|
||
with open(self.STATE_FILE, 'r') as f:
|
||
return json.load(f)
|
||
except:
|
||
pass
|
||
return {
|
||
'api_id': None,
|
||
'api_hash': None,
|
||
'channels': {},
|
||
'channel_names': {},
|
||
'channel_titles': {},
|
||
'scrape_media': True,
|
||
}
|
||
|
||
def save_state(self):
|
||
try:
|
||
with open(self.STATE_FILE, 'w') as f:
|
||
json.dump(self.state, f, indent=2)
|
||
except Exception as e:
|
||
print(f"保存状态失败:{e}")
|
||
|
||
def get_db_connection(self, channel: str) -> sqlite3.Connection:
|
||
if channel not in self.db_connections:
|
||
channel_dir = Path(channel)
|
||
channel_dir.mkdir(exist_ok=True)
|
||
|
||
db_file = channel_dir / f'{channel}.db'
|
||
conn = sqlite3.connect(str(db_file), check_same_thread=False)
|
||
conn.execute('''CREATE TABLE IF NOT EXISTS messages
|
||
(id INTEGER PRIMARY KEY, message_id INTEGER UNIQUE, date TEXT,
|
||
sender_id INTEGER, first_name TEXT, last_name TEXT, username TEXT,
|
||
message TEXT, media_type TEXT, media_path TEXT, reply_to INTEGER,
|
||
post_author TEXT, views INTEGER, forwards INTEGER, reactions TEXT)''')
|
||
conn.execute('CREATE INDEX IF NOT EXISTS idx_message_id ON messages(message_id)')
|
||
conn.execute('CREATE INDEX IF NOT EXISTS idx_date ON messages(date)')
|
||
conn.execute('PRAGMA journal_mode=WAL')
|
||
conn.execute('PRAGMA synchronous=NORMAL')
|
||
conn.commit()
|
||
|
||
self.migrate_database(conn)
|
||
|
||
self.db_connections[channel] = conn
|
||
|
||
return self.db_connections[channel]
|
||
|
||
def migrate_database(self, conn: sqlite3.Connection):
|
||
cursor = conn.cursor()
|
||
cursor.execute("PRAGMA table_info(messages)")
|
||
columns = {row[1] for row in cursor.fetchall()}
|
||
|
||
migrations = []
|
||
if 'post_author' not in columns:
|
||
migrations.append('ALTER TABLE messages ADD COLUMN post_author TEXT')
|
||
if 'views' not in columns:
|
||
migrations.append('ALTER TABLE messages ADD COLUMN views INTEGER')
|
||
if 'forwards' not in columns:
|
||
migrations.append('ALTER TABLE messages ADD COLUMN forwards INTEGER')
|
||
if 'reactions' not in columns:
|
||
migrations.append('ALTER TABLE messages ADD COLUMN reactions TEXT')
|
||
|
||
for migration in migrations:
|
||
try:
|
||
conn.execute(migration)
|
||
except:
|
||
pass
|
||
|
||
if migrations:
|
||
conn.commit()
|
||
|
||
def close_db_connections(self):
|
||
for conn in self.db_connections.values():
|
||
conn.close()
|
||
self.db_connections.clear()
|
||
|
||
def batch_insert_messages(self, channel: str, messages: List[MessageData]):
|
||
if not messages:
|
||
return
|
||
|
||
conn = self.get_db_connection(channel)
|
||
data = [(msg.message_id, msg.date, msg.sender_id, msg.first_name,
|
||
msg.last_name, msg.username, msg.message, msg.media_type,
|
||
msg.media_path, msg.reply_to, msg.post_author, msg.views,
|
||
msg.forwards, msg.reactions) for msg in messages]
|
||
|
||
conn.executemany('''INSERT OR IGNORE INTO messages
|
||
(message_id, date, sender_id, first_name, last_name, username,
|
||
message, media_type, media_path, reply_to, post_author, views,
|
||
forwards, reactions)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', data)
|
||
conn.commit()
|
||
|
||
async def download_media(self, channel: str, message) -> Optional[str]:
|
||
if not message.media or not self.state['scrape_media']:
|
||
return None
|
||
|
||
if isinstance(message.media, MessageMediaWebPage):
|
||
return None
|
||
|
||
try:
|
||
channel_dir = Path(channel)
|
||
media_folder = channel_dir / 'media'
|
||
media_folder.mkdir(exist_ok=True)
|
||
|
||
if isinstance(message.media, MessageMediaPhoto):
|
||
original_name = getattr(message.file, 'name', None) or "photo.jpg"
|
||
ext = "jpg"
|
||
elif isinstance(message.media, MessageMediaDocument):
|
||
ext = getattr(message.file, 'ext', 'bin') if message.file else 'bin'
|
||
original_name = getattr(message.file, 'name', None) or f"document.{ext}"
|
||
else:
|
||
return None
|
||
|
||
base_name = Path(original_name).stem
|
||
extension = Path(original_name).suffix or f".{ext}"
|
||
unique_filename = f"{message.id}-{base_name}{extension}"
|
||
media_path = media_folder / unique_filename
|
||
|
||
existing_files = list(media_folder.glob(f"{message.id}-*"))
|
||
if existing_files:
|
||
return str(existing_files[0])
|
||
|
||
for attempt in range(3):
|
||
try:
|
||
downloaded_path = await message.download_media(file=str(media_path))
|
||
if downloaded_path and Path(downloaded_path).exists():
|
||
return downloaded_path
|
||
else:
|
||
return None
|
||
except FloodWaitError as e:
|
||
if attempt < 2:
|
||
await asyncio.sleep(e.seconds)
|
||
else:
|
||
return None
|
||
except Exception:
|
||
if attempt < 2:
|
||
await asyncio.sleep(2 ** attempt)
|
||
else:
|
||
return None
|
||
|
||
return None
|
||
except Exception:
|
||
return None
|
||
|
||
async def update_media_path(self, channel: str, message_id: int, media_path: str):
|
||
conn = self.get_db_connection(channel)
|
||
conn.execute('UPDATE messages SET media_path = ? WHERE message_id = ?',
|
||
(media_path, message_id))
|
||
conn.commit()
|
||
|
||
async def scrape_channel(self, channel: str, offset_id: int):
|
||
try:
|
||
entity = await self.client.get_entity(PeerChannel(int(channel)) if channel.startswith('-') else channel)
|
||
result = await self.client.get_messages(entity, offset_id=offset_id, reverse=True, limit=0)
|
||
total_messages = result.total
|
||
|
||
if total_messages == 0:
|
||
print(f"频道 {channel} 中未找到消息")
|
||
return
|
||
|
||
print(f"在频道 {channel} 中找到 {total_messages} 条消息")
|
||
|
||
forward_specs = self._forward_targets_from_env()
|
||
forward_pairs: List[Tuple[Any, str]] = []
|
||
mention_replacement = ""
|
||
replace_mentions = True
|
||
if forward_specs:
|
||
for spec in forward_specs:
|
||
try:
|
||
ent = await self._resolve_forward_entity(spec)
|
||
forward_pairs.append((ent, spec))
|
||
except Exception as e:
|
||
print(f"⚠️ 转发目标解析失败:{spec}:{e}")
|
||
forward_entities = [p[0] for p in forward_pairs]
|
||
if not forward_entities:
|
||
print("⚠️ 没有可用的转发目标(已跳过转发)")
|
||
else:
|
||
dest_titles = [getattr(ent, "title", None) or sp for ent, sp in forward_pairs]
|
||
only_new = self._forward_only_new_messages()
|
||
if only_new and offset_id <= 0:
|
||
joined = "、".join(dest_titles)
|
||
print(
|
||
f"📤 已配置 {len(forward_entities)} 个转发目标({joined}),但仅新消息模式开启且当前 offset_id={offset_id},"
|
||
"本次为历史抓取,不向群组推送;待 state 中已有进度后再次抓取时才会转发新消息。"
|
||
)
|
||
forward_entities = []
|
||
else:
|
||
me = await self.client.get_me()
|
||
mention_replacement = await self._resolve_forward_mention_replacement(me)
|
||
replace_mentions = self._forward_replace_mentions_enabled()
|
||
hints = []
|
||
if only_new:
|
||
hints.append("仅新消息")
|
||
hints.append("以当前账号重发(非转发卡片)")
|
||
if replace_mentions:
|
||
hints.append(f"正文 @用户名 → {mention_replacement}")
|
||
else:
|
||
hints.append("保留正文 @(FORWARD_RAW_MENTIONS=1)")
|
||
hint_str = "(" + ",".join(hints) + ")" if hints else ""
|
||
joined = "、".join(dest_titles)
|
||
print(f"📤 推送已开启{hint_str}:共 {len(forward_entities)} 个目标 → {joined}")
|
||
else:
|
||
forward_entities = []
|
||
|
||
message_batch = []
|
||
media_tasks = []
|
||
processed_messages = 0
|
||
last_message_id = offset_id
|
||
semaphore = asyncio.Semaphore(self.max_concurrent_downloads)
|
||
|
||
async for message in self.client.iter_messages(entity, offset_id=offset_id, reverse=True):
|
||
try:
|
||
sender = await message.get_sender()
|
||
|
||
reactions_str = None
|
||
if message.reactions and message.reactions.results:
|
||
reactions_parts = []
|
||
for reaction in message.reactions.results:
|
||
emoji = getattr(reaction.reaction, 'emoticon', '')
|
||
count = reaction.count
|
||
if emoji:
|
||
reactions_parts.append(f"{emoji} {count}")
|
||
if reactions_parts:
|
||
reactions_str = ' '.join(reactions_parts)
|
||
|
||
msg_data = MessageData(
|
||
message_id=message.id,
|
||
date=message.date.strftime('%Y-%m-%d %H:%M:%S'),
|
||
sender_id=message.sender_id,
|
||
first_name=getattr(sender, 'first_name', None) if isinstance(sender, User) else None,
|
||
last_name=getattr(sender, 'last_name', None) if isinstance(sender, User) else None,
|
||
username=getattr(sender, 'username', None) if isinstance(sender, User) else None,
|
||
message=message.message or '',
|
||
media_type=message.media.__class__.__name__ if message.media else None,
|
||
media_path=None,
|
||
reply_to=message.reply_to_msg_id if message.reply_to else None,
|
||
post_author=message.post_author,
|
||
views=message.views,
|
||
forwards=message.forwards,
|
||
reactions=reactions_str
|
||
)
|
||
|
||
message_batch.append(msg_data)
|
||
|
||
for fwd_ent in forward_entities:
|
||
await self._forward_scraped_message(
|
||
fwd_ent,
|
||
entity,
|
||
message,
|
||
replacement=mention_replacement,
|
||
replace_mentions=replace_mentions,
|
||
)
|
||
|
||
if self.state['scrape_media'] and message.media and not isinstance(message.media, MessageMediaWebPage):
|
||
media_tasks.append(message)
|
||
|
||
last_message_id = message.id
|
||
processed_messages += 1
|
||
|
||
if len(message_batch) >= self.batch_size:
|
||
self.batch_insert_messages(channel, message_batch)
|
||
message_batch.clear()
|
||
|
||
if processed_messages % self.state_save_interval == 0:
|
||
self.state['channels'][channel] = last_message_id
|
||
self.save_state()
|
||
|
||
progress = (processed_messages / total_messages) * 100
|
||
bar_length = 30
|
||
filled_length = int(bar_length * processed_messages // total_messages)
|
||
bar = '█' * filled_length + '░' * (bar_length - filled_length)
|
||
|
||
sys.stdout.write(f"\r📄 Messages: [{bar}] {progress:.1f}% ({processed_messages}/{total_messages})")
|
||
sys.stdout.flush()
|
||
|
||
except Exception as e:
|
||
print(f"\n处理消息 {message.id} 时出错:{e}")
|
||
|
||
if message_batch:
|
||
self.batch_insert_messages(channel, message_batch)
|
||
|
||
if media_tasks:
|
||
total_media = len(media_tasks)
|
||
completed_media = 0
|
||
successful_downloads = 0
|
||
print(f"\n📥 Downloading {total_media} media files...")
|
||
|
||
semaphore = asyncio.Semaphore(self.max_concurrent_downloads)
|
||
|
||
async def download_single_media(message):
|
||
async with semaphore:
|
||
return await self.download_media(channel, message)
|
||
|
||
batch_size = 10
|
||
for i in range(0, len(media_tasks), batch_size):
|
||
batch = media_tasks[i:i + batch_size]
|
||
tasks = [asyncio.create_task(download_single_media(msg)) for msg in batch]
|
||
|
||
for j, task in enumerate(tasks):
|
||
try:
|
||
media_path = await task
|
||
if media_path:
|
||
await self.update_media_path(channel, batch[j].id, media_path)
|
||
successful_downloads += 1
|
||
except Exception:
|
||
pass
|
||
|
||
completed_media += 1
|
||
progress = (completed_media / total_media) * 100
|
||
bar_length = 30
|
||
filled_length = int(bar_length * completed_media // total_media)
|
||
bar = '█' * filled_length + '░' * (bar_length - filled_length)
|
||
|
||
sys.stdout.write(f"\r📥 Media: [{bar}] {progress:.1f}% ({completed_media}/{total_media})")
|
||
sys.stdout.flush()
|
||
|
||
print(f"\n✅ 媒体下载完成!(成功 {successful_downloads}/{total_media})")
|
||
|
||
self.state['channels'][channel] = last_message_id
|
||
self.save_state()
|
||
print(f"\nCompleted scraping channel {channel}")
|
||
|
||
except Exception as e:
|
||
print(f"处理频道 {channel} 时出错:{e}")
|
||
|
||
async def rescrape_media(self, channel: str):
|
||
conn = self.get_db_connection(channel)
|
||
cursor = conn.cursor()
|
||
cursor.execute('SELECT message_id FROM messages WHERE media_type IS NOT NULL AND media_type != "MessageMediaWebPage" AND media_path IS NULL')
|
||
message_ids = [row[0] for row in cursor.fetchall()]
|
||
|
||
channel_name = self.state.get('channel_names', {}).get(channel, 'Unknown')
|
||
|
||
if not message_ids:
|
||
print(f"{channel_name}(ID: {channel})没有可补抓的媒体文件")
|
||
return
|
||
|
||
print(f"📥 Reprocessing {len(message_ids)} media files for {channel_name} (ID: {channel})")
|
||
|
||
try:
|
||
if channel.lstrip('-').isdigit():
|
||
entity = await self.client.get_entity(PeerChannel(int(channel)))
|
||
else:
|
||
entity = await self.client.get_entity(channel)
|
||
semaphore = asyncio.Semaphore(self.max_concurrent_downloads)
|
||
completed_media = 0
|
||
successful_downloads = 0
|
||
|
||
async def download_single_media(message):
|
||
async with semaphore:
|
||
return await self.download_media(channel, message)
|
||
|
||
batch_size = 10
|
||
for i in range(0, len(message_ids), batch_size):
|
||
batch_ids = message_ids[i:i + batch_size]
|
||
messages = await self.client.get_messages(entity, ids=batch_ids)
|
||
|
||
valid_messages = [msg for msg in messages if msg and msg.media and not isinstance(msg.media, MessageMediaWebPage)]
|
||
tasks = [asyncio.create_task(download_single_media(msg)) for msg in valid_messages]
|
||
|
||
for j, task in enumerate(tasks):
|
||
try:
|
||
media_path = await task
|
||
if media_path:
|
||
await self.update_media_path(channel, valid_messages[j].id, media_path)
|
||
successful_downloads += 1
|
||
except Exception:
|
||
pass
|
||
|
||
completed_media += 1
|
||
progress = (completed_media / len(message_ids)) * 100
|
||
bar_length = 30
|
||
filled_length = int(bar_length * completed_media // len(message_ids))
|
||
bar = '█' * filled_length + '░' * (bar_length - filled_length)
|
||
|
||
sys.stdout.write(f"\r🔄 Rescrape: [{bar}] {progress:.1f}% ({completed_media}/{len(message_ids)})")
|
||
sys.stdout.flush()
|
||
|
||
print(f"\n✅ 媒体补抓完成!(成功 {successful_downloads}/{len(message_ids)})")
|
||
|
||
except Exception as e:
|
||
print(f"媒体补抓出错:{e}")
|
||
|
||
async def fix_missing_media(self, channel: str):
|
||
conn = self.get_db_connection(channel)
|
||
cursor = conn.cursor()
|
||
|
||
cursor.execute('SELECT COUNT(*) FROM messages WHERE media_type IS NOT NULL AND media_type != "MessageMediaWebPage"')
|
||
total_with_media = cursor.fetchone()[0]
|
||
|
||
cursor.execute('SELECT COUNT(*) FROM messages WHERE media_type IS NOT NULL AND media_type != "MessageMediaWebPage" AND media_path IS NOT NULL')
|
||
total_with_files = cursor.fetchone()[0]
|
||
|
||
missing_count = total_with_media - total_with_files
|
||
|
||
channel_name = self.state.get('channel_names', {}).get(channel, 'Unknown')
|
||
print(f"\n📊 {channel_name}(ID: {channel})媒体分析:")
|
||
print(f"含媒体的消息数:{total_with_media}")
|
||
print(f"已下载媒体文件数:{total_with_files}")
|
||
print(f"缺失媒体文件数:{missing_count}")
|
||
|
||
if missing_count == 0:
|
||
print("✅ 所有媒体文件都已下载!")
|
||
return
|
||
|
||
cursor.execute('SELECT message_id, media_type FROM messages WHERE media_type IS NOT NULL AND media_type != "MessageMediaWebPage" AND (media_path IS NULL OR media_path = "")')
|
||
missing_media = cursor.fetchall()
|
||
|
||
if not missing_media:
|
||
print("✅ 未发现缺失媒体!")
|
||
return
|
||
|
||
print(f"\n🔧 Attempting to download {len(missing_media)} missing media files...")
|
||
|
||
try:
|
||
if channel.lstrip('-').isdigit():
|
||
entity = await self.client.get_entity(PeerChannel(int(channel)))
|
||
else:
|
||
entity = await self.client.get_entity(channel)
|
||
semaphore = asyncio.Semaphore(self.max_concurrent_downloads)
|
||
completed_media = 0
|
||
successful_downloads = 0
|
||
|
||
async def download_single_media(message):
|
||
async with semaphore:
|
||
return await self.download_media(channel, message)
|
||
|
||
batch_size = 10
|
||
for i in range(0, len(missing_media), batch_size):
|
||
batch = missing_media[i:i + batch_size]
|
||
message_ids = [msg[0] for msg in batch]
|
||
|
||
messages = await self.client.get_messages(entity, ids=message_ids)
|
||
valid_messages = [msg for msg in messages if msg and msg.media and not isinstance(msg.media, MessageMediaWebPage)]
|
||
|
||
tasks = [asyncio.create_task(download_single_media(msg)) for msg in valid_messages]
|
||
|
||
for j, task in enumerate(tasks):
|
||
try:
|
||
media_path = await task
|
||
if media_path:
|
||
await self.update_media_path(channel, valid_messages[j].id, media_path)
|
||
successful_downloads += 1
|
||
except Exception:
|
||
pass
|
||
|
||
completed_media += 1
|
||
progress = (completed_media / len(missing_media)) * 100
|
||
bar_length = 30
|
||
filled_length = int(bar_length * completed_media // len(missing_media))
|
||
bar = '█' * filled_length + '░' * (bar_length - filled_length)
|
||
|
||
sys.stdout.write(f"\r🔧 Fix Media: [{bar}] {progress:.1f}% ({completed_media}/{len(missing_media)})")
|
||
sys.stdout.flush()
|
||
|
||
print(f"\n✅ 缺失媒体修复完成!(成功 {successful_downloads}/{len(missing_media)})")
|
||
|
||
except Exception as e:
|
||
print(f"修复缺失媒体时出错:{e}")
|
||
|
||
async def continuous_scraping(self):
|
||
self.continuous_scraping_active = True
|
||
hb_task = None
|
||
if self._heartbeat_enabled():
|
||
if self._heartbeat_target_spec():
|
||
hb_task = asyncio.create_task(self._heartbeat_loop())
|
||
print(
|
||
f"💓 抓取心跳已开启:启动后先发一条,之后约每 "
|
||
f"{self._heartbeat_interval_seconds():.0f} 秒一条(未配置 HEARTBEAT_TO_CHAT 时用 FORWARD_TO_CHAT 的首个目标)"
|
||
)
|
||
else:
|
||
print(
|
||
"⚠️ HEARTBEAT_ENABLED=1 但未配置 HEARTBEAT_TO_CHAT,且 FORWARD_TO_CHAT 为空,"
|
||
"已跳过心跳。"
|
||
)
|
||
|
||
try:
|
||
while self.continuous_scraping_active:
|
||
start_time = time.time()
|
||
|
||
for channel in self.state['channels']:
|
||
if not self.continuous_scraping_active:
|
||
break
|
||
print(f"\nChecking for new messages in channel: {channel}")
|
||
await self.scrape_channel(channel, self.state['channels'][channel])
|
||
|
||
elapsed = time.time() - start_time
|
||
sleep_time = max(0, 60 - elapsed)
|
||
if sleep_time > 0:
|
||
await asyncio.sleep(sleep_time)
|
||
|
||
except asyncio.CancelledError:
|
||
print("持续抓取已停止")
|
||
raise
|
||
finally:
|
||
self.continuous_scraping_active = False
|
||
if hb_task:
|
||
hb_task.cancel()
|
||
try:
|
||
await hb_task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
def get_export_filename(self, channel: str):
|
||
username = self.state.get('channel_names', {}).get(channel, 'no_username')
|
||
return f"{channel}_{username}"
|
||
|
||
def export_to_csv(self, channel: str):
|
||
conn = self.get_db_connection(channel)
|
||
filename = self.get_export_filename(channel)
|
||
csv_file = Path(channel) / f'{filename}.csv'
|
||
|
||
cursor = conn.cursor()
|
||
cursor.execute('SELECT * FROM messages ORDER BY date')
|
||
columns = [description[0] for description in cursor.description]
|
||
|
||
with open(csv_file, 'w', newline='', encoding='utf-8') as f:
|
||
writer = csv.writer(f)
|
||
writer.writerow(columns)
|
||
|
||
while True:
|
||
rows = cursor.fetchmany(1000)
|
||
if not rows:
|
||
break
|
||
writer.writerows(rows)
|
||
|
||
def export_to_json(self, channel: str):
|
||
conn = self.get_db_connection(channel)
|
||
filename = self.get_export_filename(channel)
|
||
json_file = Path(channel) / f'{filename}.json'
|
||
|
||
cursor = conn.cursor()
|
||
cursor.execute('SELECT * FROM messages ORDER BY date')
|
||
columns = [description[0] for description in cursor.description]
|
||
|
||
with open(json_file, 'w', encoding='utf-8') as f:
|
||
f.write('[\n')
|
||
first_row = True
|
||
|
||
while True:
|
||
rows = cursor.fetchmany(1000)
|
||
if not rows:
|
||
break
|
||
|
||
for row in rows:
|
||
if not first_row:
|
||
f.write(',\n')
|
||
else:
|
||
first_row = False
|
||
|
||
data = dict(zip(columns, row))
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
|
||
f.write('\n]')
|
||
|
||
async def export_data(self):
|
||
if not self.state['channels']:
|
||
print("没有可导出的频道")
|
||
return
|
||
|
||
for channel in self.state['channels']:
|
||
print(f"正在导出频道 {channel} 的数据...")
|
||
try:
|
||
self.export_to_csv(channel)
|
||
self.export_to_json(channel)
|
||
print(f"✅ 频道 {channel} 导出完成")
|
||
except Exception as e:
|
||
print(f"❌ 频道 {channel} 导出失败:{e}")
|
||
|
||
async def view_channels(self):
|
||
if not self.state['channels']:
|
||
print("还没有保存任何频道")
|
||
return
|
||
|
||
print("\n当前频道列表:")
|
||
for i, (channel, last_id) in enumerate(self.state['channels'].items(), 1):
|
||
try:
|
||
conn = self.get_db_connection(channel)
|
||
cursor = conn.cursor()
|
||
cursor.execute('SELECT COUNT(*) FROM messages')
|
||
count = cursor.fetchone()[0]
|
||
channel_name = self.state.get('channel_names', {}).get(channel, 'Unknown')
|
||
print(f"[{i}] {channel_name} (ID: {channel}), Last Message ID: {last_id}, Messages: {count}")
|
||
except:
|
||
channel_name = self.state.get('channel_names', {}).get(channel, 'Unknown')
|
||
print(f"[{i}] {channel_name} (ID: {channel}), Last Message ID: {last_id}")
|
||
|
||
def _account_list_exclude_substrings(self) -> List[str]:
|
||
"""账号可见列表中不展示的会话标题子串(如自有招聘群),环境变量 ACCOUNT_LIST_EXCLUDE_SUBSTRINGS 逗号分隔;* 表示不隐藏。"""
|
||
raw = (os.getenv("ACCOUNT_LIST_EXCLUDE_SUBSTRINGS") or "").strip()
|
||
if raw in ("*", "none", "off"):
|
||
return []
|
||
if raw:
|
||
return [p.strip() for p in re.split(r"[,,;;]+", raw) if p.strip()]
|
||
return ["远程-到岗-技术招聘"]
|
||
|
||
def _skip_dialog_in_account_channel_list(self, title: Optional[str]) -> bool:
|
||
if not title:
|
||
return False
|
||
t = str(title).strip()
|
||
for sub in self._account_list_exclude_substrings():
|
||
if sub and sub in t:
|
||
return True
|
||
return False
|
||
|
||
async def list_channels(self):
|
||
try:
|
||
print("\n当前账号加入的频道/群组:")
|
||
channels_data = []
|
||
async for dialog in self.client.iter_dialogs():
|
||
entity = dialog.entity
|
||
if dialog.id != 777000 and (isinstance(entity, Channel) or isinstance(entity, Chat)):
|
||
if self._skip_dialog_in_account_channel_list(getattr(dialog, "title", None) or getattr(entity, "title", None)):
|
||
continue
|
||
channel_type = "频道" if isinstance(entity, Channel) and entity.broadcast else "群组"
|
||
username = getattr(entity, 'username', None) or 'no_username'
|
||
n = len(channels_data) + 1
|
||
print(f"[{n}] {dialog.title}(ID: {dialog.id},类型:{channel_type},用户名:@{username})")
|
||
channels_data.append({
|
||
'number': n,
|
||
'channel_name': dialog.title,
|
||
'channel_id': str(dialog.id),
|
||
'username': username,
|
||
'type': channel_type
|
||
})
|
||
|
||
if channels_data:
|
||
csv_file = Path('channels_list.csv')
|
||
with open(csv_file, 'w', newline='', encoding='utf-8') as f:
|
||
writer = csv.DictWriter(f, fieldnames=['number', 'channel_name', 'channel_id', 'username', 'type'])
|
||
writer.writeheader()
|
||
writer.writerows(channels_data)
|
||
print(f"\n✅ 频道列表已保存到 {csv_file}")
|
||
|
||
return channels_data
|
||
|
||
except Exception as e:
|
||
print(f"列出频道时出错:{e}")
|
||
return []
|
||
|
||
def display_qr_code_ascii(self, qr_login):
|
||
qr = qrcode.QRCode(box_size=1, border=1)
|
||
qr.add_data(qr_login.url)
|
||
qr.make()
|
||
|
||
f = StringIO()
|
||
qr.print_ascii(out=f)
|
||
f.seek(0)
|
||
print(f.read())
|
||
|
||
def _stdin_interactive(self) -> bool:
|
||
try:
|
||
if (os.getenv("TELEGRAM_WEB_UI") or "").strip().lower() in ("1", "true", "yes", "on"):
|
||
return False
|
||
return sys.stdin is not None and sys.stdin.isatty()
|
||
except Exception:
|
||
return False
|
||
|
||
async def qr_code_auth(self, headless_hint: bool = False):
|
||
print("\n正在使用二维码登录...")
|
||
print("请用 Telegram 手机端扫描二维码:")
|
||
print("1. 在手机上打开 Telegram")
|
||
print("2. 进入 设置 > 设备 > 扫描二维码")
|
||
print("3. 扫描下方二维码(或打开下方链接)\n")
|
||
|
||
qr_login = await self.client.qr_login()
|
||
print(f"二维码链接(可复制到手机浏览器):\n{qr_login.url}\n")
|
||
self.display_qr_code_ascii(qr_login)
|
||
if headless_hint:
|
||
print("(Web/Docker 环境请查看本页「运行日志」或容器日志中的上述链接。)")
|
||
|
||
try:
|
||
await qr_login.wait()
|
||
print("\n✅ 二维码登录成功!")
|
||
return True
|
||
except SessionPasswordNeededError:
|
||
pwd = (os.getenv("TELEGRAM_2FA_PASSWORD") or "").strip()
|
||
if pwd:
|
||
await self.client.sign_in(password=pwd)
|
||
print("\n✅ 两步验证登录成功!")
|
||
return True
|
||
if self._stdin_interactive():
|
||
password = input("已开启两步验证,请输入密码:")
|
||
await self.client.sign_in(password=password)
|
||
print("\n✅ 两步验证登录成功!")
|
||
return True
|
||
print("已开启两步验证:请在 .env 中设置 TELEGRAM_2FA_PASSWORD 后重试(勿提交到 Git)。")
|
||
return False
|
||
except Exception as e:
|
||
print(f"\n❌ 二维码登录失败:{e}")
|
||
return False
|
||
|
||
async def phone_auth(self):
|
||
phone = input("请输入手机号:")
|
||
await self.client.send_code_request(phone)
|
||
code = input("请输入收到的验证码:")
|
||
|
||
try:
|
||
await self.client.sign_in(phone, code)
|
||
print("\n✅ 手机号登录成功!")
|
||
return True
|
||
except SessionPasswordNeededError:
|
||
password = input("已开启两步验证,请输入密码:")
|
||
await self.client.sign_in(password=password)
|
||
print("\n✅ 两步验证登录成功!")
|
||
return True
|
||
except Exception as e:
|
||
print(f"\n❌ 手机号登录失败:{e}")
|
||
return False
|
||
|
||
async def initialize_client(self):
|
||
if not self.state.get('api_id') and os.getenv("API_ID"):
|
||
try:
|
||
self.state['api_id'] = int(os.getenv("API_ID"))
|
||
except ValueError:
|
||
print("环境变量中的 API_ID 无效,将改为手动输入。")
|
||
|
||
if not self.state.get('api_hash') and os.getenv("API_HASH"):
|
||
self.state['api_hash'] = os.getenv("API_HASH")
|
||
|
||
if not all([self.state.get('api_id'), self.state.get('api_hash')]):
|
||
print("\n=== 需要配置 API ===")
|
||
print("请提供来自 https://my.telegram.org 的 API 凭据")
|
||
if not self._stdin_interactive():
|
||
print("当前为非交互环境:请在 .env 或网页「环境配置」中填写 API_ID / API_HASH。")
|
||
return False
|
||
try:
|
||
self.state['api_id'] = int(input("请输入 API ID:"))
|
||
self.state['api_hash'] = input("请输入 API Hash:")
|
||
self.save_state()
|
||
except ValueError:
|
||
print("API ID 无效,必须是数字。")
|
||
return False
|
||
except EOFError:
|
||
print("无法读取输入(EOF)。请在 .env 中配置 API_ID / API_HASH。")
|
||
return False
|
||
|
||
proxy = self.build_proxy_config()
|
||
if proxy:
|
||
print(f"正在使用代理:{proxy[0]}://{proxy[1]}:{proxy[2]}")
|
||
|
||
self.client = TelegramClient('session', self.state['api_id'], self.state['api_hash'], proxy=proxy)
|
||
|
||
try:
|
||
await self.client.connect()
|
||
except Exception as e:
|
||
print(f"连接失败:{e}")
|
||
return False
|
||
|
||
if not await self.client.is_user_authorized():
|
||
if not self._stdin_interactive():
|
||
print(
|
||
"\n=== Web / Docker 等非交互环境 ===\n"
|
||
"无法使用「手机号登录」或控制台选项。\n"
|
||
"任选其一:\n"
|
||
" A) 将本机已登录生成的 session.session 复制到程序目录(与 app_web.py 同级),再点「初始化/连接」。\n"
|
||
" B) 在 .env 设置 TELEGRAM_HEADLESS_QR=1,重启后点「初始化/连接」,在「运行日志」里打开「二维码链接」用手机 Telegram 扫码。\n"
|
||
" 若开启了两步验证,可设 TELEGRAM_2FA_PASSWORD(勿提交到 Git)。\n"
|
||
)
|
||
headless_qr = (os.getenv("TELEGRAM_HEADLESS_QR") or "").strip().lower() in (
|
||
"1",
|
||
"true",
|
||
"yes",
|
||
"on",
|
||
)
|
||
if headless_qr:
|
||
success = await self.qr_code_auth(headless_hint=True)
|
||
if not success:
|
||
await self.client.disconnect()
|
||
return False
|
||
else:
|
||
await self.client.disconnect()
|
||
return False
|
||
else:
|
||
print("\n=== 请选择登录方式 ===")
|
||
print("[1] 二维码登录(推荐,无需手机号)")
|
||
print("[2] 手机号登录(传统方式)")
|
||
|
||
while True:
|
||
try:
|
||
choice = input("请输入选项(1 或 2):").strip()
|
||
except EOFError:
|
||
print("输入结束,已取消登录。")
|
||
await self.client.disconnect()
|
||
return False
|
||
if choice in ["1", "2"]:
|
||
break
|
||
print("请输入 1 或 2")
|
||
|
||
success = await self.qr_code_auth() if choice == "1" else await self.phone_auth()
|
||
|
||
if not success:
|
||
print("登录失败,请重试。")
|
||
await self.client.disconnect()
|
||
return False
|
||
else:
|
||
print("✅ 已登录!")
|
||
|
||
return True
|
||
|
||
def parse_channel_selection(self, choice):
|
||
channels_list = list(self.state['channels'].keys())
|
||
selected_channels = []
|
||
|
||
if choice.lower() == 'all':
|
||
return channels_list
|
||
|
||
for selection in [x.strip() for x in choice.split(',')]:
|
||
try:
|
||
if selection.startswith('-'):
|
||
if selection in self.state['channels']:
|
||
selected_channels.append(selection)
|
||
else:
|
||
print(f"你的频道列表中不存在频道 ID {selection}")
|
||
else:
|
||
num = int(selection)
|
||
if 1 <= num <= len(channels_list):
|
||
selected_channels.append(channels_list[num - 1])
|
||
else:
|
||
print(f"频道编号无效:{num}。有效范围:1-{len(channels_list)}")
|
||
except ValueError:
|
||
print(f"输入无效:{selection}。请用编号(1,2,3)或完整频道ID(-100123...)")
|
||
|
||
return selected_channels
|
||
|
||
async def scrape_specific_channels(self):
|
||
if not self.state['channels']:
|
||
print("当前没有可用频道,请先用 [L] 添加频道")
|
||
return
|
||
|
||
await self.view_channels()
|
||
print("\n📥 抓取选项:")
|
||
print("• 单个:1 或 -1001234567890")
|
||
print("• 多个:1,3,5 或混合输入")
|
||
print("• 全部频道:all")
|
||
|
||
choice = input("\n请输入选择:").strip()
|
||
selected_channels = self.parse_channel_selection(choice)
|
||
|
||
if selected_channels:
|
||
print(f"\n🚀 开始抓取 {len(selected_channels)} 个频道...")
|
||
for i, channel in enumerate(selected_channels, 1):
|
||
print(f"\n[{i}/{len(selected_channels)}] 正在抓取:{channel}")
|
||
await self.scrape_channel(channel, self.state['channels'][channel])
|
||
print(f"\n✅ 已完成 {len(selected_channels)} 个频道的抓取!")
|
||
else:
|
||
print("❌ 未选择有效频道")
|
||
|
||
async def manage_channels(self):
|
||
while True:
|
||
print("\n" + "="*40)
|
||
print(" Telegram 抓取工具")
|
||
print("="*40)
|
||
print("[S] 抓取频道")
|
||
print("[C] 持续抓取")
|
||
print(f"[M] 媒体抓取:{'开启' if self.state['scrape_media'] else '关闭'}")
|
||
print("[L] 列出并添加频道")
|
||
print("[R] 移除频道")
|
||
print("[E] 导出数据")
|
||
print("[T] 补抓媒体")
|
||
print("[F] 修复缺失媒体")
|
||
print("[Q] 退出")
|
||
print("="*40)
|
||
|
||
choice = input("请输入选项:").lower().strip()
|
||
|
||
try:
|
||
if choice == 'r':
|
||
if not self.state['channels']:
|
||
print("没有可移除的频道")
|
||
continue
|
||
|
||
await self.view_channels()
|
||
print("\n移除频道说明:")
|
||
print("• 单个:1 或 -1001234567890")
|
||
print("• 多个:1,2,3 或混合输入")
|
||
selection = input("请输入选择:").strip()
|
||
selected_channels = self.parse_channel_selection(selection)
|
||
|
||
if selected_channels:
|
||
removed_count = 0
|
||
for channel in selected_channels:
|
||
if channel in self.state['channels']:
|
||
del self.state['channels'][channel]
|
||
if 'channel_names' in self.state:
|
||
self.state['channel_names'].pop(channel, None)
|
||
if 'channel_titles' in self.state:
|
||
self.state['channel_titles'].pop(channel, None)
|
||
print(f"✅ 已移除频道 {channel}")
|
||
removed_count += 1
|
||
else:
|
||
print(f"❌ 未找到频道 {channel}")
|
||
|
||
if removed_count > 0:
|
||
self.save_state()
|
||
print(f"\n🎉 已移除 {removed_count} 个频道!")
|
||
await self.view_channels()
|
||
else:
|
||
print("未移除任何频道")
|
||
else:
|
||
print("未选择有效频道")
|
||
|
||
elif choice == 's':
|
||
await self.scrape_specific_channels()
|
||
|
||
elif choice == 'm':
|
||
self.state['scrape_media'] = not self.state['scrape_media']
|
||
self.save_state()
|
||
print(f"\n✅ 媒体抓取已{'开启' if self.state['scrape_media'] else '关闭'}")
|
||
|
||
elif choice == 'c':
|
||
task = asyncio.create_task(self.continuous_scraping())
|
||
print("持续抓取已启动,按 Ctrl+C 停止。")
|
||
try:
|
||
await asyncio.sleep(float('inf'))
|
||
except KeyboardInterrupt:
|
||
self.continuous_scraping_active = False
|
||
task.cancel()
|
||
print("\n正在停止持续抓取...")
|
||
try:
|
||
await task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
elif choice == 'e':
|
||
await self.export_data()
|
||
|
||
elif choice == 'l':
|
||
channels_data = await self.list_channels()
|
||
|
||
if not channels_data:
|
||
continue
|
||
|
||
print("\n从上方列表添加频道:")
|
||
print("• 单个:1 或 -1001234567890")
|
||
print("• 多个:1,3,5 或混合输入")
|
||
print("• 全部频道:all")
|
||
print("• 直接回车可跳过添加")
|
||
selection = input("\n请输入选择(或回车跳过):").strip()
|
||
|
||
if selection:
|
||
added_count = 0
|
||
|
||
if selection.lower() == 'all':
|
||
for channel_info in channels_data:
|
||
channel_id = channel_info['channel_id']
|
||
if channel_id not in self.state['channels']:
|
||
self.state['channels'][channel_id] = 0
|
||
if 'channel_names' not in self.state:
|
||
self.state['channel_names'] = {}
|
||
if 'channel_titles' not in self.state:
|
||
self.state['channel_titles'] = {}
|
||
self.state['channel_names'][channel_id] = channel_info['username']
|
||
self.state['channel_titles'][channel_id] = channel_info['channel_name']
|
||
print(f"✅ 已添加频道 {channel_info['channel_name']}(ID: {channel_id})")
|
||
added_count += 1
|
||
else:
|
||
print(f"频道 {channel_info['channel_name']} 已添加过")
|
||
else:
|
||
for sel in [x.strip() for x in selection.split(',')]:
|
||
try:
|
||
if sel.startswith('-'):
|
||
channel_id = sel
|
||
channel_info = next((c for c in channels_data if c['channel_id'] == channel_id), None)
|
||
if not channel_info:
|
||
print(f"未找到频道 ID {channel_id}")
|
||
continue
|
||
else:
|
||
num = int(sel)
|
||
if 1 <= num <= len(channels_data):
|
||
channel_info = channels_data[num - 1]
|
||
channel_id = channel_info['channel_id']
|
||
else:
|
||
print(f"编号无效:{num}。请选择 1-{len(channels_data)}")
|
||
continue
|
||
|
||
if channel_id in self.state['channels']:
|
||
print(f"频道 {channel_info['channel_name']} 已添加过")
|
||
else:
|
||
self.state['channels'][channel_id] = 0
|
||
if 'channel_names' not in self.state:
|
||
self.state['channel_names'] = {}
|
||
if 'channel_titles' not in self.state:
|
||
self.state['channel_titles'] = {}
|
||
self.state['channel_names'][channel_id] = channel_info['username']
|
||
self.state['channel_titles'][channel_id] = channel_info['channel_name']
|
||
print(f"✅ 已添加频道 {channel_info['channel_name']}(ID: {channel_id})")
|
||
added_count += 1
|
||
|
||
except ValueError:
|
||
print(f"输入无效:{sel}")
|
||
|
||
if added_count > 0:
|
||
self.save_state()
|
||
print(f"\n🎉 已新增 {added_count} 个频道!")
|
||
await self.view_channels()
|
||
else:
|
||
print("没有新增频道")
|
||
|
||
elif choice == 't':
|
||
if not self.state['channels']:
|
||
print("当前没有可用频道,请先添加频道")
|
||
continue
|
||
|
||
await self.view_channels()
|
||
print("\n请输入频道编号(1,2,3...)或完整频道ID(-100123...)")
|
||
selection = input("请输入选择:").strip()
|
||
selected_channels = self.parse_channel_selection(selection)
|
||
|
||
if len(selected_channels) == 1:
|
||
channel = selected_channels[0]
|
||
print(f"正在为频道补抓媒体:{channel}")
|
||
await self.rescrape_media(channel)
|
||
elif len(selected_channels) > 1:
|
||
print("补抓媒体时一次只能选择一个频道")
|
||
else:
|
||
print("未选择有效频道")
|
||
|
||
elif choice == 'f':
|
||
if not self.state['channels']:
|
||
print("当前没有可用频道,请先添加频道")
|
||
continue
|
||
|
||
await self.view_channels()
|
||
print("\n请输入频道编号(1,2,3...)或完整频道ID(-100123...)")
|
||
selection = input("请输入选择:").strip()
|
||
selected_channels = self.parse_channel_selection(selection)
|
||
|
||
if len(selected_channels) == 1:
|
||
channel = selected_channels[0]
|
||
await self.fix_missing_media(channel)
|
||
elif len(selected_channels) > 1:
|
||
print("修复缺失媒体时一次只能选择一个频道")
|
||
else:
|
||
print("未选择有效频道")
|
||
|
||
elif choice == 'q':
|
||
print("\n👋 再见!")
|
||
self.close_db_connections()
|
||
if self.client:
|
||
await self.client.disconnect()
|
||
sys.exit()
|
||
|
||
else:
|
||
print("无效选项")
|
||
|
||
except Exception as e:
|
||
print(f"错误:{e}")
|
||
|
||
async def run(self):
|
||
display_ascii_art()
|
||
if await self.initialize_client():
|
||
try:
|
||
await self.sync_monitor_chats_from_env()
|
||
self._apply_scrape_media_from_env()
|
||
if self._heartbeat_enabled():
|
||
print(
|
||
"💡 心跳已配置开启:请选菜单 [C]「持续抓取」后才会往群里发;"
|
||
"若只用 [S] 单次抓取,不会出现心跳。"
|
||
)
|
||
dests = self._forward_targets_from_env()
|
||
if len(dests) > 1:
|
||
print(
|
||
"ℹ️ FORWARD_TO_CHAT 含多个目标,每条消息会发到多个群。"
|
||
"若只需「多群监控 → 一个汇总频道」,请只保留一个转发 ID,多个源请用 MONITOR_CHATS。"
|
||
)
|
||
await self.manage_channels()
|
||
finally:
|
||
self.close_db_connections()
|
||
if self.client:
|
||
await self.client.disconnect()
|
||
else:
|
||
print("客户端初始化失败,程序退出。")
|
||
|
||
async def main():
|
||
scraper = OptimizedTelegramScraper()
|
||
await scraper.run()
|
||
|
||
def configure_event_loop_policy():
|
||
if sys.platform.startswith("win"):
|
||
try:
|
||
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
|
||
except Exception:
|
||
pass
|
||
|
||
if __name__ == '__main__':
|
||
try:
|
||
configure_event_loop_policy()
|
||
asyncio.run(main())
|
||
except KeyboardInterrupt:
|
||
print("\n程序已中断,正在退出...")
|
||
sys.exit()
|