chore add core rust project files and diesel migrations
Track required workspace crates, scripts, and historical diesel migrations so the repository contains the complete runnable backend baseline. Made-with: Cursor
This commit is contained in:
@@ -0,0 +1,110 @@
|
||||
//! Sudoer auth aligned with Java `CheckAuthFilter` + `RedisTokenCacheService` (`TS_SUDO_T_*`), not `HW_T_*`.
|
||||
|
||||
use axum::http::StatusCode;
|
||||
use axum::response::IntoResponse;
|
||||
use axum::Json;
|
||||
use htycommons::common::{HtyErr, HtyErrCode, HtyResponse};
|
||||
use htycommons::jwt::jwt_decode_token;
|
||||
use serde::Deserialize;
|
||||
|
||||
use crate::config;
|
||||
use crate::redis_store::RedisTaskStore;
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct UcVerifyBody {
|
||||
r: bool,
|
||||
}
|
||||
|
||||
/// Returns `401` JSON body compatible with Java `sudoErr` when rejected.
|
||||
pub async fn verify_task_sudoer(
|
||||
redis: &RedisTaskStore,
|
||||
host: &str,
|
||||
sudo_jwt: &str,
|
||||
) -> Result<(), axum::response::Response> {
|
||||
if !config::auth_checking() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let token = match jwt_decode_token(&sudo_jwt.to_string()) {
|
||||
Ok(t) => t,
|
||||
Err(e) => return Err(sudo_reject(&e.to_string())),
|
||||
};
|
||||
|
||||
let tid = token.token_id.clone();
|
||||
let cache_key = htyts_models::sudoer_cache_redis_key(&tid);
|
||||
if let Ok(Some(s)) = redis.get_raw(&cache_key).await {
|
||||
if !s.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
if !config::token_verify() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let uc_base = match std::env::var("HTYUC_URL") {
|
||||
Ok(u) => u,
|
||||
Err(_) => return Err(sudo_reject("HTYUC_URL not set (TOKEN_VERIFY=true)")),
|
||||
};
|
||||
let url = format!(
|
||||
"{}/api/v1/uc/verify_jwt_token",
|
||||
uc_base.trim_end_matches('/')
|
||||
);
|
||||
let client = reqwest::Client::builder()
|
||||
.timeout(config::http_client_timeout())
|
||||
.build()
|
||||
.map_err(|e| sudo_reject(&e.to_string()))?;
|
||||
let resp = client
|
||||
.post(&url)
|
||||
.header("HtyHost", host)
|
||||
.header("Authorization", sudo_jwt)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| sudo_reject(&e.to_string()))?;
|
||||
let status = resp.status();
|
||||
let text = resp
|
||||
.text()
|
||||
.await
|
||||
.map_err(|e| sudo_reject(&e.to_string()))?;
|
||||
let parsed: UcVerifyBody = serde_json::from_str(&text).unwrap_or(UcVerifyBody { r: false });
|
||||
if !status.is_success() || !parsed.r {
|
||||
return Err(sudo_reject(&format!(
|
||||
"uc verify_jwt_token failed: http={status} body={text}"
|
||||
)));
|
||||
}
|
||||
|
||||
let json = serde_json::to_string(&token).map_err(|e| sudo_reject(&e.to_string()))?;
|
||||
let ttl = config::exp_days_for_sudo_cache().saturating_mul(24 * 3600);
|
||||
redis
|
||||
.set_sudoer_cache_json(&tid, &json, ttl)
|
||||
.await
|
||||
.map_err(|e| sudo_reject(&e.to_string()))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn missing_sudo_header() -> axum::response::Response {
|
||||
let body = HtyResponse::<()> {
|
||||
r: false,
|
||||
d: None,
|
||||
e: Some("HtySudoerTokenErr -> empty `HtySudoerToken` header".to_string()),
|
||||
hty_err: Some(HtyErr {
|
||||
code: HtyErrCode::AuthenticationFailed,
|
||||
reason: Some("empty `HtySudoerToken` header".to_string()),
|
||||
}),
|
||||
};
|
||||
(StatusCode::FORBIDDEN, Json(body)).into_response()
|
||||
}
|
||||
|
||||
fn sudo_reject(msg: &str) -> axum::response::Response {
|
||||
let body = HtyResponse::<()> {
|
||||
r: false,
|
||||
d: None,
|
||||
e: Some(format!("HtySudoerTokenErr -> {msg}")),
|
||||
hty_err: Some(HtyErr {
|
||||
code: HtyErrCode::AuthenticationFailed,
|
||||
reason: Some(msg.to_string()),
|
||||
}),
|
||||
};
|
||||
(StatusCode::UNAUTHORIZED, Json(body)).into_response()
|
||||
}
|
||||
@@ -0,0 +1,76 @@
|
||||
use anyhow::Context;
|
||||
use std::env;
|
||||
use std::time::Duration;
|
||||
|
||||
/// Minutes before PENDING/PROCESSING tasks are treated as zombies (Java `task_server.zombie_min`).
|
||||
pub fn zombie_minutes() -> i64 {
|
||||
env::var("ZOMBIE_MIN")
|
||||
.ok()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(10)
|
||||
}
|
||||
|
||||
pub fn ts_database_url() -> anyhow::Result<String> {
|
||||
env::var("TS_DATABASE_URL")
|
||||
.or_else(|_| env::var("DATABASE_URL"))
|
||||
.context("TS_DATABASE_URL or DATABASE_URL must be set")
|
||||
}
|
||||
|
||||
pub fn ts_domain() -> anyhow::Result<String> {
|
||||
env::var("TS_DOMAIN").context("TS_DOMAIN must be set")
|
||||
}
|
||||
|
||||
pub fn htyuc_base_url() -> anyhow::Result<String> {
|
||||
env::var("HTYUC_URL").context("HTYUC_URL must be set for kc jobs / sudo")
|
||||
}
|
||||
|
||||
pub fn htykc_base_url() -> anyhow::Result<String> {
|
||||
env::var("HTYKC_URL").context("HTYKC_URL must be set for kc jobs")
|
||||
}
|
||||
|
||||
pub fn kc_cron_default_expr() -> String {
|
||||
env::var("KC_CRON_EXPR").unwrap_or_else(|_| "0 0 20 1/1 * ?".to_string())
|
||||
}
|
||||
|
||||
pub fn ts_port() -> u16 {
|
||||
std::env::var("TS_PORT")
|
||||
.ok()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(3003)
|
||||
}
|
||||
|
||||
pub fn http_client_timeout() -> Duration {
|
||||
Duration::from_secs(
|
||||
env::var("TS_HTTP_TIMEOUT_SECS")
|
||||
.ok()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(120),
|
||||
)
|
||||
}
|
||||
|
||||
/// Mirrors Java `task_server.auth_checking` (`CheckAuthFilter`).
|
||||
pub fn auth_checking() -> bool {
|
||||
env::var("AUTH_CHECKING")
|
||||
.or_else(|_| env::var("task_server.auth_checking"))
|
||||
.ok()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(true)
|
||||
}
|
||||
|
||||
/// Mirrors Java `task_server.token_verify` (UC `verify_jwt_token` on cache miss).
|
||||
pub fn token_verify() -> bool {
|
||||
env::var("TOKEN_VERIFY")
|
||||
.or_else(|_| env::var("task_server.token_verify"))
|
||||
.ok()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(true)
|
||||
}
|
||||
|
||||
/// Sudoer cache TTL in days (Java `cn.alchemystudio.taskserver.exp_days`).
|
||||
pub fn exp_days_for_sudo_cache() -> u64 {
|
||||
env::var("EXP_DAYS")
|
||||
.or_else(|_| env::var("cn.alchemystudio.taskserver.exp_days"))
|
||||
.ok()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(30)
|
||||
}
|
||||
@@ -0,0 +1,326 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::extract::{Path, Query, State};
|
||||
use axum::http::{HeaderMap, StatusCode};
|
||||
use axum::response::IntoResponse;
|
||||
use axum::Json;
|
||||
use chrono::Utc;
|
||||
use htycommons::common::{HtyErr, HtyErrCode, HtyResponse};
|
||||
use htycommons::web::{wrap_anyhow_err, wrap_ok_resp, HtyHostHeader};
|
||||
use htycommons::web::HtyToken;
|
||||
use htyts_models::{ReqCron, ReqTask, ReqTasksWithPage, TaskStatus};
|
||||
use serde::Deserialize;
|
||||
use std::str::FromStr;
|
||||
|
||||
use crate::merge::{apply_write_defaults, inject_hty_id_for_upload_picture, row_to_req_task, split_req_for_persist};
|
||||
use crate::state::TsState;
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct PageQuery {
|
||||
pub page: Option<String>,
|
||||
pub page_size: Option<String>,
|
||||
}
|
||||
|
||||
pub async fn hello_ts() -> &'static str {
|
||||
"-=Task Server=-"
|
||||
}
|
||||
|
||||
pub async fn create_task(
|
||||
State(state): State<Arc<TsState>>,
|
||||
host: HtyHostHeader,
|
||||
headers: HeaderMap,
|
||||
Json(req): Json<ReqTask>,
|
||||
) -> Result<(StatusCode, Json<HtyResponse<String>>), axum::response::Response> {
|
||||
let sudo = match headers.get("HtySudoerToken").and_then(|v| v.to_str().ok()) {
|
||||
Some(s) => s,
|
||||
None => return Err(crate::check_auth::missing_sudo_header()),
|
||||
};
|
||||
run_create_or_update(state, host, sudo, req, true).await
|
||||
}
|
||||
|
||||
pub async fn update_task(
|
||||
State(state): State<Arc<TsState>>,
|
||||
host: HtyHostHeader,
|
||||
headers: HeaderMap,
|
||||
Json(req): Json<ReqTask>,
|
||||
) -> Result<(StatusCode, Json<HtyResponse<String>>), axum::response::Response> {
|
||||
let sudo = match headers.get("HtySudoerToken").and_then(|v| v.to_str().ok()) {
|
||||
Some(s) => s,
|
||||
None => return Err(crate::check_auth::missing_sudo_header()),
|
||||
};
|
||||
run_create_or_update(state, host, sudo, req, false).await
|
||||
}
|
||||
|
||||
async fn run_create_or_update(
|
||||
state: Arc<TsState>,
|
||||
host: HtyHostHeader,
|
||||
sudo_jwt: &str,
|
||||
mut req: ReqTask,
|
||||
is_create: bool,
|
||||
) -> Result<(StatusCode, Json<HtyResponse<String>>), axum::response::Response> {
|
||||
crate::check_auth::verify_task_sudoer(&state.redis, &host.0, sudo_jwt).await?;
|
||||
|
||||
let now = Utc::now().naive_utc();
|
||||
apply_write_defaults(&mut req, &host.0, now);
|
||||
|
||||
let hty_id = HtyToken::from_jwt(&sudo_jwt.to_string())
|
||||
.map(|t| t.hty_id)
|
||||
.unwrap_or(None);
|
||||
|
||||
if let Err(e) = inject_hty_id_for_upload_picture(&mut req, hty_id.clone()) {
|
||||
return Err(Json(wrap_anyhow_err::<String>(anyhow::anyhow!(e))).into_response());
|
||||
}
|
||||
|
||||
let task_id = if let Some(ref id) = req.task_id {
|
||||
if id.is_empty() {
|
||||
uuid::Uuid::new_v4().to_string()
|
||||
} else {
|
||||
id.clone()
|
||||
}
|
||||
} else {
|
||||
uuid::Uuid::new_v4().to_string()
|
||||
};
|
||||
req.task_id = Some(task_id.clone());
|
||||
|
||||
let (row, redis_payload) = match split_req_for_persist(&req, task_id.clone()) {
|
||||
Ok(x) => x,
|
||||
Err(e) => return Err(Json(wrap_anyhow_err::<String>(e)).into_response()),
|
||||
};
|
||||
|
||||
let pool = state.db.clone();
|
||||
let row_cloned = row.clone();
|
||||
if let Err(e) = tokio::task::spawn_blocking(move || crate::upsert_task_row(&pool, &row_cloned))
|
||||
.await
|
||||
.unwrap_or_else(|e| Err(anyhow::anyhow!("join: {e}")))
|
||||
{
|
||||
return Err(Json(wrap_anyhow_err::<String>(e)).into_response());
|
||||
}
|
||||
|
||||
if let Some(json) = redis_payload {
|
||||
if let Err(e) = state.redis.set_payload(&task_id, &json).await {
|
||||
let _ = state.redis.del(&task_id).await;
|
||||
return Err(Json(wrap_anyhow_err::<String>(e)).into_response());
|
||||
}
|
||||
} else if let Err(e) = state.redis.del(&task_id).await {
|
||||
tracing::warn!("redis del empty payload: {e}");
|
||||
}
|
||||
|
||||
let status = if is_create {
|
||||
StatusCode::CREATED
|
||||
} else {
|
||||
StatusCode::OK
|
||||
};
|
||||
Ok((status, Json(wrap_ok_resp(task_id))))
|
||||
}
|
||||
|
||||
pub async fn task_status(
|
||||
State(state): State<Arc<TsState>>,
|
||||
Path(task_id): Path<String>,
|
||||
) -> Json<HtyResponse<TaskStatus>> {
|
||||
let pool = state.db.clone();
|
||||
let tid = task_id.clone();
|
||||
let res = tokio::task::spawn_blocking(move || crate::find_task_by_id(&pool, &tid)).await;
|
||||
match res {
|
||||
Ok(Ok(Some(row))) => {
|
||||
let ts = TaskStatus::from_str(&row.task_status).unwrap_or(TaskStatus::Pending);
|
||||
Json(wrap_ok_resp(ts))
|
||||
}
|
||||
Ok(Ok(None)) => Json(HtyResponse {
|
||||
r: false,
|
||||
d: None,
|
||||
e: Some("not found".into()),
|
||||
hty_err: Some(HtyErr {
|
||||
code: HtyErrCode::DbErr,
|
||||
reason: Some("task not found".into()),
|
||||
}),
|
||||
}),
|
||||
Ok(Err(e)) => Json(wrap_anyhow_err(e)),
|
||||
Err(e) => Json(wrap_anyhow_err(anyhow::anyhow!(e))),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_task(
|
||||
State(state): State<Arc<TsState>>,
|
||||
Path(task_id): Path<String>,
|
||||
) -> Json<HtyResponse<ReqTask>> {
|
||||
let pool = state.db.clone();
|
||||
let tid = task_id.clone();
|
||||
let row_result = tokio::task::spawn_blocking(move || crate::find_task_by_id(&pool, &tid)).await;
|
||||
let row = match row_result {
|
||||
Ok(Ok(Some(r))) => r,
|
||||
Ok(Ok(None)) => {
|
||||
return Json(HtyResponse {
|
||||
r: false,
|
||||
d: None,
|
||||
e: Some("not found".into()),
|
||||
hty_err: Some(HtyErr {
|
||||
code: HtyErrCode::NotFoundErr,
|
||||
reason: Some("task not found".into()),
|
||||
}),
|
||||
});
|
||||
}
|
||||
Ok(Err(e)) => return Json(wrap_anyhow_err(e)),
|
||||
Err(e) => return Json(wrap_anyhow_err(anyhow::anyhow!(e))),
|
||||
};
|
||||
|
||||
let redis_payload = state.redis.get_payload(&task_id).await.ok().flatten();
|
||||
let req = match row_to_req_task(&row, redis_payload.as_deref()) {
|
||||
Ok(r) => r,
|
||||
Err(e) => return Json(wrap_anyhow_err(e)),
|
||||
};
|
||||
Json(wrap_ok_resp(req))
|
||||
}
|
||||
|
||||
pub async fn one_pending_task(
|
||||
State(state): State<Arc<TsState>>,
|
||||
) -> Result<Json<HtyResponse<ReqTask>>, StatusCode> {
|
||||
let pool = state.db.clone();
|
||||
let row = tokio::task::spawn_blocking(move || crate::one_pending_task(&pool))
|
||||
.await
|
||||
.unwrap_or_else(|e| Err(anyhow::anyhow!(e)));
|
||||
let row = match row {
|
||||
Ok(r) => r,
|
||||
Err(_) => return Err(StatusCode::INTERNAL_SERVER_ERROR),
|
||||
};
|
||||
let Some(row) = row else {
|
||||
return Err(StatusCode::NO_CONTENT);
|
||||
};
|
||||
let req = match row_to_req_task(&row, None) {
|
||||
Ok(r) => r,
|
||||
Err(_) => return Err(StatusCode::INTERNAL_SERVER_ERROR),
|
||||
};
|
||||
Ok(Json(wrap_ok_resp(req)))
|
||||
}
|
||||
|
||||
pub async fn one_zombie_task(
|
||||
State(state): State<Arc<TsState>>,
|
||||
) -> Result<Json<HtyResponse<ReqTask>>, StatusCode> {
|
||||
let pool = state.db.clone();
|
||||
let zm = state.zombie_minutes;
|
||||
let row = tokio::task::spawn_blocking(move || crate::one_zombie_task(&pool, zm))
|
||||
.await
|
||||
.unwrap_or_else(|e| Err(anyhow::anyhow!(e)));
|
||||
let row = match row {
|
||||
Ok(r) => r,
|
||||
Err(_) => return Err(StatusCode::INTERNAL_SERVER_ERROR),
|
||||
};
|
||||
let Some(row) = row else {
|
||||
return Err(StatusCode::NO_CONTENT);
|
||||
};
|
||||
let req = match row_to_req_task(&row, None) {
|
||||
Ok(r) => r,
|
||||
Err(_) => return Err(StatusCode::INTERNAL_SERVER_ERROR),
|
||||
};
|
||||
Ok(Json(wrap_ok_resp(req)))
|
||||
}
|
||||
|
||||
pub async fn all_tasks(State(state): State<Arc<TsState>>) -> Json<HtyResponse<Vec<ReqTask>>> {
|
||||
let pool = state.db.clone();
|
||||
let redis = state.redis.clone();
|
||||
let rows = match tokio::task::spawn_blocking(move || crate::all_tasks(&pool)).await {
|
||||
Ok(Ok(r)) => r,
|
||||
Ok(Err(e)) => return Json(wrap_anyhow_err(e)),
|
||||
Err(e) => return Json(wrap_anyhow_err(anyhow::anyhow!(e))),
|
||||
};
|
||||
let mut out = Vec::with_capacity(rows.len());
|
||||
for row in rows {
|
||||
let tid = row.task_id.clone();
|
||||
let payload = redis.get_payload(&tid).await.ok().flatten();
|
||||
match row_to_req_task(&row, payload.as_deref()) {
|
||||
Ok(r) => out.push(r),
|
||||
Err(e) => return Json(wrap_anyhow_err(e)),
|
||||
}
|
||||
}
|
||||
Json(wrap_ok_resp(out))
|
||||
}
|
||||
|
||||
pub async fn all_tasks_with_page(
|
||||
State(state): State<Arc<TsState>>,
|
||||
Query(q): Query<PageQuery>,
|
||||
) -> Json<HtyResponse<ReqTasksWithPage>> {
|
||||
let page: i64 = q.page.as_deref().unwrap_or("1").parse().unwrap_or(1);
|
||||
let page_size: i64 = q.page_size.as_deref().unwrap_or("20").parse().unwrap_or(20);
|
||||
let pool = state.db.clone();
|
||||
let redis = state.redis.clone();
|
||||
let (rows, total_page) = match tokio::task::spawn_blocking(move || {
|
||||
crate::all_tasks_page(&pool, page, page_size)
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(Ok(r)) => r,
|
||||
Ok(Err(e)) => return Json(wrap_anyhow_err(e)),
|
||||
Err(e) => return Json(wrap_anyhow_err(anyhow::anyhow!(e))),
|
||||
};
|
||||
let mut tasks = Vec::with_capacity(rows.len());
|
||||
for row in rows {
|
||||
let tid = row.task_id.clone();
|
||||
let payload = redis.get_payload(&tid).await.ok().flatten();
|
||||
match row_to_req_task(&row, payload.as_deref()) {
|
||||
Ok(r) => tasks.push(r),
|
||||
Err(e) => return Json(wrap_anyhow_err(e)),
|
||||
}
|
||||
}
|
||||
let body = ReqTasksWithPage {
|
||||
tasks,
|
||||
total_page: total_page as i32,
|
||||
};
|
||||
Json(wrap_ok_resp(body))
|
||||
}
|
||||
|
||||
pub async fn del_task(
|
||||
State(state): State<Arc<TsState>>,
|
||||
Path(task_id): Path<String>,
|
||||
) -> Json<HtyResponse<String>> {
|
||||
let pool = state.db.clone();
|
||||
let tid = task_id.clone();
|
||||
if let Err(e) = tokio::task::spawn_blocking(move || crate::delete_task_by_id(&pool, &tid)).await
|
||||
.unwrap_or_else(|e| Err(anyhow::anyhow!(e)))
|
||||
{
|
||||
return Json(wrap_anyhow_err(e));
|
||||
}
|
||||
if let Err(e) = state.redis.del(&task_id).await {
|
||||
return Json(wrap_anyhow_err(e));
|
||||
}
|
||||
Json(wrap_ok_resp(task_id))
|
||||
}
|
||||
|
||||
pub async fn kc_start(
|
||||
State(state): State<Arc<TsState>>,
|
||||
) -> Result<StatusCode, Json<HtyResponse<()>>> {
|
||||
crate::kc::scheduler::start(&state)
|
||||
.await
|
||||
.map_err(|e| Json(wrap_anyhow_err(e)))?;
|
||||
Ok(StatusCode::OK)
|
||||
}
|
||||
|
||||
pub async fn kc_stop(
|
||||
State(state): State<Arc<TsState>>,
|
||||
) -> Result<StatusCode, Json<HtyResponse<()>>> {
|
||||
crate::kc::scheduler::stop(&state)
|
||||
.await
|
||||
.map_err(|e| Json(wrap_anyhow_err(e)))?;
|
||||
Ok(StatusCode::OK)
|
||||
}
|
||||
|
||||
pub async fn kc_status(State(state): State<Arc<TsState>>) -> Json<HtyResponse<String>> {
|
||||
Json(wrap_ok_resp(crate::kc::scheduler::status(&state)))
|
||||
}
|
||||
|
||||
pub async fn kc_curr_cron_expr(State(state): State<Arc<TsState>>) -> Json<HtyResponse<String>> {
|
||||
Json(wrap_ok_resp(crate::kc::scheduler::current_cron_expr(&state)))
|
||||
}
|
||||
|
||||
pub async fn kc_reset(
|
||||
State(state): State<Arc<TsState>>,
|
||||
Json(req): Json<ReqCron>,
|
||||
) -> Result<StatusCode, Json<HtyResponse<()>>> {
|
||||
let expr = req.expr.unwrap_or_default();
|
||||
crate::kc::scheduler::reset(&state, &expr)
|
||||
.await
|
||||
.map_err(|e| Json(wrap_anyhow_err(e)))?;
|
||||
Ok(StatusCode::OK)
|
||||
}
|
||||
|
||||
pub async fn kc_test(State(state): State<Arc<TsState>>) -> (StatusCode, String) {
|
||||
(StatusCode::OK, crate::kc::scheduler::debug_job(&state))
|
||||
}
|
||||
@@ -0,0 +1,2 @@
|
||||
pub mod scheduler;
|
||||
pub mod uber;
|
||||
@@ -0,0 +1,92 @@
|
||||
use std::sync::atomic::{AtomicU8, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use crate::config;
|
||||
use crate::state::TsState;
|
||||
|
||||
static ON: u8 = 1;
|
||||
static OFF: u8 = 0;
|
||||
|
||||
static KC_STATUS: AtomicU8 = AtomicU8::new(OFF);
|
||||
static CRON_EXPR: Lazy<Mutex<String>> = Lazy::new(|| Mutex::new(String::new()));
|
||||
static TICK_HANDLE: Lazy<Mutex<Option<JoinHandle<()>>>> = Lazy::new(|| Mutex::new(None));
|
||||
|
||||
fn tick_interval() -> Duration {
|
||||
let secs = std::env::var("KC_INTERVAL_SECS")
|
||||
.ok()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(3600);
|
||||
Duration::from_secs(secs)
|
||||
}
|
||||
|
||||
pub fn status(_state: &Arc<TsState>) -> String {
|
||||
if KC_STATUS.load(Ordering::SeqCst) == ON {
|
||||
"ON".to_string()
|
||||
} else {
|
||||
"OFF".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn current_cron_expr(_state: &Arc<TsState>) -> String {
|
||||
CRON_EXPR
|
||||
.try_lock()
|
||||
.map(|g| g.clone())
|
||||
.unwrap_or_else(|_| config::kc_cron_default_expr())
|
||||
}
|
||||
|
||||
pub fn debug_job(_state: &Arc<TsState>) -> String {
|
||||
format!(
|
||||
"kc scheduler status={} cron_expr={} interval_secs={}",
|
||||
status(_state),
|
||||
current_cron_expr(_state),
|
||||
tick_interval().as_secs()
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn start(state: &Arc<TsState>) -> anyhow::Result<()> {
|
||||
let mut guard = TICK_HANDLE.lock().await;
|
||||
if guard.is_some() {
|
||||
return Ok(());
|
||||
}
|
||||
{
|
||||
let mut c = CRON_EXPR.lock().await;
|
||||
if c.is_empty() {
|
||||
*c = config::kc_cron_default_expr();
|
||||
}
|
||||
}
|
||||
let st = state.clone();
|
||||
let h = tokio::spawn(async move {
|
||||
let mut interval = tokio::time::interval(tick_interval());
|
||||
interval.tick().await;
|
||||
loop {
|
||||
interval.tick().await;
|
||||
if let Err(e) = super::uber::run_kecheng_uber(&st).await {
|
||||
tracing::error!("kc uber job error: {e:?}");
|
||||
}
|
||||
}
|
||||
});
|
||||
*guard = Some(h);
|
||||
KC_STATUS.store(ON, Ordering::SeqCst);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn stop(_state: &Arc<TsState>) -> anyhow::Result<()> {
|
||||
let mut guard = TICK_HANDLE.lock().await;
|
||||
if let Some(h) = guard.take() {
|
||||
h.abort();
|
||||
}
|
||||
KC_STATUS.store(OFF, Ordering::SeqCst);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn reset(_state: &Arc<TsState>, cron_expr: &str) -> anyhow::Result<()> {
|
||||
stop(_state).await?;
|
||||
let mut c = CRON_EXPR.lock().await;
|
||||
*c = cron_expr.to_string();
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,62 @@
|
||||
//! Course notification tick: calls HTYKC similarly to Java `UberTask` (non-repeatable path stub).
|
||||
//!
|
||||
//! Uses `TS_SUDOER_TOKEN` when set; otherwise skips. Full cert-login parity is not implemented here.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use chrono::Utc;
|
||||
|
||||
use crate::config;
|
||||
use crate::state::TsState;
|
||||
|
||||
fn sudo_token() -> Option<String> {
|
||||
std::env::var("TS_SUDOER_TOKEN").ok().filter(|s| !s.is_empty())
|
||||
}
|
||||
|
||||
pub async fn run_kecheng_uber(state: &Arc<TsState>) -> anyhow::Result<()> {
|
||||
let _ = state;
|
||||
let Some(token) = sudo_token() else {
|
||||
tracing::warn!("kc uber: TS_SUDOER_TOKEN not set; skipping kecheng notification tick");
|
||||
return Ok(());
|
||||
};
|
||||
let ts_domain = config::ts_domain()?;
|
||||
let htykc = config::htykc_base_url()?;
|
||||
let client = reqwest::Client::builder()
|
||||
.timeout(config::http_client_timeout())
|
||||
.build()?;
|
||||
|
||||
let now = Utc::now().naive_local();
|
||||
let end = now + chrono::Duration::hours(48);
|
||||
let fmt = "%Y-%m-%d %H:%M:%S";
|
||||
let start_from = now.format(fmt).to_string();
|
||||
let end_by = end.format(fmt).to_string();
|
||||
|
||||
let query = url::form_urlencoded::Serializer::new(String::new())
|
||||
.append_pair("start_from", &start_from)
|
||||
.append_pair("end_by", &end_by)
|
||||
.finish();
|
||||
let url = format!(
|
||||
"{}/api/v1/kc/find_all_non_repeatable_kechengs_within_date_range?{query}",
|
||||
htykc.trim_end_matches('/')
|
||||
);
|
||||
|
||||
let resp = client
|
||||
.get(&url)
|
||||
.header("HtySudoerToken", &token)
|
||||
.header("Authorization", &token)
|
||||
.header("HtyHost", &ts_domain)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
let status = resp.status();
|
||||
let text = resp.text().await.unwrap_or_default();
|
||||
tracing::info!(
|
||||
"kc uber: non_repeatable kechengs status={status} body_len={}",
|
||||
text.len()
|
||||
);
|
||||
if let Ok(v) = serde_json::from_str::<serde_json::Value>(&text) {
|
||||
tracing::debug!("kc uber response: {v}");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
//! Task service (`task_server` migration): `/api/v1/ts` HTTP API, PostgreSQL, Redis, kc scheduler.
|
||||
|
||||
pub mod check_auth;
|
||||
pub mod config;
|
||||
pub mod handlers;
|
||||
pub mod kc;
|
||||
pub mod merge;
|
||||
pub mod redis_store;
|
||||
pub mod state;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::routing::{get, post};
|
||||
use axum::Router;
|
||||
use htycommons::db::pool;
|
||||
use tower_http::trace::TraceLayer;
|
||||
|
||||
use crate::redis_store::RedisTaskStore;
|
||||
use crate::state::TsState;
|
||||
|
||||
pub use htyts_models::schema::dbtask;
|
||||
pub use htyts_models::{
|
||||
all_tasks, all_tasks_page, delete_task_by_id, find_task_by_id, one_pending_task, one_zombie_task,
|
||||
upsert_task_row, DbTaskRow,
|
||||
};
|
||||
|
||||
pub async fn ts_router(db_url: &str, redis_url: &str, zombie_minutes: i64) -> anyhow::Result<Router> {
|
||||
let pg = pool(db_url);
|
||||
let redis = RedisTaskStore::connect(redis_url).await?;
|
||||
let st = Arc::new(TsState {
|
||||
db: Arc::new(pg),
|
||||
redis,
|
||||
zombie_minutes,
|
||||
});
|
||||
Ok(ts_router_with_state(st))
|
||||
}
|
||||
|
||||
pub fn ts_router_with_state(state: Arc<TsState>) -> Router {
|
||||
Router::new()
|
||||
.route("/api/v1/ts", get(handlers::hello_ts))
|
||||
.route("/api/v1/ts/create_task", post(handlers::create_task))
|
||||
.route("/api/v1/ts/update_task", post(handlers::update_task))
|
||||
.route(
|
||||
"/api/v1/ts/task_status/{task_id}",
|
||||
get(handlers::task_status),
|
||||
)
|
||||
.route("/api/v1/ts/task/{task_id}", get(handlers::get_task))
|
||||
.route(
|
||||
"/api/v1/ts/one_pending_task",
|
||||
get(handlers::one_pending_task),
|
||||
)
|
||||
.route(
|
||||
"/api/v1/ts/one_zombie_task",
|
||||
get(handlers::one_zombie_task),
|
||||
)
|
||||
.route("/api/v1/ts/all_tasks", get(handlers::all_tasks))
|
||||
.route(
|
||||
"/api/v1/ts/all_tasks_with_page",
|
||||
get(handlers::all_tasks_with_page),
|
||||
)
|
||||
.route("/api/v1/ts/del_task/{task_id}", get(handlers::del_task))
|
||||
.route("/api/v1/ts/kc/start", get(handlers::kc_start))
|
||||
.route("/api/v1/ts/kc/stop", get(handlers::kc_stop))
|
||||
.route("/api/v1/ts/kc/status", get(handlers::kc_status))
|
||||
.route(
|
||||
"/api/v1/ts/kc/curr_cron_expr",
|
||||
get(handlers::kc_curr_cron_expr),
|
||||
)
|
||||
.route("/api/v1/ts/kc/reset", post(handlers::kc_reset))
|
||||
.route("/api/v1/ts/kc/test", get(handlers::kc_test))
|
||||
.layer(TraceLayer::new_for_http())
|
||||
.with_state(state)
|
||||
}
|
||||
@@ -0,0 +1,25 @@
|
||||
use dotenv::dotenv;
|
||||
use htycommons::logger::logger_init;
|
||||
use htycommons::web::launch_rocket;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
dotenv().ok();
|
||||
logger_init();
|
||||
|
||||
let db_url = htyts::config::ts_database_url()?;
|
||||
let redis_url = htycommons::redis_util::get_redis_url()?;
|
||||
let zombie = htyts::config::zombie_minutes();
|
||||
let pg = htycommons::db::pool(&db_url);
|
||||
let redis = htyts::redis_store::RedisTaskStore::connect(&redis_url).await?;
|
||||
let st = Arc::new(htyts::state::TsState {
|
||||
db: Arc::new(pg),
|
||||
redis,
|
||||
zombie_minutes: zombie,
|
||||
});
|
||||
let router = htyts::ts_router_with_state(st);
|
||||
let port = htyts::config::ts_port();
|
||||
launch_rocket(port, router).await?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,119 @@
|
||||
use chrono::NaiveDateTime;
|
||||
use std::str::FromStr;
|
||||
use htyts_models::{ReqTask, TaskStatus, TaskType};
|
||||
use serde_json::{json, Value};
|
||||
|
||||
use crate::DbTaskRow;
|
||||
|
||||
pub fn apply_write_defaults(req: &mut ReqTask, host: &str, now: NaiveDateTime) {
|
||||
if req.task_status.is_none() {
|
||||
req.task_status = Some(TaskStatus::Pending);
|
||||
}
|
||||
if req.created_by.as_ref().map(|s| s.is_empty()).unwrap_or(true) {
|
||||
req.created_by = Some(host.to_string());
|
||||
}
|
||||
if req.created_at.is_none() {
|
||||
req.created_at = Some(now);
|
||||
}
|
||||
req.updated_by = Some(host.to_string());
|
||||
req.updated_at = Some(now);
|
||||
}
|
||||
|
||||
/// Build DB row + optional Redis JSON string from request (after [`apply_write_defaults`]).
|
||||
pub fn split_req_for_persist(req: &ReqTask, task_id: String) -> anyhow::Result<(DbTaskRow, Option<String>)> {
|
||||
let tt = req
|
||||
.task_type
|
||||
.ok_or_else(|| anyhow::anyhow!("task_type is required"))?;
|
||||
let ts = req
|
||||
.task_status
|
||||
.ok_or_else(|| anyhow::anyhow!("task_status is required"))?;
|
||||
let created_by = req
|
||||
.created_by
|
||||
.clone()
|
||||
.ok_or_else(|| anyhow::anyhow!("created_by is required"))?;
|
||||
let created_at = req
|
||||
.created_at
|
||||
.ok_or_else(|| anyhow::anyhow!("created_at is required"))?;
|
||||
let updated_by = req
|
||||
.updated_by
|
||||
.clone()
|
||||
.ok_or_else(|| anyhow::anyhow!("updated_by is required"))?;
|
||||
let updated_at = req
|
||||
.updated_at
|
||||
.ok_or_else(|| anyhow::anyhow!("updated_at is required"))?;
|
||||
|
||||
let row = DbTaskRow {
|
||||
task_id,
|
||||
hty_id: req.hty_id.clone(),
|
||||
task_type: tt.as_db_str().to_string(),
|
||||
task_status: ts.as_db_str().to_string(),
|
||||
duration: req.duration,
|
||||
created_by,
|
||||
created_at,
|
||||
updated_by,
|
||||
updated_at,
|
||||
meta: req.meta.clone(),
|
||||
};
|
||||
|
||||
let redis_str = if let Some(ref p) = req.payload {
|
||||
Some(serde_json::to_string(p)?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok((row, redis_str))
|
||||
}
|
||||
|
||||
/// Injects `hty_id` into JSON object payload for `UPLOAD_PICTURE` (Java `TaskService.createOrUpdateTask`).
|
||||
pub fn inject_hty_id_for_upload_picture(
|
||||
req: &mut ReqTask,
|
||||
hty_id: Option<String>,
|
||||
) -> anyhow::Result<()> {
|
||||
if req.task_type != Some(TaskType::UploadPicture) {
|
||||
return Ok(());
|
||||
}
|
||||
let Some(hty_id) = hty_id else {
|
||||
return Ok(());
|
||||
};
|
||||
let Some(ref mut payload) = req.payload else {
|
||||
return Ok(());
|
||||
};
|
||||
match payload {
|
||||
Value::Object(map) => {
|
||||
map.insert("hty_id".to_string(), json!(hty_id));
|
||||
}
|
||||
_ => {
|
||||
*payload = json!({ "hty_id": hty_id });
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn row_to_req_task(row: &DbTaskRow, payload_json: Option<&str>) -> anyhow::Result<ReqTask> {
|
||||
let task_type =
|
||||
TaskType::from_str(&row.task_type).map_err(|e| anyhow::anyhow!("task_type: {e}"))?;
|
||||
let task_status =
|
||||
TaskStatus::from_str(&row.task_status).map_err(|e| anyhow::anyhow!("task_status: {e}"))?;
|
||||
let payload = if let Some(s) = payload_json {
|
||||
if s.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(serde_json::from_str(s)?)
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
Ok(ReqTask {
|
||||
task_id: Some(row.task_id.clone()),
|
||||
hty_id: row.hty_id.clone(),
|
||||
created_by: Some(row.created_by.clone()),
|
||||
created_at: Some(row.created_at),
|
||||
updated_by: Some(row.updated_by.clone()),
|
||||
updated_at: Some(row.updated_at),
|
||||
task_type: Some(task_type),
|
||||
task_status: Some(task_status),
|
||||
duration: row.duration,
|
||||
meta: row.meta.clone(),
|
||||
payload,
|
||||
})
|
||||
}
|
||||
@@ -0,0 +1,59 @@
|
||||
use anyhow::Context;
|
||||
use htyts_models::{sudoer_cache_redis_key, task_payload_redis_key};
|
||||
use redis::AsyncCommands;
|
||||
use redis::aio::ConnectionManager;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RedisTaskStore {
|
||||
mgr: ConnectionManager,
|
||||
}
|
||||
|
||||
impl RedisTaskStore {
|
||||
pub async fn connect(redis_url: &str) -> anyhow::Result<Self> {
|
||||
let client = redis::Client::open(redis_url).context("redis client")?;
|
||||
let mgr = ConnectionManager::new(client)
|
||||
.await
|
||||
.context("redis connection manager")?;
|
||||
Ok(Self { mgr })
|
||||
}
|
||||
|
||||
pub async fn set_payload(&self, task_id: &str, json: &str) -> anyhow::Result<()> {
|
||||
let mut c = self.mgr.clone();
|
||||
let key = task_payload_redis_key(task_id);
|
||||
let _: () = c.set(&key, json).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_payload(&self, task_id: &str) -> anyhow::Result<Option<String>> {
|
||||
let mut c = self.mgr.clone();
|
||||
let key = task_payload_redis_key(task_id);
|
||||
let v: Option<String> = c.get(&key).await?;
|
||||
Ok(v)
|
||||
}
|
||||
|
||||
pub async fn del(&self, task_id: &str) -> anyhow::Result<()> {
|
||||
let mut c = self.mgr.clone();
|
||||
let key = task_payload_redis_key(task_id);
|
||||
let _: i64 = c.del(&key).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_raw(&self, key: &str) -> anyhow::Result<Option<String>> {
|
||||
let mut c = self.mgr.clone();
|
||||
let v: Option<String> = c.get(key).await?;
|
||||
Ok(v)
|
||||
}
|
||||
|
||||
/// After UC `verify_jwt_token` succeeds, mirror Java `setRequestSudoerToken` (TTL in seconds).
|
||||
pub async fn set_sudoer_cache_json(
|
||||
&self,
|
||||
token_id: &str,
|
||||
json: &str,
|
||||
ttl_secs: u64,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut c = self.mgr.clone();
|
||||
let key = sudoer_cache_redis_key(token_id);
|
||||
let _: () = c.set_ex(&key, json, ttl_secs).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use htycommons::db::PgPool;
|
||||
|
||||
use crate::redis_store::RedisTaskStore;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct TsState {
|
||||
pub db: Arc<PgPool>,
|
||||
pub redis: RedisTaskStore,
|
||||
pub zombie_minutes: i64,
|
||||
}
|
||||
Reference in New Issue
Block a user