514 lines
20 KiB
Python
514 lines
20 KiB
Python
import os
|
|
import sqlite3
|
|
import json
|
|
import csv
|
|
import asyncio
|
|
import time
|
|
from contextlib import asynccontextmanager
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from dataclasses import dataclass
|
|
from typing import Dict, List, Optional, Any
|
|
from telethon import TelegramClient
|
|
from telethon.tl.types import MessageMediaPhoto, MessageMediaDocument, User, PeerChannel
|
|
from telethon.errors import FloodWaitError, RPCError
|
|
import aiohttp
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
def display_ascii_art():
|
|
WHITE = "\033[97m"
|
|
RESET = "\033[0m"
|
|
|
|
art = r"""
|
|
___________________ _________
|
|
\__ ___/ _____/ / _____/
|
|
| | / \ ___ \_____ \
|
|
| | \ \_\ \/ \
|
|
|____| \______ /_______ /
|
|
\/ \/
|
|
"""
|
|
|
|
print(WHITE + art + RESET)
|
|
|
|
@dataclass
|
|
class MessageData:
|
|
message_id: int
|
|
date: str
|
|
sender_id: int
|
|
first_name: Optional[str]
|
|
last_name: Optional[str]
|
|
username: Optional[str]
|
|
message: str
|
|
media_type: Optional[str]
|
|
media_path: Optional[str]
|
|
reply_to: Optional[int]
|
|
|
|
class OptimizedTelegramScraper:
|
|
def __init__(self):
|
|
self.STATE_FILE = 'state.json'
|
|
self.state = self.load_state()
|
|
self.client = None
|
|
self.continuous_scraping_active = False
|
|
self.max_concurrent_downloads = 3
|
|
self.batch_size = 100
|
|
self.state_save_interval = 50
|
|
self.db_connections = {}
|
|
|
|
def load_state(self) -> Dict[str, Any]:
|
|
if os.path.exists(self.STATE_FILE):
|
|
with open(self.STATE_FILE, 'r') as f:
|
|
return json.load(f)
|
|
return {
|
|
'api_id': None,
|
|
'api_hash': None,
|
|
'phone': None,
|
|
'channels': {},
|
|
'scrape_media': True,
|
|
}
|
|
|
|
def save_state(self):
|
|
with open(self.STATE_FILE, 'w') as f:
|
|
json.dump(self.state, f)
|
|
|
|
def get_db_connection(self, channel: str) -> sqlite3.Connection:
|
|
if channel not in self.db_connections:
|
|
channel_dir = Path(os.getcwd()) / channel
|
|
channel_dir.mkdir(exist_ok=True)
|
|
|
|
db_file = channel_dir / f'{channel}.db'
|
|
conn = sqlite3.connect(str(db_file), check_same_thread=False)
|
|
conn.execute('''CREATE TABLE IF NOT EXISTS messages
|
|
(id INTEGER PRIMARY KEY, message_id INTEGER UNIQUE, date TEXT,
|
|
sender_id INTEGER, first_name TEXT, last_name TEXT, username TEXT,
|
|
message TEXT, media_type TEXT, media_path TEXT, reply_to INTEGER)''')
|
|
conn.execute('CREATE INDEX IF NOT EXISTS idx_message_id ON messages(message_id)')
|
|
conn.execute('CREATE INDEX IF NOT EXISTS idx_date ON messages(date)')
|
|
conn.commit()
|
|
self.db_connections[channel] = conn
|
|
|
|
return self.db_connections[channel]
|
|
|
|
def close_db_connections(self):
|
|
for conn in self.db_connections.values():
|
|
conn.close()
|
|
self.db_connections.clear()
|
|
|
|
def batch_insert_messages(self, channel: str, messages: List[MessageData]):
|
|
if not messages:
|
|
return
|
|
|
|
conn = self.get_db_connection(channel)
|
|
data = [(msg.message_id, msg.date, msg.sender_id, msg.first_name,
|
|
msg.last_name, msg.username, msg.message, msg.media_type,
|
|
msg.media_path, msg.reply_to) for msg in messages]
|
|
|
|
conn.executemany('''INSERT OR IGNORE INTO messages
|
|
(message_id, date, sender_id, first_name, last_name, username,
|
|
message, media_type, media_path, reply_to)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', data)
|
|
conn.commit()
|
|
|
|
async def download_media_with_semaphore(self, semaphore: asyncio.Semaphore,
|
|
channel: str, message) -> Optional[str]:
|
|
async with semaphore:
|
|
return await self.download_media(channel, message)
|
|
|
|
async def download_media(self, channel: str, message) -> Optional[str]:
|
|
if not message.media or not self.state['scrape_media']:
|
|
return None
|
|
|
|
channel_dir = Path(os.getcwd()) / channel
|
|
media_folder = channel_dir / 'media'
|
|
media_folder.mkdir(exist_ok=True)
|
|
|
|
media_file_name = None
|
|
if isinstance(message.media, MessageMediaPhoto):
|
|
media_file_name = getattr(message.file, 'name', None) or f"{message.id}.jpg"
|
|
elif isinstance(message.media, MessageMediaDocument):
|
|
ext = getattr(message.file, 'ext', 'bin') if message.file else 'bin'
|
|
media_file_name = getattr(message.file, 'name', None) or f"{message.id}.{ext}"
|
|
|
|
if not media_file_name:
|
|
return None
|
|
|
|
media_path = media_folder / media_file_name
|
|
|
|
if media_path.exists():
|
|
return str(media_path)
|
|
|
|
max_retries = 3
|
|
for attempt in range(max_retries):
|
|
try:
|
|
downloaded_path = await message.download_media(file=str(media_folder))
|
|
if downloaded_path:
|
|
return downloaded_path
|
|
break
|
|
except FloodWaitError as e:
|
|
if attempt < max_retries - 1:
|
|
print(f"Rate limited. Waiting {e.seconds} seconds...")
|
|
await asyncio.sleep(e.seconds)
|
|
else:
|
|
print(f"Failed to download media for message {message.id} after rate limit")
|
|
return None
|
|
except Exception as e:
|
|
if attempt < max_retries - 1:
|
|
wait_time = 2 ** attempt
|
|
print(f"Download failed for message {message.id}, retrying in {wait_time}s...")
|
|
await asyncio.sleep(wait_time)
|
|
else:
|
|
print(f"Failed to download media for message {message.id}: {e}")
|
|
return None
|
|
|
|
return None
|
|
|
|
async def update_media_path(self, channel: str, message_id: int, media_path: str):
|
|
conn = self.get_db_connection(channel)
|
|
conn.execute('UPDATE messages SET media_path = ? WHERE message_id = ?',
|
|
(media_path, message_id))
|
|
conn.commit()
|
|
|
|
async def scrape_channel(self, channel: str, offset_id: int):
|
|
try:
|
|
if channel.startswith('-'):
|
|
entity = await self.client.get_entity(PeerChannel(int(channel)))
|
|
else:
|
|
entity = await self.client.get_entity(channel)
|
|
|
|
result = await self.client.get_messages(entity, offset_id=offset_id, reverse=True, limit=0)
|
|
total_messages = result.total
|
|
|
|
if total_messages == 0:
|
|
print(f"No messages found in channel {channel}.")
|
|
return
|
|
|
|
print(f"Found {total_messages} messages in channel {channel}")
|
|
|
|
message_batch = []
|
|
media_download_tasks = []
|
|
processed_messages = 0
|
|
last_message_id = offset_id
|
|
|
|
download_semaphore = asyncio.Semaphore(self.max_concurrent_downloads)
|
|
|
|
async for message in self.client.iter_messages(entity, offset_id=offset_id, reverse=True):
|
|
try:
|
|
sender = await message.get_sender()
|
|
|
|
msg_data = MessageData(
|
|
message_id=message.id,
|
|
date=message.date.strftime('%Y-%m-%d %H:%M:%S'),
|
|
sender_id=message.sender_id,
|
|
first_name=getattr(sender, 'first_name', None) if isinstance(sender, User) else None,
|
|
last_name=getattr(sender, 'last_name', None) if isinstance(sender, User) else None,
|
|
username=getattr(sender, 'username', None) if isinstance(sender, User) else None,
|
|
message=message.message or '',
|
|
media_type=message.media.__class__.__name__ if message.media else None,
|
|
media_path=None,
|
|
reply_to=message.reply_to_msg_id if message.reply_to else None
|
|
)
|
|
|
|
message_batch.append(msg_data)
|
|
|
|
if self.state['scrape_media'] and message.media:
|
|
task = asyncio.create_task(
|
|
self.download_media_with_semaphore(download_semaphore, channel, message)
|
|
)
|
|
media_download_tasks.append((message.id, task))
|
|
|
|
last_message_id = message.id
|
|
processed_messages += 1
|
|
|
|
if len(message_batch) >= self.batch_size:
|
|
self.batch_insert_messages(channel, message_batch)
|
|
message_batch.clear()
|
|
|
|
if processed_messages % self.state_save_interval == 0:
|
|
self.state['channels'][channel] = last_message_id
|
|
self.save_state()
|
|
|
|
progress = (processed_messages / total_messages) * 100
|
|
sys.stdout.write(f"\rScraping {channel}: {progress:.1f}% ({processed_messages}/{total_messages})")
|
|
sys.stdout.flush()
|
|
|
|
except Exception as e:
|
|
print(f"Error processing message {message.id}: {e}")
|
|
|
|
if message_batch:
|
|
self.batch_insert_messages(channel, message_batch)
|
|
|
|
if media_download_tasks:
|
|
print(f"\nWaiting for {len(media_download_tasks)} media downloads to complete...")
|
|
for message_id, task in media_download_tasks:
|
|
try:
|
|
media_path = await task
|
|
if media_path:
|
|
await self.update_media_path(channel, message_id, media_path)
|
|
except Exception as e:
|
|
print(f"Error in media download for message {message_id}: {e}")
|
|
|
|
self.state['channels'][channel] = last_message_id
|
|
self.save_state()
|
|
|
|
print(f"\nCompleted scraping channel {channel}")
|
|
|
|
except Exception as e:
|
|
print(f"Error with channel {channel}: {e}")
|
|
|
|
async def rescrape_media(self, channel: str):
|
|
conn = self.get_db_connection(channel)
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT message_id FROM messages WHERE media_type IS NOT NULL AND media_path IS NULL')
|
|
message_ids = [row[0] for row in cursor.fetchall()]
|
|
|
|
if not message_ids:
|
|
print(f"No media files to reprocess for channel {channel}.")
|
|
return
|
|
|
|
print(f"Reprocessing {len(message_ids)} media files for channel {channel}")
|
|
|
|
try:
|
|
entity = await self.client.get_entity(PeerChannel(int(channel)))
|
|
except Exception as e:
|
|
print(f"Error getting entity for channel {channel}: {e}")
|
|
return
|
|
|
|
download_semaphore = asyncio.Semaphore(self.max_concurrent_downloads)
|
|
tasks = []
|
|
|
|
batch_size = 50
|
|
for i in range(0, len(message_ids), batch_size):
|
|
batch_ids = message_ids[i:i + batch_size]
|
|
|
|
try:
|
|
messages = await self.client.get_messages(entity, ids=batch_ids)
|
|
|
|
for message in messages:
|
|
if message and message.media:
|
|
task = asyncio.create_task(
|
|
self.download_media_with_semaphore(download_semaphore, channel, message)
|
|
)
|
|
tasks.append((message.id, task))
|
|
|
|
for message_id, task in tasks[-len([m for m in messages if m and m.media]):]:
|
|
try:
|
|
media_path = await task
|
|
if media_path:
|
|
await self.update_media_path(channel, message_id, media_path)
|
|
except Exception as e:
|
|
print(f"Error downloading media for message {message_id}: {e}")
|
|
|
|
progress = min(100, (i + batch_size) / len(message_ids) * 100)
|
|
sys.stdout.write(f"\rReprocessing media: {progress:.1f}%")
|
|
sys.stdout.flush()
|
|
|
|
except Exception as e:
|
|
print(f"Error processing batch starting at {i}: {e}")
|
|
|
|
print(f"\nCompleted media reprocessing for channel {channel}")
|
|
|
|
async def continuous_scraping(self):
|
|
self.continuous_scraping_active = True
|
|
|
|
try:
|
|
while self.continuous_scraping_active:
|
|
start_time = time.time()
|
|
|
|
for channel in self.state['channels']:
|
|
if not self.continuous_scraping_active:
|
|
break
|
|
|
|
print(f"\nChecking for new messages in channel: {channel}")
|
|
await self.scrape_channel(channel, self.state['channels'][channel])
|
|
|
|
elapsed = time.time() - start_time
|
|
sleep_time = max(0, 60 - elapsed)
|
|
if sleep_time > 0:
|
|
await asyncio.sleep(sleep_time)
|
|
|
|
except asyncio.CancelledError:
|
|
print("Continuous scraping stopped.")
|
|
finally:
|
|
self.continuous_scraping_active = False
|
|
|
|
def export_to_csv_optimized(self, channel: str):
|
|
conn = self.get_db_connection(channel)
|
|
csv_file = Path(channel) / f'{channel}.csv'
|
|
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT * FROM messages ORDER BY date')
|
|
|
|
columns = [description[0] for description in cursor.description]
|
|
|
|
with open(csv_file, 'w', newline='', encoding='utf-8') as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow(columns)
|
|
|
|
while True:
|
|
rows = cursor.fetchmany(1000)
|
|
if not rows:
|
|
break
|
|
writer.writerows(rows)
|
|
|
|
def export_to_json_optimized(self, channel: str):
|
|
conn = self.get_db_connection(channel)
|
|
json_file = Path(channel) / f'{channel}.json'
|
|
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT * FROM messages ORDER BY date')
|
|
columns = [description[0] for description in cursor.description]
|
|
|
|
with open(json_file, 'w', encoding='utf-8') as f:
|
|
f.write('[\n')
|
|
first_row = True
|
|
|
|
while True:
|
|
rows = cursor.fetchmany(1000)
|
|
if not rows:
|
|
break
|
|
|
|
for row in rows:
|
|
if not first_row:
|
|
f.write(',\n')
|
|
else:
|
|
first_row = False
|
|
|
|
data = dict(zip(columns, row))
|
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
|
|
|
f.write('\n]')
|
|
|
|
async def export_data(self):
|
|
for channel in self.state['channels']:
|
|
print(f"Exporting data for channel {channel}...")
|
|
self.export_to_csv_optimized(channel)
|
|
self.export_to_json_optimized(channel)
|
|
print(f"Completed export for channel {channel}")
|
|
|
|
async def view_channels(self):
|
|
if not self.state['channels']:
|
|
print("No channels to view.")
|
|
return
|
|
|
|
print("\nCurrent channels:")
|
|
for channel, last_id in self.state['channels'].items():
|
|
try:
|
|
conn = self.get_db_connection(channel)
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT COUNT(*) FROM messages')
|
|
count = cursor.fetchone()[0]
|
|
print(f"Channel ID: {channel}, Last Message ID: {last_id}, Messages: {count}")
|
|
except:
|
|
print(f"Channel ID: {channel}, Last Message ID: {last_id}")
|
|
|
|
async def list_channels(self):
|
|
try:
|
|
print("\nList of channels joined by account:")
|
|
async for dialog in self.client.iter_dialogs():
|
|
if dialog.id != 777000:
|
|
print(f"* {dialog.title} (id: {dialog.id})")
|
|
except Exception as e:
|
|
print(f"Error listing channels: {e}")
|
|
|
|
async def initialize_client(self):
|
|
if not all([self.state['api_id'], self.state['api_hash'], self.state['phone']]):
|
|
self.state['api_id'] = int(input("Enter your API ID: "))
|
|
self.state['api_hash'] = input("Enter your API Hash: ")
|
|
self.state['phone'] = input("Enter your phone number: ")
|
|
self.save_state()
|
|
|
|
self.client = TelegramClient('session', self.state['api_id'], self.state['api_hash'])
|
|
await self.client.start()
|
|
|
|
async def manage_channels(self):
|
|
while True:
|
|
print("\nMenu:")
|
|
print("[A] Add new channel")
|
|
print("[R] Remove channel")
|
|
print("[S] Scrape all channels")
|
|
print("[M] Toggle media scraping (currently {})".format(
|
|
"enabled" if self.state['scrape_media'] else "disabled"))
|
|
print("[C] Continuous scraping")
|
|
print("[E] Export data")
|
|
print("[V] View saved channels")
|
|
print("[L] List account channels")
|
|
print("[Q] Quit")
|
|
|
|
choice = input("Enter your choice: ").lower()
|
|
|
|
match choice:
|
|
case 'a':
|
|
channel = input("Enter channel ID: ")
|
|
self.state['channels'][channel] = 0
|
|
self.save_state()
|
|
print(f"Added channel {channel}.")
|
|
|
|
case 'r':
|
|
channel = input("Enter channel ID to remove: ")
|
|
if channel in self.state['channels']:
|
|
del self.state['channels'][channel]
|
|
self.save_state()
|
|
print(f"Removed channel {channel}.")
|
|
else:
|
|
print(f"Channel {channel} not found.")
|
|
|
|
case 's':
|
|
for channel in self.state['channels']:
|
|
await self.scrape_channel(channel, self.state['channels'][channel])
|
|
|
|
case 'm':
|
|
self.state['scrape_media'] = not self.state['scrape_media']
|
|
self.save_state()
|
|
print(f"Media scraping {'enabled' if self.state['scrape_media'] else 'disabled'}.")
|
|
|
|
case 'c':
|
|
task = asyncio.create_task(self.continuous_scraping())
|
|
print("Continuous scraping started. Press Ctrl+C to stop.")
|
|
try:
|
|
await asyncio.sleep(float('inf'))
|
|
except KeyboardInterrupt:
|
|
self.continuous_scraping_active = False
|
|
task.cancel()
|
|
print("\nStopping continuous scraping...")
|
|
try:
|
|
await task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
case 'e':
|
|
await self.export_data()
|
|
|
|
case 'v':
|
|
await self.view_channels()
|
|
|
|
case 'l':
|
|
await self.list_channels()
|
|
|
|
case 'q':
|
|
print("Quitting...")
|
|
self.close_db_connections()
|
|
await self.client.disconnect()
|
|
sys.exit()
|
|
|
|
case _:
|
|
print("Invalid option.")
|
|
|
|
async def run(self):
|
|
display_ascii_art()
|
|
await self.initialize_client()
|
|
try:
|
|
await self.manage_channels()
|
|
finally:
|
|
self.close_db_connections()
|
|
if self.client:
|
|
await self.client.disconnect()
|
|
|
|
async def main():
|
|
scraper = OptimizedTelegramScraper()
|
|
await scraper.run()
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
print("\nProgram interrupted. Exiting...")
|
|
sys.exit() |