From 2f10cebecaa3d8af64ddeede0527befda1e4c9c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=98=BF=E7=94=B7?= Date: Mon, 27 Apr 2026 00:00:48 +0800 Subject: [PATCH] htyproc: add DEBUG trace for task pipeline, TS client, and AI polling Made-with: Cursor --- htyproc/src/clients.rs | 23 ++++- htyproc/src/processor.rs | 24 +++++ htyproc/src/tasks/pipelines.rs | 159 ++++++++++++++++++++++++++++++--- 3 files changed, 192 insertions(+), 14 deletions(-) diff --git a/htyproc/src/clients.rs b/htyproc/src/clients.rs index 905a596..9c1dbae 100644 --- a/htyproc/src/clients.rs +++ b/htyproc/src/clients.rs @@ -31,6 +31,10 @@ impl TsClient { } let v: serde_json::Value = resp.json().await?; if v.get("r").and_then(|x| x.as_bool()) != Some(true) { + tracing::debug!( + response = ?v, + "TsClient.one_pending_task: r=false or missing; treating as no pending task", + ); return Ok(None); } let d = v.get("d").cloned().unwrap_or(Value::Null); @@ -48,8 +52,17 @@ impl TsClient { .json(task) .send() .await?; - if !resp.status().is_success() { - anyhow::bail!("update_task failed: {}", resp.status()); + let status = resp.status(); + if !status.is_success() { + let body = resp.text().await.unwrap_or_default(); + tracing::debug!( + task_id = ?task.task_id, + task_status = ?task.task_status, + http_status = %status, + body = %body, + "TsClient.update_task: HTTP error", + ); + anyhow::bail!("update_task failed: {}", status); } Ok(()) } @@ -61,10 +74,11 @@ pub async fn ngx_combine_image( host: &str, task: &ReqTask, ) -> anyhow::Result { + let url = format!("{}/api/ngx/image/combine", ngx_base.trim_end_matches('/')); + tracing::debug!(task_id = ?task.task_id, url = %url, "ngx_combine_image: POST"); let client = reqwest::Client::builder() .timeout(crate::config::http_timeout()) .build()?; - let url = format!("{}/api/ngx/image/combine", ngx_base.trim_end_matches('/')); let resp = client .post(&url) .header("HtySudoerToken", sudo) @@ -81,10 +95,11 @@ pub async fn ngx_convert_audio( host: &str, task: &ReqTask, ) -> anyhow::Result { + let url = format!("{}/api/ngx/audio/convert", ngx_base.trim_end_matches('/')); + tracing::debug!(task_id = ?task.task_id, url = %url, "ngx_convert_audio: POST"); let client = reqwest::Client::builder() .timeout(crate::config::http_timeout()) .build()?; - let url = format!("{}/api/ngx/audio/convert", ngx_base.trim_end_matches('/')); let resp = client .post(&url) .header("HtySudoerToken", sudo) diff --git a/htyproc/src/processor.rs b/htyproc/src/processor.rs index 48b3114..b0113c2 100644 --- a/htyproc/src/processor.rs +++ b/htyproc/src/processor.rs @@ -197,6 +197,11 @@ pub async fn processor_loop(rt: Arc) { Some(t) => t, None => continue, }; + tracing::debug!( + task_id = %task_id, + task_type = tt.as_db_str(), + "picked PENDING task; updating row to PROCESSING", + ); merge_row_status(&mut req, TaskStatus::Processing, &task_id); if let Err(e) = rt.ts.update_task(&sudo, &host, &req).await { tracing::error!("set processing: {e:?}"); @@ -222,6 +227,11 @@ async fn run_one( tt: TaskType, ) -> anyhow::Result<()> { let task_id = req.task_id.clone().unwrap_or_default(); + tracing::debug!( + task_id = %task_id, + task_type = tt.as_db_str(), + "run_one: pipeline start", + ); let res = match tt { TaskType::UploadPicture => ngx_combine_image(&rt.ngx_url, sudo, host, &req) .await @@ -239,6 +249,20 @@ async fn run_one( } TaskType::Noop | TaskType::TestUpyunRemove => Ok(()), }; + match &res { + Ok(()) => tracing::debug!( + task_id = %task_id, + task_type = tt.as_db_str(), + "run_one: pipeline ok", + ), + Err(e) => tracing::debug!( + task_id = %task_id, + task_type = tt.as_db_str(), + error = %e, + error_dbg = ?e, + "run_one: pipeline failed", + ), + } match res { Ok(()) => merge_row_status(&mut req, TaskStatus::Done, &task_id), Err(_) => merge_row_status(&mut req, TaskStatus::Failed, &task_id), diff --git a/htyproc/src/tasks/pipelines.rs b/htyproc/src/tasks/pipelines.rs index d2cee90..0c8e8b3 100644 --- a/htyproc/src/tasks/pipelines.rs +++ b/htyproc/src/tasks/pipelines.rs @@ -26,12 +26,14 @@ pub async fn run_ai_score_task( host: &str, req: &mut ReqTask, ) -> anyhow::Result<()> { + let ts_tid = req.task_id.as_deref().unwrap_or(""); let raw = req .payload .as_ref() .ok_or_else(|| anyhow::anyhow!("missing payload"))?; let mut payload = try_parse_ai_score_payload(raw) .context("AiScorePayload (aligned with Java task_commons; supports legacy nested Redis shape)")?; + tracing::debug!(ts_task_id = %ts_tid, "AI_SCORE: payload parsed; entering compare pipeline"); execute_ai_score_pipeline(rt, sudo, host, req, &mut payload).await?; req.payload = Some(serde_json::to_value(&payload)?); Ok(()) @@ -43,6 +45,7 @@ pub async fn run_watermark_task( host: &str, req: &mut ReqTask, ) -> anyhow::Result<()> { + let ts_tid = req.task_id.as_deref().unwrap_or(""); let raw = req .payload .as_ref() @@ -51,6 +54,7 @@ pub async fn run_watermark_task( .context("WatermarkPayload (aligned with Java task_commons; supports legacy nested Redis shape)")?; let base = rt.ai_url.trim_end_matches('/'); let url = format!("{base}/api/v1/ai/watermark"); + tracing::debug!(ts_task_id = %ts_tid, url = %url, "WATERMARK: POST AI watermark"); let resp = rt .http .post(&url) @@ -65,6 +69,11 @@ pub async fn run_watermark_task( .task_id .clone() .ok_or_else(|| anyhow::anyhow!("watermark: missing ai task_id"))?; + tracing::debug!( + ts_task_id = %ts_tid, + ai_task_id = %ai_task_id, + "WATERMARK: Celery job id received; polling until terminal", + ); let updated = poll_ai_until_terminal(rt, sudo, host, &ai_task_id).await?; if updated.task_status.as_deref() == Some(AI_STATUS_FAILED) { payload.task_result = Some(updated); @@ -73,6 +82,7 @@ pub async fn run_watermark_task( } payload.task_result = Some(updated); req.payload = Some(serde_json::to_value(&payload)?); + tracing::debug!(ts_task_id = %ts_tid, "WATERMARK: pipeline ok"); Ok(()) } @@ -82,12 +92,21 @@ pub async fn run_video_compression_task( host: &str, req: &mut ReqTask, ) -> anyhow::Result<()> { + let ts_tid = req.task_id.as_deref().unwrap_or(""); let raw = req .payload .as_ref() .ok_or_else(|| anyhow::anyhow!("missing payload"))?; let mut payload = try_parse_video_compression_payload(raw) .context("VideoCompressionPayload (aligned with Java task_commons; supports legacy nested Redis shape)")?; + let file_hint: String = payload.file_url.chars().take(120).collect(); + tracing::debug!( + ts_task_id = %ts_tid, + resolution = %payload.resolution_type, + hty_resource_id = ?payload.hty_resource_id, + file_url_prefix = %file_hint, + "VIDEO_COMPRESSION: parsed payload; POST compress", + ); let base = rt.ai_url.trim_end_matches('/'); let url = format!("{base}/api/v1/ai/compress"); let resp = rt @@ -104,6 +123,11 @@ pub async fn run_video_compression_task( .task_id .clone() .ok_or_else(|| anyhow::anyhow!("compress: missing ai task_id"))?; + tracing::debug!( + ts_task_id = %ts_tid, + ai_task_id = %ai_task_id, + "VIDEO_COMPRESSION: compress job accepted; polling AI", + ); payload.task_result = Some(initial); req.payload = Some(serde_json::to_value(&payload)?); @@ -129,9 +153,16 @@ pub async fn run_video_compression_task( .and_then(|v| v.as_str()) .map(str::to_string) .ok_or_else(|| anyhow::anyhow!("missing compressed_file_url"))?; + let out_hint: String = video_url.chars().take(120).collect(); + tracing::debug!( + ts_task_id = %ts_tid, + compressed_url_prefix = %out_hint, + "VIDEO_COMPRESSION: AI SUCCESS; optional UC/WS updates", + ); if payload.hty_resource_id.is_none() { req.payload = Some(serde_json::to_value(&payload)?); + tracing::debug!(ts_task_id = %ts_tid, "VIDEO_COMPRESSION: no hty_resource_id; skip UC/WS"); return Ok(()); } let hty_resource_id = payload.hty_resource_id.clone().unwrap(); @@ -185,6 +216,12 @@ pub async fn run_video_compression_task( .as_array() .cloned() .unwrap_or_else(|| Vec::new()); + tracing::debug!( + ts_task_id = %ts_tid, + hty_resource_id = %hty_resource_id, + ref_resource_rows = arr.len(), + "VIDEO_COMPRESSION: UC updated; updating ref_resource rows", + ); let ts_url_short = rt.ts_url.trim_end_matches('/'); let now = chrono::Utc::now().naive_utc(); @@ -208,6 +245,7 @@ pub async fn run_video_compression_task( } req.payload = Some(serde_json::to_value(&payload)?); + tracing::debug!(ts_task_id = %ts_tid, "VIDEO_COMPRESSION: pipeline ok"); Ok(()) } @@ -231,6 +269,11 @@ pub async fn run_audio_file_ai_score_task( let tt = req .task_type .ok_or_else(|| anyhow::anyhow!("missing task_type"))?; + tracing::debug!( + ts_task_id = %task_id, + task_type = tt.as_db_str(), + "AUDIO_FILE_AI_SCORE: ngx convert then UC/WS resource rows", + ); let convert_req = ReqTask { task_id: Some(task_id.clone()), @@ -296,6 +339,11 @@ pub async fn run_audio_file_ai_score_task( .unwrap_or(&upyun_path); combined.ai_score_payload.compare_url = compare_from_ref.to_string(); + tracing::debug!( + ts_task_id = %task_id, + hty_resource_id = %created_id, + "AUDIO_FILE_AI_SCORE: entering AI compare pipeline", + ); execute_ai_score_pipeline(rt, sudo, host, req, &mut combined.ai_score_payload).await?; req.payload = Some(serde_json::to_value(&combined)?); Ok(()) @@ -309,10 +357,15 @@ pub(crate) async fn execute_ai_score_pipeline( req: &ReqTask, payload: &mut AiScorePayload, ) -> anyhow::Result<()> { + let ts_task_id = req.task_id.as_deref().unwrap_or(""); let task_type = req.task_type.ok_or_else(|| anyhow::anyhow!("missing task_type"))?; let base = rt.ai_url.trim_end_matches('/'); let compare_url = format!("{base}/api/v1/ai/compare"); - tracing::debug!("AI_SCORE -> POST AI compare, url={compare_url}"); + tracing::debug!( + ts_task_id = %ts_task_id, + url = %compare_url, + "AI_SCORE: POST compare", + ); let resp = rt .http .post(&compare_url) @@ -328,14 +381,22 @@ pub(crate) async fn execute_ai_score_pipeline( .task_id .clone() .ok_or_else(|| anyhow::anyhow!("compare: missing ai task_id"))?; - tracing::debug!("AI_SCORE -> AI compare ok, ai_task_id={compare_ai_task_id:?}"); + tracing::debug!( + ts_task_id = %ts_task_id, + ai_task_id = %compare_ai_task_id, + "AI_SCORE: compare job accepted", + ); let ts_task = wrap_ts_common_task(req); let ai_task = wrap_common_task(&compare_result, task_type.as_db_str()); let mut req_score = build_req_score(payload, &ts_task, &ai_task, None); let ws_base = rt.htyws_url.trim_end_matches('/'); let create_url = format!("{ws_base}/api/v1/ws/create_score"); - tracing::debug!("AI_SCORE -> POST create_score, url={create_url}"); + tracing::debug!( + ts_task_id = %ts_task_id, + url = %create_url, + "AI_SCORE: POST create_score", + ); let cs_resp = rt .http .post(&create_url) @@ -348,7 +409,11 @@ pub(crate) async fn execute_ai_score_pipeline( let score_d = parse_hty_response(cs_resp).await?; let score_id = json_value_as_string(&score_d) .ok_or_else(|| anyhow::anyhow!("create_score: expected string id in d"))?; - tracing::debug!("AI_SCORE -> create_score ok, score_id={score_id}"); + tracing::debug!( + ts_task_id = %ts_task_id, + score_id = %score_id, + "AI_SCORE: create_score ok", + ); if let Some(obj) = req_score.as_object_mut() { obj.insert("id".to_string(), json!(score_id.clone())); } @@ -360,9 +425,25 @@ pub(crate) async fn execute_ai_score_pipeline( let updated = poll_ai_once(rt, sudo, host, &compare_ai_task_id).await?; let status = updated.task_status.as_deref().unwrap_or(""); - tracing::debug!("AI_SCORE -> poll ai_task_id={compare_ai_task_id} retry={retry} status={status}"); + tracing::debug!( + ts_task_id = %ts_task_id, + ai_task_id = %compare_ai_task_id, + retry, + task_status = status, + "AI_SCORE: poll get_task_result", + ); if status == AI_STATUS_FAILED { - tracing::error!("AI_SCORE -> AI returned FAILED, ai_task_id={compare_ai_task_id} result={:?}", updated.task_result); + tracing::debug!( + ts_task_id = %ts_task_id, + ai_task_id = %compare_ai_task_id, + task_result = ?updated.task_result, + "AI_SCORE: Celery compare FAILED (detail for TS/Redis payload)", + ); + tracing::error!( + ts_task_id = %ts_task_id, + ai_task_id = %compare_ai_task_id, + "AI_SCORE: AI compare FAILED", + ); let ts_t = wrap_ts_common_task(req); let ai_t = wrap_common_task(&updated, task_type.as_db_str()); let fail_score = build_req_score(payload, &ts_t, &ai_t, Some(&score_id)); @@ -379,7 +460,11 @@ pub(crate) async fn execute_ai_score_pipeline( anyhow::bail!("AI scoring FAILED for ai_task_id={compare_ai_task_id}"); } if status == AI_STATUS_SUCCESS { - tracing::debug!("AI_SCORE -> AI returned SUCCESS, ai_task_id={compare_ai_task_id}"); + tracing::debug!( + ts_task_id = %ts_task_id, + ai_task_id = %compare_ai_task_id, + "AI_SCORE: compare SUCCESS", + ); payload.task_result = Some(updated.clone()); let ts_t = wrap_ts_common_task(req); let ai_t = wrap_common_task(&updated, task_type.as_db_str()); @@ -396,12 +481,27 @@ pub(crate) async fn execute_ai_score_pipeline( .send() .await?; let _ = parse_hty_response(upd_resp).await?; - tracing::debug!("AI_SCORE -> update_score ok, score_id={score_id}"); + tracing::debug!( + ts_task_id = %ts_task_id, + score_id = %score_id, + "AI_SCORE: update_score ok", + ); return Ok(()); } retry += 1; if retry >= max_retry { - tracing::error!("AI_SCORE -> max retries exceeded, ai_task_id={compare_ai_task_id}"); + tracing::debug!( + ts_task_id = %ts_task_id, + ai_task_id = %compare_ai_task_id, + last_status = status, + max_retry, + "AI_SCORE: max poll retries exceeded", + ); + tracing::error!( + ts_task_id = %ts_task_id, + ai_task_id = %compare_ai_task_id, + "AI_SCORE: compare poll timeout", + ); let ts_t = wrap_ts_common_task(req); let ai_t = wrap_common_task(&updated, task_type.as_db_str()); let timeout_score = build_req_score(payload, &ts_t, &ai_t, Some(&score_id)); @@ -503,9 +603,19 @@ async fn parse_hty_response(resp: reqwest::Response) -> anyhow::Result { let status = resp.status(); let v: Value = resp.json().await?; if !status.is_success() { + tracing::debug!( + http_status = %status, + response = ?v, + "parse_hty_response: HTTP non-success (body may contain TS/AI error)", + ); anyhow::bail!("http {}: {}", status, v); } if v.get("r").and_then(|x| x.as_bool()) != Some(true) { + tracing::debug!( + http_status = %status, + response = ?v, + "parse_hty_response: r=false (check JSON e / d for AI get_task_result etc.)", + ); anyhow::bail!("hty r=false: {}", v); } Ok(v.get("d").cloned().unwrap_or(Value::Null)) @@ -540,14 +650,43 @@ async fn poll_ai_until_terminal( let mut retry: u32 = 0; loop { if retry >= max_retry { + tracing::debug!( + ai_task_id, + max_retry, + "poll_ai_until_terminal: exceeded max poll rounds before terminal status", + ); anyhow::bail!("poll_ai_until_terminal: exceeded max retries"); } sleep(Duration::from_secs(config::ai_poll_interval_secs())).await; - let updated = poll_ai_once(rt, sudo, host, ai_task_id).await?; + let updated = match poll_ai_once(rt, sudo, host, ai_task_id).await { + Ok(u) => u, + Err(e) => { + tracing::debug!( + ai_task_id, + poll_round = retry + 1, + error = %e, + error_dbg = ?e, + "poll_ai_until_terminal: get_task_result request failed", + ); + return Err(e); + } + }; let status = updated.task_status.as_deref().unwrap_or(""); if status == AI_STATUS_FAILED || status == AI_STATUS_SUCCESS { + tracing::debug!( + ai_task_id, + poll_round = retry + 1, + task_status = status, + "poll_ai_until_terminal: terminal status", + ); return Ok(updated); } + tracing::debug!( + ai_task_id, + poll_round = retry + 1, + task_status = status, + "poll_ai_until_terminal: still non-terminal; will sleep and poll again", + ); retry += 1; } }