import os import re import sqlite3 import json import csv import asyncio import time import sys import uuid import warnings import tempfile from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple from pathlib import Path from io import StringIO from telethon import TelegramClient, utils from telethon.tl.types import MessageMediaPhoto, MessageMediaDocument, MessageMediaWebPage, User, PeerChannel, Channel, Chat, MessageService from telethon.errors import FloodWaitError, SessionPasswordNeededError import qrcode warnings.filterwarnings("ignore", message="Using async sessions support is an experimental feature") def load_env_file(env_path: str = ".env"): if not os.path.exists(env_path): return try: with open(env_path, "r", encoding="utf-8") as f: for raw_line in f: line = raw_line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) key = key.strip() value = value.strip().strip("'").strip('"') if key and key not in os.environ: os.environ[key] = value except Exception: pass def display_ascii_art(): WHITE = "\033[97m" RESET = "\033[0m" art = r""" ___________________ _________ \__ ___/ _____/ / _____/ | | / \ ___ \_____ \ | | \ \_\ \/ \ |____| \______ /_______ / \/ \/ """ print(WHITE + art + RESET) @dataclass class MessageData: message_id: int date: str sender_id: int first_name: Optional[str] last_name: Optional[str] username: Optional[str] message: str media_type: Optional[str] media_path: Optional[str] reply_to: Optional[int] post_author: Optional[str] views: Optional[int] forwards: Optional[int] reactions: Optional[str] class OptimizedTelegramScraper: def __init__(self): load_env_file() self.STATE_FILE = 'state.json' self.state = self.load_state() self.client = None self.continuous_scraping_active = False self.max_concurrent_downloads = 5 self.batch_size = 100 self.state_save_interval = 50 self.db_connections = {} self._forward_delay_seconds = self._read_forward_delay() def _read_forward_delay(self) -> float: raw = os.getenv("FORWARD_DELAY_SECONDS", "").strip() if not raw: return 0.0 try: return max(0.0, float(raw)) except ValueError: return 0.0 def _apply_scrape_media_from_env(self) -> None: raw = os.getenv("SCRAPE_MEDIA", "").strip().lower() if not raw: return if raw in ("0", "false", "no", "off"): self.state["scrape_media"] = False elif raw in ("1", "true", "yes", "on"): self.state["scrape_media"] = True else: return self.save_state() print( "ℹ️ 已从环境变量 SCRAPE_MEDIA 写入 state:" f"{'开启' if self.state['scrape_media'] else '关闭'}媒体下载(关闭时只存文字与元数据,不抓文件)" ) def _heartbeat_enabled(self) -> bool: return self._env_flag_true("HEARTBEAT_ENABLED", "SCRAPER_HEARTBEAT") def _heartbeat_interval_seconds(self) -> float: raw = os.getenv("HEARTBEAT_INTERVAL_SECONDS", "30").strip() try: return max(5.0, float(raw)) except ValueError: return 30.0 def _heartbeat_target_spec(self) -> Optional[str]: s = (os.getenv("HEARTBEAT_TO_CHAT") or os.getenv("HEARTBEAT_TO") or "").strip() if s: return s t = self._forward_targets_from_env() return t[0] if t else None async def _heartbeat_loop(self) -> None: spec = self._heartbeat_target_spec() if not spec: return try: entity = await self._resolve_forward_entity(spec) except Exception as e: print(f"💓 心跳目标无法解析({spec}):{e}") return while self.continuous_scraping_active: try: ts = time.strftime("%Y-%m-%d %H:%M:%S") await self.client.send_message( entity, f"💓 抓取心跳 · 持续运行中 · {ts}" ) except asyncio.CancelledError: break except FloodWaitError as e: await asyncio.sleep(min(e.seconds, 300)) except Exception as e: print(f"\n💓 心跳发送失败:{e}") try: await asyncio.sleep(self._heartbeat_interval_seconds()) except asyncio.CancelledError: break def _forward_targets_from_env(self) -> List[str]: raw = (os.getenv("FORWARD_TO_CHAT") or os.getenv("FORWARD_TO") or "").strip() if not raw: return [] parts = re.split(r"[,,;;]+", raw) out: List[str] = [] for p in parts: s = p.strip().strip("\ufeff").strip("\u200b") if s: out.append(s) return out def _monitor_chats_specs_from_env(self) -> List[str]: raw = (os.getenv("MONITOR_CHATS") or os.getenv("MONITOR_GROUPS") or "").strip() if not raw: return [] parts = re.split(r"[,,;;]+", raw) return [p.strip().strip("\ufeff").strip("\u200b") for p in parts if p.strip()] async def sync_monitor_chats_from_env(self) -> None: """把 MONITOR_CHATS 中的多个群/频道加入 state['channels'],与菜单 [L] 添加效果一致。""" specs = self._monitor_chats_specs_from_env() if not specs or not self.client: return if "channel_names" not in self.state: self.state["channel_names"] = {} if "channel_titles" not in self.state: self.state["channel_titles"] = {} added = 0 for spec in specs: try: ent = await self._resolve_forward_entity(spec) cid = str(utils.get_peer_id(ent)) uname = getattr(ent, "username", None) or "no_username" title = getattr(ent, "title", None) or cid if cid not in self.state["channels"]: self.state["channels"][cid] = 0 self.state["channel_names"][cid] = uname self.state["channel_titles"][cid] = str(title) if title else "" print(f"📌 已从 MONITOR_CHATS 加入监控源:{title}({cid})") added += 1 else: self.state["channel_names"][cid] = uname self.state["channel_titles"][cid] = str(title) if title else "" except Exception as e: print(f"⚠️ MONITOR_CHATS 无法解析「{spec}」:{e}") if added: self.save_state() def _normalize_entity_input(self, spec: str) -> str: """把 t.me / telegram.me 链接规范成 @username,便于 get_entity 解析。""" s = (spec or "").strip().strip("\ufeff").strip("\u200b") if not s: return s # 邀请链接保留原样,交给 get_entity 处理 if re.search(r"(?:t\.me|telegram\.me)/(?:joinchat/|\+)", s, re.I): return s m = re.search( r"(?:https?://)?(?:t\.me|telegram\.me)/([A-Za-z0-9_]+)(?:[/?#].*)?$", s, re.I, ) if m: slug = m.group(1) if slug.isdigit(): return s reserved = {"addstickers", "share", "iv", "socks", "setlanguage", "proxy"} if slug.lower() in reserved: return s return "@" + slug return s async def _resolve_forward_entity(self, spec: str): """解析转发目标:-100 超级群用 PeerChannel,避免 get_entity(str) 偶发失败。""" s = self._normalize_entity_input(spec) if not s: raise ValueError("转发目标为空") if s.startswith("@"): return await self.client.get_entity(s) if re.fullmatch(r"-100\d+", s): inner = int(s[4:]) try: return await self.client.get_entity(PeerChannel(inner)) except Exception: return await self.client.get_entity(int(s)) if re.fullmatch(r"-?\d+", s): num = int(s) try: return await self.client.get_entity(num) except Exception: sn = str(num) if sn.startswith("-100") and len(sn) > 4 and sn[4:].isdigit(): return await self.client.get_entity(PeerChannel(int(sn[4:]))) raise return await self.client.get_entity(s) def _env_flag_true(self, *keys: str) -> bool: for key in keys: v = os.getenv(key, "").strip().lower() if v in ("1", "true", "yes", "on"): return True return False def _forward_only_new_messages(self) -> bool: return self._env_flag_true( "FORWARD_ONLY_NEW_MESSAGES", "FORWARD_ONLY_IF_OFFSET_POSITIVE", ) def _forward_my_username(self) -> Optional[str]: raw = (os.getenv("FORWARD_MY_USERNAME") or os.getenv("FORWARD_AS_USERNAME") or "").strip() if not raw: return None return raw.lstrip("@").strip() or None def _env_flag_false(self, *keys: str) -> bool: """为真时表示显式关闭(用于默认开启的行为)。""" for key in keys: v = os.getenv(key, "").strip().lower() if v in ("0", "false", "no", "off"): return True return False def _forward_replace_mentions_enabled(self) -> bool: if self._env_flag_false("FORWARD_RAW_MENTIONS", "FORWARD_NO_MENTION_REPLACE"): return False return True def _sanitize_mentions_text(self, text: str, replacement: str) -> str: if not text: return "" if not replacement: return text # Telegram 用户名一般为 5–32 位 [A-Za-z0-9_];4 位兼容边界情况 return re.sub(r"@[A-Za-z0-9_]{4,32}\b", lambda _: replacement, text) async def _resolve_forward_mention_replacement(self, me) -> str: """推送到汇总群时,用于替换正文中所有 @用户名的展示串(优先环境变量,否则为当前登录号)。""" custom = self._forward_my_username() if custom: return "@" + custom.lstrip("@") if getattr(me, "username", None): return "@" + me.username parts = [ (getattr(me, "first_name", None) or "").strip(), (getattr(me, "last_name", None) or "").strip(), ] name = " ".join(p for p in parts if p) return name or "我" async def _forward_scraped_message( self, target, _source_entity, message, *, replacement: str, replace_mentions: bool, ) -> None: """以当前账号「重发」副本:先本地落盘再上传,避免出现「转发自 …」卡片。""" if isinstance(message, MessageService): return text_raw = message.message or "" text_out = ( self._sanitize_mentions_text(text_raw, replacement) if replace_mentions else text_raw ) has_media = bool( message.media and not isinstance(message.media, MessageMediaWebPage) ) async def _send_copy() -> None: cap = text_out.strip() or None if has_media: if isinstance(message.media, MessageMediaPhoto): suffix = ".jpg" else: suffix = ".bin" if isinstance(message.media, MessageMediaDocument) and message.file: ext = getattr(message.file, "ext", None) or "" nm = getattr(message.file, "name", None) or "" if nm: suf = Path(nm).suffix if suf: suffix = suf elif ext: suffix = "." + ext.lstrip(".") tmp_path = str( Path(tempfile.gettempdir()) / f"tgfwd_{uuid.uuid4().hex}{suffix}" ) saved: Optional[str] = None try: saved = await self.client.download_media(message, file=tmp_path) if saved and Path(saved).exists(): await self.client.send_file(target, saved, caption=cap) return finally: if saved: try: Path(saved).unlink(missing_ok=True) except OSError: pass if cap: await self.client.send_message(target, cap) else: print( f"\n⚠️ 消息 {message.id} 有媒体但下载失败,且无可用文字说明,已跳过推送。" ) return if cap: await self.client.send_message(target, cap) try: await _send_copy() if self._forward_delay_seconds > 0: await asyncio.sleep(self._forward_delay_seconds) except FloodWaitError as e: await asyncio.sleep(e.seconds) try: await _send_copy() except Exception as err: print(f"\n推送消息 {message.id} 时仍失败(FloodWait 后):{err}") except Exception as e: print(f"\n推送消息 {message.id} 失败(可能无发言权限或媒体无法下载):{e}") def build_proxy_config(self): """代理策略: - TELEGRAM_WEB_UI:网页端默认直连,仅当 PROXY_ENABLED=1/true/on 时才使用代理(避免服务器误配 HOST 即走代理)。 - 命令行本地:PROXY_ENABLED=0/false/off 禁用;未设置 PROXY_ENABLED 时仍可按 HOST+PORT 启用(兼容旧配置)。 """ pe = (os.getenv("PROXY_ENABLED") or "").strip().lower() web_ui = (os.getenv("TELEGRAM_WEB_UI") or "").strip().lower() in ("1", "true", "yes", "on") if web_ui: if pe not in ("1", "true", "yes", "on"): return None else: if pe in ("0", "false", "no", "off", "none"): return None proxy_host = (os.getenv("PROXY_HOST") or "").strip() proxy_port_raw = (os.getenv("PROXY_PORT") or "").strip() if not proxy_host or not proxy_port_raw: return None if not web_ui and pe not in ("", "1", "true", "yes", "on"): print(f"PROXY_ENABLED 取值无效({pe!r}),已跳过代理。") return None try: proxy_port = int(proxy_port_raw) except ValueError: print("环境变量中的 PROXY_PORT 无效,已跳过代理设置。") return None proxy_type = os.getenv("PROXY_TYPE", "socks5").lower() proxy_username = os.getenv("PROXY_USERNAME") proxy_password = os.getenv("PROXY_PASSWORD") if proxy_username or proxy_password: return (proxy_type, proxy_host, proxy_port, True, proxy_username, proxy_password) return (proxy_type, proxy_host, proxy_port) def load_state(self) -> Dict[str, Any]: if os.path.exists(self.STATE_FILE): try: with open(self.STATE_FILE, 'r') as f: return json.load(f) except: pass return { 'api_id': None, 'api_hash': None, 'channels': {}, 'channel_names': {}, 'channel_titles': {}, 'scrape_media': True, } def save_state(self): try: with open(self.STATE_FILE, 'w') as f: json.dump(self.state, f, indent=2) except Exception as e: print(f"保存状态失败:{e}") def get_db_connection(self, channel: str) -> sqlite3.Connection: if channel not in self.db_connections: channel_dir = Path(channel) channel_dir.mkdir(exist_ok=True) db_file = channel_dir / f'{channel}.db' conn = sqlite3.connect(str(db_file), check_same_thread=False) conn.execute('''CREATE TABLE IF NOT EXISTS messages (id INTEGER PRIMARY KEY, message_id INTEGER UNIQUE, date TEXT, sender_id INTEGER, first_name TEXT, last_name TEXT, username TEXT, message TEXT, media_type TEXT, media_path TEXT, reply_to INTEGER, post_author TEXT, views INTEGER, forwards INTEGER, reactions TEXT)''') conn.execute('CREATE INDEX IF NOT EXISTS idx_message_id ON messages(message_id)') conn.execute('CREATE INDEX IF NOT EXISTS idx_date ON messages(date)') conn.execute('PRAGMA journal_mode=WAL') conn.execute('PRAGMA synchronous=NORMAL') conn.commit() self.migrate_database(conn) self.db_connections[channel] = conn return self.db_connections[channel] def migrate_database(self, conn: sqlite3.Connection): cursor = conn.cursor() cursor.execute("PRAGMA table_info(messages)") columns = {row[1] for row in cursor.fetchall()} migrations = [] if 'post_author' not in columns: migrations.append('ALTER TABLE messages ADD COLUMN post_author TEXT') if 'views' not in columns: migrations.append('ALTER TABLE messages ADD COLUMN views INTEGER') if 'forwards' not in columns: migrations.append('ALTER TABLE messages ADD COLUMN forwards INTEGER') if 'reactions' not in columns: migrations.append('ALTER TABLE messages ADD COLUMN reactions TEXT') for migration in migrations: try: conn.execute(migration) except: pass if migrations: conn.commit() def close_db_connections(self): for conn in self.db_connections.values(): conn.close() self.db_connections.clear() def batch_insert_messages(self, channel: str, messages: List[MessageData]): if not messages: return conn = self.get_db_connection(channel) data = [(msg.message_id, msg.date, msg.sender_id, msg.first_name, msg.last_name, msg.username, msg.message, msg.media_type, msg.media_path, msg.reply_to, msg.post_author, msg.views, msg.forwards, msg.reactions) for msg in messages] conn.executemany('''INSERT OR IGNORE INTO messages (message_id, date, sender_id, first_name, last_name, username, message, media_type, media_path, reply_to, post_author, views, forwards, reactions) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', data) conn.commit() async def download_media(self, channel: str, message) -> Optional[str]: if not message.media or not self.state['scrape_media']: return None if isinstance(message.media, MessageMediaWebPage): return None try: channel_dir = Path(channel) media_folder = channel_dir / 'media' media_folder.mkdir(exist_ok=True) if isinstance(message.media, MessageMediaPhoto): original_name = getattr(message.file, 'name', None) or "photo.jpg" ext = "jpg" elif isinstance(message.media, MessageMediaDocument): ext = getattr(message.file, 'ext', 'bin') if message.file else 'bin' original_name = getattr(message.file, 'name', None) or f"document.{ext}" else: return None base_name = Path(original_name).stem extension = Path(original_name).suffix or f".{ext}" unique_filename = f"{message.id}-{base_name}{extension}" media_path = media_folder / unique_filename existing_files = list(media_folder.glob(f"{message.id}-*")) if existing_files: return str(existing_files[0]) for attempt in range(3): try: downloaded_path = await message.download_media(file=str(media_path)) if downloaded_path and Path(downloaded_path).exists(): return downloaded_path else: return None except FloodWaitError as e: if attempt < 2: await asyncio.sleep(e.seconds) else: return None except Exception: if attempt < 2: await asyncio.sleep(2 ** attempt) else: return None return None except Exception: return None async def update_media_path(self, channel: str, message_id: int, media_path: str): conn = self.get_db_connection(channel) conn.execute('UPDATE messages SET media_path = ? WHERE message_id = ?', (media_path, message_id)) conn.commit() async def scrape_channel(self, channel: str, offset_id: int): try: entity = await self.client.get_entity(PeerChannel(int(channel)) if channel.startswith('-') else channel) result = await self.client.get_messages(entity, offset_id=offset_id, reverse=True, limit=0) total_messages = result.total if total_messages == 0: print(f"频道 {channel} 中未找到消息") return print(f"在频道 {channel} 中找到 {total_messages} 条消息") forward_specs = self._forward_targets_from_env() forward_pairs: List[Tuple[Any, str]] = [] mention_replacement = "" replace_mentions = True if forward_specs: for spec in forward_specs: try: ent = await self._resolve_forward_entity(spec) forward_pairs.append((ent, spec)) except Exception as e: print(f"⚠️ 转发目标解析失败:{spec}:{e}") forward_entities = [p[0] for p in forward_pairs] if not forward_entities: print("⚠️ 没有可用的转发目标(已跳过转发)") else: dest_titles = [getattr(ent, "title", None) or sp for ent, sp in forward_pairs] only_new = self._forward_only_new_messages() if only_new and offset_id <= 0: joined = "、".join(dest_titles) print( f"📤 已配置 {len(forward_entities)} 个转发目标({joined}),但仅新消息模式开启且当前 offset_id={offset_id}," "本次为历史抓取,不向群组推送;待 state 中已有进度后再次抓取时才会转发新消息。" ) forward_entities = [] else: me = await self.client.get_me() mention_replacement = await self._resolve_forward_mention_replacement(me) replace_mentions = self._forward_replace_mentions_enabled() hints = [] if only_new: hints.append("仅新消息") hints.append("以当前账号重发(非转发卡片)") if replace_mentions: hints.append(f"正文 @用户名 → {mention_replacement}") else: hints.append("保留正文 @(FORWARD_RAW_MENTIONS=1)") hint_str = "(" + ",".join(hints) + ")" if hints else "" joined = "、".join(dest_titles) print(f"📤 推送已开启{hint_str}:共 {len(forward_entities)} 个目标 → {joined}") else: forward_entities = [] message_batch = [] media_tasks = [] processed_messages = 0 last_message_id = offset_id semaphore = asyncio.Semaphore(self.max_concurrent_downloads) async for message in self.client.iter_messages(entity, offset_id=offset_id, reverse=True): try: sender = await message.get_sender() reactions_str = None if message.reactions and message.reactions.results: reactions_parts = [] for reaction in message.reactions.results: emoji = getattr(reaction.reaction, 'emoticon', '') count = reaction.count if emoji: reactions_parts.append(f"{emoji} {count}") if reactions_parts: reactions_str = ' '.join(reactions_parts) msg_data = MessageData( message_id=message.id, date=message.date.strftime('%Y-%m-%d %H:%M:%S'), sender_id=message.sender_id, first_name=getattr(sender, 'first_name', None) if isinstance(sender, User) else None, last_name=getattr(sender, 'last_name', None) if isinstance(sender, User) else None, username=getattr(sender, 'username', None) if isinstance(sender, User) else None, message=message.message or '', media_type=message.media.__class__.__name__ if message.media else None, media_path=None, reply_to=message.reply_to_msg_id if message.reply_to else None, post_author=message.post_author, views=message.views, forwards=message.forwards, reactions=reactions_str ) message_batch.append(msg_data) for fwd_ent in forward_entities: await self._forward_scraped_message( fwd_ent, entity, message, replacement=mention_replacement, replace_mentions=replace_mentions, ) if self.state['scrape_media'] and message.media and not isinstance(message.media, MessageMediaWebPage): media_tasks.append(message) last_message_id = message.id processed_messages += 1 if len(message_batch) >= self.batch_size: self.batch_insert_messages(channel, message_batch) message_batch.clear() if processed_messages % self.state_save_interval == 0: self.state['channels'][channel] = last_message_id self.save_state() progress = (processed_messages / total_messages) * 100 bar_length = 30 filled_length = int(bar_length * processed_messages // total_messages) bar = '█' * filled_length + '░' * (bar_length - filled_length) sys.stdout.write(f"\r📄 Messages: [{bar}] {progress:.1f}% ({processed_messages}/{total_messages})") sys.stdout.flush() except Exception as e: print(f"\n处理消息 {message.id} 时出错:{e}") if message_batch: self.batch_insert_messages(channel, message_batch) if media_tasks: total_media = len(media_tasks) completed_media = 0 successful_downloads = 0 print(f"\n📥 Downloading {total_media} media files...") semaphore = asyncio.Semaphore(self.max_concurrent_downloads) async def download_single_media(message): async with semaphore: return await self.download_media(channel, message) batch_size = 10 for i in range(0, len(media_tasks), batch_size): batch = media_tasks[i:i + batch_size] tasks = [asyncio.create_task(download_single_media(msg)) for msg in batch] for j, task in enumerate(tasks): try: media_path = await task if media_path: await self.update_media_path(channel, batch[j].id, media_path) successful_downloads += 1 except Exception: pass completed_media += 1 progress = (completed_media / total_media) * 100 bar_length = 30 filled_length = int(bar_length * completed_media // total_media) bar = '█' * filled_length + '░' * (bar_length - filled_length) sys.stdout.write(f"\r📥 Media: [{bar}] {progress:.1f}% ({completed_media}/{total_media})") sys.stdout.flush() print(f"\n✅ 媒体下载完成!(成功 {successful_downloads}/{total_media})") self.state['channels'][channel] = last_message_id self.save_state() print(f"\nCompleted scraping channel {channel}") except Exception as e: print(f"处理频道 {channel} 时出错:{e}") async def rescrape_media(self, channel: str): conn = self.get_db_connection(channel) cursor = conn.cursor() cursor.execute('SELECT message_id FROM messages WHERE media_type IS NOT NULL AND media_type != "MessageMediaWebPage" AND media_path IS NULL') message_ids = [row[0] for row in cursor.fetchall()] channel_name = self.state.get('channel_names', {}).get(channel, 'Unknown') if not message_ids: print(f"{channel_name}(ID: {channel})没有可补抓的媒体文件") return print(f"📥 Reprocessing {len(message_ids)} media files for {channel_name} (ID: {channel})") try: if channel.lstrip('-').isdigit(): entity = await self.client.get_entity(PeerChannel(int(channel))) else: entity = await self.client.get_entity(channel) semaphore = asyncio.Semaphore(self.max_concurrent_downloads) completed_media = 0 successful_downloads = 0 async def download_single_media(message): async with semaphore: return await self.download_media(channel, message) batch_size = 10 for i in range(0, len(message_ids), batch_size): batch_ids = message_ids[i:i + batch_size] messages = await self.client.get_messages(entity, ids=batch_ids) valid_messages = [msg for msg in messages if msg and msg.media and not isinstance(msg.media, MessageMediaWebPage)] tasks = [asyncio.create_task(download_single_media(msg)) for msg in valid_messages] for j, task in enumerate(tasks): try: media_path = await task if media_path: await self.update_media_path(channel, valid_messages[j].id, media_path) successful_downloads += 1 except Exception: pass completed_media += 1 progress = (completed_media / len(message_ids)) * 100 bar_length = 30 filled_length = int(bar_length * completed_media // len(message_ids)) bar = '█' * filled_length + '░' * (bar_length - filled_length) sys.stdout.write(f"\r🔄 Rescrape: [{bar}] {progress:.1f}% ({completed_media}/{len(message_ids)})") sys.stdout.flush() print(f"\n✅ 媒体补抓完成!(成功 {successful_downloads}/{len(message_ids)})") except Exception as e: print(f"媒体补抓出错:{e}") async def fix_missing_media(self, channel: str): conn = self.get_db_connection(channel) cursor = conn.cursor() cursor.execute('SELECT COUNT(*) FROM messages WHERE media_type IS NOT NULL AND media_type != "MessageMediaWebPage"') total_with_media = cursor.fetchone()[0] cursor.execute('SELECT COUNT(*) FROM messages WHERE media_type IS NOT NULL AND media_type != "MessageMediaWebPage" AND media_path IS NOT NULL') total_with_files = cursor.fetchone()[0] missing_count = total_with_media - total_with_files channel_name = self.state.get('channel_names', {}).get(channel, 'Unknown') print(f"\n📊 {channel_name}(ID: {channel})媒体分析:") print(f"含媒体的消息数:{total_with_media}") print(f"已下载媒体文件数:{total_with_files}") print(f"缺失媒体文件数:{missing_count}") if missing_count == 0: print("✅ 所有媒体文件都已下载!") return cursor.execute('SELECT message_id, media_type FROM messages WHERE media_type IS NOT NULL AND media_type != "MessageMediaWebPage" AND (media_path IS NULL OR media_path = "")') missing_media = cursor.fetchall() if not missing_media: print("✅ 未发现缺失媒体!") return print(f"\n🔧 Attempting to download {len(missing_media)} missing media files...") try: if channel.lstrip('-').isdigit(): entity = await self.client.get_entity(PeerChannel(int(channel))) else: entity = await self.client.get_entity(channel) semaphore = asyncio.Semaphore(self.max_concurrent_downloads) completed_media = 0 successful_downloads = 0 async def download_single_media(message): async with semaphore: return await self.download_media(channel, message) batch_size = 10 for i in range(0, len(missing_media), batch_size): batch = missing_media[i:i + batch_size] message_ids = [msg[0] for msg in batch] messages = await self.client.get_messages(entity, ids=message_ids) valid_messages = [msg for msg in messages if msg and msg.media and not isinstance(msg.media, MessageMediaWebPage)] tasks = [asyncio.create_task(download_single_media(msg)) for msg in valid_messages] for j, task in enumerate(tasks): try: media_path = await task if media_path: await self.update_media_path(channel, valid_messages[j].id, media_path) successful_downloads += 1 except Exception: pass completed_media += 1 progress = (completed_media / len(missing_media)) * 100 bar_length = 30 filled_length = int(bar_length * completed_media // len(missing_media)) bar = '█' * filled_length + '░' * (bar_length - filled_length) sys.stdout.write(f"\r🔧 Fix Media: [{bar}] {progress:.1f}% ({completed_media}/{len(missing_media)})") sys.stdout.flush() print(f"\n✅ 缺失媒体修复完成!(成功 {successful_downloads}/{len(missing_media)})") except Exception as e: print(f"修复缺失媒体时出错:{e}") async def continuous_scraping(self): self.continuous_scraping_active = True hb_task = None if self._heartbeat_enabled(): if self._heartbeat_target_spec(): hb_task = asyncio.create_task(self._heartbeat_loop()) print( f"💓 抓取心跳已开启:启动后先发一条,之后约每 " f"{self._heartbeat_interval_seconds():.0f} 秒一条(未配置 HEARTBEAT_TO_CHAT 时用 FORWARD_TO_CHAT 的首个目标)" ) else: print( "⚠️ HEARTBEAT_ENABLED=1 但未配置 HEARTBEAT_TO_CHAT,且 FORWARD_TO_CHAT 为空," "已跳过心跳。" ) try: while self.continuous_scraping_active: start_time = time.time() for channel in self.state['channels']: if not self.continuous_scraping_active: break print(f"\nChecking for new messages in channel: {channel}") await self.scrape_channel(channel, self.state['channels'][channel]) elapsed = time.time() - start_time sleep_time = max(0, 60 - elapsed) if sleep_time > 0: await asyncio.sleep(sleep_time) except asyncio.CancelledError: print("持续抓取已停止") raise finally: self.continuous_scraping_active = False if hb_task: hb_task.cancel() try: await hb_task except asyncio.CancelledError: pass def get_export_filename(self, channel: str): username = self.state.get('channel_names', {}).get(channel, 'no_username') return f"{channel}_{username}" def export_to_csv(self, channel: str): conn = self.get_db_connection(channel) filename = self.get_export_filename(channel) csv_file = Path(channel) / f'{filename}.csv' cursor = conn.cursor() cursor.execute('SELECT * FROM messages ORDER BY date') columns = [description[0] for description in cursor.description] with open(csv_file, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(columns) while True: rows = cursor.fetchmany(1000) if not rows: break writer.writerows(rows) def export_to_json(self, channel: str): conn = self.get_db_connection(channel) filename = self.get_export_filename(channel) json_file = Path(channel) / f'{filename}.json' cursor = conn.cursor() cursor.execute('SELECT * FROM messages ORDER BY date') columns = [description[0] for description in cursor.description] with open(json_file, 'w', encoding='utf-8') as f: f.write('[\n') first_row = True while True: rows = cursor.fetchmany(1000) if not rows: break for row in rows: if not first_row: f.write(',\n') else: first_row = False data = dict(zip(columns, row)) json.dump(data, f, ensure_ascii=False, indent=2) f.write('\n]') async def export_data(self): if not self.state['channels']: print("没有可导出的频道") return for channel in self.state['channels']: print(f"正在导出频道 {channel} 的数据...") try: self.export_to_csv(channel) self.export_to_json(channel) print(f"✅ 频道 {channel} 导出完成") except Exception as e: print(f"❌ 频道 {channel} 导出失败:{e}") async def view_channels(self): if not self.state['channels']: print("还没有保存任何频道") return print("\n当前频道列表:") for i, (channel, last_id) in enumerate(self.state['channels'].items(), 1): try: conn = self.get_db_connection(channel) cursor = conn.cursor() cursor.execute('SELECT COUNT(*) FROM messages') count = cursor.fetchone()[0] channel_name = self.state.get('channel_names', {}).get(channel, 'Unknown') print(f"[{i}] {channel_name} (ID: {channel}), Last Message ID: {last_id}, Messages: {count}") except: channel_name = self.state.get('channel_names', {}).get(channel, 'Unknown') print(f"[{i}] {channel_name} (ID: {channel}), Last Message ID: {last_id}") def _account_list_exclude_substrings(self) -> List[str]: """账号可见列表中不展示的会话标题子串(如自有招聘群),环境变量 ACCOUNT_LIST_EXCLUDE_SUBSTRINGS 逗号分隔;* 表示不隐藏。""" raw = (os.getenv("ACCOUNT_LIST_EXCLUDE_SUBSTRINGS") or "").strip() if raw in ("*", "none", "off"): return [] if raw: return [p.strip() for p in re.split(r"[,,;;]+", raw) if p.strip()] return ["远程-到岗-技术招聘"] def _skip_dialog_in_account_channel_list(self, title: Optional[str]) -> bool: if not title: return False t = str(title).strip() for sub in self._account_list_exclude_substrings(): if sub and sub in t: return True return False async def list_channels(self): try: print("\n当前账号加入的频道/群组:") channels_data = [] async for dialog in self.client.iter_dialogs(): entity = dialog.entity if dialog.id != 777000 and (isinstance(entity, Channel) or isinstance(entity, Chat)): if self._skip_dialog_in_account_channel_list(getattr(dialog, "title", None) or getattr(entity, "title", None)): continue channel_type = "频道" if isinstance(entity, Channel) and entity.broadcast else "群组" username = getattr(entity, 'username', None) or 'no_username' n = len(channels_data) + 1 print(f"[{n}] {dialog.title}(ID: {dialog.id},类型:{channel_type},用户名:@{username})") channels_data.append({ 'number': n, 'channel_name': dialog.title, 'channel_id': str(dialog.id), 'username': username, 'type': channel_type }) if channels_data: csv_file = Path('channels_list.csv') with open(csv_file, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=['number', 'channel_name', 'channel_id', 'username', 'type']) writer.writeheader() writer.writerows(channels_data) print(f"\n✅ 频道列表已保存到 {csv_file}") return channels_data except Exception as e: print(f"列出频道时出错:{e}") return [] def display_qr_code_ascii(self, qr_login): qr = qrcode.QRCode(box_size=1, border=1) qr.add_data(qr_login.url) qr.make() f = StringIO() qr.print_ascii(out=f) f.seek(0) print(f.read()) def _stdin_interactive(self) -> bool: try: if (os.getenv("TELEGRAM_WEB_UI") or "").strip().lower() in ("1", "true", "yes", "on"): return False return sys.stdin is not None and sys.stdin.isatty() except Exception: return False async def qr_code_auth(self, headless_hint: bool = False): print("\n正在使用二维码登录...") print("请用 Telegram 手机端扫描二维码:") print("1. 在手机上打开 Telegram") print("2. 进入 设置 > 设备 > 扫描二维码") print("3. 扫描下方二维码(或打开下方链接)\n") qr_login = await self.client.qr_login() print(f"二维码链接(可复制到手机浏览器):\n{qr_login.url}\n") self.display_qr_code_ascii(qr_login) if headless_hint: print("(Web/Docker 环境请查看本页「运行日志」或容器日志中的上述链接。)") try: await qr_login.wait() print("\n✅ 二维码登录成功!") return True except SessionPasswordNeededError: pwd = (os.getenv("TELEGRAM_2FA_PASSWORD") or "").strip() if pwd: await self.client.sign_in(password=pwd) print("\n✅ 两步验证登录成功!") return True if self._stdin_interactive(): password = input("已开启两步验证,请输入密码:") await self.client.sign_in(password=password) print("\n✅ 两步验证登录成功!") return True print("已开启两步验证:请在 .env 中设置 TELEGRAM_2FA_PASSWORD 后重试(勿提交到 Git)。") return False except Exception as e: print(f"\n❌ 二维码登录失败:{e}") return False async def phone_auth(self): phone = input("请输入手机号:") await self.client.send_code_request(phone) code = input("请输入收到的验证码:") try: await self.client.sign_in(phone, code) print("\n✅ 手机号登录成功!") return True except SessionPasswordNeededError: password = input("已开启两步验证,请输入密码:") await self.client.sign_in(password=password) print("\n✅ 两步验证登录成功!") return True except Exception as e: print(f"\n❌ 手机号登录失败:{e}") return False async def initialize_client(self): if not self.state.get('api_id') and os.getenv("API_ID"): try: self.state['api_id'] = int(os.getenv("API_ID")) except ValueError: print("环境变量中的 API_ID 无效,将改为手动输入。") if not self.state.get('api_hash') and os.getenv("API_HASH"): self.state['api_hash'] = os.getenv("API_HASH") if not all([self.state.get('api_id'), self.state.get('api_hash')]): print("\n=== 需要配置 API ===") print("请提供来自 https://my.telegram.org 的 API 凭据") if not self._stdin_interactive(): print("当前为非交互环境:请在 .env 或网页「环境配置」中填写 API_ID / API_HASH。") return False try: self.state['api_id'] = int(input("请输入 API ID:")) self.state['api_hash'] = input("请输入 API Hash:") self.save_state() except ValueError: print("API ID 无效,必须是数字。") return False except EOFError: print("无法读取输入(EOF)。请在 .env 中配置 API_ID / API_HASH。") return False proxy = self.build_proxy_config() if proxy: print(f"正在使用代理:{proxy[0]}://{proxy[1]}:{proxy[2]}") self.client = TelegramClient('session', self.state['api_id'], self.state['api_hash'], proxy=proxy) try: await self.client.connect() except Exception as e: print(f"连接失败:{e}") return False if not await self.client.is_user_authorized(): if not self._stdin_interactive(): print( "\n=== Web / Docker 等非交互环境 ===\n" "无法使用「手机号登录」或控制台选项。\n" "任选其一:\n" " A) 将本机已登录生成的 session.session 复制到程序目录(与 app_web.py 同级),再点「初始化/连接」。\n" " B) 在 .env 设置 TELEGRAM_HEADLESS_QR=1,重启后点「初始化/连接」,在「运行日志」里打开「二维码链接」用手机 Telegram 扫码。\n" " 若开启了两步验证,可设 TELEGRAM_2FA_PASSWORD(勿提交到 Git)。\n" ) headless_qr = (os.getenv("TELEGRAM_HEADLESS_QR") or "").strip().lower() in ( "1", "true", "yes", "on", ) if headless_qr: success = await self.qr_code_auth(headless_hint=True) if not success: await self.client.disconnect() return False else: await self.client.disconnect() return False else: print("\n=== 请选择登录方式 ===") print("[1] 二维码登录(推荐,无需手机号)") print("[2] 手机号登录(传统方式)") while True: try: choice = input("请输入选项(1 或 2):").strip() except EOFError: print("输入结束,已取消登录。") await self.client.disconnect() return False if choice in ["1", "2"]: break print("请输入 1 或 2") success = await self.qr_code_auth() if choice == "1" else await self.phone_auth() if not success: print("登录失败,请重试。") await self.client.disconnect() return False else: print("✅ 已登录!") return True def parse_channel_selection(self, choice): channels_list = list(self.state['channels'].keys()) selected_channels = [] if choice.lower() == 'all': return channels_list for selection in [x.strip() for x in choice.split(',')]: try: if selection.startswith('-'): if selection in self.state['channels']: selected_channels.append(selection) else: print(f"你的频道列表中不存在频道 ID {selection}") else: num = int(selection) if 1 <= num <= len(channels_list): selected_channels.append(channels_list[num - 1]) else: print(f"频道编号无效:{num}。有效范围:1-{len(channels_list)}") except ValueError: print(f"输入无效:{selection}。请用编号(1,2,3)或完整频道ID(-100123...)") return selected_channels async def scrape_specific_channels(self): if not self.state['channels']: print("当前没有可用频道,请先用 [L] 添加频道") return await self.view_channels() print("\n📥 抓取选项:") print("• 单个:1 或 -1001234567890") print("• 多个:1,3,5 或混合输入") print("• 全部频道:all") choice = input("\n请输入选择:").strip() selected_channels = self.parse_channel_selection(choice) if selected_channels: print(f"\n🚀 开始抓取 {len(selected_channels)} 个频道...") for i, channel in enumerate(selected_channels, 1): print(f"\n[{i}/{len(selected_channels)}] 正在抓取:{channel}") await self.scrape_channel(channel, self.state['channels'][channel]) print(f"\n✅ 已完成 {len(selected_channels)} 个频道的抓取!") else: print("❌ 未选择有效频道") async def manage_channels(self): while True: print("\n" + "="*40) print(" Telegram 抓取工具") print("="*40) print("[S] 抓取频道") print("[C] 持续抓取") print(f"[M] 媒体抓取:{'开启' if self.state['scrape_media'] else '关闭'}") print("[L] 列出并添加频道") print("[R] 移除频道") print("[E] 导出数据") print("[T] 补抓媒体") print("[F] 修复缺失媒体") print("[Q] 退出") print("="*40) choice = input("请输入选项:").lower().strip() try: if choice == 'r': if not self.state['channels']: print("没有可移除的频道") continue await self.view_channels() print("\n移除频道说明:") print("• 单个:1 或 -1001234567890") print("• 多个:1,2,3 或混合输入") selection = input("请输入选择:").strip() selected_channels = self.parse_channel_selection(selection) if selected_channels: removed_count = 0 for channel in selected_channels: if channel in self.state['channels']: del self.state['channels'][channel] if 'channel_names' in self.state: self.state['channel_names'].pop(channel, None) if 'channel_titles' in self.state: self.state['channel_titles'].pop(channel, None) print(f"✅ 已移除频道 {channel}") removed_count += 1 else: print(f"❌ 未找到频道 {channel}") if removed_count > 0: self.save_state() print(f"\n🎉 已移除 {removed_count} 个频道!") await self.view_channels() else: print("未移除任何频道") else: print("未选择有效频道") elif choice == 's': await self.scrape_specific_channels() elif choice == 'm': self.state['scrape_media'] = not self.state['scrape_media'] self.save_state() print(f"\n✅ 媒体抓取已{'开启' if self.state['scrape_media'] else '关闭'}") elif choice == 'c': task = asyncio.create_task(self.continuous_scraping()) print("持续抓取已启动,按 Ctrl+C 停止。") try: await asyncio.sleep(float('inf')) except KeyboardInterrupt: self.continuous_scraping_active = False task.cancel() print("\n正在停止持续抓取...") try: await task except asyncio.CancelledError: pass elif choice == 'e': await self.export_data() elif choice == 'l': channels_data = await self.list_channels() if not channels_data: continue print("\n从上方列表添加频道:") print("• 单个:1 或 -1001234567890") print("• 多个:1,3,5 或混合输入") print("• 全部频道:all") print("• 直接回车可跳过添加") selection = input("\n请输入选择(或回车跳过):").strip() if selection: added_count = 0 if selection.lower() == 'all': for channel_info in channels_data: channel_id = channel_info['channel_id'] if channel_id not in self.state['channels']: self.state['channels'][channel_id] = 0 if 'channel_names' not in self.state: self.state['channel_names'] = {} if 'channel_titles' not in self.state: self.state['channel_titles'] = {} self.state['channel_names'][channel_id] = channel_info['username'] self.state['channel_titles'][channel_id] = channel_info['channel_name'] print(f"✅ 已添加频道 {channel_info['channel_name']}(ID: {channel_id})") added_count += 1 else: print(f"频道 {channel_info['channel_name']} 已添加过") else: for sel in [x.strip() for x in selection.split(',')]: try: if sel.startswith('-'): channel_id = sel channel_info = next((c for c in channels_data if c['channel_id'] == channel_id), None) if not channel_info: print(f"未找到频道 ID {channel_id}") continue else: num = int(sel) if 1 <= num <= len(channels_data): channel_info = channels_data[num - 1] channel_id = channel_info['channel_id'] else: print(f"编号无效:{num}。请选择 1-{len(channels_data)}") continue if channel_id in self.state['channels']: print(f"频道 {channel_info['channel_name']} 已添加过") else: self.state['channels'][channel_id] = 0 if 'channel_names' not in self.state: self.state['channel_names'] = {} if 'channel_titles' not in self.state: self.state['channel_titles'] = {} self.state['channel_names'][channel_id] = channel_info['username'] self.state['channel_titles'][channel_id] = channel_info['channel_name'] print(f"✅ 已添加频道 {channel_info['channel_name']}(ID: {channel_id})") added_count += 1 except ValueError: print(f"输入无效:{sel}") if added_count > 0: self.save_state() print(f"\n🎉 已新增 {added_count} 个频道!") await self.view_channels() else: print("没有新增频道") elif choice == 't': if not self.state['channels']: print("当前没有可用频道,请先添加频道") continue await self.view_channels() print("\n请输入频道编号(1,2,3...)或完整频道ID(-100123...)") selection = input("请输入选择:").strip() selected_channels = self.parse_channel_selection(selection) if len(selected_channels) == 1: channel = selected_channels[0] print(f"正在为频道补抓媒体:{channel}") await self.rescrape_media(channel) elif len(selected_channels) > 1: print("补抓媒体时一次只能选择一个频道") else: print("未选择有效频道") elif choice == 'f': if not self.state['channels']: print("当前没有可用频道,请先添加频道") continue await self.view_channels() print("\n请输入频道编号(1,2,3...)或完整频道ID(-100123...)") selection = input("请输入选择:").strip() selected_channels = self.parse_channel_selection(selection) if len(selected_channels) == 1: channel = selected_channels[0] await self.fix_missing_media(channel) elif len(selected_channels) > 1: print("修复缺失媒体时一次只能选择一个频道") else: print("未选择有效频道") elif choice == 'q': print("\n👋 再见!") self.close_db_connections() if self.client: await self.client.disconnect() sys.exit() else: print("无效选项") except Exception as e: print(f"错误:{e}") async def run(self): display_ascii_art() if await self.initialize_client(): try: await self.sync_monitor_chats_from_env() self._apply_scrape_media_from_env() if self._heartbeat_enabled(): print( "💡 心跳已配置开启:请选菜单 [C]「持续抓取」后才会往群里发;" "若只用 [S] 单次抓取,不会出现心跳。" ) dests = self._forward_targets_from_env() if len(dests) > 1: print( "ℹ️ FORWARD_TO_CHAT 含多个目标,每条消息会发到多个群。" "若只需「多群监控 → 一个汇总频道」,请只保留一个转发 ID,多个源请用 MONITOR_CHATS。" ) await self.manage_channels() finally: self.close_db_connections() if self.client: await self.client.disconnect() else: print("客户端初始化失败,程序退出。") async def main(): scraper = OptimizedTelegramScraper() await scraper.run() def configure_event_loop_policy(): if sys.platform.startswith("win"): try: asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) except Exception: pass if __name__ == '__main__': try: configure_event_loop_policy() asyncio.run(main()) except KeyboardInterrupt: print("\n程序已中断,正在退出...") sys.exit()