htyproc: add DEBUG trace for task pipeline, TS client, and AI polling

Made-with: Cursor
This commit is contained in:
2026-04-27 00:00:48 +08:00
parent d9b4171771
commit 2f10cebeca
3 changed files with 192 additions and 14 deletions
+19 -4
View File
@@ -31,6 +31,10 @@ impl TsClient {
}
let v: serde_json::Value = resp.json().await?;
if v.get("r").and_then(|x| x.as_bool()) != Some(true) {
tracing::debug!(
response = ?v,
"TsClient.one_pending_task: r=false or missing; treating as no pending task",
);
return Ok(None);
}
let d = v.get("d").cloned().unwrap_or(Value::Null);
@@ -48,8 +52,17 @@ impl TsClient {
.json(task)
.send()
.await?;
if !resp.status().is_success() {
anyhow::bail!("update_task failed: {}", resp.status());
let status = resp.status();
if !status.is_success() {
let body = resp.text().await.unwrap_or_default();
tracing::debug!(
task_id = ?task.task_id,
task_status = ?task.task_status,
http_status = %status,
body = %body,
"TsClient.update_task: HTTP error",
);
anyhow::bail!("update_task failed: {}", status);
}
Ok(())
}
@@ -61,10 +74,11 @@ pub async fn ngx_combine_image(
host: &str,
task: &ReqTask,
) -> anyhow::Result<String> {
let url = format!("{}/api/ngx/image/combine", ngx_base.trim_end_matches('/'));
tracing::debug!(task_id = ?task.task_id, url = %url, "ngx_combine_image: POST");
let client = reqwest::Client::builder()
.timeout(crate::config::http_timeout())
.build()?;
let url = format!("{}/api/ngx/image/combine", ngx_base.trim_end_matches('/'));
let resp = client
.post(&url)
.header("HtySudoerToken", sudo)
@@ -81,10 +95,11 @@ pub async fn ngx_convert_audio(
host: &str,
task: &ReqTask,
) -> anyhow::Result<String> {
let url = format!("{}/api/ngx/audio/convert", ngx_base.trim_end_matches('/'));
tracing::debug!(task_id = ?task.task_id, url = %url, "ngx_convert_audio: POST");
let client = reqwest::Client::builder()
.timeout(crate::config::http_timeout())
.build()?;
let url = format!("{}/api/ngx/audio/convert", ngx_base.trim_end_matches('/'));
let resp = client
.post(&url)
.header("HtySudoerToken", sudo)
+24
View File
@@ -197,6 +197,11 @@ pub async fn processor_loop(rt: Arc<ProcessorRuntime>) {
Some(t) => t,
None => continue,
};
tracing::debug!(
task_id = %task_id,
task_type = tt.as_db_str(),
"picked PENDING task; updating row to PROCESSING",
);
merge_row_status(&mut req, TaskStatus::Processing, &task_id);
if let Err(e) = rt.ts.update_task(&sudo, &host, &req).await {
tracing::error!("set processing: {e:?}");
@@ -222,6 +227,11 @@ async fn run_one(
tt: TaskType,
) -> anyhow::Result<()> {
let task_id = req.task_id.clone().unwrap_or_default();
tracing::debug!(
task_id = %task_id,
task_type = tt.as_db_str(),
"run_one: pipeline start",
);
let res = match tt {
TaskType::UploadPicture => ngx_combine_image(&rt.ngx_url, sudo, host, &req)
.await
@@ -239,6 +249,20 @@ async fn run_one(
}
TaskType::Noop | TaskType::TestUpyunRemove => Ok(()),
};
match &res {
Ok(()) => tracing::debug!(
task_id = %task_id,
task_type = tt.as_db_str(),
"run_one: pipeline ok",
),
Err(e) => tracing::debug!(
task_id = %task_id,
task_type = tt.as_db_str(),
error = %e,
error_dbg = ?e,
"run_one: pipeline failed",
),
}
match res {
Ok(()) => merge_row_status(&mut req, TaskStatus::Done, &task_id),
Err(_) => merge_row_status(&mut req, TaskStatus::Failed, &task_id),
+149 -10
View File
@@ -26,12 +26,14 @@ pub async fn run_ai_score_task(
host: &str,
req: &mut ReqTask,
) -> anyhow::Result<()> {
let ts_tid = req.task_id.as_deref().unwrap_or("");
let raw = req
.payload
.as_ref()
.ok_or_else(|| anyhow::anyhow!("missing payload"))?;
let mut payload = try_parse_ai_score_payload(raw)
.context("AiScorePayload (aligned with Java task_commons; supports legacy nested Redis shape)")?;
tracing::debug!(ts_task_id = %ts_tid, "AI_SCORE: payload parsed; entering compare pipeline");
execute_ai_score_pipeline(rt, sudo, host, req, &mut payload).await?;
req.payload = Some(serde_json::to_value(&payload)?);
Ok(())
@@ -43,6 +45,7 @@ pub async fn run_watermark_task(
host: &str,
req: &mut ReqTask,
) -> anyhow::Result<()> {
let ts_tid = req.task_id.as_deref().unwrap_or("");
let raw = req
.payload
.as_ref()
@@ -51,6 +54,7 @@ pub async fn run_watermark_task(
.context("WatermarkPayload (aligned with Java task_commons; supports legacy nested Redis shape)")?;
let base = rt.ai_url.trim_end_matches('/');
let url = format!("{base}/api/v1/ai/watermark");
tracing::debug!(ts_task_id = %ts_tid, url = %url, "WATERMARK: POST AI watermark");
let resp = rt
.http
.post(&url)
@@ -65,6 +69,11 @@ pub async fn run_watermark_task(
.task_id
.clone()
.ok_or_else(|| anyhow::anyhow!("watermark: missing ai task_id"))?;
tracing::debug!(
ts_task_id = %ts_tid,
ai_task_id = %ai_task_id,
"WATERMARK: Celery job id received; polling until terminal",
);
let updated = poll_ai_until_terminal(rt, sudo, host, &ai_task_id).await?;
if updated.task_status.as_deref() == Some(AI_STATUS_FAILED) {
payload.task_result = Some(updated);
@@ -73,6 +82,7 @@ pub async fn run_watermark_task(
}
payload.task_result = Some(updated);
req.payload = Some(serde_json::to_value(&payload)?);
tracing::debug!(ts_task_id = %ts_tid, "WATERMARK: pipeline ok");
Ok(())
}
@@ -82,12 +92,21 @@ pub async fn run_video_compression_task(
host: &str,
req: &mut ReqTask,
) -> anyhow::Result<()> {
let ts_tid = req.task_id.as_deref().unwrap_or("");
let raw = req
.payload
.as_ref()
.ok_or_else(|| anyhow::anyhow!("missing payload"))?;
let mut payload = try_parse_video_compression_payload(raw)
.context("VideoCompressionPayload (aligned with Java task_commons; supports legacy nested Redis shape)")?;
let file_hint: String = payload.file_url.chars().take(120).collect();
tracing::debug!(
ts_task_id = %ts_tid,
resolution = %payload.resolution_type,
hty_resource_id = ?payload.hty_resource_id,
file_url_prefix = %file_hint,
"VIDEO_COMPRESSION: parsed payload; POST compress",
);
let base = rt.ai_url.trim_end_matches('/');
let url = format!("{base}/api/v1/ai/compress");
let resp = rt
@@ -104,6 +123,11 @@ pub async fn run_video_compression_task(
.task_id
.clone()
.ok_or_else(|| anyhow::anyhow!("compress: missing ai task_id"))?;
tracing::debug!(
ts_task_id = %ts_tid,
ai_task_id = %ai_task_id,
"VIDEO_COMPRESSION: compress job accepted; polling AI",
);
payload.task_result = Some(initial);
req.payload = Some(serde_json::to_value(&payload)?);
@@ -129,9 +153,16 @@ pub async fn run_video_compression_task(
.and_then(|v| v.as_str())
.map(str::to_string)
.ok_or_else(|| anyhow::anyhow!("missing compressed_file_url"))?;
let out_hint: String = video_url.chars().take(120).collect();
tracing::debug!(
ts_task_id = %ts_tid,
compressed_url_prefix = %out_hint,
"VIDEO_COMPRESSION: AI SUCCESS; optional UC/WS updates",
);
if payload.hty_resource_id.is_none() {
req.payload = Some(serde_json::to_value(&payload)?);
tracing::debug!(ts_task_id = %ts_tid, "VIDEO_COMPRESSION: no hty_resource_id; skip UC/WS");
return Ok(());
}
let hty_resource_id = payload.hty_resource_id.clone().unwrap();
@@ -185,6 +216,12 @@ pub async fn run_video_compression_task(
.as_array()
.cloned()
.unwrap_or_else(|| Vec::new());
tracing::debug!(
ts_task_id = %ts_tid,
hty_resource_id = %hty_resource_id,
ref_resource_rows = arr.len(),
"VIDEO_COMPRESSION: UC updated; updating ref_resource rows",
);
let ts_url_short = rt.ts_url.trim_end_matches('/');
let now = chrono::Utc::now().naive_utc();
@@ -208,6 +245,7 @@ pub async fn run_video_compression_task(
}
req.payload = Some(serde_json::to_value(&payload)?);
tracing::debug!(ts_task_id = %ts_tid, "VIDEO_COMPRESSION: pipeline ok");
Ok(())
}
@@ -231,6 +269,11 @@ pub async fn run_audio_file_ai_score_task(
let tt = req
.task_type
.ok_or_else(|| anyhow::anyhow!("missing task_type"))?;
tracing::debug!(
ts_task_id = %task_id,
task_type = tt.as_db_str(),
"AUDIO_FILE_AI_SCORE: ngx convert then UC/WS resource rows",
);
let convert_req = ReqTask {
task_id: Some(task_id.clone()),
@@ -296,6 +339,11 @@ pub async fn run_audio_file_ai_score_task(
.unwrap_or(&upyun_path);
combined.ai_score_payload.compare_url = compare_from_ref.to_string();
tracing::debug!(
ts_task_id = %task_id,
hty_resource_id = %created_id,
"AUDIO_FILE_AI_SCORE: entering AI compare pipeline",
);
execute_ai_score_pipeline(rt, sudo, host, req, &mut combined.ai_score_payload).await?;
req.payload = Some(serde_json::to_value(&combined)?);
Ok(())
@@ -309,10 +357,15 @@ pub(crate) async fn execute_ai_score_pipeline(
req: &ReqTask,
payload: &mut AiScorePayload,
) -> anyhow::Result<()> {
let ts_task_id = req.task_id.as_deref().unwrap_or("");
let task_type = req.task_type.ok_or_else(|| anyhow::anyhow!("missing task_type"))?;
let base = rt.ai_url.trim_end_matches('/');
let compare_url = format!("{base}/api/v1/ai/compare");
tracing::debug!("AI_SCORE -> POST AI compare, url={compare_url}");
tracing::debug!(
ts_task_id = %ts_task_id,
url = %compare_url,
"AI_SCORE: POST compare",
);
let resp = rt
.http
.post(&compare_url)
@@ -328,14 +381,22 @@ pub(crate) async fn execute_ai_score_pipeline(
.task_id
.clone()
.ok_or_else(|| anyhow::anyhow!("compare: missing ai task_id"))?;
tracing::debug!("AI_SCORE -> AI compare ok, ai_task_id={compare_ai_task_id:?}");
tracing::debug!(
ts_task_id = %ts_task_id,
ai_task_id = %compare_ai_task_id,
"AI_SCORE: compare job accepted",
);
let ts_task = wrap_ts_common_task(req);
let ai_task = wrap_common_task(&compare_result, task_type.as_db_str());
let mut req_score = build_req_score(payload, &ts_task, &ai_task, None);
let ws_base = rt.htyws_url.trim_end_matches('/');
let create_url = format!("{ws_base}/api/v1/ws/create_score");
tracing::debug!("AI_SCORE -> POST create_score, url={create_url}");
tracing::debug!(
ts_task_id = %ts_task_id,
url = %create_url,
"AI_SCORE: POST create_score",
);
let cs_resp = rt
.http
.post(&create_url)
@@ -348,7 +409,11 @@ pub(crate) async fn execute_ai_score_pipeline(
let score_d = parse_hty_response(cs_resp).await?;
let score_id = json_value_as_string(&score_d)
.ok_or_else(|| anyhow::anyhow!("create_score: expected string id in d"))?;
tracing::debug!("AI_SCORE -> create_score ok, score_id={score_id}");
tracing::debug!(
ts_task_id = %ts_task_id,
score_id = %score_id,
"AI_SCORE: create_score ok",
);
if let Some(obj) = req_score.as_object_mut() {
obj.insert("id".to_string(), json!(score_id.clone()));
}
@@ -360,9 +425,25 @@ pub(crate) async fn execute_ai_score_pipeline(
let updated = poll_ai_once(rt, sudo, host, &compare_ai_task_id).await?;
let status = updated.task_status.as_deref().unwrap_or("");
tracing::debug!("AI_SCORE -> poll ai_task_id={compare_ai_task_id} retry={retry} status={status}");
tracing::debug!(
ts_task_id = %ts_task_id,
ai_task_id = %compare_ai_task_id,
retry,
task_status = status,
"AI_SCORE: poll get_task_result",
);
if status == AI_STATUS_FAILED {
tracing::error!("AI_SCORE -> AI returned FAILED, ai_task_id={compare_ai_task_id} result={:?}", updated.task_result);
tracing::debug!(
ts_task_id = %ts_task_id,
ai_task_id = %compare_ai_task_id,
task_result = ?updated.task_result,
"AI_SCORE: Celery compare FAILED (detail for TS/Redis payload)",
);
tracing::error!(
ts_task_id = %ts_task_id,
ai_task_id = %compare_ai_task_id,
"AI_SCORE: AI compare FAILED",
);
let ts_t = wrap_ts_common_task(req);
let ai_t = wrap_common_task(&updated, task_type.as_db_str());
let fail_score = build_req_score(payload, &ts_t, &ai_t, Some(&score_id));
@@ -379,7 +460,11 @@ pub(crate) async fn execute_ai_score_pipeline(
anyhow::bail!("AI scoring FAILED for ai_task_id={compare_ai_task_id}");
}
if status == AI_STATUS_SUCCESS {
tracing::debug!("AI_SCORE -> AI returned SUCCESS, ai_task_id={compare_ai_task_id}");
tracing::debug!(
ts_task_id = %ts_task_id,
ai_task_id = %compare_ai_task_id,
"AI_SCORE: compare SUCCESS",
);
payload.task_result = Some(updated.clone());
let ts_t = wrap_ts_common_task(req);
let ai_t = wrap_common_task(&updated, task_type.as_db_str());
@@ -396,12 +481,27 @@ pub(crate) async fn execute_ai_score_pipeline(
.send()
.await?;
let _ = parse_hty_response(upd_resp).await?;
tracing::debug!("AI_SCORE -> update_score ok, score_id={score_id}");
tracing::debug!(
ts_task_id = %ts_task_id,
score_id = %score_id,
"AI_SCORE: update_score ok",
);
return Ok(());
}
retry += 1;
if retry >= max_retry {
tracing::error!("AI_SCORE -> max retries exceeded, ai_task_id={compare_ai_task_id}");
tracing::debug!(
ts_task_id = %ts_task_id,
ai_task_id = %compare_ai_task_id,
last_status = status,
max_retry,
"AI_SCORE: max poll retries exceeded",
);
tracing::error!(
ts_task_id = %ts_task_id,
ai_task_id = %compare_ai_task_id,
"AI_SCORE: compare poll timeout",
);
let ts_t = wrap_ts_common_task(req);
let ai_t = wrap_common_task(&updated, task_type.as_db_str());
let timeout_score = build_req_score(payload, &ts_t, &ai_t, Some(&score_id));
@@ -503,9 +603,19 @@ async fn parse_hty_response(resp: reqwest::Response) -> anyhow::Result<Value> {
let status = resp.status();
let v: Value = resp.json().await?;
if !status.is_success() {
tracing::debug!(
http_status = %status,
response = ?v,
"parse_hty_response: HTTP non-success (body may contain TS/AI error)",
);
anyhow::bail!("http {}: {}", status, v);
}
if v.get("r").and_then(|x| x.as_bool()) != Some(true) {
tracing::debug!(
http_status = %status,
response = ?v,
"parse_hty_response: r=false (check JSON e / d for AI get_task_result etc.)",
);
anyhow::bail!("hty r=false: {}", v);
}
Ok(v.get("d").cloned().unwrap_or(Value::Null))
@@ -540,14 +650,43 @@ async fn poll_ai_until_terminal(
let mut retry: u32 = 0;
loop {
if retry >= max_retry {
tracing::debug!(
ai_task_id,
max_retry,
"poll_ai_until_terminal: exceeded max poll rounds before terminal status",
);
anyhow::bail!("poll_ai_until_terminal: exceeded max retries");
}
sleep(Duration::from_secs(config::ai_poll_interval_secs())).await;
let updated = poll_ai_once(rt, sudo, host, ai_task_id).await?;
let updated = match poll_ai_once(rt, sudo, host, ai_task_id).await {
Ok(u) => u,
Err(e) => {
tracing::debug!(
ai_task_id,
poll_round = retry + 1,
error = %e,
error_dbg = ?e,
"poll_ai_until_terminal: get_task_result request failed",
);
return Err(e);
}
};
let status = updated.task_status.as_deref().unwrap_or("");
if status == AI_STATUS_FAILED || status == AI_STATUS_SUCCESS {
tracing::debug!(
ai_task_id,
poll_round = retry + 1,
task_status = status,
"poll_ai_until_terminal: terminal status",
);
return Ok(updated);
}
tracing::debug!(
ai_task_id,
poll_round = retry + 1,
task_status = status,
"poll_ai_until_terminal: still non-terminal; will sleep and poll again",
);
retry += 1;
}
}