Update telegram-scraper.py
v3.0
This commit is contained in:
@@ -4,21 +4,23 @@ import json
|
||||
import csv
|
||||
import asyncio
|
||||
import time
|
||||
from contextlib import asynccontextmanager
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import sys
|
||||
import uuid
|
||||
import warnings
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, List, Optional, Any
|
||||
from telethon import TelegramClient
|
||||
from telethon.tl.types import MessageMediaPhoto, MessageMediaDocument, User, PeerChannel
|
||||
from telethon.errors import FloodWaitError, RPCError
|
||||
import aiohttp
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from io import StringIO
|
||||
from telethon import TelegramClient
|
||||
from telethon.tl.types import MessageMediaPhoto, MessageMediaDocument, MessageMediaWebPage, User, PeerChannel
|
||||
from telethon.errors import FloodWaitError, SessionPasswordNeededError
|
||||
import qrcode
|
||||
|
||||
warnings.filterwarnings("ignore", message="Using async sessions support is an experimental feature")
|
||||
|
||||
def display_ascii_art():
|
||||
WHITE = "\033[97m"
|
||||
RESET = "\033[0m"
|
||||
|
||||
art = r"""
|
||||
___________________ _________
|
||||
\__ ___/ _____/ / _____/
|
||||
@@ -27,7 +29,6 @@ ___________________ _________
|
||||
|____| \______ /_______ /
|
||||
\/ \/
|
||||
"""
|
||||
|
||||
print(WHITE + art + RESET)
|
||||
|
||||
@dataclass
|
||||
@@ -49,30 +50,35 @@ class OptimizedTelegramScraper:
|
||||
self.state = self.load_state()
|
||||
self.client = None
|
||||
self.continuous_scraping_active = False
|
||||
self.max_concurrent_downloads = 3
|
||||
self.max_concurrent_downloads = 5
|
||||
self.batch_size = 100
|
||||
self.state_save_interval = 50
|
||||
self.db_connections = {}
|
||||
|
||||
def load_state(self) -> Dict[str, Any]:
|
||||
if os.path.exists(self.STATE_FILE):
|
||||
with open(self.STATE_FILE, 'r') as f:
|
||||
return json.load(f)
|
||||
try:
|
||||
with open(self.STATE_FILE, 'r') as f:
|
||||
return json.load(f)
|
||||
except:
|
||||
pass
|
||||
return {
|
||||
'api_id': None,
|
||||
'api_hash': None,
|
||||
'phone': None,
|
||||
'channels': {},
|
||||
'scrape_media': True,
|
||||
}
|
||||
|
||||
def save_state(self):
|
||||
with open(self.STATE_FILE, 'w') as f:
|
||||
json.dump(self.state, f)
|
||||
try:
|
||||
with open(self.STATE_FILE, 'w') as f:
|
||||
json.dump(self.state, f, indent=2)
|
||||
except Exception as e:
|
||||
print(f"Failed to save state: {e}")
|
||||
|
||||
def get_db_connection(self, channel: str) -> sqlite3.Connection:
|
||||
if channel not in self.db_connections:
|
||||
channel_dir = Path(os.getcwd()) / channel
|
||||
channel_dir = Path(channel)
|
||||
channel_dir.mkdir(exist_ok=True)
|
||||
|
||||
db_file = channel_dir / f'{channel}.db'
|
||||
@@ -83,6 +89,8 @@ class OptimizedTelegramScraper:
|
||||
message TEXT, media_type TEXT, media_path TEXT, reply_to INTEGER)''')
|
||||
conn.execute('CREATE INDEX IF NOT EXISTS idx_message_id ON messages(message_id)')
|
||||
conn.execute('CREATE INDEX IF NOT EXISTS idx_date ON messages(date)')
|
||||
conn.execute('PRAGMA journal_mode=WAL')
|
||||
conn.execute('PRAGMA synchronous=NORMAL')
|
||||
conn.commit()
|
||||
self.db_connections[channel] = conn
|
||||
|
||||
@@ -108,58 +116,57 @@ class OptimizedTelegramScraper:
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', data)
|
||||
conn.commit()
|
||||
|
||||
async def download_media_with_semaphore(self, semaphore: asyncio.Semaphore,
|
||||
channel: str, message) -> Optional[str]:
|
||||
async with semaphore:
|
||||
return await self.download_media(channel, message)
|
||||
|
||||
async def download_media(self, channel: str, message) -> Optional[str]:
|
||||
if not message.media or not self.state['scrape_media']:
|
||||
return None
|
||||
|
||||
channel_dir = Path(os.getcwd()) / channel
|
||||
media_folder = channel_dir / 'media'
|
||||
media_folder.mkdir(exist_ok=True)
|
||||
|
||||
media_file_name = None
|
||||
if isinstance(message.media, MessageMediaPhoto):
|
||||
media_file_name = getattr(message.file, 'name', None) or f"{message.id}.jpg"
|
||||
elif isinstance(message.media, MessageMediaDocument):
|
||||
ext = getattr(message.file, 'ext', 'bin') if message.file else 'bin'
|
||||
media_file_name = getattr(message.file, 'name', None) or f"{message.id}.{ext}"
|
||||
|
||||
if not media_file_name:
|
||||
if isinstance(message.media, MessageMediaWebPage):
|
||||
return None
|
||||
|
||||
media_path = media_folder / media_file_name
|
||||
|
||||
if media_path.exists():
|
||||
return str(media_path)
|
||||
|
||||
max_retries = 3
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
downloaded_path = await message.download_media(file=str(media_folder))
|
||||
if downloaded_path:
|
||||
return downloaded_path
|
||||
break
|
||||
except FloodWaitError as e:
|
||||
if attempt < max_retries - 1:
|
||||
print(f"Rate limited. Waiting {e.seconds} seconds...")
|
||||
await asyncio.sleep(e.seconds)
|
||||
else:
|
||||
print(f"Failed to download media for message {message.id} after rate limit")
|
||||
return None
|
||||
except Exception as e:
|
||||
if attempt < max_retries - 1:
|
||||
wait_time = 2 ** attempt
|
||||
print(f"Download failed for message {message.id}, retrying in {wait_time}s...")
|
||||
await asyncio.sleep(wait_time)
|
||||
else:
|
||||
print(f"Failed to download media for message {message.id}: {e}")
|
||||
return None
|
||||
|
||||
return None
|
||||
try:
|
||||
channel_dir = Path(channel)
|
||||
media_folder = channel_dir / 'media'
|
||||
media_folder.mkdir(exist_ok=True)
|
||||
|
||||
if isinstance(message.media, MessageMediaPhoto):
|
||||
original_name = getattr(message.file, 'name', None) or "photo.jpg"
|
||||
ext = "jpg"
|
||||
elif isinstance(message.media, MessageMediaDocument):
|
||||
ext = getattr(message.file, 'ext', 'bin') if message.file else 'bin'
|
||||
original_name = getattr(message.file, 'name', None) or f"document.{ext}"
|
||||
else:
|
||||
return None
|
||||
|
||||
base_name = Path(original_name).stem
|
||||
extension = Path(original_name).suffix or f".{ext}"
|
||||
unique_filename = f"{message.id}-{base_name}{extension}"
|
||||
media_path = media_folder / unique_filename
|
||||
|
||||
existing_files = list(media_folder.glob(f"{message.id}-*"))
|
||||
if existing_files:
|
||||
return str(existing_files[0])
|
||||
|
||||
for attempt in range(3):
|
||||
try:
|
||||
downloaded_path = await message.download_media(file=str(media_path))
|
||||
if downloaded_path and Path(downloaded_path).exists():
|
||||
return downloaded_path
|
||||
else:
|
||||
return None
|
||||
except FloodWaitError as e:
|
||||
if attempt < 2:
|
||||
await asyncio.sleep(e.seconds)
|
||||
else:
|
||||
return None
|
||||
except Exception:
|
||||
if attempt < 2:
|
||||
await asyncio.sleep(2 ** attempt)
|
||||
else:
|
||||
return None
|
||||
|
||||
return None
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
async def update_media_path(self, channel: str, message_id: int, media_path: str):
|
||||
conn = self.get_db_connection(channel)
|
||||
@@ -169,26 +176,21 @@ class OptimizedTelegramScraper:
|
||||
|
||||
async def scrape_channel(self, channel: str, offset_id: int):
|
||||
try:
|
||||
if channel.startswith('-'):
|
||||
entity = await self.client.get_entity(PeerChannel(int(channel)))
|
||||
else:
|
||||
entity = await self.client.get_entity(channel)
|
||||
|
||||
entity = await self.client.get_entity(PeerChannel(int(channel)) if channel.startswith('-') else channel)
|
||||
result = await self.client.get_messages(entity, offset_id=offset_id, reverse=True, limit=0)
|
||||
total_messages = result.total
|
||||
|
||||
if total_messages == 0:
|
||||
print(f"No messages found in channel {channel}.")
|
||||
print(f"No messages found in channel {channel}")
|
||||
return
|
||||
|
||||
print(f"Found {total_messages} messages in channel {channel}")
|
||||
|
||||
message_batch = []
|
||||
media_download_tasks = []
|
||||
media_tasks = []
|
||||
processed_messages = 0
|
||||
last_message_id = offset_id
|
||||
|
||||
download_semaphore = asyncio.Semaphore(self.max_concurrent_downloads)
|
||||
semaphore = asyncio.Semaphore(self.max_concurrent_downloads)
|
||||
|
||||
async for message in self.client.iter_messages(entity, offset_id=offset_id, reverse=True):
|
||||
try:
|
||||
@@ -209,11 +211,8 @@ class OptimizedTelegramScraper:
|
||||
|
||||
message_batch.append(msg_data)
|
||||
|
||||
if self.state['scrape_media'] and message.media:
|
||||
task = asyncio.create_task(
|
||||
self.download_media_with_semaphore(download_semaphore, channel, message)
|
||||
)
|
||||
media_download_tasks.append((message.id, task))
|
||||
if self.state['scrape_media'] and message.media and not isinstance(message.media, MessageMediaWebPage):
|
||||
media_tasks.append(message)
|
||||
|
||||
last_message_id = message.id
|
||||
processed_messages += 1
|
||||
@@ -227,28 +226,58 @@ class OptimizedTelegramScraper:
|
||||
self.save_state()
|
||||
|
||||
progress = (processed_messages / total_messages) * 100
|
||||
sys.stdout.write(f"\rScraping {channel}: {progress:.1f}% ({processed_messages}/{total_messages})")
|
||||
bar_length = 30
|
||||
filled_length = int(bar_length * processed_messages // total_messages)
|
||||
bar = '█' * filled_length + '░' * (bar_length - filled_length)
|
||||
|
||||
sys.stdout.write(f"\r📄 Messages: [{bar}] {progress:.1f}% ({processed_messages}/{total_messages})")
|
||||
sys.stdout.flush()
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error processing message {message.id}: {e}")
|
||||
print(f"\nError processing message {message.id}: {e}")
|
||||
|
||||
if message_batch:
|
||||
self.batch_insert_messages(channel, message_batch)
|
||||
|
||||
if media_download_tasks:
|
||||
print(f"\nWaiting for {len(media_download_tasks)} media downloads to complete...")
|
||||
for message_id, task in media_download_tasks:
|
||||
try:
|
||||
media_path = await task
|
||||
if media_path:
|
||||
await self.update_media_path(channel, message_id, media_path)
|
||||
except Exception as e:
|
||||
print(f"Error in media download for message {message_id}: {e}")
|
||||
if media_tasks:
|
||||
total_media = len(media_tasks)
|
||||
completed_media = 0
|
||||
successful_downloads = 0
|
||||
print(f"\n📥 Downloading {total_media} media files...")
|
||||
|
||||
semaphore = asyncio.Semaphore(self.max_concurrent_downloads)
|
||||
|
||||
async def download_single_media(message):
|
||||
async with semaphore:
|
||||
return await self.download_media(channel, message)
|
||||
|
||||
batch_size = 10
|
||||
for i in range(0, len(media_tasks), batch_size):
|
||||
batch = media_tasks[i:i + batch_size]
|
||||
tasks = [asyncio.create_task(download_single_media(msg)) for msg in batch]
|
||||
|
||||
for j, task in enumerate(tasks):
|
||||
try:
|
||||
media_path = await task
|
||||
if media_path:
|
||||
await self.update_media_path(channel, batch[j].id, media_path)
|
||||
successful_downloads += 1
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
completed_media += 1
|
||||
progress = (completed_media / total_media) * 100
|
||||
bar_length = 30
|
||||
filled_length = int(bar_length * completed_media // total_media)
|
||||
bar = '█' * filled_length + '░' * (bar_length - filled_length)
|
||||
|
||||
sys.stdout.write(f"\r📥 Media: [{bar}] {progress:.1f}% ({completed_media}/{total_media})")
|
||||
sys.stdout.flush()
|
||||
|
||||
print(f"\n✅ Media download complete! ({successful_downloads}/{total_media} successful)")
|
||||
|
||||
self.state['channels'][channel] = last_message_id
|
||||
self.save_state()
|
||||
|
||||
print(f"\nCompleted scraping channel {channel}")
|
||||
|
||||
except Exception as e:
|
||||
@@ -257,54 +286,128 @@ class OptimizedTelegramScraper:
|
||||
async def rescrape_media(self, channel: str):
|
||||
conn = self.get_db_connection(channel)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('SELECT message_id FROM messages WHERE media_type IS NOT NULL AND media_path IS NULL')
|
||||
cursor.execute('SELECT message_id FROM messages WHERE media_type IS NOT NULL AND media_type != "MessageMediaWebPage" AND media_path IS NULL')
|
||||
message_ids = [row[0] for row in cursor.fetchall()]
|
||||
|
||||
if not message_ids:
|
||||
print(f"No media files to reprocess for channel {channel}.")
|
||||
print(f"No media files to reprocess for channel {channel}")
|
||||
return
|
||||
|
||||
print(f"Reprocessing {len(message_ids)} media files for channel {channel}")
|
||||
print(f"📥 Reprocessing {len(message_ids)} media files for channel {channel}")
|
||||
|
||||
try:
|
||||
entity = await self.client.get_entity(PeerChannel(int(channel)))
|
||||
except Exception as e:
|
||||
print(f"Error getting entity for channel {channel}: {e}")
|
||||
return
|
||||
|
||||
download_semaphore = asyncio.Semaphore(self.max_concurrent_downloads)
|
||||
tasks = []
|
||||
|
||||
batch_size = 50
|
||||
for i in range(0, len(message_ids), batch_size):
|
||||
batch_ids = message_ids[i:i + batch_size]
|
||||
semaphore = asyncio.Semaphore(self.max_concurrent_downloads)
|
||||
completed_media = 0
|
||||
successful_downloads = 0
|
||||
|
||||
try:
|
||||
async def download_single_media(message):
|
||||
async with semaphore:
|
||||
return await self.download_media(channel, message)
|
||||
|
||||
batch_size = 10
|
||||
for i in range(0, len(message_ids), batch_size):
|
||||
batch_ids = message_ids[i:i + batch_size]
|
||||
messages = await self.client.get_messages(entity, ids=batch_ids)
|
||||
|
||||
for message in messages:
|
||||
if message and message.media:
|
||||
task = asyncio.create_task(
|
||||
self.download_media_with_semaphore(download_semaphore, channel, message)
|
||||
)
|
||||
tasks.append((message.id, task))
|
||||
valid_messages = [msg for msg in messages if msg and msg.media and not isinstance(msg.media, MessageMediaWebPage)]
|
||||
tasks = [asyncio.create_task(download_single_media(msg)) for msg in valid_messages]
|
||||
|
||||
for message_id, task in tasks[-len([m for m in messages if m and m.media]):]:
|
||||
for j, task in enumerate(tasks):
|
||||
try:
|
||||
media_path = await task
|
||||
if media_path:
|
||||
await self.update_media_path(channel, message_id, media_path)
|
||||
except Exception as e:
|
||||
print(f"Error downloading media for message {message_id}: {e}")
|
||||
await self.update_media_path(channel, valid_messages[j].id, media_path)
|
||||
successful_downloads += 1
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
completed_media += 1
|
||||
progress = (completed_media / len(message_ids)) * 100
|
||||
bar_length = 30
|
||||
filled_length = int(bar_length * completed_media // len(message_ids))
|
||||
bar = '█' * filled_length + '░' * (bar_length - filled_length)
|
||||
|
||||
sys.stdout.write(f"\r🔄 Rescrape: [{bar}] {progress:.1f}% ({completed_media}/{len(message_ids)})")
|
||||
sys.stdout.flush()
|
||||
|
||||
progress = min(100, (i + batch_size) / len(message_ids) * 100)
|
||||
sys.stdout.write(f"\rReprocessing media: {progress:.1f}%")
|
||||
sys.stdout.flush()
|
||||
print(f"\n✅ Media reprocessing complete! ({successful_downloads}/{len(message_ids)} successful)")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error processing batch starting at {i}: {e}")
|
||||
except Exception as e:
|
||||
print(f"Error reprocessing media: {e}")
|
||||
|
||||
print(f"\nCompleted media reprocessing for channel {channel}")
|
||||
async def fix_missing_media(self, channel: str):
|
||||
conn = self.get_db_connection(channel)
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute('SELECT COUNT(*) FROM messages WHERE media_type IS NOT NULL AND media_type != "MessageMediaWebPage"')
|
||||
total_with_media = cursor.fetchone()[0]
|
||||
|
||||
cursor.execute('SELECT COUNT(*) FROM messages WHERE media_type IS NOT NULL AND media_type != "MessageMediaWebPage" AND media_path IS NOT NULL')
|
||||
total_with_files = cursor.fetchone()[0]
|
||||
|
||||
missing_count = total_with_media - total_with_files
|
||||
|
||||
print(f"\n📊 Media Analysis for {channel}:")
|
||||
print(f"Messages with media: {total_with_media}")
|
||||
print(f"Media files downloaded: {total_with_files}")
|
||||
print(f"Missing media files: {missing_count}")
|
||||
|
||||
if missing_count == 0:
|
||||
print("✅ All media files are already downloaded!")
|
||||
return
|
||||
|
||||
cursor.execute('SELECT message_id, media_type FROM messages WHERE media_type IS NOT NULL AND media_type != "MessageMediaWebPage" AND (media_path IS NULL OR media_path = "")')
|
||||
missing_media = cursor.fetchall()
|
||||
|
||||
if not missing_media:
|
||||
print("✅ No missing media found!")
|
||||
return
|
||||
|
||||
print(f"\n🔧 Attempting to download {len(missing_media)} missing media files...")
|
||||
|
||||
try:
|
||||
entity = await self.client.get_entity(PeerChannel(int(channel)))
|
||||
semaphore = asyncio.Semaphore(self.max_concurrent_downloads)
|
||||
completed_media = 0
|
||||
successful_downloads = 0
|
||||
|
||||
async def download_single_media(message):
|
||||
async with semaphore:
|
||||
return await self.download_media(channel, message)
|
||||
|
||||
batch_size = 10
|
||||
for i in range(0, len(missing_media), batch_size):
|
||||
batch = missing_media[i:i + batch_size]
|
||||
message_ids = [msg[0] for msg in batch]
|
||||
|
||||
messages = await self.client.get_messages(entity, ids=message_ids)
|
||||
valid_messages = [msg for msg in messages if msg and msg.media and not isinstance(msg.media, MessageMediaWebPage)]
|
||||
|
||||
tasks = [asyncio.create_task(download_single_media(msg)) for msg in valid_messages]
|
||||
|
||||
for j, task in enumerate(tasks):
|
||||
try:
|
||||
media_path = await task
|
||||
if media_path:
|
||||
await self.update_media_path(channel, valid_messages[j].id, media_path)
|
||||
successful_downloads += 1
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
completed_media += 1
|
||||
progress = (completed_media / len(missing_media)) * 100
|
||||
bar_length = 30
|
||||
filled_length = int(bar_length * completed_media // len(missing_media))
|
||||
bar = '█' * filled_length + '░' * (bar_length - filled_length)
|
||||
|
||||
sys.stdout.write(f"\r🔧 Fix Media: [{bar}] {progress:.1f}% ({completed_media}/{len(missing_media)})")
|
||||
sys.stdout.flush()
|
||||
|
||||
print(f"\n✅ Media fix complete! ({successful_downloads}/{len(missing_media)} successful)")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error fixing missing media: {e}")
|
||||
|
||||
async def continuous_scraping(self):
|
||||
self.continuous_scraping_active = True
|
||||
@@ -316,7 +419,6 @@ class OptimizedTelegramScraper:
|
||||
for channel in self.state['channels']:
|
||||
if not self.continuous_scraping_active:
|
||||
break
|
||||
|
||||
print(f"\nChecking for new messages in channel: {channel}")
|
||||
await self.scrape_channel(channel, self.state['channels'][channel])
|
||||
|
||||
@@ -326,17 +428,16 @@ class OptimizedTelegramScraper:
|
||||
await asyncio.sleep(sleep_time)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
print("Continuous scraping stopped.")
|
||||
print("Continuous scraping stopped")
|
||||
finally:
|
||||
self.continuous_scraping_active = False
|
||||
|
||||
def export_to_csv_optimized(self, channel: str):
|
||||
def export_to_csv(self, channel: str):
|
||||
conn = self.get_db_connection(channel)
|
||||
csv_file = Path(channel) / f'{channel}.csv'
|
||||
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('SELECT * FROM messages ORDER BY date')
|
||||
|
||||
columns = [description[0] for description in cursor.description]
|
||||
|
||||
with open(csv_file, 'w', newline='', encoding='utf-8') as f:
|
||||
@@ -349,7 +450,7 @@ class OptimizedTelegramScraper:
|
||||
break
|
||||
writer.writerows(rows)
|
||||
|
||||
def export_to_json_optimized(self, channel: str):
|
||||
def export_to_json(self, channel: str):
|
||||
conn = self.get_db_connection(channel)
|
||||
json_file = Path(channel) / f'{channel}.json'
|
||||
|
||||
@@ -378,89 +479,246 @@ class OptimizedTelegramScraper:
|
||||
f.write('\n]')
|
||||
|
||||
async def export_data(self):
|
||||
if not self.state['channels']:
|
||||
print("No channels to export")
|
||||
return
|
||||
|
||||
for channel in self.state['channels']:
|
||||
print(f"Exporting data for channel {channel}...")
|
||||
self.export_to_csv_optimized(channel)
|
||||
self.export_to_json_optimized(channel)
|
||||
print(f"Completed export for channel {channel}")
|
||||
try:
|
||||
self.export_to_csv(channel)
|
||||
self.export_to_json(channel)
|
||||
print(f"✅ Completed export for channel {channel}")
|
||||
except Exception as e:
|
||||
print(f"❌ Export failed for channel {channel}: {e}")
|
||||
|
||||
async def view_channels(self):
|
||||
if not self.state['channels']:
|
||||
print("No channels to view.")
|
||||
print("No channels saved")
|
||||
return
|
||||
|
||||
print("\nCurrent channels:")
|
||||
for channel, last_id in self.state['channels'].items():
|
||||
for i, (channel, last_id) in enumerate(self.state['channels'].items(), 1):
|
||||
try:
|
||||
conn = self.get_db_connection(channel)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('SELECT COUNT(*) FROM messages')
|
||||
count = cursor.fetchone()[0]
|
||||
print(f"Channel ID: {channel}, Last Message ID: {last_id}, Messages: {count}")
|
||||
print(f"[{i}] Channel ID: {channel}, Last Message ID: {last_id}, Messages: {count}")
|
||||
except:
|
||||
print(f"Channel ID: {channel}, Last Message ID: {last_id}")
|
||||
print(f"[{i}] Channel ID: {channel}, Last Message ID: {last_id}")
|
||||
|
||||
async def list_channels(self):
|
||||
try:
|
||||
print("\nList of channels joined by account:")
|
||||
count = 1
|
||||
async for dialog in self.client.iter_dialogs():
|
||||
if dialog.id != 777000:
|
||||
print(f"* {dialog.title} (id: {dialog.id})")
|
||||
print(f"[{count}] {dialog.title} (id: {dialog.id})")
|
||||
count += 1
|
||||
except Exception as e:
|
||||
print(f"Error listing channels: {e}")
|
||||
|
||||
def display_qr_code_ascii(self, qr_login):
|
||||
qr = qrcode.QRCode(box_size=1, border=1)
|
||||
qr.add_data(qr_login.url)
|
||||
qr.make()
|
||||
|
||||
f = StringIO()
|
||||
qr.print_ascii(out=f)
|
||||
f.seek(0)
|
||||
print(f.read())
|
||||
|
||||
async def qr_code_auth(self):
|
||||
print("\nChoosing QR Code authentication...")
|
||||
print("Please scan the QR code with your Telegram app:")
|
||||
print("1. Open Telegram on your phone")
|
||||
print("2. Go to Settings > Devices > Scan QR")
|
||||
print("3. Scan the code below\n")
|
||||
|
||||
qr_login = await self.client.qr_login()
|
||||
self.display_qr_code_ascii(qr_login)
|
||||
|
||||
try:
|
||||
await qr_login.wait()
|
||||
print("\n✅ Successfully logged in via QR code!")
|
||||
return True
|
||||
except SessionPasswordNeededError:
|
||||
password = input("Two-factor authentication enabled. Enter your password: ")
|
||||
await self.client.sign_in(password=password)
|
||||
print("\n✅ Successfully logged in with 2FA!")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"\n❌ QR code authentication failed: {e}")
|
||||
return False
|
||||
|
||||
async def phone_auth(self):
|
||||
phone = input("Enter your phone number: ")
|
||||
await self.client.send_code_request(phone)
|
||||
code = input("Enter the code you received: ")
|
||||
|
||||
try:
|
||||
await self.client.sign_in(phone, code)
|
||||
print("\n✅ Successfully logged in via phone!")
|
||||
return True
|
||||
except SessionPasswordNeededError:
|
||||
password = input("Two-factor authentication enabled. Enter your password: ")
|
||||
await self.client.sign_in(password=password)
|
||||
print("\n✅ Successfully logged in with 2FA!")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"\n❌ Phone authentication failed: {e}")
|
||||
return False
|
||||
|
||||
async def initialize_client(self):
|
||||
if not all([self.state['api_id'], self.state['api_hash'], self.state['phone']]):
|
||||
self.state['api_id'] = int(input("Enter your API ID: "))
|
||||
self.state['api_hash'] = input("Enter your API Hash: ")
|
||||
self.state['phone'] = input("Enter your phone number: ")
|
||||
self.save_state()
|
||||
if not all([self.state.get('api_id'), self.state.get('api_hash')]):
|
||||
print("\n=== API Configuration Required ===")
|
||||
print("You need to provide API credentials from https://my.telegram.org")
|
||||
try:
|
||||
self.state['api_id'] = int(input("Enter your API ID: "))
|
||||
self.state['api_hash'] = input("Enter your API Hash: ")
|
||||
self.save_state()
|
||||
except ValueError:
|
||||
print("Invalid API ID. Must be a number.")
|
||||
return False
|
||||
|
||||
self.client = TelegramClient('session', self.state['api_id'], self.state['api_hash'])
|
||||
await self.client.start()
|
||||
|
||||
try:
|
||||
await self.client.connect()
|
||||
except Exception as e:
|
||||
print(f"Failed to connect: {e}")
|
||||
return False
|
||||
|
||||
if not await self.client.is_user_authorized():
|
||||
print("\n=== Choose Authentication Method ===")
|
||||
print("[1] QR Code (Recommended - No phone number needed)")
|
||||
print("[2] Phone Number (Traditional method)")
|
||||
|
||||
while True:
|
||||
choice = input("Enter your choice (1 or 2): ").strip()
|
||||
if choice in ['1', '2']:
|
||||
break
|
||||
print("Please enter 1 or 2")
|
||||
|
||||
success = await self.qr_code_auth() if choice == '1' else await self.phone_auth()
|
||||
|
||||
if not success:
|
||||
print("Authentication failed. Please try again.")
|
||||
await self.client.disconnect()
|
||||
return False
|
||||
else:
|
||||
print("✅ Already authenticated!")
|
||||
|
||||
return True
|
||||
|
||||
def parse_channel_selection(self, choice):
|
||||
channels_list = list(self.state['channels'].keys())
|
||||
selected_channels = []
|
||||
|
||||
if choice.lower() == 'all':
|
||||
return channels_list
|
||||
|
||||
for selection in [x.strip() for x in choice.split(',')]:
|
||||
try:
|
||||
if selection.startswith('-'):
|
||||
if selection in self.state['channels']:
|
||||
selected_channels.append(selection)
|
||||
else:
|
||||
print(f"Channel ID {selection} not found in your channels")
|
||||
else:
|
||||
num = int(selection)
|
||||
if 1 <= num <= len(channels_list):
|
||||
selected_channels.append(channels_list[num - 1])
|
||||
else:
|
||||
print(f"Invalid channel number: {num}. Valid range: 1-{len(channels_list)}")
|
||||
except ValueError:
|
||||
print(f"Invalid input: {selection}. Use numbers (1,2,3) or full IDs (-100123...)")
|
||||
|
||||
return selected_channels
|
||||
|
||||
async def scrape_specific_channels(self):
|
||||
if not self.state['channels']:
|
||||
print("No channels available. Use [L] to add channels first")
|
||||
return
|
||||
|
||||
await self.view_channels()
|
||||
print("\n📥 Scrape Options:")
|
||||
print("• Single: 1 or -1001234567890")
|
||||
print("• Multiple: 1,3,5 or mix formats")
|
||||
print("• All channels: all")
|
||||
|
||||
choice = input("\nEnter selection: ").strip()
|
||||
selected_channels = self.parse_channel_selection(choice)
|
||||
|
||||
if selected_channels:
|
||||
print(f"\n🚀 Starting scrape of {len(selected_channels)} channel(s)...")
|
||||
for i, channel in enumerate(selected_channels, 1):
|
||||
print(f"\n[{i}/{len(selected_channels)}] Scraping: {channel}")
|
||||
await self.scrape_channel(channel, self.state['channels'][channel])
|
||||
print(f"\n✅ Completed scraping {len(selected_channels)} channel(s)!")
|
||||
else:
|
||||
print("❌ No valid channels selected")
|
||||
|
||||
async def manage_channels(self):
|
||||
while True:
|
||||
print("\nMenu:")
|
||||
print("[A] Add new channel")
|
||||
print("[R] Remove channel")
|
||||
print("[S] Scrape all channels")
|
||||
print("[M] Toggle media scraping (currently {})".format(
|
||||
"enabled" if self.state['scrape_media'] else "disabled"))
|
||||
print("\n" + "="*40)
|
||||
print(" TELEGRAM SCRAPER")
|
||||
print("="*40)
|
||||
print("[S] Scrape channels")
|
||||
print("[C] Continuous scraping")
|
||||
print(f"[M] Media scraping: {'ON' if self.state['scrape_media'] else 'OFF'}")
|
||||
print("[L] List & add channels")
|
||||
print("[R] Remove channels")
|
||||
print("[E] Export data")
|
||||
print("[V] View saved channels")
|
||||
print("[L] List account channels")
|
||||
print("[T] Rescrape media")
|
||||
print("[F] Fix missing media")
|
||||
print("[Q] Quit")
|
||||
print("="*40)
|
||||
|
||||
choice = input("Enter your choice: ").lower()
|
||||
choice = input("Enter your choice: ").lower().strip()
|
||||
|
||||
match choice:
|
||||
case 'a':
|
||||
channel = input("Enter channel ID: ")
|
||||
self.state['channels'][channel] = 0
|
||||
self.save_state()
|
||||
print(f"Added channel {channel}.")
|
||||
try:
|
||||
if choice == 'r':
|
||||
if not self.state['channels']:
|
||||
print("No channels to remove")
|
||||
continue
|
||||
|
||||
await self.view_channels()
|
||||
print("\nTo remove channels:")
|
||||
print("• Single: 1 or -1001234567890")
|
||||
print("• Multiple: 1,2,3 or mix formats")
|
||||
selection = input("Enter selection: ").strip()
|
||||
selected_channels = self.parse_channel_selection(selection)
|
||||
|
||||
case 'r':
|
||||
channel = input("Enter channel ID to remove: ")
|
||||
if channel in self.state['channels']:
|
||||
del self.state['channels'][channel]
|
||||
self.save_state()
|
||||
print(f"Removed channel {channel}.")
|
||||
if selected_channels:
|
||||
removed_count = 0
|
||||
for channel in selected_channels:
|
||||
if channel in self.state['channels']:
|
||||
del self.state['channels'][channel]
|
||||
print(f"✅ Removed channel {channel}")
|
||||
removed_count += 1
|
||||
else:
|
||||
print(f"❌ Channel {channel} not found")
|
||||
|
||||
if removed_count > 0:
|
||||
self.save_state()
|
||||
print(f"\n🎉 Removed {removed_count} channel(s)!")
|
||||
await self.view_channels()
|
||||
else:
|
||||
print("No channels were removed")
|
||||
else:
|
||||
print(f"Channel {channel} not found.")
|
||||
print("No valid channels selected")
|
||||
|
||||
case 's':
|
||||
for channel in self.state['channels']:
|
||||
await self.scrape_channel(channel, self.state['channels'][channel])
|
||||
|
||||
case 'm':
|
||||
elif choice == 's':
|
||||
await self.scrape_specific_channels()
|
||||
|
||||
elif choice == 'm':
|
||||
self.state['scrape_media'] = not self.state['scrape_media']
|
||||
self.save_state()
|
||||
print(f"Media scraping {'enabled' if self.state['scrape_media'] else 'disabled'}.")
|
||||
print(f"\n✅ Media scraping {'enabled' if self.state['scrape_media'] else 'disabled'}")
|
||||
|
||||
case 'c':
|
||||
elif choice == 'c':
|
||||
task = asyncio.create_task(self.continuous_scraping())
|
||||
print("Continuous scraping started. Press Ctrl+C to stop.")
|
||||
try:
|
||||
@@ -474,33 +732,114 @@ class OptimizedTelegramScraper:
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
case 'e':
|
||||
elif choice == 'e':
|
||||
await self.export_data()
|
||||
|
||||
case 'v':
|
||||
await self.view_channels()
|
||||
elif choice == 'l':
|
||||
channels_list = []
|
||||
async for dialog in self.client.iter_dialogs():
|
||||
if dialog.id != 777000:
|
||||
channels_list.append(str(dialog.id))
|
||||
|
||||
case 'l':
|
||||
await self.list_channels()
|
||||
print("\nTo add channels from the list above:")
|
||||
print("• Single: 1 or -1001234567890")
|
||||
print("• Multiple: 1,3,5 or mix formats")
|
||||
print("• Press Enter to skip adding")
|
||||
selection = input("\nEnter selection (or Enter to skip): ").strip()
|
||||
|
||||
case 'q':
|
||||
print("Quitting...")
|
||||
if selection:
|
||||
added_count = 0
|
||||
for sel in [x.strip() for x in selection.split(',')]:
|
||||
try:
|
||||
if sel.startswith('-'):
|
||||
channel = sel
|
||||
else:
|
||||
num = int(sel)
|
||||
if 1 <= num <= len(channels_list):
|
||||
channel = channels_list[num - 1]
|
||||
else:
|
||||
print(f"Invalid number: {num}. Choose 1-{len(channels_list)}")
|
||||
continue
|
||||
|
||||
if channel in self.state['channels']:
|
||||
print(f"Channel {channel} already added")
|
||||
else:
|
||||
self.state['channels'][channel] = 0
|
||||
self.save_state()
|
||||
print(f"✅ Added channel {channel}")
|
||||
added_count += 1
|
||||
|
||||
except ValueError:
|
||||
print(f"Invalid input: {sel}")
|
||||
|
||||
if added_count > 0:
|
||||
print(f"\n🎉 Added {added_count} new channel(s)!")
|
||||
await self.view_channels()
|
||||
else:
|
||||
print("No new channels were added")
|
||||
|
||||
elif choice == 't':
|
||||
if not self.state['channels']:
|
||||
print("No channels available. Add channels first")
|
||||
continue
|
||||
|
||||
await self.view_channels()
|
||||
print("\nEnter channel NUMBER (1,2,3...) or full channel ID (-100123...)")
|
||||
selection = input("Enter your selection: ").strip()
|
||||
selected_channels = self.parse_channel_selection(selection)
|
||||
|
||||
if len(selected_channels) == 1:
|
||||
channel = selected_channels[0]
|
||||
print(f"Rescaping media for channel: {channel}")
|
||||
await self.rescrape_media(channel)
|
||||
elif len(selected_channels) > 1:
|
||||
print("Please select only one channel for media rescaping")
|
||||
else:
|
||||
print("No valid channel selected")
|
||||
|
||||
elif choice == 'f':
|
||||
if not self.state['channels']:
|
||||
print("No channels available. Add channels first")
|
||||
continue
|
||||
|
||||
await self.view_channels()
|
||||
print("\nEnter channel NUMBER (1,2,3...) or full channel ID (-100123...)")
|
||||
selection = input("Enter your selection: ").strip()
|
||||
selected_channels = self.parse_channel_selection(selection)
|
||||
|
||||
if len(selected_channels) == 1:
|
||||
channel = selected_channels[0]
|
||||
await self.fix_missing_media(channel)
|
||||
elif len(selected_channels) > 1:
|
||||
print("Please select only one channel for fixing missing media")
|
||||
else:
|
||||
print("No valid channel selected")
|
||||
|
||||
elif choice == 'q':
|
||||
print("\n👋 Goodbye!")
|
||||
self.close_db_connections()
|
||||
await self.client.disconnect()
|
||||
if self.client:
|
||||
await self.client.disconnect()
|
||||
sys.exit()
|
||||
|
||||
case _:
|
||||
print("Invalid option.")
|
||||
else:
|
||||
print("Invalid option")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
|
||||
async def run(self):
|
||||
display_ascii_art()
|
||||
await self.initialize_client()
|
||||
try:
|
||||
await self.manage_channels()
|
||||
finally:
|
||||
self.close_db_connections()
|
||||
if self.client:
|
||||
await self.client.disconnect()
|
||||
if await self.initialize_client():
|
||||
try:
|
||||
await self.manage_channels()
|
||||
finally:
|
||||
self.close_db_connections()
|
||||
if self.client:
|
||||
await self.client.disconnect()
|
||||
else:
|
||||
print("Failed to initialize client. Exiting.")
|
||||
|
||||
async def main():
|
||||
scraper = OptimizedTelegramScraper()
|
||||
@@ -511,4 +850,4 @@ if __name__ == '__main__':
|
||||
asyncio.run(main())
|
||||
except KeyboardInterrupt:
|
||||
print("\nProgram interrupted. Exiting...")
|
||||
sys.exit()
|
||||
sys.exit()
|
||||
|
||||
Reference in New Issue
Block a user