Delete gai.py

This commit is contained in:
Robert Aitch
2025-07-20 00:36:53 +02:00
committed by GitHub
parent f383f222c4
commit 57bf125ca1

405
gai.py
View File

@@ -1,405 +0,0 @@
import os
import sqlite3
import json
import csv
import asyncio
from telethon import TelegramClient
from telethon.tl.types import MessageMediaPhoto, MessageMediaDocument, User, PeerChannel
from telethon.errors import FloodWaitError, RPCError
import aiohttp
import sys
import logging
import os
from datetime import datetime
from tqdm import tqdm
# 定义日志文件夹路径
log_folder = 'logs'
# 如果日志文件夹不存在,则创建它
if not os.path.exists(log_folder):
os.makedirs(log_folder)
script_name = os.path.splitext(os.path.basename(__file__))[0]
current_time = datetime.now().strftime("%Y%m%d%H%M%S")
log_file_name = f"{script_name}_{current_time}.log"
log_file_path = os.path.join(log_folder, log_file_name)
# 基本配置,将日志输出到指定文件夹下的文件
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
filename=log_file_path
)
# 记录日志
logging.info('程序开始')
def display_ascii_art():
WHITE = "\033[97m"
RESET = "\033[0m"
art = r"""
___________________ _________
\__ ___/ _____/ / _____/
| | / \ ___ \_____ \
| | \ \_\ \/ \
|____| \______ /_______ /
\/ \/
"""
print(WHITE + art + RESET)
display_ascii_art()
STATE_FILE = 'state.json'
def load_state():
if os.path.exists(STATE_FILE):
with open(STATE_FILE, 'r') as f:
logging.info('读取状态文件')
return json.load(f)
return {
'api_id': None,
'api_hash': None,
'phone': None,
'channels': {},
'scrape_media': True,
}
def save_state(state):
with open(STATE_FILE, 'w') as f:
# logging.info('保存状态文件')
json.dump(state, f)
state = load_state()
logging.info('状态文件加载完成')
if not state['api_id'] or not state['api_hash'] or not state['phone']:
state['api_id'] = int(input("Enter your API ID: "))
state['api_hash'] = input("Enter your API Hash: ")
state['phone'] = input("Enter your phone number: ")
save_state(state)
client = TelegramClient('session', state['api_id'], state['api_hash'])
logging.info('客户端连接完成')
def save_message_to_db(channel, message, sender):
channel_dir = os.path.join(os.getcwd(), channel)
os.makedirs(channel_dir, exist_ok=True)
db_file = os.path.join(channel_dir, f'{channel}.db')
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute(f'''CREATE TABLE IF NOT EXISTS messages
(id INTEGER PRIMARY KEY,
message_id INTEGER,
date TEXT,
sender_id INTEGER,
first_name TEXT,
last_name TEXT,
username TEXT,
message TEXT,
media_type TEXT,
media_path TEXT,
reply_to INTEGER)''')
c.execute('''INSERT OR IGNORE INTO messages (message_id, date, sender_id, first_name, last_name, username, message, media_type, media_path, reply_to)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''',
(message.id,
message.date.strftime('%Y-%m-%d %H:%M:%S'),
message.sender_id,
getattr(sender, 'first_name', None) if isinstance(sender, User) else None,
getattr(sender, 'last_name', None) if isinstance(sender, User) else None,
getattr(sender, 'username', None) if isinstance(sender, User) else None,
message.message,
message.media.__class__.__name__ if message.media else None,
None,
message.reply_to_msg_id if message.reply_to else None))
conn.commit()
conn.close()
MAX_RETRIES = 5
async def download_media(channel, message):
if not message.media or not state['scrape_media']:
return None
channel_dir = os.path.join(os.getcwd(), channel)
media_folder = os.path.join(channel_dir, 'media')
os.makedirs(media_folder, exist_ok=True)
media_file_name = None
if isinstance(message.media, MessageMediaPhoto):
media_file_name = message.file.name or f"{message.id}.jpg"
logging.info("跳过图片下载")
return None
elif isinstance(message.media, MessageMediaDocument):
media_file_name = message.file.name or f"{message.id}.{message.file.ext if message.file.ext else 'bin'}"
if not media_file_name:
# print(f"Unable to determine file name for message {message.id}. Skipping download.")
logging.info(f"Unable to determine file name for message {message.id}. Skipping download.")
return None
media_path = os.path.join(media_folder, media_file_name)
if os.path.exists(media_path):
logging.info(f"Media file already exists: {media_path}")
return media_path
retries = 0
while retries < MAX_RETRIES:
try:
if isinstance(message.media, MessageMediaPhoto):
logging.info("message.media为MessageMediaPhoto")
media_path = await message.download_media(file=media_folder)
elif isinstance(message.media, MessageMediaDocument):
logging.info("message.media为MessageMediaDocument")
media_path = await message.download_media(file=media_folder)
if media_path:
logging.info(f"Successfully downloaded media to: {media_path}")
break
except (TimeoutError, aiohttp.ClientError, RPCError) as e:
retries += 1
print(f"Retrying download for message {message.id}. Attempt {retries}...")
logging.info(f"Retrying download for message {message.id}. Attempt {retries}...")
logging.info(f"sleep {2 ** retries}s")
await asyncio.sleep(2 ** retries)
return media_path
async def rescrape_media(channel):
channel_dir = os.path.join(os.getcwd(), channel)
db_file = os.path.join(channel_dir, f'{channel}.db')
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute('SELECT message_id FROM messages WHERE media_type IS NOT NULL AND media_path IS NULL')
rows = c.fetchall()
conn.close()
total_messages = len(rows)
if total_messages == 0:
print(f"No media files to reprocess for channel {channel}.")
return
for index, (message_id,) in enumerate(rows):
try:
entity = await client.get_entity(PeerChannel(int(channel)))
message = await client.get_messages(entity, ids=message_id)
media_path = await download_media(channel, message)
if media_path:
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute('''UPDATE messages SET media_path = ? WHERE message_id = ?''', (media_path, message_id))
conn.commit()
conn.close()
progress = (index + 1) / total_messages * 100
sys.stdout.write(f"\rReprocessing media for channel {channel}: {progress:.2f}% complete")
sys.stdout.flush()
except Exception as e:
print(f"Error reprocessing message {message_id}: {e}")
print()
async def scrape_channel(channel, offset_id):
try:
if channel.startswith('-'):
entity = await client.get_entity(PeerChannel(int(channel)))
else:
entity = await client.get_entity(channel)
logging.info("get entity end!")
total_messages = 0
processed_messages = 0
logging.info("统计总信息数 ")
result = await client.get_messages(entity, offset_id=offset_id, reverse=True, limit=0)
total_messages = result.total
if total_messages == 0:
print(f"No messages found in channel {channel}.")
return
logging.info("开始爬取")
last_message_id = None
processed_messages = 0
initial_value = state['channels'][channel]
with tqdm(total=total_messages, desc=f"Scraping channel: {channel}", unit="msg",initial=initial_value) as pbar:
async for message in client.iter_messages(entity, offset_id=offset_id, reverse=True):
try:
sender = await message.get_sender()
save_message_to_db(channel, message, sender)
if state['scrape_media'] and message.media:
media_path = await download_media(channel, message)
if media_path:
conn = sqlite3.connect(os.path.join(channel, f'{channel}.db'))
c = conn.cursor()
c.execute('''UPDATE messages SET media_path = ? WHERE message_id = ?''', (media_path, message.id))
conn.commit()
conn.close()
last_message_id = message.id
processed_messages += 1
pbar.update(1)
state['channels'][channel] = last_message_id
save_state(state)
except Exception as e:
print(f"Error processing message {message.id}: {e}")
print()
except ValueError as e:
print(f"Error with channel {channel}: {e}")
async def continuous_scraping():
global continuous_scraping_active
continuous_scraping_active = True
try:
while continuous_scraping_active:
for channel in state['channels']:
print(f"\nChecking for new messages in channel: {channel}")
await scrape_channel(channel, state['channels'][channel])
print(f"New messages or media scraped from channel: {channel}")
await asyncio.sleep(60)
except asyncio.CancelledError:
print("Continuous scraping stopped.")
continuous_scraping_active = False
async def export_data():
for channel in state['channels']:
if state['channels'][channel] == 0:
print(f"No messages to export for channel {channel}. Skipping export.")
continue
export_to_csv(channel)
export_to_json(channel)
def export_to_csv(channel):
db_file = os.path.join(channel, f'{channel}.db')
csv_file = os.path.join(channel, f'{channel}.csv')
# print(f"Trying to open database file: {db_file}")
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute('SELECT * FROM messages')
rows = c.fetchall()
conn.close()
with open(csv_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow([description[0] for description in c.description])
writer.writerows(rows)
def export_to_json(channel):
db_file = os.path.join(channel, f'{channel}.db')
json_file = os.path.join(channel, f'{channel}.json')
conn = sqlite3.connect(db_file)
c = conn.cursor()
c.execute('SELECT * FROM messages')
rows = c.fetchall()
conn.close()
data = [dict(zip([description[0] for description in c.description], row)) for row in rows]
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
async def view_channels():
if not state['channels']:
print("No channels to view.")
return
print("\nCurrent channels:")
for channel, last_id in state['channels'].items():
print(f"Channel ID: {channel}, Last Message ID: {last_id}")
async def list_Channels():
try:
print("\nList of channels joined by account: ")
async for dialog in client.iter_dialogs():
if (dialog.id != 777000):
print(f"* {dialog.title} (id: {dialog.id})")
except Exception as e:
print(f"Error processing: {e}")
async def manage_channels():
while True:
print("\nMenu:")
print("[A] Add new channel")
print("[addOnList] Add new channel from csv file")
print("[R] Remove channel")
print("[S] Scrape all channels")
print("[M] Toggle media scraping (currently {})".format(
"enabled" if state['scrape_media'] else "disabled"))
print("[C] Continuous scraping")
print("[E] Export data")
print("[V] View saved channels")
print("[L] List account channels")
print("[Q] Quit")
choice = input("Enter your choice: ").lower()
match (choice):
case 'a':
channel = input("Enter channel ID: ")
state['channels'][channel] = 0
save_state(state)
print(f"Added channel {channel}.")
case 'r':
channel = input("Enter channel ID to remove: ")
if channel in state['channels']:
del state['channels'][channel]
save_state(state)
print(f"Removed channel {channel}.")
else:
print(f"Channel {channel} not found.")
case 's':
for channel in state['channels']:
await scrape_channel(channel, state['channels'][channel])
case 'm':
state['scrape_media'] = not state['scrape_media']
save_state(state)
print(
f"Media scraping {'enabled' if state['scrape_media'] else 'disabled'}.")
case 'c':
global continuous_scraping_active
continuous_scraping_active = True
task = asyncio.create_task(continuous_scraping())
print("Continuous scraping started. Press Ctrl+C to stop.")
try:
await asyncio.sleep(float('inf'))
except KeyboardInterrupt:
continuous_scraping_active = False
task.cancel()
print("\nStopping continuous scraping...")
await task
case 'e':
await export_data()
case 'v':
await view_channels()
case 'q':
print("Quitting...")
sys.exit()
case 'l':
await list_Channels()
case 'addonlist':
#读取csv的第二列
channel_list = set(state['channels'].keys())
print("Reading csv file...")
with open('username.csv', 'r') as f:
reader = csv.reader(f)
for row in tqdm(reader, desc="Processing channels"):
row[1] = row[1].strip().lstrip('@')
if row[1] not in channel_list:
state['channels'][row[1]] = 0
save_state(state)
case _:
print("Invalid option.")
async def main():
await client.start()
while True:
await manage_channels()
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nProgram interrupted. Exiting...")
sys.exit()