Files
Resume-python/app_web.py
2026-04-27 01:19:38 +08:00

808 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import contextlib
import importlib.util
import io
import json
import os
import sqlite3
import time
from collections import deque
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from fastapi import FastAPI, Form, Query, Request
from fastapi.responses import RedirectResponse
from fastapi.templating import Jinja2Templates
BASE_DIR = Path(__file__).resolve().parent
ENV_FILE = BASE_DIR / ".env"
SCRIPT_FILE = BASE_DIR / "telegram-scraper.py"
STATE_FILE = BASE_DIR / "state.json"
app = FastAPI(title="Telegram Scraper Web Console")
templates = Jinja2Templates(directory=str(BASE_DIR / "templates"))
def read_state_channel_maps() -> Tuple[Dict[str, str], Dict[str, str]]:
if not STATE_FILE.exists():
return {}, {}
try:
raw = json.loads(STATE_FILE.read_text(encoding="utf-8"))
titles = raw.get("channel_titles") or {}
names = raw.get("channel_names") or {}
return {str(k): str(v) for k, v in titles.items()}, {str(k): str(v) for k, v in names.items()}
except Exception:
return {}, {}
def channel_display_name(cid: str, titles: Dict[str, str], names: Dict[str, str]) -> str:
cid = str(cid)
t = (titles.get(cid) or "").strip()
if t and t != cid and not (len(t) > 6 and t.startswith("-") and t[1:].replace("-", "").isdigit()):
return t
u = (names.get(cid) or "no_username").strip()
if u and u != "no_username":
return "@" + u.lstrip("@")
return "未命名频道"
BINARY_ENV_KEYS = frozenset(
{
"SCRAPE_MEDIA",
"HEARTBEAT_ENABLED",
"FORWARD_ONLY_NEW_MESSAGES",
"FORWARD_ONLY_IF_OFFSET_POSITIVE",
"FORWARD_RAW_MENTIONS",
}
)
CONFIG_KEYS: List[Tuple[str, str, str]] = [
("API_ID", "Telegram API_ID", "text"),
("API_HASH", "Telegram API_HASH", "text"),
("PROXY_TYPE", "代理类型", "text"),
("PROXY_HOST", "代理主机", "text"),
("PROXY_PORT", "代理端口", "text"),
("PROXY_USERNAME", "代理账号", "text"),
("PROXY_PASSWORD", "代理密码", "password"),
("SCRAPE_MEDIA", "下载媒体1开/0关", "text"),
("HEARTBEAT_ENABLED", "心跳开关1开/0关", "text"),
("HEARTBEAT_INTERVAL_SECONDS", "心跳间隔秒", "text"),
("HEARTBEAT_TO_CHAT", "心跳目标", "text"),
("ACCOUNT_LIST_EXCLUDE_SUBSTRINGS", "账号列表隐藏(标题含子串则不展示,逗号分隔;* 不隐藏)", "text"),
("MONITOR_CHATS", "监控源(逗号分隔)", "text"),
("FORWARD_TO_CHAT", "推送目标(逗号分隔)", "text"),
("FORWARD_ONLY_NEW_MESSAGES", "仅新消息1开/0关", "text"),
("FORWARD_ONLY_IF_OFFSET_POSITIVE", "仅新消息别名", "text"),
("FORWARD_AS_USERNAME", "@替换目标用户名(不带@", "text"),
("FORWARD_DELAY_SECONDS", "每条推送延迟秒", "text"),
("FORWARD_RAW_MENTIONS", "保留原@不替换1开/0关", "text"),
]
class WebScraperService:
def __init__(self) -> None:
self.module = None
self.scraper = None
self.init_lock = asyncio.Lock()
self.job_lock = asyncio.Lock()
self.job_task: Optional[asyncio.Task] = None
self.job_name: str = ""
self.job_started_at: Optional[float] = None
self.logs: deque[str] = deque(maxlen=1200)
self.continuous_task: Optional[asyncio.Task] = None
self.continuous_started_at: Optional[float] = None
def _append(self, text: str) -> None:
ts = time.strftime("%Y-%m-%d %H:%M:%S")
self.logs.append(f"[{ts}] {text}")
def _load_module(self):
if self.module is not None:
return self.module
spec = importlib.util.spec_from_file_location("telegram_scraper_web_core", str(SCRIPT_FILE))
if not spec or not spec.loader:
raise RuntimeError("无法加载 telegram-scraper.py")
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
self.module = module
return module
async def ensure_ready(self) -> None:
async with self.init_lock:
if self.scraper and self.scraper.client and self.scraper.client.is_connected():
return
module = self._load_module()
if self.scraper is None:
self.scraper = module.OptimizedTelegramScraper()
self._append("正在初始化 Telegram 客户端...")
ok = await self._run_and_capture(self.scraper.initialize_client())
if not ok:
raise RuntimeError("客户端初始化失败,请检查 API_ID/API_HASH 或 session。")
await self._run_and_capture(self.scraper.sync_monitor_chats_from_env())
self.scraper._apply_scrape_media_from_env()
self._append("Telegram 客户端已就绪。")
async def disconnect(self) -> None:
await self.stop_continuous()
if self.scraper and self.scraper.client:
await self.scraper.client.disconnect()
self._append("Telegram 客户端已断开。")
def is_connected(self) -> bool:
return bool(self.scraper and self.scraper.client and self.scraper.client.is_connected())
def current_job_uptime(self) -> str:
if not self.job_started_at or not self.job_task or self.job_task.done():
return "-"
seconds = int(time.time() - self.job_started_at)
return f"{seconds // 3600:02d}:{(seconds % 3600) // 60:02d}:{seconds % 60:02d}"
def is_job_running(self) -> bool:
return self.job_task is not None and not self.job_task.done()
def is_continuous_running(self) -> bool:
return self.continuous_task is not None and not self.continuous_task.done()
def continuous_uptime(self) -> str:
if not self.continuous_started_at or not self.is_continuous_running():
return "-"
seconds = int(time.time() - self.continuous_started_at)
return f"{seconds // 3600:02d}:{(seconds % 3600) // 60:02d}:{seconds % 60:02d}"
async def _run_and_capture(self, awaitable):
buf = io.StringIO()
with contextlib.redirect_stdout(buf), contextlib.redirect_stderr(buf):
result = await awaitable
captured = buf.getvalue().strip()
if captured:
for line in captured.splitlines():
if line.strip():
self._append(line.strip())
return result
async def list_account_channels(self) -> List[Dict]:
await self.ensure_ready()
data = await self._run_and_capture(self.scraper.list_channels())
return data or []
async def list_monitored_channels(self) -> List[Dict[str, str]]:
if not self.scraper:
if not STATE_FILE.exists():
return []
try:
raw = json.loads(STATE_FILE.read_text(encoding="utf-8"))
channels = raw.get("channels", {})
names = raw.get("channel_names", {})
titles = raw.get("channel_titles", {})
return [
{
"channel_id": str(cid),
"last_message_id": str(last_id),
"username": names.get(str(cid), "no_username"),
"display_name": channel_display_name(str(cid), titles, names),
}
for cid, last_id in channels.items()
]
except Exception:
return []
titles = self.scraper.state.get("channel_titles", {})
names = self.scraper.state.get("channel_names", {})
rows: List[Dict[str, str]] = []
for cid, last_id in self.scraper.state.get("channels", {}).items():
rows.append(
{
"channel_id": cid,
"last_message_id": str(last_id),
"username": names.get(cid, "no_username"),
"display_name": channel_display_name(str(cid), titles, names),
}
)
return rows
async def add_channel(self, spec: str) -> str:
await self.ensure_ready()
s = (spec or "").strip()
if not s:
raise ValueError("频道标识不能为空(可用 @用户名、-100…、t.me 链接)")
ent = await self.scraper._resolve_forward_entity(s)
cid = str(self.module.utils.get_peer_id(ent))
username = getattr(ent, "username", None) or "no_username"
title = getattr(ent, "title", None) or cid
if cid in self.scraper.state["channels"]:
return f"频道已存在:{title}{cid}"
self.scraper.state["channels"][cid] = 0
if "channel_names" not in self.scraper.state:
self.scraper.state["channel_names"] = {}
self.scraper.state["channel_names"][cid] = username
if "channel_titles" not in self.scraper.state:
self.scraper.state["channel_titles"] = {}
self.scraper.state["channel_titles"][cid] = str(title) if title else ""
self.scraper.save_state()
return f"已添加频道:{title}{cid}"
async def remove_channel(self, spec: str) -> str:
if not self.scraper:
await self.ensure_ready()
s = (spec or "").strip()
if not s:
raise ValueError("要移除的频道不能为空")
targets = await self.resolve_monitored_channel_ids(s)
removed = 0
for cid in targets:
if cid in self.scraper.state["channels"]:
del self.scraper.state["channels"][cid]
if "channel_names" in self.scraper.state:
self.scraper.state["channel_names"].pop(cid, None)
if "channel_titles" in self.scraper.state:
self.scraper.state["channel_titles"].pop(cid, None)
removed += 1
if removed == 0:
raise ValueError(
"未移除任何频道。请使用all编号 1,2与下方已监控列表从上到下对应"
"完整频道 ID -100…或已在监控列表中的 @用户名 / t.me 链接。"
)
self.scraper.save_state()
return f"已移除 {removed} 个频道"
async def resolve_monitored_channel_ids(self, selection: str) -> List[str]:
"""解析为当前已加入监控state的频道 IDall / 序号 / -100… / @用户名 / t.me。"""
if not self.scraper:
return []
s = (selection or "").strip() or "all"
primary = self.scraper.parse_channel_selection(s)
if primary:
return primary
if s.lower() == "all":
return self.scraper.parse_channel_selection("all")
out: List[str] = []
for token in [x.strip() for x in s.split(",") if x.strip()]:
part = self.scraper.parse_channel_selection(token)
if part:
out.extend(part)
continue
try:
ent = await self.scraper._resolve_forward_entity(token)
cid = str(self.module.utils.get_peer_id(ent))
if cid in self.scraper.state.get("channels", {}):
out.append(cid)
except Exception:
continue
return list(dict.fromkeys(out))
async def _run_job(self, name: str, coro) -> None:
try:
self._append(f"任务开始:{name}")
await self._run_and_capture(coro)
self._append(f"任务完成:{name}")
except Exception as e:
self._append(f"任务失败:{name},错误:{e}")
finally:
self.job_name = ""
self.job_started_at = None
async def start_job(self, name: str, coro) -> str:
async with self.job_lock:
if self.is_job_running():
raise RuntimeError(f"已有任务在运行:{self.job_name}")
self.job_name = name
self.job_started_at = time.time()
self.job_task = asyncio.create_task(self._run_job(name, coro))
return f"任务已启动:{name}"
async def start_scrape_job(self, selection: str) -> str:
await self.ensure_ready()
channels = await self.resolve_monitored_channel_ids(selection)
if not channels:
raise ValueError("没有可抓取频道,请先添加频道。")
async def runner():
for cid in channels:
await self.scraper.scrape_channel(cid, self.scraper.state["channels"][cid])
return await self.start_job(f"抓取({len(channels)}个频道)", runner())
async def start_export_job(self, selection: str) -> str:
await self.ensure_ready()
channels = await self.resolve_monitored_channel_ids(selection)
if not channels:
raise ValueError("没有可导出频道,请先添加频道。")
async def runner():
for cid in channels:
self.scraper.export_to_csv(cid)
self.scraper.export_to_json(cid)
return await self.start_job(f"导出({len(channels)}个频道)", runner())
async def start_rescrape_job(self, selection: str) -> str:
await self.ensure_ready()
channels = await self.resolve_monitored_channel_ids(selection)
if not channels:
raise ValueError("没有可补抓频道,请先添加频道。")
async def runner():
for cid in channels:
await self.scraper.rescrape_media(cid)
return await self.start_job(f"补抓媒体({len(channels)}个频道)", runner())
async def start_continuous(self) -> str:
await self.ensure_ready()
if self.is_continuous_running():
return "持续抓取已在运行中"
if not self.scraper.state.get("channels"):
raise ValueError("当前没有监控频道,请先添加频道。")
async def runner():
try:
await self._run_and_capture(self.scraper.continuous_scraping())
except asyncio.CancelledError:
self._append("持续抓取任务已取消。")
raise
except Exception as e:
self._append(f"持续抓取异常:{e}")
self.continuous_started_at = time.time()
self.continuous_task = asyncio.create_task(runner())
self._append("已启动持续抓取(含心跳逻辑)。")
return "持续抓取已启动"
async def stop_continuous(self) -> str:
if not self.is_continuous_running():
return "持续抓取未运行"
self.scraper.continuous_scraping_active = False
self.continuous_task.cancel()
try:
await self.continuous_task
except asyncio.CancelledError:
pass
self.continuous_task = None
self.continuous_started_at = None
self._append("已停止持续抓取。")
return "持续抓取已停止"
service = WebScraperService()
def _parse_env_lines() -> List[str]:
if not ENV_FILE.exists():
return []
return ENV_FILE.read_text(encoding="utf-8").splitlines()
def read_env_dict() -> Dict[str, str]:
values: Dict[str, str] = {}
for line in _parse_env_lines():
s = line.strip()
if not s or s.startswith("#") or "=" not in s:
continue
k, v = s.split("=", 1)
values[k.strip()] = v.strip()
return values
def write_env_updates(updates: Dict[str, str]) -> None:
existing_lines = _parse_env_lines()
out_lines: List[str] = []
touched = set()
for raw_line in existing_lines:
s = raw_line.strip()
if not s or s.startswith("#") or "=" not in s:
out_lines.append(raw_line)
continue
k, _v = raw_line.split("=", 1)
key = k.strip()
if key in updates:
out_lines.append(f"{key}={updates[key]}")
touched.add(key)
else:
out_lines.append(raw_line)
if out_lines and out_lines[-1].strip() != "":
out_lines.append("")
for key, value in updates.items():
if key not in touched:
out_lines.append(f"{key}={value}")
ENV_FILE.write_text("\n".join(out_lines).rstrip() + "\n", encoding="utf-8")
DEFAULT_STATS_KEYWORDS: Tuple[str, ...] = (
"招聘",
"急聘",
"岗位",
"求职",
"诚聘",
"内推",
"hiring",
"recruit",
)
def find_channel_databases(base: Path) -> List[Tuple[str, Path]]:
out: List[Tuple[str, Path]] = []
if not base.exists():
return out
for d in base.iterdir():
if not d.is_dir():
continue
name = d.name
if name.startswith(".") or name == "__pycache__":
continue
cand = d / f"{name}.db"
if cand.is_file():
out.append((name, cand))
return sorted(out, key=lambda x: x[0])
def count_state_monitored_channels() -> int:
if not STATE_FILE.exists():
return 0
try:
data = json.loads(STATE_FILE.read_text(encoding="utf-8"))
return len(data.get("channels") or {})
except Exception:
return 0
def normalize_stats_keywords_param(raw: str) -> Tuple[str, ...]:
parts: List[str] = []
for x in (raw or "").split(","):
s = x.strip().replace("%", "").replace("_", "")
if s:
parts.append(s)
return tuple(parts) if parts else DEFAULT_STATS_KEYWORDS
def compute_storage_stats(base: Path, days: int, keyword_list: Tuple[str, ...]) -> Dict[str, Any]:
titles, names = read_state_channel_maps()
dbs = find_channel_databases(base)
end = datetime.utcnow().date()
start = end - timedelta(days=max(days, 1) - 1)
start_day = start.strftime("%Y-%m-%d")
end_day = end.strftime("%Y-%m-%d")
day_keys: List[str] = []
cur = start
while cur <= end:
day_keys.append(cur.strftime("%Y-%m-%d"))
cur += timedelta(days=1)
daily_total: Dict[str, int] = {d: 0 for d in day_keys}
daily_kw: Dict[str, int] = {d: 0 for d in day_keys}
total_all_time = 0
in_range_total = 0
in_range_kw = 0
by_channel: List[Dict[str, Any]] = []
kw_sql = " OR ".join(["(message LIKE ?)"] * len(keyword_list))
kw_params = [f"%{k}%" for k in keyword_list]
for channel_id, path in dbs:
conn = sqlite3.connect(str(path))
try:
cur_sql = conn.cursor()
cur_sql.execute("SELECT COUNT(*) FROM messages")
total_all_time += int(cur_sql.fetchone()[0])
cur_sql.execute(
"SELECT COUNT(*) FROM messages WHERE substr(date,1,10) >= ? AND substr(date,1,10) <= ?",
(start_day, end_day),
)
ch_range = int(cur_sql.fetchone()[0])
in_range_total += ch_range
cur_sql.execute(
f"SELECT COUNT(*) FROM messages WHERE substr(date,1,10) >= ? AND substr(date,1,10) <= ? AND ({kw_sql})",
(start_day, end_day, *kw_params),
)
ch_kw = int(cur_sql.fetchone()[0])
in_range_kw += ch_kw
cur_sql.execute(
"SELECT substr(date,1,10) AS d, COUNT(*) FROM messages WHERE substr(date,1,10) >= ? AND substr(date,1,10) <= ? GROUP BY d",
(start_day, end_day),
)
for d, c in cur_sql.fetchall():
ds = str(d)
if ds in daily_total:
daily_total[ds] += int(c)
cur_sql.execute(
f"SELECT substr(date,1,10) AS d, COUNT(*) FROM messages WHERE substr(date,1,10) >= ? AND substr(date,1,10) <= ? AND ({kw_sql}) GROUP BY d",
(start_day, end_day, *kw_params),
)
for d, c in cur_sql.fetchall():
ds = str(d)
if ds in daily_kw:
daily_kw[ds] += int(c)
by_channel.append(
{
"channel_id": channel_id,
"display_name": channel_display_name(str(channel_id), titles, names),
"messages_in_range": ch_range,
"job_like_in_range": ch_kw,
}
)
finally:
conn.close()
by_channel.sort(key=lambda x: -x["messages_in_range"])
return {
"days": days,
"range_start": start_day,
"range_end": end_day,
"keywords_used": list(keyword_list),
"total_messages_all_time": total_all_time,
"messages_in_range": in_range_total,
"job_like_in_range": in_range_kw,
"database_count": len(dbs),
"by_channel": by_channel[:24],
"daily_messages": [[d, daily_total[d]] for d in day_keys],
"daily_job_like": [[d, daily_kw[d]] for d in day_keys],
}
@app.get("/api/stats/overview")
async def api_stats_overview(
days: int = Query(30, ge=1, le=366),
keywords: str = Query("", description="逗号分隔关键词,用于估算「招聘类」消息;留空用内置词表"),
):
kw = normalize_stats_keywords_param(keywords)
def run() -> Dict[str, Any]:
data = compute_storage_stats(BASE_DIR, days, kw)
data["state_monitored_count"] = count_state_monitored_channels()
return data
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, run)
@app.get("/")
async def index(request: Request):
env_vals = read_env_dict()
fields = [
{"key": k, "label": label, "type": field_type, "value": env_vals.get(k, "")}
for k, label, field_type in CONFIG_KEYS
]
monitored = await service.list_monitored_channels()
monitored_ids = {str(row["channel_id"]) for row in monitored}
account_channels: List[Dict] = []
err = ""
try:
account_channels = await service.list_account_channels()
except Exception as e:
err = str(e)
return templates.TemplateResponse(
"index.html",
{
"request": request,
"fields": fields,
"binary_env_keys": list(BINARY_ENV_KEYS),
"monitored": monitored,
"monitored_ids": monitored_ids,
"account_channels": account_channels,
"connected": service.is_connected(),
"job_running": service.is_job_running(),
"job_name": service.job_name,
"job_uptime": service.current_job_uptime(),
"continuous_running": service.is_continuous_running(),
"continuous_uptime": service.continuous_uptime(),
"logs": "\n".join(service.logs),
"error": err,
},
)
@app.post("/config")
async def save_config(request: Request):
form = await request.form()
updates: Dict[str, str] = {}
for k, _label, _field_type in CONFIG_KEYS:
if k in BINARY_ENV_KEYS:
updates[k] = "1" if str(form.get(k, "")).strip() == "1" else "0"
else:
updates[k] = str(form.get(k, "")).strip()
write_env_updates(updates)
return RedirectResponse(url="/", status_code=303)
@app.post("/start")
async def start_scraper():
try:
await service.ensure_ready()
except Exception as e:
service._append(f"初始化失败:{e}")
return RedirectResponse(url="/", status_code=303)
@app.post("/stop")
async def stop_scraper():
await service.disconnect()
return RedirectResponse(url="/", status_code=303)
@app.post("/channels/add")
async def add_channel(channel_spec: str = Form(...)):
try:
msg = await service.add_channel(channel_spec)
service._append(msg)
except Exception as e:
service._append(f"添加频道失败:{e}")
return RedirectResponse(url="/", status_code=303)
@app.post("/channels/add-selected")
async def add_channels_selected(request: Request):
form = await request.form()
raw_ids = form.getlist("channel_id")
ids = [str(x).strip() for x in raw_ids if str(x).strip()]
if not ids:
service._append("未选择任何频道,请在列表中勾选后再提交。")
return RedirectResponse(url="/", status_code=303)
for spec in ids:
try:
msg = await service.add_channel(spec)
service._append(msg)
except Exception as e:
service._append(f"添加失败({spec}{e}")
return RedirectResponse(url="/", status_code=303)
@app.post("/channels/remove")
async def remove_channel(channel_spec: str = Form(...)):
try:
msg = await service.remove_channel(channel_spec)
service._append(msg)
except Exception as e:
service._append(f"移除频道失败:{e}")
return RedirectResponse(url="/", status_code=303)
@app.post("/jobs/scrape")
async def start_scrape(selection: str = Form("all")):
try:
msg = await service.start_scrape_job(selection)
service._append(msg)
except Exception as e:
service._append(f"启动抓取失败:{e}")
return RedirectResponse(url="/", status_code=303)
@app.post("/jobs/export")
async def start_export(selection: str = Form("all")):
try:
msg = await service.start_export_job(selection)
service._append(msg)
except Exception as e:
service._append(f"启动导出失败:{e}")
return RedirectResponse(url="/", status_code=303)
@app.post("/jobs/rescrape")
async def start_rescrape(selection: str = Form("all")):
try:
msg = await service.start_rescrape_job(selection)
service._append(msg)
except Exception as e:
service._append(f"启动补抓失败:{e}")
return RedirectResponse(url="/", status_code=303)
@app.post("/jobs/continuous/start")
async def start_continuous():
try:
msg = await service.start_continuous()
service._append(msg)
except Exception as e:
service._append(f"启动持续抓取失败:{e}")
return RedirectResponse(url="/", status_code=303)
@app.post("/jobs/continuous/stop")
async def stop_continuous():
try:
msg = await service.stop_continuous()
service._append(msg)
except Exception as e:
service._append(f"停止持续抓取失败:{e}")
return RedirectResponse(url="/", status_code=303)
@app.get("/status")
async def status():
return {
"ready": service.scraper is not None,
"connected": service.is_connected(),
"job_running": service.is_job_running(),
"job_name": service.job_name,
"job_uptime": service.current_job_uptime(),
"continuous_running": service.is_continuous_running(),
"continuous_uptime": service.continuous_uptime(),
"monitored_channels": await service.list_monitored_channels(),
"logs": list(service.logs),
}
@app.get("/api/channels/monitored")
async def api_monitored_channels():
return {"items": await service.list_monitored_channels()}
@app.get("/api/channels/account")
async def api_account_channels():
try:
items = await service.list_account_channels()
return {"items": items}
except Exception as e:
return {"items": [], "error": str(e)}
@app.post("/api/channels/add")
async def api_add_channel(channel_spec: str = Form(...)):
msg = await service.add_channel(channel_spec)
service._append(msg)
return {"ok": True, "message": msg}
@app.post("/api/channels/remove")
async def api_remove_channel(channel_spec: str = Form(...)):
msg = await service.remove_channel(channel_spec)
service._append(msg)
return {"ok": True, "message": msg}
@app.post("/api/jobs/scrape")
async def api_job_scrape(selection: str = Form("all")):
msg = await service.start_scrape_job(selection)
service._append(msg)
return {"ok": True, "message": msg}
@app.post("/api/jobs/export")
async def api_job_export(selection: str = Form("all")):
msg = await service.start_export_job(selection)
service._append(msg)
return {"ok": True, "message": msg}
@app.post("/api/jobs/rescrape")
async def api_job_rescrape(selection: str = Form("all")):
msg = await service.start_rescrape_job(selection)
service._append(msg)
return {"ok": True, "message": msg}
@app.get("/api/jobs/status")
async def api_job_status():
return {
"connected": service.is_connected(),
"job_running": service.is_job_running(),
"job_name": service.job_name,
"job_uptime": service.current_job_uptime(),
"continuous_running": service.is_continuous_running(),
"continuous_uptime": service.continuous_uptime(),
"logs": list(service.logs),
}
@app.post("/api/jobs/continuous/start")
async def api_start_continuous():
msg = await service.start_continuous()
service._append(msg)
return {"ok": True, "message": msg}
@app.post("/api/jobs/continuous/stop")
async def api_stop_continuous():
msg = await service.stop_continuous()
service._append(msg)
return {"ok": True, "message": msg}