From 8d4e092b1b5704866afb76a658eadf67ba1ce894 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9D=93=BE=F0=9D=93=B7=F0=9D=93=B7=F0=9D=93=B8?= =?UTF-8?q?=F0=9D=93=B1=F0=9D=94=80=F0=9D=93=B7?= Date: Thu, 11 Sep 2025 17:34:59 +0200 Subject: [PATCH] Update telegram-scraper.py v3.0 --- telegram-scraper.py | 705 ++++++++++++++++++++++++++++++++------------ 1 file changed, 522 insertions(+), 183 deletions(-) diff --git a/telegram-scraper.py b/telegram-scraper.py index e9f27cc..900910a 100644 --- a/telegram-scraper.py +++ b/telegram-scraper.py @@ -4,21 +4,23 @@ import json import csv import asyncio import time -from contextlib import asynccontextmanager -from concurrent.futures import ThreadPoolExecutor +import sys +import uuid +import warnings from dataclasses import dataclass from typing import Dict, List, Optional, Any -from telethon import TelegramClient -from telethon.tl.types import MessageMediaPhoto, MessageMediaDocument, User, PeerChannel -from telethon.errors import FloodWaitError, RPCError -import aiohttp -import sys from pathlib import Path +from io import StringIO +from telethon import TelegramClient +from telethon.tl.types import MessageMediaPhoto, MessageMediaDocument, MessageMediaWebPage, User, PeerChannel +from telethon.errors import FloodWaitError, SessionPasswordNeededError +import qrcode + +warnings.filterwarnings("ignore", message="Using async sessions support is an experimental feature") def display_ascii_art(): WHITE = "\033[97m" RESET = "\033[0m" - art = r""" ___________________ _________ \__ ___/ _____/ / _____/ @@ -27,7 +29,6 @@ ___________________ _________ |____| \______ /_______ / \/ \/ """ - print(WHITE + art + RESET) @dataclass @@ -49,30 +50,35 @@ class OptimizedTelegramScraper: self.state = self.load_state() self.client = None self.continuous_scraping_active = False - self.max_concurrent_downloads = 3 + self.max_concurrent_downloads = 5 self.batch_size = 100 self.state_save_interval = 50 self.db_connections = {} def load_state(self) -> Dict[str, Any]: if os.path.exists(self.STATE_FILE): - with open(self.STATE_FILE, 'r') as f: - return json.load(f) + try: + with open(self.STATE_FILE, 'r') as f: + return json.load(f) + except: + pass return { 'api_id': None, 'api_hash': None, - 'phone': None, 'channels': {}, 'scrape_media': True, } def save_state(self): - with open(self.STATE_FILE, 'w') as f: - json.dump(self.state, f) + try: + with open(self.STATE_FILE, 'w') as f: + json.dump(self.state, f, indent=2) + except Exception as e: + print(f"Failed to save state: {e}") def get_db_connection(self, channel: str) -> sqlite3.Connection: if channel not in self.db_connections: - channel_dir = Path(os.getcwd()) / channel + channel_dir = Path(channel) channel_dir.mkdir(exist_ok=True) db_file = channel_dir / f'{channel}.db' @@ -83,6 +89,8 @@ class OptimizedTelegramScraper: message TEXT, media_type TEXT, media_path TEXT, reply_to INTEGER)''') conn.execute('CREATE INDEX IF NOT EXISTS idx_message_id ON messages(message_id)') conn.execute('CREATE INDEX IF NOT EXISTS idx_date ON messages(date)') + conn.execute('PRAGMA journal_mode=WAL') + conn.execute('PRAGMA synchronous=NORMAL') conn.commit() self.db_connections[channel] = conn @@ -108,58 +116,57 @@ class OptimizedTelegramScraper: VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', data) conn.commit() - async def download_media_with_semaphore(self, semaphore: asyncio.Semaphore, - channel: str, message) -> Optional[str]: - async with semaphore: - return await self.download_media(channel, message) - async def download_media(self, channel: str, message) -> Optional[str]: if not message.media or not self.state['scrape_media']: return None - channel_dir = Path(os.getcwd()) / channel - media_folder = channel_dir / 'media' - media_folder.mkdir(exist_ok=True) - - media_file_name = None - if isinstance(message.media, MessageMediaPhoto): - media_file_name = getattr(message.file, 'name', None) or f"{message.id}.jpg" - elif isinstance(message.media, MessageMediaDocument): - ext = getattr(message.file, 'ext', 'bin') if message.file else 'bin' - media_file_name = getattr(message.file, 'name', None) or f"{message.id}.{ext}" - - if not media_file_name: + if isinstance(message.media, MessageMediaWebPage): return None - - media_path = media_folder / media_file_name - - if media_path.exists(): - return str(media_path) - max_retries = 3 - for attempt in range(max_retries): - try: - downloaded_path = await message.download_media(file=str(media_folder)) - if downloaded_path: - return downloaded_path - break - except FloodWaitError as e: - if attempt < max_retries - 1: - print(f"Rate limited. Waiting {e.seconds} seconds...") - await asyncio.sleep(e.seconds) - else: - print(f"Failed to download media for message {message.id} after rate limit") - return None - except Exception as e: - if attempt < max_retries - 1: - wait_time = 2 ** attempt - print(f"Download failed for message {message.id}, retrying in {wait_time}s...") - await asyncio.sleep(wait_time) - else: - print(f"Failed to download media for message {message.id}: {e}") - return None - - return None + try: + channel_dir = Path(channel) + media_folder = channel_dir / 'media' + media_folder.mkdir(exist_ok=True) + + if isinstance(message.media, MessageMediaPhoto): + original_name = getattr(message.file, 'name', None) or "photo.jpg" + ext = "jpg" + elif isinstance(message.media, MessageMediaDocument): + ext = getattr(message.file, 'ext', 'bin') if message.file else 'bin' + original_name = getattr(message.file, 'name', None) or f"document.{ext}" + else: + return None + + base_name = Path(original_name).stem + extension = Path(original_name).suffix or f".{ext}" + unique_filename = f"{message.id}-{base_name}{extension}" + media_path = media_folder / unique_filename + + existing_files = list(media_folder.glob(f"{message.id}-*")) + if existing_files: + return str(existing_files[0]) + + for attempt in range(3): + try: + downloaded_path = await message.download_media(file=str(media_path)) + if downloaded_path and Path(downloaded_path).exists(): + return downloaded_path + else: + return None + except FloodWaitError as e: + if attempt < 2: + await asyncio.sleep(e.seconds) + else: + return None + except Exception: + if attempt < 2: + await asyncio.sleep(2 ** attempt) + else: + return None + + return None + except Exception: + return None async def update_media_path(self, channel: str, message_id: int, media_path: str): conn = self.get_db_connection(channel) @@ -169,26 +176,21 @@ class OptimizedTelegramScraper: async def scrape_channel(self, channel: str, offset_id: int): try: - if channel.startswith('-'): - entity = await self.client.get_entity(PeerChannel(int(channel))) - else: - entity = await self.client.get_entity(channel) - + entity = await self.client.get_entity(PeerChannel(int(channel)) if channel.startswith('-') else channel) result = await self.client.get_messages(entity, offset_id=offset_id, reverse=True, limit=0) total_messages = result.total if total_messages == 0: - print(f"No messages found in channel {channel}.") + print(f"No messages found in channel {channel}") return print(f"Found {total_messages} messages in channel {channel}") message_batch = [] - media_download_tasks = [] + media_tasks = [] processed_messages = 0 last_message_id = offset_id - - download_semaphore = asyncio.Semaphore(self.max_concurrent_downloads) + semaphore = asyncio.Semaphore(self.max_concurrent_downloads) async for message in self.client.iter_messages(entity, offset_id=offset_id, reverse=True): try: @@ -209,11 +211,8 @@ class OptimizedTelegramScraper: message_batch.append(msg_data) - if self.state['scrape_media'] and message.media: - task = asyncio.create_task( - self.download_media_with_semaphore(download_semaphore, channel, message) - ) - media_download_tasks.append((message.id, task)) + if self.state['scrape_media'] and message.media and not isinstance(message.media, MessageMediaWebPage): + media_tasks.append(message) last_message_id = message.id processed_messages += 1 @@ -227,28 +226,58 @@ class OptimizedTelegramScraper: self.save_state() progress = (processed_messages / total_messages) * 100 - sys.stdout.write(f"\rScraping {channel}: {progress:.1f}% ({processed_messages}/{total_messages})") + bar_length = 30 + filled_length = int(bar_length * processed_messages // total_messages) + bar = 'ā–ˆ' * filled_length + 'ā–‘' * (bar_length - filled_length) + + sys.stdout.write(f"\ršŸ“„ Messages: [{bar}] {progress:.1f}% ({processed_messages}/{total_messages})") sys.stdout.flush() except Exception as e: - print(f"Error processing message {message.id}: {e}") + print(f"\nError processing message {message.id}: {e}") if message_batch: self.batch_insert_messages(channel, message_batch) - if media_download_tasks: - print(f"\nWaiting for {len(media_download_tasks)} media downloads to complete...") - for message_id, task in media_download_tasks: - try: - media_path = await task - if media_path: - await self.update_media_path(channel, message_id, media_path) - except Exception as e: - print(f"Error in media download for message {message_id}: {e}") + if media_tasks: + total_media = len(media_tasks) + completed_media = 0 + successful_downloads = 0 + print(f"\nšŸ“„ Downloading {total_media} media files...") + + semaphore = asyncio.Semaphore(self.max_concurrent_downloads) + + async def download_single_media(message): + async with semaphore: + return await self.download_media(channel, message) + + batch_size = 10 + for i in range(0, len(media_tasks), batch_size): + batch = media_tasks[i:i + batch_size] + tasks = [asyncio.create_task(download_single_media(msg)) for msg in batch] + + for j, task in enumerate(tasks): + try: + media_path = await task + if media_path: + await self.update_media_path(channel, batch[j].id, media_path) + successful_downloads += 1 + except Exception: + pass + + completed_media += 1 + progress = (completed_media / total_media) * 100 + bar_length = 30 + filled_length = int(bar_length * completed_media // total_media) + bar = 'ā–ˆ' * filled_length + 'ā–‘' * (bar_length - filled_length) + + sys.stdout.write(f"\ršŸ“„ Media: [{bar}] {progress:.1f}% ({completed_media}/{total_media})") + sys.stdout.flush() + + print(f"\nāœ… Media download complete! ({successful_downloads}/{total_media} successful)") self.state['channels'][channel] = last_message_id self.save_state() - print(f"\nCompleted scraping channel {channel}") except Exception as e: @@ -257,54 +286,128 @@ class OptimizedTelegramScraper: async def rescrape_media(self, channel: str): conn = self.get_db_connection(channel) cursor = conn.cursor() - cursor.execute('SELECT message_id FROM messages WHERE media_type IS NOT NULL AND media_path IS NULL') + cursor.execute('SELECT message_id FROM messages WHERE media_type IS NOT NULL AND media_type != "MessageMediaWebPage" AND media_path IS NULL') message_ids = [row[0] for row in cursor.fetchall()] if not message_ids: - print(f"No media files to reprocess for channel {channel}.") + print(f"No media files to reprocess for channel {channel}") return - print(f"Reprocessing {len(message_ids)} media files for channel {channel}") + print(f"šŸ“„ Reprocessing {len(message_ids)} media files for channel {channel}") try: entity = await self.client.get_entity(PeerChannel(int(channel))) - except Exception as e: - print(f"Error getting entity for channel {channel}: {e}") - return - - download_semaphore = asyncio.Semaphore(self.max_concurrent_downloads) - tasks = [] - - batch_size = 50 - for i in range(0, len(message_ids), batch_size): - batch_ids = message_ids[i:i + batch_size] + semaphore = asyncio.Semaphore(self.max_concurrent_downloads) + completed_media = 0 + successful_downloads = 0 - try: + async def download_single_media(message): + async with semaphore: + return await self.download_media(channel, message) + + batch_size = 10 + for i in range(0, len(message_ids), batch_size): + batch_ids = message_ids[i:i + batch_size] messages = await self.client.get_messages(entity, ids=batch_ids) - for message in messages: - if message and message.media: - task = asyncio.create_task( - self.download_media_with_semaphore(download_semaphore, channel, message) - ) - tasks.append((message.id, task)) + valid_messages = [msg for msg in messages if msg and msg.media and not isinstance(msg.media, MessageMediaWebPage)] + tasks = [asyncio.create_task(download_single_media(msg)) for msg in valid_messages] - for message_id, task in tasks[-len([m for m in messages if m and m.media]):]: + for j, task in enumerate(tasks): try: media_path = await task if media_path: - await self.update_media_path(channel, message_id, media_path) - except Exception as e: - print(f"Error downloading media for message {message_id}: {e}") + await self.update_media_path(channel, valid_messages[j].id, media_path) + successful_downloads += 1 + except Exception: + pass + + completed_media += 1 + progress = (completed_media / len(message_ids)) * 100 + bar_length = 30 + filled_length = int(bar_length * completed_media // len(message_ids)) + bar = 'ā–ˆ' * filled_length + 'ā–‘' * (bar_length - filled_length) + + sys.stdout.write(f"\ršŸ”„ Rescrape: [{bar}] {progress:.1f}% ({completed_media}/{len(message_ids)})") + sys.stdout.flush() - progress = min(100, (i + batch_size) / len(message_ids) * 100) - sys.stdout.write(f"\rReprocessing media: {progress:.1f}%") - sys.stdout.flush() + print(f"\nāœ… Media reprocessing complete! ({successful_downloads}/{len(message_ids)} successful)") - except Exception as e: - print(f"Error processing batch starting at {i}: {e}") + except Exception as e: + print(f"Error reprocessing media: {e}") - print(f"\nCompleted media reprocessing for channel {channel}") + async def fix_missing_media(self, channel: str): + conn = self.get_db_connection(channel) + cursor = conn.cursor() + + cursor.execute('SELECT COUNT(*) FROM messages WHERE media_type IS NOT NULL AND media_type != "MessageMediaWebPage"') + total_with_media = cursor.fetchone()[0] + + cursor.execute('SELECT COUNT(*) FROM messages WHERE media_type IS NOT NULL AND media_type != "MessageMediaWebPage" AND media_path IS NOT NULL') + total_with_files = cursor.fetchone()[0] + + missing_count = total_with_media - total_with_files + + print(f"\nšŸ“Š Media Analysis for {channel}:") + print(f"Messages with media: {total_with_media}") + print(f"Media files downloaded: {total_with_files}") + print(f"Missing media files: {missing_count}") + + if missing_count == 0: + print("āœ… All media files are already downloaded!") + return + + cursor.execute('SELECT message_id, media_type FROM messages WHERE media_type IS NOT NULL AND media_type != "MessageMediaWebPage" AND (media_path IS NULL OR media_path = "")') + missing_media = cursor.fetchall() + + if not missing_media: + print("āœ… No missing media found!") + return + + print(f"\nšŸ”§ Attempting to download {len(missing_media)} missing media files...") + + try: + entity = await self.client.get_entity(PeerChannel(int(channel))) + semaphore = asyncio.Semaphore(self.max_concurrent_downloads) + completed_media = 0 + successful_downloads = 0 + + async def download_single_media(message): + async with semaphore: + return await self.download_media(channel, message) + + batch_size = 10 + for i in range(0, len(missing_media), batch_size): + batch = missing_media[i:i + batch_size] + message_ids = [msg[0] for msg in batch] + + messages = await self.client.get_messages(entity, ids=message_ids) + valid_messages = [msg for msg in messages if msg and msg.media and not isinstance(msg.media, MessageMediaWebPage)] + + tasks = [asyncio.create_task(download_single_media(msg)) for msg in valid_messages] + + for j, task in enumerate(tasks): + try: + media_path = await task + if media_path: + await self.update_media_path(channel, valid_messages[j].id, media_path) + successful_downloads += 1 + except Exception: + pass + + completed_media += 1 + progress = (completed_media / len(missing_media)) * 100 + bar_length = 30 + filled_length = int(bar_length * completed_media // len(missing_media)) + bar = 'ā–ˆ' * filled_length + 'ā–‘' * (bar_length - filled_length) + + sys.stdout.write(f"\ršŸ”§ Fix Media: [{bar}] {progress:.1f}% ({completed_media}/{len(missing_media)})") + sys.stdout.flush() + + print(f"\nāœ… Media fix complete! ({successful_downloads}/{len(missing_media)} successful)") + + except Exception as e: + print(f"Error fixing missing media: {e}") async def continuous_scraping(self): self.continuous_scraping_active = True @@ -316,7 +419,6 @@ class OptimizedTelegramScraper: for channel in self.state['channels']: if not self.continuous_scraping_active: break - print(f"\nChecking for new messages in channel: {channel}") await self.scrape_channel(channel, self.state['channels'][channel]) @@ -326,17 +428,16 @@ class OptimizedTelegramScraper: await asyncio.sleep(sleep_time) except asyncio.CancelledError: - print("Continuous scraping stopped.") + print("Continuous scraping stopped") finally: self.continuous_scraping_active = False - def export_to_csv_optimized(self, channel: str): + def export_to_csv(self, channel: str): conn = self.get_db_connection(channel) csv_file = Path(channel) / f'{channel}.csv' cursor = conn.cursor() cursor.execute('SELECT * FROM messages ORDER BY date') - columns = [description[0] for description in cursor.description] with open(csv_file, 'w', newline='', encoding='utf-8') as f: @@ -349,7 +450,7 @@ class OptimizedTelegramScraper: break writer.writerows(rows) - def export_to_json_optimized(self, channel: str): + def export_to_json(self, channel: str): conn = self.get_db_connection(channel) json_file = Path(channel) / f'{channel}.json' @@ -378,89 +479,246 @@ class OptimizedTelegramScraper: f.write('\n]') async def export_data(self): + if not self.state['channels']: + print("No channels to export") + return + for channel in self.state['channels']: print(f"Exporting data for channel {channel}...") - self.export_to_csv_optimized(channel) - self.export_to_json_optimized(channel) - print(f"Completed export for channel {channel}") + try: + self.export_to_csv(channel) + self.export_to_json(channel) + print(f"āœ… Completed export for channel {channel}") + except Exception as e: + print(f"āŒ Export failed for channel {channel}: {e}") async def view_channels(self): if not self.state['channels']: - print("No channels to view.") + print("No channels saved") return print("\nCurrent channels:") - for channel, last_id in self.state['channels'].items(): + for i, (channel, last_id) in enumerate(self.state['channels'].items(), 1): try: conn = self.get_db_connection(channel) cursor = conn.cursor() cursor.execute('SELECT COUNT(*) FROM messages') count = cursor.fetchone()[0] - print(f"Channel ID: {channel}, Last Message ID: {last_id}, Messages: {count}") + print(f"[{i}] Channel ID: {channel}, Last Message ID: {last_id}, Messages: {count}") except: - print(f"Channel ID: {channel}, Last Message ID: {last_id}") + print(f"[{i}] Channel ID: {channel}, Last Message ID: {last_id}") async def list_channels(self): try: print("\nList of channels joined by account:") + count = 1 async for dialog in self.client.iter_dialogs(): if dialog.id != 777000: - print(f"* {dialog.title} (id: {dialog.id})") + print(f"[{count}] {dialog.title} (id: {dialog.id})") + count += 1 except Exception as e: print(f"Error listing channels: {e}") + def display_qr_code_ascii(self, qr_login): + qr = qrcode.QRCode(box_size=1, border=1) + qr.add_data(qr_login.url) + qr.make() + + f = StringIO() + qr.print_ascii(out=f) + f.seek(0) + print(f.read()) + + async def qr_code_auth(self): + print("\nChoosing QR Code authentication...") + print("Please scan the QR code with your Telegram app:") + print("1. Open Telegram on your phone") + print("2. Go to Settings > Devices > Scan QR") + print("3. Scan the code below\n") + + qr_login = await self.client.qr_login() + self.display_qr_code_ascii(qr_login) + + try: + await qr_login.wait() + print("\nāœ… Successfully logged in via QR code!") + return True + except SessionPasswordNeededError: + password = input("Two-factor authentication enabled. Enter your password: ") + await self.client.sign_in(password=password) + print("\nāœ… Successfully logged in with 2FA!") + return True + except Exception as e: + print(f"\nāŒ QR code authentication failed: {e}") + return False + + async def phone_auth(self): + phone = input("Enter your phone number: ") + await self.client.send_code_request(phone) + code = input("Enter the code you received: ") + + try: + await self.client.sign_in(phone, code) + print("\nāœ… Successfully logged in via phone!") + return True + except SessionPasswordNeededError: + password = input("Two-factor authentication enabled. Enter your password: ") + await self.client.sign_in(password=password) + print("\nāœ… Successfully logged in with 2FA!") + return True + except Exception as e: + print(f"\nāŒ Phone authentication failed: {e}") + return False + async def initialize_client(self): - if not all([self.state['api_id'], self.state['api_hash'], self.state['phone']]): - self.state['api_id'] = int(input("Enter your API ID: ")) - self.state['api_hash'] = input("Enter your API Hash: ") - self.state['phone'] = input("Enter your phone number: ") - self.save_state() + if not all([self.state.get('api_id'), self.state.get('api_hash')]): + print("\n=== API Configuration Required ===") + print("You need to provide API credentials from https://my.telegram.org") + try: + self.state['api_id'] = int(input("Enter your API ID: ")) + self.state['api_hash'] = input("Enter your API Hash: ") + self.save_state() + except ValueError: + print("Invalid API ID. Must be a number.") + return False self.client = TelegramClient('session', self.state['api_id'], self.state['api_hash']) - await self.client.start() + + try: + await self.client.connect() + except Exception as e: + print(f"Failed to connect: {e}") + return False + + if not await self.client.is_user_authorized(): + print("\n=== Choose Authentication Method ===") + print("[1] QR Code (Recommended - No phone number needed)") + print("[2] Phone Number (Traditional method)") + + while True: + choice = input("Enter your choice (1 or 2): ").strip() + if choice in ['1', '2']: + break + print("Please enter 1 or 2") + + success = await self.qr_code_auth() if choice == '1' else await self.phone_auth() + + if not success: + print("Authentication failed. Please try again.") + await self.client.disconnect() + return False + else: + print("āœ… Already authenticated!") + + return True + + def parse_channel_selection(self, choice): + channels_list = list(self.state['channels'].keys()) + selected_channels = [] + + if choice.lower() == 'all': + return channels_list + + for selection in [x.strip() for x in choice.split(',')]: + try: + if selection.startswith('-'): + if selection in self.state['channels']: + selected_channels.append(selection) + else: + print(f"Channel ID {selection} not found in your channels") + else: + num = int(selection) + if 1 <= num <= len(channels_list): + selected_channels.append(channels_list[num - 1]) + else: + print(f"Invalid channel number: {num}. Valid range: 1-{len(channels_list)}") + except ValueError: + print(f"Invalid input: {selection}. Use numbers (1,2,3) or full IDs (-100123...)") + + return selected_channels + + async def scrape_specific_channels(self): + if not self.state['channels']: + print("No channels available. Use [L] to add channels first") + return + + await self.view_channels() + print("\nšŸ“„ Scrape Options:") + print("• Single: 1 or -1001234567890") + print("• Multiple: 1,3,5 or mix formats") + print("• All channels: all") + + choice = input("\nEnter selection: ").strip() + selected_channels = self.parse_channel_selection(choice) + + if selected_channels: + print(f"\nšŸš€ Starting scrape of {len(selected_channels)} channel(s)...") + for i, channel in enumerate(selected_channels, 1): + print(f"\n[{i}/{len(selected_channels)}] Scraping: {channel}") + await self.scrape_channel(channel, self.state['channels'][channel]) + print(f"\nāœ… Completed scraping {len(selected_channels)} channel(s)!") + else: + print("āŒ No valid channels selected") async def manage_channels(self): while True: - print("\nMenu:") - print("[A] Add new channel") - print("[R] Remove channel") - print("[S] Scrape all channels") - print("[M] Toggle media scraping (currently {})".format( - "enabled" if self.state['scrape_media'] else "disabled")) + print("\n" + "="*40) + print(" TELEGRAM SCRAPER") + print("="*40) + print("[S] Scrape channels") print("[C] Continuous scraping") + print(f"[M] Media scraping: {'ON' if self.state['scrape_media'] else 'OFF'}") + print("[L] List & add channels") + print("[R] Remove channels") print("[E] Export data") - print("[V] View saved channels") - print("[L] List account channels") + print("[T] Rescrape media") + print("[F] Fix missing media") print("[Q] Quit") + print("="*40) - choice = input("Enter your choice: ").lower() + choice = input("Enter your choice: ").lower().strip() - match choice: - case 'a': - channel = input("Enter channel ID: ") - self.state['channels'][channel] = 0 - self.save_state() - print(f"Added channel {channel}.") + try: + if choice == 'r': + if not self.state['channels']: + print("No channels to remove") + continue + + await self.view_channels() + print("\nTo remove channels:") + print("• Single: 1 or -1001234567890") + print("• Multiple: 1,2,3 or mix formats") + selection = input("Enter selection: ").strip() + selected_channels = self.parse_channel_selection(selection) - case 'r': - channel = input("Enter channel ID to remove: ") - if channel in self.state['channels']: - del self.state['channels'][channel] - self.save_state() - print(f"Removed channel {channel}.") + if selected_channels: + removed_count = 0 + for channel in selected_channels: + if channel in self.state['channels']: + del self.state['channels'][channel] + print(f"āœ… Removed channel {channel}") + removed_count += 1 + else: + print(f"āŒ Channel {channel} not found") + + if removed_count > 0: + self.save_state() + print(f"\nšŸŽ‰ Removed {removed_count} channel(s)!") + await self.view_channels() + else: + print("No channels were removed") else: - print(f"Channel {channel} not found.") + print("No valid channels selected") - case 's': - for channel in self.state['channels']: - await self.scrape_channel(channel, self.state['channels'][channel]) - - case 'm': + elif choice == 's': + await self.scrape_specific_channels() + + elif choice == 'm': self.state['scrape_media'] = not self.state['scrape_media'] self.save_state() - print(f"Media scraping {'enabled' if self.state['scrape_media'] else 'disabled'}.") + print(f"\nāœ… Media scraping {'enabled' if self.state['scrape_media'] else 'disabled'}") - case 'c': + elif choice == 'c': task = asyncio.create_task(self.continuous_scraping()) print("Continuous scraping started. Press Ctrl+C to stop.") try: @@ -474,33 +732,114 @@ class OptimizedTelegramScraper: except asyncio.CancelledError: pass - case 'e': + elif choice == 'e': await self.export_data() - case 'v': - await self.view_channels() + elif choice == 'l': + channels_list = [] + async for dialog in self.client.iter_dialogs(): + if dialog.id != 777000: + channels_list.append(str(dialog.id)) - case 'l': await self.list_channels() + print("\nTo add channels from the list above:") + print("• Single: 1 or -1001234567890") + print("• Multiple: 1,3,5 or mix formats") + print("• Press Enter to skip adding") + selection = input("\nEnter selection (or Enter to skip): ").strip() - case 'q': - print("Quitting...") + if selection: + added_count = 0 + for sel in [x.strip() for x in selection.split(',')]: + try: + if sel.startswith('-'): + channel = sel + else: + num = int(sel) + if 1 <= num <= len(channels_list): + channel = channels_list[num - 1] + else: + print(f"Invalid number: {num}. Choose 1-{len(channels_list)}") + continue + + if channel in self.state['channels']: + print(f"Channel {channel} already added") + else: + self.state['channels'][channel] = 0 + self.save_state() + print(f"āœ… Added channel {channel}") + added_count += 1 + + except ValueError: + print(f"Invalid input: {sel}") + + if added_count > 0: + print(f"\nšŸŽ‰ Added {added_count} new channel(s)!") + await self.view_channels() + else: + print("No new channels were added") + + elif choice == 't': + if not self.state['channels']: + print("No channels available. Add channels first") + continue + + await self.view_channels() + print("\nEnter channel NUMBER (1,2,3...) or full channel ID (-100123...)") + selection = input("Enter your selection: ").strip() + selected_channels = self.parse_channel_selection(selection) + + if len(selected_channels) == 1: + channel = selected_channels[0] + print(f"Rescaping media for channel: {channel}") + await self.rescrape_media(channel) + elif len(selected_channels) > 1: + print("Please select only one channel for media rescaping") + else: + print("No valid channel selected") + + elif choice == 'f': + if not self.state['channels']: + print("No channels available. Add channels first") + continue + + await self.view_channels() + print("\nEnter channel NUMBER (1,2,3...) or full channel ID (-100123...)") + selection = input("Enter your selection: ").strip() + selected_channels = self.parse_channel_selection(selection) + + if len(selected_channels) == 1: + channel = selected_channels[0] + await self.fix_missing_media(channel) + elif len(selected_channels) > 1: + print("Please select only one channel for fixing missing media") + else: + print("No valid channel selected") + + elif choice == 'q': + print("\nšŸ‘‹ Goodbye!") self.close_db_connections() - await self.client.disconnect() + if self.client: + await self.client.disconnect() sys.exit() - case _: - print("Invalid option.") + else: + print("Invalid option") + + except Exception as e: + print(f"Error: {e}") async def run(self): display_ascii_art() - await self.initialize_client() - try: - await self.manage_channels() - finally: - self.close_db_connections() - if self.client: - await self.client.disconnect() + if await self.initialize_client(): + try: + await self.manage_channels() + finally: + self.close_db_connections() + if self.client: + await self.client.disconnect() + else: + print("Failed to initialize client. Exiting.") async def main(): scraper = OptimizedTelegramScraper() @@ -511,4 +850,4 @@ if __name__ == '__main__': asyncio.run(main()) except KeyboardInterrupt: print("\nProgram interrupted. Exiting...") - sys.exit() \ No newline at end of file + sys.exit()