Files
Resume-python/telegram-scraper.py
2026-04-27 13:23:23 +08:00

1507 lines
66 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
import sqlite3
import json
import csv
import asyncio
import time
import sys
import uuid
import warnings
import tempfile
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
from pathlib import Path
from io import StringIO
from telethon import TelegramClient, utils
from telethon.tl.types import MessageMediaPhoto, MessageMediaDocument, MessageMediaWebPage, User, PeerChannel, Channel, Chat, MessageService
from telethon.errors import FloodWaitError, SessionPasswordNeededError
import qrcode
warnings.filterwarnings("ignore", message="Using async sessions support is an experimental feature")
def load_env_file(env_path: str = ".env"):
if not os.path.exists(env_path):
return
try:
with open(env_path, "r", encoding="utf-8") as f:
for raw_line in f:
line = raw_line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().strip("'").strip('"')
if key and key not in os.environ:
os.environ[key] = value
except Exception:
pass
def display_ascii_art():
WHITE = "\033[97m"
RESET = "\033[0m"
art = r"""
___________________ _________
\__ ___/ _____/ / _____/
| | / \ ___ \_____ \
| | \ \_\ \/ \
|____| \______ /_______ /
\/ \/
"""
print(WHITE + art + RESET)
@dataclass
class MessageData:
message_id: int
date: str
sender_id: int
first_name: Optional[str]
last_name: Optional[str]
username: Optional[str]
message: str
media_type: Optional[str]
media_path: Optional[str]
reply_to: Optional[int]
post_author: Optional[str]
views: Optional[int]
forwards: Optional[int]
reactions: Optional[str]
class OptimizedTelegramScraper:
def __init__(self):
load_env_file()
self.STATE_FILE = 'state.json'
self.state = self.load_state()
self.client = None
self.continuous_scraping_active = False
self.max_concurrent_downloads = 5
self.batch_size = 100
self.state_save_interval = 50
self.db_connections = {}
self._forward_delay_seconds = self._read_forward_delay()
def _read_forward_delay(self) -> float:
raw = os.getenv("FORWARD_DELAY_SECONDS", "").strip()
if not raw:
return 0.0
try:
return max(0.0, float(raw))
except ValueError:
return 0.0
def _apply_scrape_media_from_env(self) -> None:
raw = os.getenv("SCRAPE_MEDIA", "").strip().lower()
if not raw:
return
if raw in ("0", "false", "no", "off"):
self.state["scrape_media"] = False
elif raw in ("1", "true", "yes", "on"):
self.state["scrape_media"] = True
else:
return
self.save_state()
print(
" 已从环境变量 SCRAPE_MEDIA 写入 state"
f"{'开启' if self.state['scrape_media'] else '关闭'}媒体下载(关闭时只存文字与元数据,不抓文件)"
)
def _heartbeat_enabled(self) -> bool:
return self._env_flag_true("HEARTBEAT_ENABLED", "SCRAPER_HEARTBEAT")
def _heartbeat_interval_seconds(self) -> float:
raw = os.getenv("HEARTBEAT_INTERVAL_SECONDS", "30").strip()
try:
return max(5.0, float(raw))
except ValueError:
return 30.0
def _heartbeat_target_spec(self) -> Optional[str]:
s = (os.getenv("HEARTBEAT_TO_CHAT") or os.getenv("HEARTBEAT_TO") or "").strip()
if s:
return s
t = self._forward_targets_from_env()
return t[0] if t else None
async def _heartbeat_loop(self) -> None:
spec = self._heartbeat_target_spec()
if not spec:
return
try:
entity = await self._resolve_forward_entity(spec)
except Exception as e:
print(f"💓 心跳目标无法解析({spec}{e}")
return
while self.continuous_scraping_active:
try:
ts = time.strftime("%Y-%m-%d %H:%M:%S")
await self.client.send_message(
entity, f"💓 抓取心跳 · 持续运行中 · {ts}"
)
except asyncio.CancelledError:
break
except FloodWaitError as e:
await asyncio.sleep(min(e.seconds, 300))
except Exception as e:
print(f"\n💓 心跳发送失败:{e}")
try:
await asyncio.sleep(self._heartbeat_interval_seconds())
except asyncio.CancelledError:
break
def _forward_targets_from_env(self) -> List[str]:
raw = (os.getenv("FORWARD_TO_CHAT") or os.getenv("FORWARD_TO") or "").strip()
if not raw:
return []
parts = re.split(r"[,;]+", raw)
out: List[str] = []
for p in parts:
s = p.strip().strip("\ufeff").strip("\u200b")
if s:
out.append(s)
return out
def _monitor_chats_specs_from_env(self) -> List[str]:
raw = (os.getenv("MONITOR_CHATS") or os.getenv("MONITOR_GROUPS") or "").strip()
if not raw:
return []
parts = re.split(r"[,;]+", raw)
return [p.strip().strip("\ufeff").strip("\u200b") for p in parts if p.strip()]
async def sync_monitor_chats_from_env(self) -> None:
"""把 MONITOR_CHATS 中的多个群/频道加入 state['channels'],与菜单 [L] 添加效果一致。"""
specs = self._monitor_chats_specs_from_env()
if not specs or not self.client:
return
if "channel_names" not in self.state:
self.state["channel_names"] = {}
if "channel_titles" not in self.state:
self.state["channel_titles"] = {}
added = 0
for spec in specs:
try:
ent = await self._resolve_forward_entity(spec)
cid = str(utils.get_peer_id(ent))
uname = getattr(ent, "username", None) or "no_username"
title = getattr(ent, "title", None) or cid
if cid not in self.state["channels"]:
self.state["channels"][cid] = 0
self.state["channel_names"][cid] = uname
self.state["channel_titles"][cid] = str(title) if title else ""
print(f"📌 已从 MONITOR_CHATS 加入监控源:{title}{cid}")
added += 1
else:
self.state["channel_names"][cid] = uname
self.state["channel_titles"][cid] = str(title) if title else ""
except Exception as e:
print(f"⚠️ MONITOR_CHATS 无法解析「{spec}」:{e}")
if added:
self.save_state()
def _normalize_entity_input(self, spec: str) -> str:
"""把 t.me / telegram.me 链接规范成 @username便于 get_entity 解析。"""
s = (spec or "").strip().strip("\ufeff").strip("\u200b")
if not s:
return s
# 邀请链接保留原样,交给 get_entity 处理
if re.search(r"(?:t\.me|telegram\.me)/(?:joinchat/|\+)", s, re.I):
return s
m = re.search(
r"(?:https?://)?(?:t\.me|telegram\.me)/([A-Za-z0-9_]+)(?:[/?#].*)?$",
s,
re.I,
)
if m:
slug = m.group(1)
if slug.isdigit():
return s
reserved = {"addstickers", "share", "iv", "socks", "setlanguage", "proxy"}
if slug.lower() in reserved:
return s
return "@" + slug
return s
async def _resolve_forward_entity(self, spec: str):
"""解析转发目标:-100 超级群用 PeerChannel避免 get_entity(str) 偶发失败。"""
s = self._normalize_entity_input(spec)
if not s:
raise ValueError("转发目标为空")
if s.startswith("@"):
return await self.client.get_entity(s)
if re.fullmatch(r"-100\d+", s):
inner = int(s[4:])
try:
return await self.client.get_entity(PeerChannel(inner))
except Exception:
return await self.client.get_entity(int(s))
if re.fullmatch(r"-?\d+", s):
num = int(s)
try:
return await self.client.get_entity(num)
except Exception:
sn = str(num)
if sn.startswith("-100") and len(sn) > 4 and sn[4:].isdigit():
return await self.client.get_entity(PeerChannel(int(sn[4:])))
raise
return await self.client.get_entity(s)
def _env_flag_true(self, *keys: str) -> bool:
for key in keys:
v = os.getenv(key, "").strip().lower()
if v in ("1", "true", "yes", "on"):
return True
return False
def _forward_only_new_messages(self) -> bool:
return self._env_flag_true(
"FORWARD_ONLY_NEW_MESSAGES",
"FORWARD_ONLY_IF_OFFSET_POSITIVE",
)
def _forward_my_username(self) -> Optional[str]:
raw = (os.getenv("FORWARD_MY_USERNAME") or os.getenv("FORWARD_AS_USERNAME") or "").strip()
if not raw:
return None
return raw.lstrip("@").strip() or None
def _env_flag_false(self, *keys: str) -> bool:
"""为真时表示显式关闭(用于默认开启的行为)。"""
for key in keys:
v = os.getenv(key, "").strip().lower()
if v in ("0", "false", "no", "off"):
return True
return False
def _forward_replace_mentions_enabled(self) -> bool:
if self._env_flag_false("FORWARD_RAW_MENTIONS", "FORWARD_NO_MENTION_REPLACE"):
return False
return True
def _sanitize_mentions_text(self, text: str, replacement: str) -> str:
if not text:
return ""
if not replacement:
return text
# Telegram 用户名一般为 532 位 [A-Za-z0-9_]4 位兼容边界情况
return re.sub(r"@[A-Za-z0-9_]{4,32}\b", lambda _: replacement, text)
async def _resolve_forward_mention_replacement(self, me) -> str:
"""推送到汇总群时,用于替换正文中所有 @用户名的展示串(优先环境变量,否则为当前登录号)。"""
custom = self._forward_my_username()
if custom:
return "@" + custom.lstrip("@")
if getattr(me, "username", None):
return "@" + me.username
parts = [
(getattr(me, "first_name", None) or "").strip(),
(getattr(me, "last_name", None) or "").strip(),
]
name = " ".join(p for p in parts if p)
return name or ""
async def _forward_scraped_message(
self,
target,
_source_entity,
message,
*,
replacement: str,
replace_mentions: bool,
) -> None:
"""以当前账号「重发」副本:先本地落盘再上传,避免出现「转发自 …」卡片。"""
if isinstance(message, MessageService):
return
text_raw = message.message or ""
text_out = (
self._sanitize_mentions_text(text_raw, replacement)
if replace_mentions
else text_raw
)
has_media = bool(
message.media and not isinstance(message.media, MessageMediaWebPage)
)
async def _send_copy() -> None:
cap = text_out.strip() or None
if has_media:
if isinstance(message.media, MessageMediaPhoto):
suffix = ".jpg"
else:
suffix = ".bin"
if isinstance(message.media, MessageMediaDocument) and message.file:
ext = getattr(message.file, "ext", None) or ""
nm = getattr(message.file, "name", None) or ""
if nm:
suf = Path(nm).suffix
if suf:
suffix = suf
elif ext:
suffix = "." + ext.lstrip(".")
tmp_path = str(
Path(tempfile.gettempdir()) / f"tgfwd_{uuid.uuid4().hex}{suffix}"
)
saved: Optional[str] = None
try:
saved = await self.client.download_media(message, file=tmp_path)
if saved and Path(saved).exists():
await self.client.send_file(target, saved, caption=cap)
return
finally:
if saved:
try:
Path(saved).unlink(missing_ok=True)
except OSError:
pass
if cap:
await self.client.send_message(target, cap)
else:
print(
f"\n⚠️ 消息 {message.id} 有媒体但下载失败,且无可用文字说明,已跳过推送。"
)
return
if cap:
await self.client.send_message(target, cap)
try:
await _send_copy()
if self._forward_delay_seconds > 0:
await asyncio.sleep(self._forward_delay_seconds)
except FloodWaitError as e:
await asyncio.sleep(e.seconds)
try:
await _send_copy()
except Exception as err:
print(f"\n推送消息 {message.id} 时仍失败FloodWait 后):{err}")
except Exception as e:
print(f"\n推送消息 {message.id} 失败(可能无发言权限或媒体无法下载):{e}")
def build_proxy_config(self):
"""代理策略:
- TELEGRAM_WEB_UI网页端默认直连仅当 PROXY_ENABLED=1/true/on 时才使用代理(避免服务器误配 HOST 即走代理)。
- 命令行本地PROXY_ENABLED=0/false/off 禁用;未设置 PROXY_ENABLED 时仍可按 HOST+PORT 启用(兼容旧配置)。
"""
pe = (os.getenv("PROXY_ENABLED") or "").strip().lower()
web_ui = (os.getenv("TELEGRAM_WEB_UI") or "").strip().lower() in ("1", "true", "yes", "on")
if web_ui:
if pe not in ("1", "true", "yes", "on"):
return None
else:
if pe in ("0", "false", "no", "off", "none"):
return None
proxy_host = (os.getenv("PROXY_HOST") or "").strip()
proxy_port_raw = (os.getenv("PROXY_PORT") or "").strip()
if not proxy_host or not proxy_port_raw:
return None
if not web_ui and pe not in ("", "1", "true", "yes", "on"):
print(f"PROXY_ENABLED 取值无效({pe!r}),已跳过代理。")
return None
try:
proxy_port = int(proxy_port_raw)
except ValueError:
print("环境变量中的 PROXY_PORT 无效,已跳过代理设置。")
return None
proxy_type = os.getenv("PROXY_TYPE", "socks5").lower()
proxy_username = os.getenv("PROXY_USERNAME")
proxy_password = os.getenv("PROXY_PASSWORD")
if proxy_username or proxy_password:
return (proxy_type, proxy_host, proxy_port, True, proxy_username, proxy_password)
return (proxy_type, proxy_host, proxy_port)
def load_state(self) -> Dict[str, Any]:
if os.path.exists(self.STATE_FILE):
try:
with open(self.STATE_FILE, 'r') as f:
return json.load(f)
except:
pass
return {
'api_id': None,
'api_hash': None,
'channels': {},
'channel_names': {},
'channel_titles': {},
'scrape_media': True,
}
def save_state(self):
try:
with open(self.STATE_FILE, 'w') as f:
json.dump(self.state, f, indent=2)
except Exception as e:
print(f"保存状态失败:{e}")
def get_db_connection(self, channel: str) -> sqlite3.Connection:
if channel not in self.db_connections:
channel_dir = Path(channel)
channel_dir.mkdir(exist_ok=True)
db_file = channel_dir / f'{channel}.db'
# 多频道轮询写入时适当延长等待,减轻与其它进程/备份工具短暂抢锁时的 OperationalError
conn = sqlite3.connect(str(db_file), check_same_thread=False, timeout=60.0)
conn.execute("PRAGMA busy_timeout=60000")
conn.execute('''CREATE TABLE IF NOT EXISTS messages
(id INTEGER PRIMARY KEY, message_id INTEGER UNIQUE, date TEXT,
sender_id INTEGER, first_name TEXT, last_name TEXT, username TEXT,
message TEXT, media_type TEXT, media_path TEXT, reply_to INTEGER,
post_author TEXT, views INTEGER, forwards INTEGER, reactions TEXT)''')
conn.execute('CREATE INDEX IF NOT EXISTS idx_message_id ON messages(message_id)')
conn.execute('CREATE INDEX IF NOT EXISTS idx_date ON messages(date)')
conn.execute('PRAGMA journal_mode=WAL')
conn.execute('PRAGMA synchronous=NORMAL')
conn.commit()
self.migrate_database(conn)
self.db_connections[channel] = conn
return self.db_connections[channel]
def migrate_database(self, conn: sqlite3.Connection):
cursor = conn.cursor()
cursor.execute("PRAGMA table_info(messages)")
columns = {row[1] for row in cursor.fetchall()}
migrations = []
if 'post_author' not in columns:
migrations.append('ALTER TABLE messages ADD COLUMN post_author TEXT')
if 'views' not in columns:
migrations.append('ALTER TABLE messages ADD COLUMN views INTEGER')
if 'forwards' not in columns:
migrations.append('ALTER TABLE messages ADD COLUMN forwards INTEGER')
if 'reactions' not in columns:
migrations.append('ALTER TABLE messages ADD COLUMN reactions TEXT')
for migration in migrations:
try:
conn.execute(migration)
except:
pass
if migrations:
conn.commit()
def close_db_connections(self):
for conn in self.db_connections.values():
conn.close()
self.db_connections.clear()
def batch_insert_messages(self, channel: str, messages: List[MessageData]):
if not messages:
return
conn = self.get_db_connection(channel)
data = [(msg.message_id, msg.date, msg.sender_id, msg.first_name,
msg.last_name, msg.username, msg.message, msg.media_type,
msg.media_path, msg.reply_to, msg.post_author, msg.views,
msg.forwards, msg.reactions) for msg in messages]
conn.executemany('''INSERT OR IGNORE INTO messages
(message_id, date, sender_id, first_name, last_name, username,
message, media_type, media_path, reply_to, post_author, views,
forwards, reactions)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', data)
conn.commit()
async def download_media(self, channel: str, message) -> Optional[str]:
if not message.media or not self.state['scrape_media']:
return None
if isinstance(message.media, MessageMediaWebPage):
return None
try:
channel_dir = Path(channel)
media_folder = channel_dir / 'media'
media_folder.mkdir(exist_ok=True)
if isinstance(message.media, MessageMediaPhoto):
original_name = getattr(message.file, 'name', None) or "photo.jpg"
ext = "jpg"
elif isinstance(message.media, MessageMediaDocument):
ext = getattr(message.file, 'ext', 'bin') if message.file else 'bin'
original_name = getattr(message.file, 'name', None) or f"document.{ext}"
else:
return None
base_name = Path(original_name).stem
extension = Path(original_name).suffix or f".{ext}"
unique_filename = f"{message.id}-{base_name}{extension}"
media_path = media_folder / unique_filename
existing_files = list(media_folder.glob(f"{message.id}-*"))
if existing_files:
return str(existing_files[0])
for attempt in range(3):
try:
downloaded_path = await message.download_media(file=str(media_path))
if downloaded_path and Path(downloaded_path).exists():
return downloaded_path
else:
return None
except FloodWaitError as e:
if attempt < 2:
await asyncio.sleep(e.seconds)
else:
return None
except Exception:
if attempt < 2:
await asyncio.sleep(2 ** attempt)
else:
return None
return None
except Exception:
return None
async def update_media_path(self, channel: str, message_id: int, media_path: str):
conn = self.get_db_connection(channel)
conn.execute('UPDATE messages SET media_path = ? WHERE message_id = ?',
(media_path, message_id))
conn.commit()
async def scrape_channel(self, channel: str, offset_id: int):
try:
entity = await self.client.get_entity(PeerChannel(int(channel)) if channel.startswith('-') else channel)
result = await self.client.get_messages(entity, offset_id=offset_id, reverse=True, limit=0)
total_messages = result.total
if total_messages == 0:
print(f"频道 {channel} 中未找到消息")
return
print(f"在频道 {channel} 中找到 {total_messages} 条消息")
forward_specs = self._forward_targets_from_env()
forward_pairs: List[Tuple[Any, str]] = []
mention_replacement = ""
replace_mentions = True
if forward_specs:
for spec in forward_specs:
try:
ent = await self._resolve_forward_entity(spec)
forward_pairs.append((ent, spec))
except Exception as e:
print(f"⚠️ 转发目标解析失败:{spec}{e}")
forward_entities = [p[0] for p in forward_pairs]
if not forward_entities:
print("⚠️ 没有可用的转发目标(已跳过转发)")
else:
dest_titles = [getattr(ent, "title", None) or sp for ent, sp in forward_pairs]
only_new = self._forward_only_new_messages()
if only_new and offset_id <= 0:
joined = "".join(dest_titles)
print(
f"📤 已配置 {len(forward_entities)} 个转发目标({joined}),但仅新消息模式开启且当前 offset_id={offset_id}"
"本次为历史抓取,不向群组推送;待 state 中已有进度后再次抓取时才会转发新消息。"
)
forward_entities = []
else:
me = await self.client.get_me()
mention_replacement = await self._resolve_forward_mention_replacement(me)
replace_mentions = self._forward_replace_mentions_enabled()
hints = []
if only_new:
hints.append("仅新消息")
hints.append("以当前账号重发(非转发卡片)")
if replace_mentions:
hints.append(f"正文 @用户名 → {mention_replacement}")
else:
hints.append("保留正文 @FORWARD_RAW_MENTIONS=1")
hint_str = "" + "".join(hints) + "" if hints else ""
joined = "".join(dest_titles)
print(f"📤 推送已开启{hint_str}:共 {len(forward_entities)} 个目标 → {joined}")
else:
forward_entities = []
message_batch = []
media_tasks = []
processed_messages = 0
last_message_id = offset_id
semaphore = asyncio.Semaphore(self.max_concurrent_downloads)
async for message in self.client.iter_messages(entity, offset_id=offset_id, reverse=True):
try:
sender = await message.get_sender()
reactions_str = None
if message.reactions and message.reactions.results:
reactions_parts = []
for reaction in message.reactions.results:
emoji = getattr(reaction.reaction, 'emoticon', '')
count = reaction.count
if emoji:
reactions_parts.append(f"{emoji} {count}")
if reactions_parts:
reactions_str = ' '.join(reactions_parts)
msg_data = MessageData(
message_id=message.id,
date=message.date.strftime('%Y-%m-%d %H:%M:%S'),
sender_id=message.sender_id,
first_name=getattr(sender, 'first_name', None) if isinstance(sender, User) else None,
last_name=getattr(sender, 'last_name', None) if isinstance(sender, User) else None,
username=getattr(sender, 'username', None) if isinstance(sender, User) else None,
message=message.message or '',
media_type=message.media.__class__.__name__ if message.media else None,
media_path=None,
reply_to=message.reply_to_msg_id if message.reply_to else None,
post_author=message.post_author,
views=message.views,
forwards=message.forwards,
reactions=reactions_str
)
message_batch.append(msg_data)
for fwd_ent in forward_entities:
await self._forward_scraped_message(
fwd_ent,
entity,
message,
replacement=mention_replacement,
replace_mentions=replace_mentions,
)
if self.state['scrape_media'] and message.media and not isinstance(message.media, MessageMediaWebPage):
media_tasks.append(message)
last_message_id = message.id
processed_messages += 1
if len(message_batch) >= self.batch_size:
self.batch_insert_messages(channel, message_batch)
message_batch.clear()
if processed_messages % self.state_save_interval == 0:
self.state['channels'][channel] = last_message_id
self.save_state()
progress = (processed_messages / total_messages) * 100
bar_length = 30
filled_length = int(bar_length * processed_messages // total_messages)
bar = '' * filled_length + '' * (bar_length - filled_length)
sys.stdout.write(f"\r📄 Messages: [{bar}] {progress:.1f}% ({processed_messages}/{total_messages})")
sys.stdout.flush()
except Exception as e:
print(f"\n处理消息 {message.id} 时出错:{e}")
if message_batch:
self.batch_insert_messages(channel, message_batch)
if media_tasks:
total_media = len(media_tasks)
completed_media = 0
successful_downloads = 0
print(f"\n📥 Downloading {total_media} media files...")
semaphore = asyncio.Semaphore(self.max_concurrent_downloads)
async def download_single_media(message):
async with semaphore:
return await self.download_media(channel, message)
batch_size = 10
for i in range(0, len(media_tasks), batch_size):
batch = media_tasks[i:i + batch_size]
tasks = [asyncio.create_task(download_single_media(msg)) for msg in batch]
for j, task in enumerate(tasks):
try:
media_path = await task
if media_path:
await self.update_media_path(channel, batch[j].id, media_path)
successful_downloads += 1
except Exception:
pass
completed_media += 1
progress = (completed_media / total_media) * 100
bar_length = 30
filled_length = int(bar_length * completed_media // total_media)
bar = '' * filled_length + '' * (bar_length - filled_length)
sys.stdout.write(f"\r📥 Media: [{bar}] {progress:.1f}% ({completed_media}/{total_media})")
sys.stdout.flush()
print(f"\n✅ 媒体下载完成!(成功 {successful_downloads}/{total_media}")
self.state['channels'][channel] = last_message_id
self.save_state()
print(f"\nCompleted scraping channel {channel}")
except Exception as e:
print(f"处理频道 {channel} 时出错:{e}")
async def rescrape_media(self, channel: str):
conn = self.get_db_connection(channel)
cursor = conn.cursor()
cursor.execute('SELECT message_id FROM messages WHERE media_type IS NOT NULL AND media_type != "MessageMediaWebPage" AND media_path IS NULL')
message_ids = [row[0] for row in cursor.fetchall()]
channel_name = self.state.get('channel_names', {}).get(channel, 'Unknown')
if not message_ids:
print(f"{channel_name}ID: {channel})没有可补抓的媒体文件")
return
print(f"📥 Reprocessing {len(message_ids)} media files for {channel_name} (ID: {channel})")
try:
if channel.lstrip('-').isdigit():
entity = await self.client.get_entity(PeerChannel(int(channel)))
else:
entity = await self.client.get_entity(channel)
semaphore = asyncio.Semaphore(self.max_concurrent_downloads)
completed_media = 0
successful_downloads = 0
async def download_single_media(message):
async with semaphore:
return await self.download_media(channel, message)
batch_size = 10
for i in range(0, len(message_ids), batch_size):
batch_ids = message_ids[i:i + batch_size]
messages = await self.client.get_messages(entity, ids=batch_ids)
valid_messages = [msg for msg in messages if msg and msg.media and not isinstance(msg.media, MessageMediaWebPage)]
tasks = [asyncio.create_task(download_single_media(msg)) for msg in valid_messages]
for j, task in enumerate(tasks):
try:
media_path = await task
if media_path:
await self.update_media_path(channel, valid_messages[j].id, media_path)
successful_downloads += 1
except Exception:
pass
completed_media += 1
progress = (completed_media / len(message_ids)) * 100
bar_length = 30
filled_length = int(bar_length * completed_media // len(message_ids))
bar = '' * filled_length + '' * (bar_length - filled_length)
sys.stdout.write(f"\r🔄 Rescrape: [{bar}] {progress:.1f}% ({completed_media}/{len(message_ids)})")
sys.stdout.flush()
print(f"\n✅ 媒体补抓完成!(成功 {successful_downloads}/{len(message_ids)}")
except Exception as e:
print(f"媒体补抓出错:{e}")
async def fix_missing_media(self, channel: str):
conn = self.get_db_connection(channel)
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM messages WHERE media_type IS NOT NULL AND media_type != "MessageMediaWebPage"')
total_with_media = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM messages WHERE media_type IS NOT NULL AND media_type != "MessageMediaWebPage" AND media_path IS NOT NULL')
total_with_files = cursor.fetchone()[0]
missing_count = total_with_media - total_with_files
channel_name = self.state.get('channel_names', {}).get(channel, 'Unknown')
print(f"\n📊 {channel_name}ID: {channel})媒体分析:")
print(f"含媒体的消息数:{total_with_media}")
print(f"已下载媒体文件数:{total_with_files}")
print(f"缺失媒体文件数:{missing_count}")
if missing_count == 0:
print("✅ 所有媒体文件都已下载!")
return
cursor.execute('SELECT message_id, media_type FROM messages WHERE media_type IS NOT NULL AND media_type != "MessageMediaWebPage" AND (media_path IS NULL OR media_path = "")')
missing_media = cursor.fetchall()
if not missing_media:
print("✅ 未发现缺失媒体!")
return
print(f"\n🔧 Attempting to download {len(missing_media)} missing media files...")
try:
if channel.lstrip('-').isdigit():
entity = await self.client.get_entity(PeerChannel(int(channel)))
else:
entity = await self.client.get_entity(channel)
semaphore = asyncio.Semaphore(self.max_concurrent_downloads)
completed_media = 0
successful_downloads = 0
async def download_single_media(message):
async with semaphore:
return await self.download_media(channel, message)
batch_size = 10
for i in range(0, len(missing_media), batch_size):
batch = missing_media[i:i + batch_size]
message_ids = [msg[0] for msg in batch]
messages = await self.client.get_messages(entity, ids=message_ids)
valid_messages = [msg for msg in messages if msg and msg.media and not isinstance(msg.media, MessageMediaWebPage)]
tasks = [asyncio.create_task(download_single_media(msg)) for msg in valid_messages]
for j, task in enumerate(tasks):
try:
media_path = await task
if media_path:
await self.update_media_path(channel, valid_messages[j].id, media_path)
successful_downloads += 1
except Exception:
pass
completed_media += 1
progress = (completed_media / len(missing_media)) * 100
bar_length = 30
filled_length = int(bar_length * completed_media // len(missing_media))
bar = '' * filled_length + '' * (bar_length - filled_length)
sys.stdout.write(f"\r🔧 Fix Media: [{bar}] {progress:.1f}% ({completed_media}/{len(missing_media)})")
sys.stdout.flush()
print(f"\n✅ 缺失媒体修复完成!(成功 {successful_downloads}/{len(missing_media)}")
except Exception as e:
print(f"修复缺失媒体时出错:{e}")
async def continuous_scraping(self):
self.continuous_scraping_active = True
hb_task = None
if self._heartbeat_enabled():
if self._heartbeat_target_spec():
hb_task = asyncio.create_task(self._heartbeat_loop())
print(
f"💓 抓取心跳已开启:启动后先发一条,之后约每 "
f"{self._heartbeat_interval_seconds():.0f} 秒一条(未配置 HEARTBEAT_TO_CHAT 时用 FORWARD_TO_CHAT 的首个目标)"
)
else:
print(
"⚠️ HEARTBEAT_ENABLED=1 但未配置 HEARTBEAT_TO_CHAT且 FORWARD_TO_CHAT 为空,"
"已跳过心跳。"
)
try:
while self.continuous_scraping_active:
start_time = time.time()
for channel in self.state['channels']:
if not self.continuous_scraping_active:
break
print(f"\nChecking for new messages in channel: {channel}")
await self.scrape_channel(channel, self.state['channels'][channel])
elapsed = time.time() - start_time
sleep_time = max(0, 60 - elapsed)
if sleep_time > 0:
await asyncio.sleep(sleep_time)
except asyncio.CancelledError:
print("持续抓取已停止")
raise
finally:
self.continuous_scraping_active = False
if hb_task:
hb_task.cancel()
try:
await hb_task
except asyncio.CancelledError:
pass
def get_export_filename(self, channel: str):
username = self.state.get('channel_names', {}).get(channel, 'no_username')
return f"{channel}_{username}"
def export_to_csv(self, channel: str):
conn = self.get_db_connection(channel)
filename = self.get_export_filename(channel)
csv_file = Path(channel) / f'{filename}.csv'
cursor = conn.cursor()
cursor.execute('SELECT * FROM messages ORDER BY date')
columns = [description[0] for description in cursor.description]
with open(csv_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(columns)
while True:
rows = cursor.fetchmany(1000)
if not rows:
break
writer.writerows(rows)
def export_to_json(self, channel: str):
conn = self.get_db_connection(channel)
filename = self.get_export_filename(channel)
json_file = Path(channel) / f'{filename}.json'
cursor = conn.cursor()
cursor.execute('SELECT * FROM messages ORDER BY date')
columns = [description[0] for description in cursor.description]
with open(json_file, 'w', encoding='utf-8') as f:
f.write('[\n')
first_row = True
while True:
rows = cursor.fetchmany(1000)
if not rows:
break
for row in rows:
if not first_row:
f.write(',\n')
else:
first_row = False
data = dict(zip(columns, row))
json.dump(data, f, ensure_ascii=False, indent=2)
f.write('\n]')
async def export_data(self):
if not self.state['channels']:
print("没有可导出的频道")
return
for channel in self.state['channels']:
print(f"正在导出频道 {channel} 的数据...")
try:
self.export_to_csv(channel)
self.export_to_json(channel)
print(f"✅ 频道 {channel} 导出完成")
except Exception as e:
print(f"❌ 频道 {channel} 导出失败:{e}")
async def view_channels(self):
if not self.state['channels']:
print("还没有保存任何频道")
return
print("\n当前频道列表:")
for i, (channel, last_id) in enumerate(self.state['channels'].items(), 1):
try:
conn = self.get_db_connection(channel)
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM messages')
count = cursor.fetchone()[0]
channel_name = self.state.get('channel_names', {}).get(channel, 'Unknown')
print(f"[{i}] {channel_name} (ID: {channel}), Last Message ID: {last_id}, Messages: {count}")
except:
channel_name = self.state.get('channel_names', {}).get(channel, 'Unknown')
print(f"[{i}] {channel_name} (ID: {channel}), Last Message ID: {last_id}")
def _account_list_exclude_substrings(self) -> List[str]:
"""账号可见列表中不展示的会话标题子串(如自有招聘群),环境变量 ACCOUNT_LIST_EXCLUDE_SUBSTRINGS 逗号分隔;* 表示不隐藏。"""
raw = (os.getenv("ACCOUNT_LIST_EXCLUDE_SUBSTRINGS") or "").strip()
if raw in ("*", "none", "off"):
return []
if raw:
return [p.strip() for p in re.split(r"[,;]+", raw) if p.strip()]
return ["远程-到岗-技术招聘"]
def _skip_dialog_in_account_channel_list(self, title: Optional[str]) -> bool:
if not title:
return False
t = str(title).strip()
for sub in self._account_list_exclude_substrings():
if sub and sub in t:
return True
return False
async def list_channels(self):
try:
print("\n当前账号加入的频道/群组:")
channels_data = []
async for dialog in self.client.iter_dialogs():
entity = dialog.entity
if dialog.id != 777000 and (isinstance(entity, Channel) or isinstance(entity, Chat)):
if self._skip_dialog_in_account_channel_list(getattr(dialog, "title", None) or getattr(entity, "title", None)):
continue
channel_type = "频道" if isinstance(entity, Channel) and entity.broadcast else "群组"
username = getattr(entity, 'username', None) or 'no_username'
n = len(channels_data) + 1
print(f"[{n}] {dialog.title}ID: {dialog.id},类型:{channel_type},用户名:@{username}")
channels_data.append({
'number': n,
'channel_name': dialog.title,
'channel_id': str(dialog.id),
'username': username,
'type': channel_type
})
if channels_data:
csv_file = Path('channels_list.csv')
with open(csv_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=['number', 'channel_name', 'channel_id', 'username', 'type'])
writer.writeheader()
writer.writerows(channels_data)
print(f"\n✅ 频道列表已保存到 {csv_file}")
return channels_data
except Exception as e:
print(f"列出频道时出错:{e}")
return []
def display_qr_code_ascii(self, qr_login):
qr = qrcode.QRCode(box_size=1, border=1)
qr.add_data(qr_login.url)
qr.make()
f = StringIO()
qr.print_ascii(out=f)
f.seek(0)
print(f.read())
def _stdin_interactive(self) -> bool:
try:
if (os.getenv("TELEGRAM_WEB_UI") or "").strip().lower() in ("1", "true", "yes", "on"):
return False
return sys.stdin is not None and sys.stdin.isatty()
except Exception:
return False
async def qr_code_auth(self, headless_hint: bool = False):
print("\n正在使用二维码登录...")
print("请用 Telegram 手机端扫描二维码:")
print("1. 在手机上打开 Telegram")
print("2. 进入 设置 > 设备 > 扫描二维码")
print("3. 扫描下方二维码(或打开下方链接)\n")
qr_login = await self.client.qr_login()
print(f"二维码链接(可复制到手机浏览器):\n{qr_login.url}\n")
self.display_qr_code_ascii(qr_login)
if headless_hint:
print("Web/Docker 环境请查看本页「运行日志」或容器日志中的上述链接。)")
try:
await qr_login.wait()
print("\n✅ 二维码登录成功!")
return True
except SessionPasswordNeededError:
pwd = (os.getenv("TELEGRAM_2FA_PASSWORD") or "").strip()
if pwd:
await self.client.sign_in(password=pwd)
print("\n✅ 两步验证登录成功!")
return True
if self._stdin_interactive():
password = input("已开启两步验证,请输入密码:")
await self.client.sign_in(password=password)
print("\n✅ 两步验证登录成功!")
return True
print("已开启两步验证:请在 .env 中设置 TELEGRAM_2FA_PASSWORD 后重试(勿提交到 Git")
return False
except Exception as e:
print(f"\n❌ 二维码登录失败:{e}")
return False
async def phone_auth(self):
phone = input("请输入手机号:")
await self.client.send_code_request(phone)
code = input("请输入收到的验证码:")
try:
await self.client.sign_in(phone, code)
print("\n✅ 手机号登录成功!")
return True
except SessionPasswordNeededError:
password = input("已开启两步验证,请输入密码:")
await self.client.sign_in(password=password)
print("\n✅ 两步验证登录成功!")
return True
except Exception as e:
print(f"\n❌ 手机号登录失败:{e}")
return False
async def initialize_client(self):
if not self.state.get('api_id') and os.getenv("API_ID"):
try:
self.state['api_id'] = int(os.getenv("API_ID"))
except ValueError:
print("环境变量中的 API_ID 无效,将改为手动输入。")
if not self.state.get('api_hash') and os.getenv("API_HASH"):
self.state['api_hash'] = os.getenv("API_HASH")
if not all([self.state.get('api_id'), self.state.get('api_hash')]):
print("\n=== 需要配置 API ===")
print("请提供来自 https://my.telegram.org 的 API 凭据")
if not self._stdin_interactive():
print("当前为非交互环境:请在 .env 或网页「环境配置」中填写 API_ID / API_HASH。")
return False
try:
self.state['api_id'] = int(input("请输入 API ID"))
self.state['api_hash'] = input("请输入 API Hash")
self.save_state()
except ValueError:
print("API ID 无效,必须是数字。")
return False
except EOFError:
print("无法读取输入EOF。请在 .env 中配置 API_ID / API_HASH。")
return False
proxy = self.build_proxy_config()
if proxy:
print(f"正在使用代理:{proxy[0]}://{proxy[1]}:{proxy[2]}")
self.client = TelegramClient('session', self.state['api_id'], self.state['api_hash'], proxy=proxy)
try:
await self.client.connect()
except Exception as e:
print(f"连接失败:{e}")
return False
if not await self.client.is_user_authorized():
if not self._stdin_interactive():
print(
"\n=== Web / Docker 等非交互环境 ===\n"
"无法使用「手机号登录」或控制台选项。\n"
"任选其一:\n"
" A) 将本机已登录生成的 session.session 复制到程序目录(与 app_web.py 同级),再点「初始化/连接」。\n"
" B) 在 .env 设置 TELEGRAM_HEADLESS_QR=1重启后点「初始化/连接」,在「运行日志」里打开「二维码链接」用手机 Telegram 扫码。\n"
" 若开启了两步验证,可设 TELEGRAM_2FA_PASSWORD勿提交到 Git\n"
)
headless_qr = (os.getenv("TELEGRAM_HEADLESS_QR") or "").strip().lower() in (
"1",
"true",
"yes",
"on",
)
if headless_qr:
success = await self.qr_code_auth(headless_hint=True)
if not success:
await self.client.disconnect()
return False
else:
await self.client.disconnect()
return False
else:
print("\n=== 请选择登录方式 ===")
print("[1] 二维码登录(推荐,无需手机号)")
print("[2] 手机号登录(传统方式)")
while True:
try:
choice = input("请输入选项1 或 2").strip()
except EOFError:
print("输入结束,已取消登录。")
await self.client.disconnect()
return False
if choice in ["1", "2"]:
break
print("请输入 1 或 2")
success = await self.qr_code_auth() if choice == "1" else await self.phone_auth()
if not success:
print("登录失败,请重试。")
await self.client.disconnect()
return False
else:
print("✅ 已登录!")
return True
def parse_channel_selection(self, choice):
channels_list = list(self.state['channels'].keys())
selected_channels = []
if choice.lower() == 'all':
return channels_list
for selection in [x.strip() for x in choice.split(',')]:
try:
if selection.startswith('-'):
if selection in self.state['channels']:
selected_channels.append(selection)
else:
print(f"你的频道列表中不存在频道 ID {selection}")
else:
num = int(selection)
if 1 <= num <= len(channels_list):
selected_channels.append(channels_list[num - 1])
else:
print(f"频道编号无效:{num}。有效范围1-{len(channels_list)}")
except ValueError:
print(f"输入无效:{selection}。请用编号1,2,3或完整频道ID-100123...")
return selected_channels
async def scrape_specific_channels(self):
if not self.state['channels']:
print("当前没有可用频道,请先用 [L] 添加频道")
return
await self.view_channels()
print("\n📥 抓取选项:")
print("• 单个1 或 -1001234567890")
print("• 多个1,3,5 或混合输入")
print("• 全部频道all")
choice = input("\n请输入选择:").strip()
selected_channels = self.parse_channel_selection(choice)
if selected_channels:
print(f"\n🚀 开始抓取 {len(selected_channels)} 个频道...")
for i, channel in enumerate(selected_channels, 1):
print(f"\n[{i}/{len(selected_channels)}] 正在抓取:{channel}")
await self.scrape_channel(channel, self.state['channels'][channel])
print(f"\n✅ 已完成 {len(selected_channels)} 个频道的抓取!")
else:
print("❌ 未选择有效频道")
async def manage_channels(self):
while True:
print("\n" + "="*40)
print(" Telegram 抓取工具")
print("="*40)
print("[S] 抓取频道")
print("[C] 持续抓取")
print(f"[M] 媒体抓取:{'开启' if self.state['scrape_media'] else '关闭'}")
print("[L] 列出并添加频道")
print("[R] 移除频道")
print("[E] 导出数据")
print("[T] 补抓媒体")
print("[F] 修复缺失媒体")
print("[Q] 退出")
print("="*40)
choice = input("请输入选项:").lower().strip()
try:
if choice == 'r':
if not self.state['channels']:
print("没有可移除的频道")
continue
await self.view_channels()
print("\n移除频道说明:")
print("• 单个1 或 -1001234567890")
print("• 多个1,2,3 或混合输入")
selection = input("请输入选择:").strip()
selected_channels = self.parse_channel_selection(selection)
if selected_channels:
removed_count = 0
for channel in selected_channels:
if channel in self.state['channels']:
del self.state['channels'][channel]
if 'channel_names' in self.state:
self.state['channel_names'].pop(channel, None)
if 'channel_titles' in self.state:
self.state['channel_titles'].pop(channel, None)
print(f"✅ 已移除频道 {channel}")
removed_count += 1
else:
print(f"❌ 未找到频道 {channel}")
if removed_count > 0:
self.save_state()
print(f"\n🎉 已移除 {removed_count} 个频道!")
await self.view_channels()
else:
print("未移除任何频道")
else:
print("未选择有效频道")
elif choice == 's':
await self.scrape_specific_channels()
elif choice == 'm':
self.state['scrape_media'] = not self.state['scrape_media']
self.save_state()
print(f"\n✅ 媒体抓取已{'开启' if self.state['scrape_media'] else '关闭'}")
elif choice == 'c':
task = asyncio.create_task(self.continuous_scraping())
print("持续抓取已启动,按 Ctrl+C 停止。")
try:
await asyncio.sleep(float('inf'))
except KeyboardInterrupt:
self.continuous_scraping_active = False
task.cancel()
print("\n正在停止持续抓取...")
try:
await task
except asyncio.CancelledError:
pass
elif choice == 'e':
await self.export_data()
elif choice == 'l':
channels_data = await self.list_channels()
if not channels_data:
continue
print("\n从上方列表添加频道:")
print("• 单个1 或 -1001234567890")
print("• 多个1,3,5 或混合输入")
print("• 全部频道all")
print("• 直接回车可跳过添加")
selection = input("\n请输入选择(或回车跳过):").strip()
if selection:
added_count = 0
if selection.lower() == 'all':
for channel_info in channels_data:
channel_id = channel_info['channel_id']
if channel_id not in self.state['channels']:
self.state['channels'][channel_id] = 0
if 'channel_names' not in self.state:
self.state['channel_names'] = {}
if 'channel_titles' not in self.state:
self.state['channel_titles'] = {}
self.state['channel_names'][channel_id] = channel_info['username']
self.state['channel_titles'][channel_id] = channel_info['channel_name']
print(f"✅ 已添加频道 {channel_info['channel_name']}ID: {channel_id}")
added_count += 1
else:
print(f"频道 {channel_info['channel_name']} 已添加过")
else:
for sel in [x.strip() for x in selection.split(',')]:
try:
if sel.startswith('-'):
channel_id = sel
channel_info = next((c for c in channels_data if c['channel_id'] == channel_id), None)
if not channel_info:
print(f"未找到频道 ID {channel_id}")
continue
else:
num = int(sel)
if 1 <= num <= len(channels_data):
channel_info = channels_data[num - 1]
channel_id = channel_info['channel_id']
else:
print(f"编号无效:{num}。请选择 1-{len(channels_data)}")
continue
if channel_id in self.state['channels']:
print(f"频道 {channel_info['channel_name']} 已添加过")
else:
self.state['channels'][channel_id] = 0
if 'channel_names' not in self.state:
self.state['channel_names'] = {}
if 'channel_titles' not in self.state:
self.state['channel_titles'] = {}
self.state['channel_names'][channel_id] = channel_info['username']
self.state['channel_titles'][channel_id] = channel_info['channel_name']
print(f"✅ 已添加频道 {channel_info['channel_name']}ID: {channel_id}")
added_count += 1
except ValueError:
print(f"输入无效:{sel}")
if added_count > 0:
self.save_state()
print(f"\n🎉 已新增 {added_count} 个频道!")
await self.view_channels()
else:
print("没有新增频道")
elif choice == 't':
if not self.state['channels']:
print("当前没有可用频道,请先添加频道")
continue
await self.view_channels()
print("\n请输入频道编号1,2,3...或完整频道ID-100123...")
selection = input("请输入选择:").strip()
selected_channels = self.parse_channel_selection(selection)
if len(selected_channels) == 1:
channel = selected_channels[0]
print(f"正在为频道补抓媒体:{channel}")
await self.rescrape_media(channel)
elif len(selected_channels) > 1:
print("补抓媒体时一次只能选择一个频道")
else:
print("未选择有效频道")
elif choice == 'f':
if not self.state['channels']:
print("当前没有可用频道,请先添加频道")
continue
await self.view_channels()
print("\n请输入频道编号1,2,3...或完整频道ID-100123...")
selection = input("请输入选择:").strip()
selected_channels = self.parse_channel_selection(selection)
if len(selected_channels) == 1:
channel = selected_channels[0]
await self.fix_missing_media(channel)
elif len(selected_channels) > 1:
print("修复缺失媒体时一次只能选择一个频道")
else:
print("未选择有效频道")
elif choice == 'q':
print("\n👋 再见!")
self.close_db_connections()
if self.client:
await self.client.disconnect()
sys.exit()
else:
print("无效选项")
except Exception as e:
print(f"错误:{e}")
async def run(self):
display_ascii_art()
if await self.initialize_client():
try:
await self.sync_monitor_chats_from_env()
self._apply_scrape_media_from_env()
if self._heartbeat_enabled():
print(
"💡 心跳已配置开启:请选菜单 [C]「持续抓取」后才会往群里发;"
"若只用 [S] 单次抓取,不会出现心跳。"
)
dests = self._forward_targets_from_env()
if len(dests) > 1:
print(
" FORWARD_TO_CHAT 含多个目标,每条消息会发到多个群。"
"若只需「多群监控 → 一个汇总频道」,请只保留一个转发 ID多个源请用 MONITOR_CHATS。"
)
await self.manage_channels()
finally:
self.close_db_connections()
if self.client:
await self.client.disconnect()
else:
print("客户端初始化失败,程序退出。")
async def main():
scraper = OptimizedTelegramScraper()
await scraper.run()
def configure_event_loop_policy():
if sys.platform.startswith("win"):
try:
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
except Exception:
pass
if __name__ == '__main__':
try:
configure_event_loop_policy()
asyncio.run(main())
except KeyboardInterrupt:
print("\n程序已中断,正在退出...")
sys.exit()