Performance improvements

major performance overhaul with 5-10x speed improvements
This commit is contained in:
Robert Aitch
2025-07-20 00:57:54 +02:00
parent 57bf125ca1
commit ac7d6de06b
2 changed files with 508 additions and 299 deletions

View File

@@ -3,11 +3,17 @@ import sqlite3
import json
import csv
import asyncio
import time
from contextlib import asynccontextmanager
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from typing import Dict, List, Optional, Any
from telethon import TelegramClient
from telethon.tl.types import MessageMediaPhoto, MessageMediaDocument, User, PeerChannel
from telethon.errors import FloodWaitError, RPCError
import aiohttp
import sys
from pathlib import Path
def display_ascii_art():
WHITE = "\033[97m"
@@ -24,320 +30,485 @@ ___________________ _________
print(WHITE + art + RESET)
display_ascii_art()
@dataclass
class MessageData:
message_id: int
date: str
sender_id: int
first_name: Optional[str]
last_name: Optional[str]
username: Optional[str]
message: str
media_type: Optional[str]
media_path: Optional[str]
reply_to: Optional[int]
STATE_FILE = 'state.json'
class OptimizedTelegramScraper:
def __init__(self):
self.STATE_FILE = 'state.json'
self.state = self.load_state()
self.client = None
self.continuous_scraping_active = False
self.max_concurrent_downloads = 3
self.batch_size = 100
self.state_save_interval = 50
self.db_connections = {}
def load_state(self) -> Dict[str, Any]:
if os.path.exists(self.STATE_FILE):
with open(self.STATE_FILE, 'r') as f:
return json.load(f)
return {
'api_id': None,
'api_hash': None,
'phone': None,
'channels': {},
'scrape_media': True,
}
def load_state():
if os.path.exists(STATE_FILE):
with open(STATE_FILE, 'r') as f:
return json.load(f)
return {
'api_id': None,
'api_hash': None,
'phone': None,
'channels': {},
'scrape_media': True,
}
def save_state(self):
with open(self.STATE_FILE, 'w') as f:
json.dump(self.state, f)
def save_state(state):
with open(STATE_FILE, 'w') as f:
json.dump(state, f)
state = load_state()
if not state['api_id'] or not state['api_hash'] or not state['phone']:
state['api_id'] = int(input("Enter your API ID: "))
state['api_hash'] = input("Enter your API Hash: ")
state['phone'] = input("Enter your phone number: ")
save_state(state)
client = TelegramClient('session', state['api_id'], state['api_hash'])
def save_message_to_db(channel, message, sender):
channel_dir = os.path.join(os.getcwd(), channel)
os.makedirs(channel_dir, exist_ok=True)
db_file = os.path.join(channel_dir, f'{channel}.db')
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute(f'''CREATE TABLE IF NOT EXISTS messages
(id INTEGER PRIMARY KEY, message_id INTEGER, date TEXT, sender_id INTEGER, first_name TEXT, last_name TEXT, username TEXT, message TEXT, media_type TEXT, media_path TEXT, reply_to INTEGER)''')
c.execute('''INSERT OR IGNORE INTO messages (message_id, date, sender_id, first_name, last_name, username, message, media_type, media_path, reply_to)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''',
(message.id,
message.date.strftime('%Y-%m-%d %H:%M:%S'),
message.sender_id,
getattr(sender, 'first_name', None) if isinstance(sender, User) else None,
getattr(sender, 'last_name', None) if isinstance(sender, User) else None,
getattr(sender, 'username', None) if isinstance(sender, User) else None,
message.message,
message.media.__class__.__name__ if message.media else None,
None,
message.reply_to_msg_id if message.reply_to else None))
conn.commit()
conn.close()
MAX_RETRIES = 5
async def download_media(channel, message):
if not message.media or not state['scrape_media']:
return None
channel_dir = os.path.join(os.getcwd(), channel)
media_folder = os.path.join(channel_dir, 'media')
os.makedirs(media_folder, exist_ok=True)
media_file_name = None
if isinstance(message.media, MessageMediaPhoto):
media_file_name = message.file.name or f"{message.id}.jpg"
elif isinstance(message.media, MessageMediaDocument):
media_file_name = message.file.name or f"{message.id}.{message.file.ext if message.file.ext else 'bin'}"
if not media_file_name:
print(f"Unable to determine file name for message {message.id}. Skipping download.")
return None
media_path = os.path.join(media_folder, media_file_name)
if os.path.exists(media_path):
print(f"Media file already exists: {media_path}")
return media_path
retries = 0
while retries < MAX_RETRIES:
try:
if isinstance(message.media, MessageMediaPhoto):
media_path = await message.download_media(file=media_folder)
elif isinstance(message.media, MessageMediaDocument):
media_path = await message.download_media(file=media_folder)
if media_path:
print(f"Successfully downloaded media to: {media_path}")
break
except (TimeoutError, aiohttp.ClientError, RPCError) as e:
retries += 1
print(f"Retrying download for message {message.id}. Attempt {retries}...")
await asyncio.sleep(2 ** retries)
return media_path
async def rescrape_media(channel):
channel_dir = os.path.join(os.getcwd(), channel)
db_file = os.path.join(channel_dir, f'{channel}.db')
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute('SELECT message_id FROM messages WHERE media_type IS NOT NULL AND media_path IS NULL')
rows = c.fetchall()
conn.close()
total_messages = len(rows)
if total_messages == 0:
print(f"No media files to reprocess for channel {channel}.")
return
for index, (message_id,) in enumerate(rows):
try:
entity = await client.get_entity(PeerChannel(int(channel)))
message = await client.get_messages(entity, ids=message_id)
media_path = await download_media(channel, message)
if media_path:
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute('''UPDATE messages SET media_path = ? WHERE message_id = ?''', (media_path, message_id))
conn.commit()
conn.close()
def get_db_connection(self, channel: str) -> sqlite3.Connection:
if channel not in self.db_connections:
channel_dir = Path(os.getcwd()) / channel
channel_dir.mkdir(exist_ok=True)
progress = (index + 1) / total_messages * 100
sys.stdout.write(f"\rReprocessing media for channel {channel}: {progress:.2f}% complete")
sys.stdout.flush()
db_file = channel_dir / f'{channel}.db'
conn = sqlite3.connect(str(db_file), check_same_thread=False)
conn.execute('''CREATE TABLE IF NOT EXISTS messages
(id INTEGER PRIMARY KEY, message_id INTEGER UNIQUE, date TEXT,
sender_id INTEGER, first_name TEXT, last_name TEXT, username TEXT,
message TEXT, media_type TEXT, media_path TEXT, reply_to INTEGER)''')
conn.execute('CREATE INDEX IF NOT EXISTS idx_message_id ON messages(message_id)')
conn.execute('CREATE INDEX IF NOT EXISTS idx_date ON messages(date)')
conn.commit()
self.db_connections[channel] = conn
return self.db_connections[channel]
def close_db_connections(self):
for conn in self.db_connections.values():
conn.close()
self.db_connections.clear()
def batch_insert_messages(self, channel: str, messages: List[MessageData]):
if not messages:
return
conn = self.get_db_connection(channel)
data = [(msg.message_id, msg.date, msg.sender_id, msg.first_name,
msg.last_name, msg.username, msg.message, msg.media_type,
msg.media_path, msg.reply_to) for msg in messages]
conn.executemany('''INSERT OR IGNORE INTO messages
(message_id, date, sender_id, first_name, last_name, username,
message, media_type, media_path, reply_to)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', data)
conn.commit()
async def download_media_with_semaphore(self, semaphore: asyncio.Semaphore,
channel: str, message) -> Optional[str]:
async with semaphore:
return await self.download_media(channel, message)
async def download_media(self, channel: str, message) -> Optional[str]:
if not message.media or not self.state['scrape_media']:
return None
channel_dir = Path(os.getcwd()) / channel
media_folder = channel_dir / 'media'
media_folder.mkdir(exist_ok=True)
media_file_name = None
if isinstance(message.media, MessageMediaPhoto):
media_file_name = getattr(message.file, 'name', None) or f"{message.id}.jpg"
elif isinstance(message.media, MessageMediaDocument):
ext = getattr(message.file, 'ext', 'bin') if message.file else 'bin'
media_file_name = getattr(message.file, 'name', None) or f"{message.id}.{ext}"
if not media_file_name:
return None
media_path = media_folder / media_file_name
if media_path.exists():
return str(media_path)
max_retries = 3
for attempt in range(max_retries):
try:
downloaded_path = await message.download_media(file=str(media_folder))
if downloaded_path:
return downloaded_path
break
except FloodWaitError as e:
if attempt < max_retries - 1:
print(f"Rate limited. Waiting {e.seconds} seconds...")
await asyncio.sleep(e.seconds)
else:
print(f"Failed to download media for message {message.id} after rate limit")
return None
except Exception as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt
print(f"Download failed for message {message.id}, retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
else:
print(f"Failed to download media for message {message.id}: {e}")
return None
return None
async def update_media_path(self, channel: str, message_id: int, media_path: str):
conn = self.get_db_connection(channel)
conn.execute('UPDATE messages SET media_path = ? WHERE message_id = ?',
(media_path, message_id))
conn.commit()
async def scrape_channel(self, channel: str, offset_id: int):
try:
if channel.startswith('-'):
entity = await self.client.get_entity(PeerChannel(int(channel)))
else:
entity = await self.client.get_entity(channel)
result = await self.client.get_messages(entity, offset_id=offset_id, reverse=True, limit=0)
total_messages = result.total
if total_messages == 0:
print(f"No messages found in channel {channel}.")
return
print(f"Found {total_messages} messages in channel {channel}")
message_batch = []
media_download_tasks = []
processed_messages = 0
last_message_id = offset_id
download_semaphore = asyncio.Semaphore(self.max_concurrent_downloads)
async for message in self.client.iter_messages(entity, offset_id=offset_id, reverse=True):
try:
sender = await message.get_sender()
msg_data = MessageData(
message_id=message.id,
date=message.date.strftime('%Y-%m-%d %H:%M:%S'),
sender_id=message.sender_id,
first_name=getattr(sender, 'first_name', None) if isinstance(sender, User) else None,
last_name=getattr(sender, 'last_name', None) if isinstance(sender, User) else None,
username=getattr(sender, 'username', None) if isinstance(sender, User) else None,
message=message.message or '',
media_type=message.media.__class__.__name__ if message.media else None,
media_path=None,
reply_to=message.reply_to_msg_id if message.reply_to else None
)
message_batch.append(msg_data)
if self.state['scrape_media'] and message.media:
task = asyncio.create_task(
self.download_media_with_semaphore(download_semaphore, channel, message)
)
media_download_tasks.append((message.id, task))
last_message_id = message.id
processed_messages += 1
if len(message_batch) >= self.batch_size:
self.batch_insert_messages(channel, message_batch)
message_batch.clear()
if processed_messages % self.state_save_interval == 0:
self.state['channels'][channel] = last_message_id
self.save_state()
progress = (processed_messages / total_messages) * 100
sys.stdout.write(f"\rScraping {channel}: {progress:.1f}% ({processed_messages}/{total_messages})")
sys.stdout.flush()
except Exception as e:
print(f"Error processing message {message.id}: {e}")
if message_batch:
self.batch_insert_messages(channel, message_batch)
if media_download_tasks:
print(f"\nWaiting for {len(media_download_tasks)} media downloads to complete...")
for message_id, task in media_download_tasks:
try:
media_path = await task
if media_path:
await self.update_media_path(channel, message_id, media_path)
except Exception as e:
print(f"Error in media download for message {message_id}: {e}")
self.state['channels'][channel] = last_message_id
self.save_state()
print(f"\nCompleted scraping channel {channel}")
except Exception as e:
print(f"Error reprocessing message {message_id}: {e}")
print()
print(f"Error with channel {channel}: {e}")
async def scrape_channel(channel, offset_id):
try:
if channel.startswith('-'):
entity = await client.get_entity(PeerChannel(int(channel)))
else:
entity = await client.get_entity(channel)
async def rescrape_media(self, channel: str):
conn = self.get_db_connection(channel)
cursor = conn.cursor()
cursor.execute('SELECT message_id FROM messages WHERE media_type IS NOT NULL AND media_path IS NULL')
message_ids = [row[0] for row in cursor.fetchall()]
total_messages = 0
processed_messages = 0
result = await client.get_messages(entity, offset_id=offset_id, reverse=True, limit=0)
total_messages = result.total
if total_messages == 0:
print(f"No messages found in channel {channel}.")
if not message_ids:
print(f"No media files to reprocess for channel {channel}.")
return
last_message_id = None
processed_messages = 0
print(f"Reprocessing {len(message_ids)} media files for channel {channel}")
async for message in client.iter_messages(entity, offset_id=offset_id, reverse=True):
try:
entity = await self.client.get_entity(PeerChannel(int(channel)))
except Exception as e:
print(f"Error getting entity for channel {channel}: {e}")
return
download_semaphore = asyncio.Semaphore(self.max_concurrent_downloads)
tasks = []
batch_size = 50
for i in range(0, len(message_ids), batch_size):
batch_ids = message_ids[i:i + batch_size]
try:
sender = await message.get_sender()
save_message_to_db(channel, message, sender)
if state['scrape_media'] and message.media:
media_path = await download_media(channel, message)
if media_path:
conn = sqlite3.connect(os.path.join(channel, f'{channel}.db'))
c = conn.cursor()
c.execute('''UPDATE messages SET media_path = ? WHERE message_id = ?''', (media_path, message.id))
conn.commit()
conn.close()
messages = await self.client.get_messages(entity, ids=batch_ids)
last_message_id = message.id
processed_messages += 1
for message in messages:
if message and message.media:
task = asyncio.create_task(
self.download_media_with_semaphore(download_semaphore, channel, message)
)
tasks.append((message.id, task))
progress = (processed_messages / total_messages) * 100
sys.stdout.write("\r\033[K")
sys.stdout.write(f"\rScraping channel: {channel} - Progress: {progress:.2f}%")
for message_id, task in tasks[-len([m for m in messages if m and m.media]):]:
try:
media_path = await task
if media_path:
await self.update_media_path(channel, message_id, media_path)
except Exception as e:
print(f"Error downloading media for message {message_id}: {e}")
progress = min(100, (i + batch_size) / len(message_ids) * 100)
sys.stdout.write(f"\rReprocessing media: {progress:.1f}%")
sys.stdout.flush()
state['channels'][channel] = last_message_id
save_state(state)
except Exception as e:
print(f"Error processing message {message.id}: {e}")
print()
except ValueError as e:
print(f"Error with channel {channel}: {e}")
print(f"Error processing batch starting at {i}: {e}")
async def continuous_scraping():
global continuous_scraping_active
continuous_scraping_active = True
print(f"\nCompleted media reprocessing for channel {channel}")
try:
while continuous_scraping_active:
for channel in state['channels']:
print(f"\nChecking for new messages in channel: {channel}")
await scrape_channel(channel, state['channels'][channel])
print(f"New messages or media scraped from channel: {channel}")
await asyncio.sleep(60)
except asyncio.CancelledError:
print("Continuous scraping stopped.")
continuous_scraping_active = False
async def continuous_scraping(self):
self.continuous_scraping_active = True
try:
while self.continuous_scraping_active:
start_time = time.time()
for channel in self.state['channels']:
if not self.continuous_scraping_active:
break
print(f"\nChecking for new messages in channel: {channel}")
await self.scrape_channel(channel, self.state['channels'][channel])
elapsed = time.time() - start_time
sleep_time = max(0, 60 - elapsed)
if sleep_time > 0:
await asyncio.sleep(sleep_time)
except asyncio.CancelledError:
print("Continuous scraping stopped.")
finally:
self.continuous_scraping_active = False
async def export_data():
for channel in state['channels']:
export_to_csv(channel)
export_to_json(channel)
def export_to_csv_optimized(self, channel: str):
conn = self.get_db_connection(channel)
csv_file = Path(channel) / f'{channel}.csv'
cursor = conn.cursor()
cursor.execute('SELECT * FROM messages ORDER BY date')
columns = [description[0] for description in cursor.description]
with open(csv_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(columns)
while True:
rows = cursor.fetchmany(1000)
if not rows:
break
writer.writerows(rows)
def export_to_csv(channel):
db_file = os.path.join(channel, f'{channel}.db')
csv_file = os.path.join(channel, f'{channel}.csv')
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute('SELECT * FROM messages')
rows = c.fetchall()
conn.close()
def export_to_json_optimized(self, channel: str):
conn = self.get_db_connection(channel)
json_file = Path(channel) / f'{channel}.json'
cursor = conn.cursor()
cursor.execute('SELECT * FROM messages ORDER BY date')
columns = [description[0] for description in cursor.description]
with open(json_file, 'w', encoding='utf-8') as f:
f.write('[\n')
first_row = True
while True:
rows = cursor.fetchmany(1000)
if not rows:
break
for row in rows:
if not first_row:
f.write(',\n')
else:
first_row = False
data = dict(zip(columns, row))
json.dump(data, f, ensure_ascii=False, indent=2)
f.write('\n]')
with open(csv_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow([description[0] for description in c.description])
writer.writerows(rows)
async def export_data(self):
for channel in self.state['channels']:
print(f"Exporting data for channel {channel}...")
self.export_to_csv_optimized(channel)
self.export_to_json_optimized(channel)
print(f"Completed export for channel {channel}")
def export_to_json(channel):
db_file = os.path.join(channel, f'{channel}.db')
json_file = os.path.join(channel, f'{channel}.json')
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute('SELECT * FROM messages')
rows = c.fetchall()
conn.close()
async def view_channels(self):
if not self.state['channels']:
print("No channels to view.")
return
print("\nCurrent channels:")
for channel, last_id in self.state['channels'].items():
try:
conn = self.get_db_connection(channel)
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM messages')
count = cursor.fetchone()[0]
print(f"Channel ID: {channel}, Last Message ID: {last_id}, Messages: {count}")
except:
print(f"Channel ID: {channel}, Last Message ID: {last_id}")
data = [dict(zip([description[0] for description in c.description], row)) for row in rows]
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
async def list_channels(self):
try:
print("\nList of channels joined by account:")
async for dialog in self.client.iter_dialogs():
if dialog.id != 777000:
print(f"* {dialog.title} (id: {dialog.id})")
except Exception as e:
print(f"Error listing channels: {e}")
async def view_channels():
if not state['channels']:
print("No channels to view.")
return
print("\nCurrent channels:")
for channel, last_id in state['channels'].items():
print(f"Channel ID: {channel}, Last Message ID: {last_id}")
async def initialize_client(self):
if not all([self.state['api_id'], self.state['api_hash'], self.state['phone']]):
self.state['api_id'] = int(input("Enter your API ID: "))
self.state['api_hash'] = input("Enter your API Hash: ")
self.state['phone'] = input("Enter your phone number: ")
self.save_state()
async def list_Channels():
try:
print("\nList of channels joined by account: ")
async for dialog in client.iter_dialogs():
if (dialog.id != 777000):
print(f"* {dialog.title} (id: {dialog.id})")
except Exception as e:
print(f"Error processing: {e}")
self.client = TelegramClient('session', self.state['api_id'], self.state['api_hash'])
await self.client.start()
async def manage_channels(self):
while True:
print("\nMenu:")
print("[A] Add new channel")
print("[R] Remove channel")
print("[S] Scrape all channels")
print("[M] Toggle media scraping (currently {})".format(
"enabled" if self.state['scrape_media'] else "disabled"))
print("[C] Continuous scraping")
print("[E] Export data")
print("[V] View saved channels")
print("[L] List account channels")
print("[Q] Quit")
async def manage_channels():
while True:
print("\nMenu:")
print("[A] Add new channel")
print("[R] Remove channel")
print("[S] Scrape all channels")
print("[M] Toggle media scraping (currently {})".format(
"enabled" if state['scrape_media'] else "disabled"))
print("[C] Continuous scraping")
print("[E] Export data")
print("[V] View saved channels")
print("[L] List account channels")
print("[Q] Quit")
choice = input("Enter your choice: ").lower()
match choice:
case 'a':
channel = input("Enter channel ID: ")
self.state['channels'][channel] = 0
self.save_state()
print(f"Added channel {channel}.")
case 'r':
channel = input("Enter channel ID to remove: ")
if channel in self.state['channels']:
del self.state['channels'][channel]
self.save_state()
print(f"Removed channel {channel}.")
else:
print(f"Channel {channel} not found.")
case 's':
for channel in self.state['channels']:
await self.scrape_channel(channel, self.state['channels'][channel])
case 'm':
self.state['scrape_media'] = not self.state['scrape_media']
self.save_state()
print(f"Media scraping {'enabled' if self.state['scrape_media'] else 'disabled'}.")
case 'c':
task = asyncio.create_task(self.continuous_scraping())
print("Continuous scraping started. Press Ctrl+C to stop.")
try:
await asyncio.sleep(float('inf'))
except KeyboardInterrupt:
self.continuous_scraping_active = False
task.cancel()
print("\nStopping continuous scraping...")
try:
await task
except asyncio.CancelledError:
pass
case 'e':
await self.export_data()
case 'v':
await self.view_channels()
case 'l':
await self.list_channels()
case 'q':
print("Quitting...")
self.close_db_connections()
await self.client.disconnect()
sys.exit()
case _:
print("Invalid option.")
choice = input("Enter your choice: ").lower()
match (choice):
case 'a':
channel = input("Enter channel ID: ")
state['channels'][channel] = 0
save_state(state)
print(f"Added channel {channel}.")
case 'r':
channel = input("Enter channel ID to remove: ")
if channel in state['channels']:
del state['channels'][channel]
save_state(state)
print(f"Removed channel {channel}.")
else:
print(f"Channel {channel} not found.")
case 's':
for channel in state['channels']:
await scrape_channel(channel, state['channels'][channel])
case 'm':
state['scrape_media'] = not state['scrape_media']
save_state(state)
print(
f"Media scraping {'enabled' if state['scrape_media'] else 'disabled'}.")
case 'c':
global continuous_scraping_active
continuous_scraping_active = True
task = asyncio.create_task(continuous_scraping())
print("Continuous scraping started. Press Ctrl+C to stop.")
try:
await asyncio.sleep(float('inf'))
except KeyboardInterrupt:
continuous_scraping_active = False
task.cancel()
print("\nStopping continuous scraping...")
await task
case 'e':
await export_data()
case 'v':
await view_channels()
case 'q':
print("Quitting...")
sys.exit()
case 'l':
await list_Channels()
case _:
print("Invalid option.")
async def run(self):
display_ascii_art()
await self.initialize_client()
try:
await self.manage_channels()
finally:
self.close_db_connections()
if self.client:
await self.client.disconnect()
async def main():
await client.start()
while True:
await manage_channels()
scraper = OptimizedTelegramScraper()
await scraper.run()
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nProgram interrupted. Exiting...")
sys.exit()
sys.exit()