Files
huike-back/htyproc/src/processor.rs
T
weli 44c320d8fa chore add core rust project files and diesel migrations
Track required workspace crates, scripts, and historical diesel migrations so the repository contains the complete runnable backend baseline.

Made-with: Cursor
2026-04-23 17:20:01 +08:00

190 lines
6.2 KiB
Rust

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use htyts_models::{ReqTask, TaskStatus, TaskType};
use crate::clients::{merge_row_status, ngx_combine_image, ngx_convert_audio, TsClient};
use crate::config;
use crate::redis_task::RedisTask;
use crate::tasks;
pub static PROCESSING: AtomicUsize = AtomicUsize::new(0);
/// Mirrors Java `ProcessorHelper.Status`.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProcMachineStatus {
Pending,
Running,
Abort,
Error,
}
impl ProcMachineStatus {
pub const fn as_java_str(&self) -> &'static str {
match self {
Self::Pending => "PENDING",
Self::Running => "RUNNING",
Self::Abort => "ABORT",
Self::Error => "ERROR",
}
}
}
pub struct ProcessorRuntime {
pub machine_status: Mutex<ProcMachineStatus>,
pub ts: TsClient,
pub redis: RedisTask,
pub sudo_token: tokio::sync::Mutex<Option<String>>,
pub ts_domain: String,
pub ngx_url: String,
/// Shared HTTP client for AI / htyws / htyuc (same timeout as `TsClient`).
pub http: reqwest::Client,
pub ai_url: String,
pub htyws_url: String,
pub htyuc_url: String,
/// `TS_URL` — used as `updated_by` on ref resources (Java `tsUrl`).
pub ts_url: String,
}
impl ProcessorRuntime {
pub async fn new() -> anyhow::Result<Arc<Self>> {
let ts = TsClient::new(&config::ts_url()?);
let redis = RedisTask::connect(&config::redis_url()?).await?;
let ts_domain = config::ts_domain()?;
let ngx_url = config::ngx_url()?;
let ai_url = config::ai_url()?;
let htyws_url = config::htyws_url()?;
let htyuc_url = config::htyuc_url()?;
let ts_url = config::ts_url()?;
let http = reqwest::Client::builder()
.timeout(config::http_timeout())
.build()?;
Ok(Arc::new(Self {
machine_status: Mutex::new(ProcMachineStatus::Pending),
ts,
redis,
sudo_token: tokio::sync::Mutex::new(std::env::var("PROC_SUDOER_TOKEN").ok()),
ts_domain,
ngx_url,
http,
ai_url,
htyws_url,
htyuc_url,
ts_url,
}))
}
pub fn proc_status(&self) -> ProcMachineStatus {
*self.machine_status.lock().expect("proc status mutex poisoned")
}
pub async fn sudo_token(&self) -> Option<String> {
self.sudo_token.lock().await.clone()
}
}
pub async fn processor_loop(rt: Arc<ProcessorRuntime>) {
tracing::info!("processor loop started");
loop {
{
let st = *rt.machine_status.lock().expect("proc status mutex poisoned");
if st != ProcMachineStatus::Running {
break;
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(config::proc_wait_secs())).await;
{
let st = *rt.machine_status.lock().expect("proc status mutex poisoned");
if st != ProcMachineStatus::Running {
break;
}
}
let Some(sudo) = rt.sudo_token().await else {
tracing::warn!("PROC_SUDOER_TOKEN not set; processor idle");
continue;
};
let host = rt.ts_domain.clone();
if PROCESSING.load(Ordering::SeqCst) >= config::max_processing_tasks() {
tracing::debug!("max processing tasks reached; skip poll");
continue;
}
let pending = match rt.ts.one_pending_task(&sudo).await {
Ok(p) => p,
Err(e) => {
tracing::debug!("one_pending_task err: {e:?}");
continue;
}
};
let Some(mut req) = pending else {
continue;
};
let Some(ref tid) = req.task_id else {
continue;
};
let task_id = tid.clone();
if let Ok(Some(p)) = rt.redis.get_payload(&task_id).await {
if let Ok(v) = serde_json::from_str(&p) {
req.payload = Some(v);
}
}
let tt = match req.task_type {
Some(t) => t,
None => continue,
};
merge_row_status(&mut req, TaskStatus::Processing, &task_id);
if let Err(e) = rt.ts.update_task(&sudo, &host, &req).await {
tracing::error!("set processing: {e:?}");
continue;
}
PROCESSING.fetch_add(1, Ordering::SeqCst);
let rt2 = rt.clone();
let sudo2 = sudo.clone();
let host2 = host.clone();
tokio::spawn(async move {
let _ = run_one(&rt2, &sudo2, &host2, req, tt).await;
PROCESSING.fetch_sub(1, Ordering::SeqCst);
});
}
tracing::info!("processor loop stopped");
}
async fn run_one(
rt: &Arc<ProcessorRuntime>,
sudo: &str,
host: &str,
mut req: ReqTask,
tt: TaskType,
) -> anyhow::Result<()> {
let task_id = req.task_id.clone().unwrap_or_default();
let res = match tt {
TaskType::UploadPicture => ngx_combine_image(&rt.ngx_url, sudo, host, &req)
.await
.map(|_| ()),
TaskType::ConvertAudioFile => ngx_convert_audio(&rt.ngx_url, sudo, host, &req)
.await
.map(|_| ()),
TaskType::AiScore => tasks::run_ai_score_task(rt, sudo, host, &mut req).await,
TaskType::AudioFileAiScore => {
tasks::run_audio_file_ai_score_task(rt, sudo, host, &mut req).await
}
TaskType::Watermark => tasks::run_watermark_task(rt, sudo, host, &mut req).await,
TaskType::VideoCompression => {
tasks::run_video_compression_task(rt, sudo, host, &mut req).await
}
TaskType::Noop | TaskType::TestUpyunRemove => Ok(()),
};
match res {
Ok(()) => merge_row_status(&mut req, TaskStatus::Done, &task_id),
Err(_) => merge_row_status(&mut req, TaskStatus::Failed, &task_id),
}
if let Some(ref p) = req.payload {
let existing = rt.redis.get_payload(&task_id).await.ok().flatten();
let merged = crate::redis_payload_merge::merge_task_payload_for_redis(existing.as_deref(), p);
if let Ok(json) = serde_json::to_string(&merged) {
let _ = rt.redis.set_payload(&task_id, &json).await;
}
}
rt.ts.update_task(sudo, host, &req).await?;
Ok(())
}