From 57bf125ca13f81ca60bc6fe2545db1bf7c4db47a Mon Sep 17 00:00:00 2001 From: Robert Aitch Date: Sun, 20 Jul 2025 00:36:53 +0200 Subject: [PATCH] Delete gai.py --- gai.py | 405 --------------------------------------------------------- 1 file changed, 405 deletions(-) delete mode 100644 gai.py diff --git a/gai.py b/gai.py deleted file mode 100644 index d8b328a..0000000 --- a/gai.py +++ /dev/null @@ -1,405 +0,0 @@ -import os -import sqlite3 -import json -import csv -import asyncio -from telethon import TelegramClient -from telethon.tl.types import MessageMediaPhoto, MessageMediaDocument, User, PeerChannel -from telethon.errors import FloodWaitError, RPCError -import aiohttp -import sys -import logging -import os -from datetime import datetime -from tqdm import tqdm - -# 定义日志文件夹路径 -log_folder = 'logs' -# 如果日志文件夹不存在,则创建它 -if not os.path.exists(log_folder): - os.makedirs(log_folder) -script_name = os.path.splitext(os.path.basename(__file__))[0] -current_time = datetime.now().strftime("%Y%m%d%H%M%S") -log_file_name = f"{script_name}_{current_time}.log" -log_file_path = os.path.join(log_folder, log_file_name) -# 基本配置,将日志输出到指定文件夹下的文件 -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - filename=log_file_path -) -# 记录日志 -logging.info('程序开始') - - -def display_ascii_art(): - WHITE = "\033[97m" - RESET = "\033[0m" - - art = r""" -___________________ _________ -\__ ___/ _____/ / _____/ - | | / \ ___ \_____ \ - | | \ \_\ \/ \ - |____| \______ /_______ / - \/ \/ - """ - - print(WHITE + art + RESET) - -display_ascii_art() - -STATE_FILE = 'state.json' - -def load_state(): - if os.path.exists(STATE_FILE): - with open(STATE_FILE, 'r') as f: - logging.info('读取状态文件') - return json.load(f) - return { - 'api_id': None, - 'api_hash': None, - 'phone': None, - 'channels': {}, - 'scrape_media': True, - } - -def save_state(state): - with open(STATE_FILE, 'w') as f: - # logging.info('保存状态文件') - json.dump(state, f) - -state = load_state() - -logging.info('状态文件加载完成') - -if not state['api_id'] or not state['api_hash'] or not state['phone']: - state['api_id'] = int(input("Enter your API ID: ")) - state['api_hash'] = input("Enter your API Hash: ") - state['phone'] = input("Enter your phone number: ") - save_state(state) - -client = TelegramClient('session', state['api_id'], state['api_hash']) -logging.info('客户端连接完成') -def save_message_to_db(channel, message, sender): - channel_dir = os.path.join(os.getcwd(), channel) - os.makedirs(channel_dir, exist_ok=True) - - db_file = os.path.join(channel_dir, f'{channel}.db') - conn = sqlite3.connect(db_file) - c = conn.cursor() - c.execute(f'''CREATE TABLE IF NOT EXISTS messages - (id INTEGER PRIMARY KEY, - message_id INTEGER, - date TEXT, - sender_id INTEGER, - first_name TEXT, - last_name TEXT, - username TEXT, - message TEXT, - media_type TEXT, - media_path TEXT, - reply_to INTEGER)''') - c.execute('''INSERT OR IGNORE INTO messages (message_id, date, sender_id, first_name, last_name, username, message, media_type, media_path, reply_to) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', - (message.id, - message.date.strftime('%Y-%m-%d %H:%M:%S'), - message.sender_id, - getattr(sender, 'first_name', None) if isinstance(sender, User) else None, - getattr(sender, 'last_name', None) if isinstance(sender, User) else None, - getattr(sender, 'username', None) if isinstance(sender, User) else None, - message.message, - message.media.__class__.__name__ if message.media else None, - None, - message.reply_to_msg_id if message.reply_to else None)) - conn.commit() - conn.close() - -MAX_RETRIES = 5 - -async def download_media(channel, message): - if not message.media or not state['scrape_media']: - return None - - channel_dir = os.path.join(os.getcwd(), channel) - media_folder = os.path.join(channel_dir, 'media') - os.makedirs(media_folder, exist_ok=True) - media_file_name = None - if isinstance(message.media, MessageMediaPhoto): - media_file_name = message.file.name or f"{message.id}.jpg" - logging.info("跳过图片下载") - return None - elif isinstance(message.media, MessageMediaDocument): - media_file_name = message.file.name or f"{message.id}.{message.file.ext if message.file.ext else 'bin'}" - - if not media_file_name: - # print(f"Unable to determine file name for message {message.id}. Skipping download.") - logging.info(f"Unable to determine file name for message {message.id}. Skipping download.") - return None - - media_path = os.path.join(media_folder, media_file_name) - - if os.path.exists(media_path): - logging.info(f"Media file already exists: {media_path}") - return media_path - - retries = 0 - while retries < MAX_RETRIES: - try: - if isinstance(message.media, MessageMediaPhoto): - logging.info("message.media为MessageMediaPhoto") - media_path = await message.download_media(file=media_folder) - elif isinstance(message.media, MessageMediaDocument): - logging.info("message.media为MessageMediaDocument") - media_path = await message.download_media(file=media_folder) - if media_path: - - logging.info(f"Successfully downloaded media to: {media_path}") - break - except (TimeoutError, aiohttp.ClientError, RPCError) as e: - retries += 1 - print(f"Retrying download for message {message.id}. Attempt {retries}...") - logging.info(f"Retrying download for message {message.id}. Attempt {retries}...") - logging.info(f"sleep {2 ** retries}s") - await asyncio.sleep(2 ** retries) - return media_path - -async def rescrape_media(channel): - channel_dir = os.path.join(os.getcwd(), channel) - db_file = os.path.join(channel_dir, f'{channel}.db') - conn = sqlite3.connect(db_file) - c = conn.cursor() - c.execute('SELECT message_id FROM messages WHERE media_type IS NOT NULL AND media_path IS NULL') - rows = c.fetchall() - conn.close() - - total_messages = len(rows) - if total_messages == 0: - print(f"No media files to reprocess for channel {channel}.") - return - - for index, (message_id,) in enumerate(rows): - try: - entity = await client.get_entity(PeerChannel(int(channel))) - message = await client.get_messages(entity, ids=message_id) - media_path = await download_media(channel, message) - if media_path: - conn = sqlite3.connect(db_file) - c = conn.cursor() - c.execute('''UPDATE messages SET media_path = ? WHERE message_id = ?''', (media_path, message_id)) - conn.commit() - conn.close() - - progress = (index + 1) / total_messages * 100 - sys.stdout.write(f"\rReprocessing media for channel {channel}: {progress:.2f}% complete") - sys.stdout.flush() - except Exception as e: - print(f"Error reprocessing message {message_id}: {e}") - print() - -async def scrape_channel(channel, offset_id): - try: - if channel.startswith('-'): - entity = await client.get_entity(PeerChannel(int(channel))) - else: - entity = await client.get_entity(channel) - logging.info("get entity end!") - total_messages = 0 - processed_messages = 0 - - logging.info("统计总信息数 ") - result = await client.get_messages(entity, offset_id=offset_id, reverse=True, limit=0) - total_messages = result.total - - if total_messages == 0: - print(f"No messages found in channel {channel}.") - return - logging.info("开始爬取") - last_message_id = None - processed_messages = 0 - initial_value = state['channels'][channel] - with tqdm(total=total_messages, desc=f"Scraping channel: {channel}", unit="msg",initial=initial_value) as pbar: - async for message in client.iter_messages(entity, offset_id=offset_id, reverse=True): - try: - sender = await message.get_sender() - save_message_to_db(channel, message, sender) - - if state['scrape_media'] and message.media: - media_path = await download_media(channel, message) - if media_path: - conn = sqlite3.connect(os.path.join(channel, f'{channel}.db')) - c = conn.cursor() - c.execute('''UPDATE messages SET media_path = ? WHERE message_id = ?''', (media_path, message.id)) - conn.commit() - conn.close() - - last_message_id = message.id - processed_messages += 1 - - pbar.update(1) - - state['channels'][channel] = last_message_id - save_state(state) - except Exception as e: - print(f"Error processing message {message.id}: {e}") - print() - except ValueError as e: - print(f"Error with channel {channel}: {e}") - -async def continuous_scraping(): - global continuous_scraping_active - continuous_scraping_active = True - - try: - while continuous_scraping_active: - for channel in state['channels']: - print(f"\nChecking for new messages in channel: {channel}") - await scrape_channel(channel, state['channels'][channel]) - print(f"New messages or media scraped from channel: {channel}") - await asyncio.sleep(60) - except asyncio.CancelledError: - print("Continuous scraping stopped.") - continuous_scraping_active = False - -async def export_data(): - for channel in state['channels']: - if state['channels'][channel] == 0: - print(f"No messages to export for channel {channel}. Skipping export.") - continue - export_to_csv(channel) - export_to_json(channel) - -def export_to_csv(channel): - db_file = os.path.join(channel, f'{channel}.db') - csv_file = os.path.join(channel, f'{channel}.csv') - # print(f"Trying to open database file: {db_file}") - conn = sqlite3.connect(db_file) - c = conn.cursor() - c.execute('SELECT * FROM messages') - rows = c.fetchall() - conn.close() - - with open(csv_file, 'w', newline='', encoding='utf-8') as f: - writer = csv.writer(f) - writer.writerow([description[0] for description in c.description]) - writer.writerows(rows) - -def export_to_json(channel): - db_file = os.path.join(channel, f'{channel}.db') - json_file = os.path.join(channel, f'{channel}.json') - conn = sqlite3.connect(db_file) - c = conn.cursor() - c.execute('SELECT * FROM messages') - rows = c.fetchall() - conn.close() - - data = [dict(zip([description[0] for description in c.description], row)) for row in rows] - with open(json_file, 'w', encoding='utf-8') as f: - json.dump(data, f, ensure_ascii=False, indent=4) - -async def view_channels(): - if not state['channels']: - print("No channels to view.") - return - - print("\nCurrent channels:") - for channel, last_id in state['channels'].items(): - print(f"Channel ID: {channel}, Last Message ID: {last_id}") - -async def list_Channels(): - try: - print("\nList of channels joined by account: ") - async for dialog in client.iter_dialogs(): - if (dialog.id != 777000): - print(f"* {dialog.title} (id: {dialog.id})") - except Exception as e: - print(f"Error processing: {e}") - - -async def manage_channels(): - while True: - print("\nMenu:") - print("[A] Add new channel") - print("[addOnList] Add new channel from csv file") - print("[R] Remove channel") - print("[S] Scrape all channels") - print("[M] Toggle media scraping (currently {})".format( - "enabled" if state['scrape_media'] else "disabled")) - print("[C] Continuous scraping") - print("[E] Export data") - print("[V] View saved channels") - print("[L] List account channels") - - print("[Q] Quit") - - choice = input("Enter your choice: ").lower() - match (choice): - case 'a': - channel = input("Enter channel ID: ") - state['channels'][channel] = 0 - save_state(state) - print(f"Added channel {channel}.") - case 'r': - channel = input("Enter channel ID to remove: ") - if channel in state['channels']: - del state['channels'][channel] - save_state(state) - print(f"Removed channel {channel}.") - else: - print(f"Channel {channel} not found.") - case 's': - for channel in state['channels']: - await scrape_channel(channel, state['channels'][channel]) - case 'm': - state['scrape_media'] = not state['scrape_media'] - save_state(state) - print( - f"Media scraping {'enabled' if state['scrape_media'] else 'disabled'}.") - case 'c': - global continuous_scraping_active - continuous_scraping_active = True - task = asyncio.create_task(continuous_scraping()) - print("Continuous scraping started. Press Ctrl+C to stop.") - try: - await asyncio.sleep(float('inf')) - except KeyboardInterrupt: - continuous_scraping_active = False - task.cancel() - print("\nStopping continuous scraping...") - await task - case 'e': - await export_data() - case 'v': - await view_channels() - case 'q': - print("Quitting...") - sys.exit() - case 'l': - await list_Channels() - case 'addonlist': - #读取csv的第二列 - channel_list = set(state['channels'].keys()) - print("Reading csv file...") - with open('username.csv', 'r') as f: - reader = csv.reader(f) - for row in tqdm(reader, desc="Processing channels"): - row[1] = row[1].strip().lstrip('@') - if row[1] not in channel_list: - state['channels'][row[1]] = 0 - save_state(state) - - - case _: - print("Invalid option.") - -async def main(): - await client.start() - while True: - await manage_channels() - -if __name__ == '__main__': - try: - asyncio.run(main()) - except KeyboardInterrupt: - print("\nProgram interrupted. Exiting...") - sys.exit()