Framepack-CLI/api_server.py
2025-04-26 10:04:08 +02:00

302 lines
9.9 KiB
Python

# api_server_fixed.py - Korrigierte REST-API für den Videogenerator mit job_id-Unterstützung
# Im webui-Verzeichnis speichern
from fastapi import FastAPI, BackgroundTasks
from fastapi.responses import FileResponse
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
import os
import subprocess
import uuid
from pydantic import BaseModel
from typing import Optional, List, Dict
# Datenmodell für die Videogenerierungsanfrage
class VideoGenerationRequest(BaseModel):
image_path: str
prompt: str
n_prompt: Optional[str] = ""
seed: Optional[int] = 31337
length: Optional[float] = 5.0
steps: Optional[int] = 25
use_teacache: Optional[bool] = True
output_filename: Optional[str] = "output.mp4"
app = FastAPI(title="Hunyuan Video Generator API")
# CORS für n8n-Zugriff erlauben
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Im Produktivbetrieb einschränken auf n8n-Server-IP
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Verzeichnis für Ausgaben
OUTPUT_DIR = os.path.join(os.path.dirname(__file__), "outputs")
PYTHON_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "system", "python", "python.exe")
os.makedirs(OUTPUT_DIR, exist_ok=True)
# In-Memory Speicher für laufende Jobs
active_jobs = {}
def generate_video_task(image_path: str, prompt: str, n_prompt: str,
seed: int, length: float, steps: int,
use_teacache: bool, output_filename: str,
job_id: str):
"""Führt die Videogenerierung als Hintergrundaufgabe aus"""
# Absoluten Pfad sicherstellen
if not os.path.isabs(image_path):
# Wenn relativer Pfad, relativ zum aktuellen Verzeichnis interpretieren
image_path = os.path.abspath(os.path.join(os.path.dirname(__file__), image_path))
# Prüfen, ob die Bilddatei existiert
if not os.path.exists(image_path):
print(f"Fehler: Bilddatei nicht gefunden: {image_path}")
error_file = os.path.join(OUTPUT_DIR, f"{job_id}_error.txt")
with open(error_file, 'w') as f:
f.write(f"Fehler: Bilddatei nicht gefunden: {image_path}")
return
# Befehlszeile erstellen - WICHTIG: Wir geben die job_id als Parameter weiter
cmd = [
PYTHON_PATH,
"hunyuan_cli.py", # Aktualisierte Version mit job_id-Unterstützung
"--image", image_path,
"--prompt", prompt,
"--seed", str(seed),
"--length", str(length),
"--steps", str(steps),
"--output", output_filename,
"--job_id", job_id # Wichtig: Gebe job_id weiter
]
if n_prompt:
cmd.extend(["--n_prompt", n_prompt])
if use_teacache:
cmd.append("--teacache")
# Befehl ausführen und Ausgabe protokollieren
try:
print(f"Starte Videogenerierung mit Befehl: {' '.join(cmd)}")
process = subprocess.run(cmd,
check=True,
cwd=os.path.dirname(__file__),
capture_output=True,
text=True)
# Ausgabe in Protokolldatei schreiben
log_path = os.path.join(OUTPUT_DIR, f"{job_id}_log.txt")
with open(log_path, 'w') as log_file:
log_file.write(f"STDOUT:\n{process.stdout}\n\nSTDERR:\n{process.stderr}")
print(f"Video generation completed for job {job_id}")
# Aus aktiven Jobs entfernen
if job_id in active_jobs:
del active_jobs[job_id]
except subprocess.CalledProcessError as e:
print(f"Error generating video: {e}")
# Fehlermeldung in Protokolldatei schreiben
error_path = os.path.join(OUTPUT_DIR, f"{job_id}_error.txt")
with open(error_path, 'w') as error_file:
error_file.write(f"Command: {' '.join(cmd)}\n\nError:\n{str(e)}\n\nSTDOUT:\n{e.stdout}\n\nSTDERR:\n{e.stderr}")
# Aus aktiven Jobs entfernen
if job_id in active_jobs:
del active_jobs[job_id]
@app.post("/generate/")
async def generate_video(
request: VideoGenerationRequest,
background_tasks: BackgroundTasks
):
"""
Generiert ein Video basierend auf den angegebenen Parametern und einem lokalen Bildpfad
"""
# Eindeutige Job-ID generieren
job_id = str(uuid.uuid4())
# In aktive Jobs aufnehmen
active_jobs[job_id] = {
"status": "initializing",
"parameters": request.dict()
}
# Statusdatei sofort erstellen
status_path = os.path.join(OUTPUT_DIR, f"{job_id}_status.txt")
with open(status_path, 'w') as f:
f.write(f"Status: Initialisierung\nJob-ID: {job_id}\nBild: {request.image_path}\nPrompt: {request.prompt}\n")
# Videogenerierung als Hintergrundaufgabe starten
background_tasks.add_task(
generate_video_task,
request.image_path,
request.prompt,
request.n_prompt,
request.seed,
request.length,
request.steps,
request.use_teacache,
request.output_filename,
job_id
)
return {
"status": "processing",
"job_id": job_id,
"message": "Video generation started in background",
"result_url": f"/result/{job_id}_{request.output_filename}"
}
@app.get("/result/{filename}")
async def get_result(filename: str):
"""
Gibt das generierte Video zurück, wenn es verfügbar ist
"""
file_path = os.path.join(OUTPUT_DIR, filename)
if os.path.exists(file_path):
return FileResponse(
file_path,
media_type="video/mp4",
filename=filename.split("_", 1)[1] if "_" in filename else filename # Entferne Job-ID vom Dateinamen
)
else:
return {"status": "not_found", "message": f"Requested video not found or still processing: {file_path}"}
@app.get("/status/{job_id}")
async def check_status(job_id: str):
"""
Prüft den Status einer Videogenerierung
"""
# 1. Prüfen, ob der Job in den aktiven Jobs ist
if job_id in active_jobs:
return {
"status": "processing",
"job_id": job_id,
"details": "Job wird verarbeitet",
"active_job": True
}
# 2. Prüfen, ob eine Statusdatei existiert
status_file = os.path.join(OUTPUT_DIR, f"{job_id}_status.txt")
if os.path.exists(status_file):
with open(status_file, 'r') as f:
status_content = f.read()
# 3. Prüfen, ob ein fertiges Video existiert
video_files = [f for f in os.listdir(OUTPUT_DIR)
if f.startswith(job_id) and f.endswith('.mp4')]
if video_files:
return {
"status": "completed",
"job_id": job_id,
"files": video_files,
"download_urls": [f"/result/{file}" for file in video_files],
"status_info": status_content
}
# 4. Prüfen, ob ein Fehler aufgetreten ist
error_file = os.path.join(OUTPUT_DIR, f"{job_id}_error.txt")
if os.path.exists(error_file):
with open(error_file, 'r') as f:
error_text = f.read()
return {
"status": "error",
"job_id": job_id,
"error": error_text,
"status_info": status_content
}
# 5. Sonst läuft der Job noch
return {
"status": "processing",
"job_id": job_id,
"status_info": status_content
}
# 6. Prüfen auf alte Dateien ohne Status-Datei
video_files = [f for f in os.listdir(OUTPUT_DIR)
if f.startswith(job_id) and f.endswith('.mp4')]
if video_files:
return {
"status": "completed",
"job_id": job_id,
"files": video_files,
"download_urls": [f"/result/{file}" for file in video_files],
"note": "Status file not found, but video exists"
}
# 7. Prüfen, ob ein Fehlerprotokoll existiert ohne Status-Datei
error_file = os.path.join(OUTPUT_DIR, f"{job_id}_error.txt")
if os.path.exists(error_file):
with open(error_file, 'r') as f:
error_text = f.read()
return {
"status": "error",
"job_id": job_id,
"error": error_text,
"note": "Status file not found, but error exists"
}
# 8. Prüfen, ob ein Ausführungsprotokoll existiert ohne Status-Datei
log_file = os.path.join(OUTPUT_DIR, f"{job_id}_log.txt")
if os.path.exists(log_file):
return {
"status": "unknown",
"job_id": job_id,
"note": "Status file not found, but log exists, check logs for details"
}
# 9. Keine Dateien oder Informationen zu dieser Job-ID gefunden
return {"status": "not_found", "job_id": job_id}
@app.get("/debug/")
async def debug():
"""
Gibt Debug-Informationen zurück
"""
try:
# Alle Dateien im OUTPUT_DIR auflisten
files = os.listdir(OUTPUT_DIR)
# Gruppieren nach Präfix
file_groups = {}
for file in files:
parts = file.split('_', 1)
if len(parts) > 1:
prefix = parts[0]
if prefix not in file_groups:
file_groups[prefix] = []
file_groups[prefix].append(file)
return {
"active_jobs": active_jobs,
"output_dir": OUTPUT_DIR,
"files_count": len(files),
"file_groups": file_groups
}
except Exception as e:
return {"error": str(e)}
@app.get("/debug/files/")
async def list_files():
"""
Listet alle Dateien im OUTPUT_DIR auf
"""
try:
files = os.listdir(OUTPUT_DIR)
return {"files": files}
except Exception as e:
return {"error": str(e)}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)