diff --git a/examples/delay.rs b/examples/delay.rs index 95b21f4..0473ebe 100644 --- a/examples/delay.rs +++ b/examples/delay.rs @@ -32,6 +32,7 @@ pub struct Sleep(u64); #[async_trait] impl Step for Sleep { async fn step(self, _db: &PgPool) -> StepResult { + std::fs::read_to_string("non-existing")?; println!("Sleeping for {} sec", self.0); NextStep::delay(Wakeup(self.0), Duration::from_secs(self.0)) } diff --git a/src/error.rs b/src/error.rs index 855abd7..2c867e3 100644 --- a/src/error.rs +++ b/src/error.rs @@ -6,8 +6,13 @@ use std::{error::Error as StdError, result::Result as StdResult}; pub enum Error { /// can't add task AddTask(#[source] sqlx::Error), - /// can't serialize step - SerializeStep(#[source] serde_json::Error), + /// can't serialize step: {1} + SerializeStep(#[source] serde_json::Error, String), + /** + can't deserialize step (the task was likely changed between the + scheduling and running of the step): {1} + */ + DeserializeStep(#[source] serde_json::Error, String), /// can't unlock stale tasks UnlockStaleTasks(#[source] sqlx::Error), /// waiter can't connect to the db @@ -16,6 +21,8 @@ pub enum Error { WaiterListen(#[source] sqlx::Error), /// unreachable: worker semaphore is closed UnreachableWorkerSemaphoreClosed(#[source] tokio::sync::AcquireError), + /// db error: {1} + Db(#[source] sqlx::Error, String), } /// The crate result diff --git a/src/lib.rs b/src/lib.rs index 4579b7e..4c08b2f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,3 +36,16 @@ pub async fn delay(db: &PgPool, task: &impl Scheduler, delay: Duration) -> Resul pub async fn schedule(db: &PgPool, task: &impl Scheduler, at: DateTime) -> Result { task.schedule(db, at).await } + +mod private_macros { + macro_rules! db_error { + () => { + |e| $crate::Error::Db(e, code_path!().into()) + }; + ($desc:expr) => { + |e| $crate::Error::Db(e, format!("{} {}", code_path!(), $desc)) + }; + } + + pub(crate) use db_error; +} diff --git a/src/traits.rs b/src/traits.rs index 7e0f42e..d904bf4 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -49,7 +49,8 @@ pub trait Scheduler: fmt::Debug + DeserializeOwned + Serialize + Sized + Sync { /// Schedules a task to run at a specified time in the future async fn schedule(&self, db: &PgPool, at: DateTime) -> crate::Result { - let step = serde_json::to_string(self).map_err(Error::SerializeStep)?; + let step = serde_json::to_string(self) + .map_err(|e| Error::SerializeStep(e, format!("{self:?}")))?; sqlx::query!( "INSERT INTO pg_task (step, wakeup_at) VALUES ($1, $2) RETURNING id", step, diff --git a/src/util.rs b/src/util.rs index d197802..7622f9b 100644 --- a/src/util.rs +++ b/src/util.rs @@ -14,12 +14,12 @@ pub fn std_duration_to_chrono(std_duration: std::time::Duration) -> chrono::Dura /// Returns the ordinal string of a given integer pub fn ordinal(n: i32) -> String { match n.abs() { - 11..=13 => format!("{}-th", n), + 11..=13 => format!("{}th", n), _ => match n % 10 { - 1 => format!("{}-st", n), - 2 => format!("{}-nd", n), - 3 => format!("{}-rd", n), - _ => format!("{}-th", n), + 1 => format!("{}st", n), + 2 => format!("{}nd", n), + 3 => format!("{}rd", n), + _ => format!("{}th", n), }, } } diff --git a/src/worker.rs b/src/worker.rs index a8ed5ee..145bcf8 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,4 +1,7 @@ -use crate::{util, waiter::Waiter, NextStep, Step, StepError, LOST_CONNECTION_SLEEP}; +use crate::{ + private_macros::db_error, util, waiter::Waiter, Error, NextStep, Result, Step, StepError, + LOST_CONNECTION_SLEEP, +}; use chrono::{DateTime, Utc}; use code_path::code_path; use serde::Serialize; @@ -10,31 +13,6 @@ use std::{fmt, marker::PhantomData, sync::Arc, time::Duration}; use tokio::{sync::Semaphore, time::sleep}; use tracing::{debug, error, info, trace, warn}; -/// An error report to log from the worker -#[derive(Debug, displaydoc::Display, thiserror::Error)] -enum ErrorReport { - /// db error: {1} - Sqlx(#[source] sqlx::Error, String), - /// can't serialize step: {1} - SerializeStep(#[source] serde_json::Error, String), - /** - can't deserialize step (the task was likely changed between the - scheduling and running of the step): {1} - */ - DeserializeStep(#[source] serde_json::Error, String), -} - -type ReportResult = std::result::Result; - -macro_rules! sqlx_error { - () => { - |e| ErrorReport::Sqlx(e, code_path!().into()) - }; - ($desc:expr) => { - |e| ErrorReport::Sqlx(e, format!("{} {}", code_path!(), $desc)) - }; -} - /// A worker for processing tasks pub struct Worker { db: PgPool, @@ -51,12 +29,6 @@ struct Task { wakeup_at: DateTime, } -impl ErrorReport { - fn log(self) { - error!("{}", source_chain::to_string(&self)); - } -} - impl> Worker { /// Creates a new worker pub fn new(db: PgPool) -> Self { @@ -77,7 +49,7 @@ impl> Worker { } /// Runs all ready tasks to completion and waits for new ones - pub async fn run(&self) -> crate::Result<()> { + pub async fn run(&self) -> Result<()> { self.unlock_stale_tasks().await?; self.waiter.listen(self.db.clone()).await?; @@ -89,10 +61,12 @@ impl> Worker { .clone() .acquire_owned() .await - .map_err(crate::Error::UnreachableWorkerSemaphoreClosed)?; + .map_err(Error::UnreachableWorkerSemaphoreClosed)?; let db = self.db.clone(); tokio::spawn(async move { - task.run_step::(&db).await.map_err(ErrorReport::log).ok(); + if let Err(e) = task.run_step::(&db).await { + error!("[{}] {}", task.id, source_chain::to_string(&e)); + }; drop(permit); }); } @@ -112,12 +86,12 @@ impl> Worker { /// Unlocks all tasks. This is intended to run at the start of the worker as /// some tasks could remain locked as running indefinitely if the /// previous run ended due to some kind of crash. - async fn unlock_stale_tasks(&self) -> crate::Result<()> { + async fn unlock_stale_tasks(&self) -> Result<()> { let unlocked = sqlx::query!("UPDATE pg_task SET is_running = false WHERE is_running = true") .execute(&self.db) .await - .map_err(crate::Error::UnlockStaleTasks)? + .map_err(Error::UnlockStaleTasks)? .rows_affected(); if unlocked == 0 { debug!("No stale tasks to unlock") @@ -128,31 +102,31 @@ impl> Worker { } /// Waits until the next task is ready, marks it as running and returns it. - async fn recv_task(&self) -> ReportResult { + async fn recv_task(&self) -> Result { trace!("Receiving the next task"); loop { let table_changes = self.waiter.subscribe(); - let mut tx = self.db.begin().await.map_err(sqlx_error!("begin"))?; + let mut tx = self.db.begin().await.map_err(db_error!("begin"))?; if let Some(task) = Task::fetch_closest(&mut tx).await? { let time_to_run = task.wakeup_at - Utc::now(); if time_to_run <= chrono::Duration::zero() { task.mark_running(&mut tx).await?; tx.commit() .await - .map_err(sqlx_error!("commit on task return"))?; + .map_err(db_error!("commit on task return"))?; return Ok(task); } tx.commit() .await - .map_err(sqlx_error!("commit on wait for a period"))?; + .map_err(db_error!("commit on wait for a period"))?; table_changes .wait_for(util::chrono_duration_to_std(time_to_run)) .await; } else { tx.commit() .await - .map_err(sqlx_error!("commit on wait forever"))?; + .map_err(db_error!("commit on wait forever"))?; table_changes.wait_forever().await; } } @@ -161,7 +135,7 @@ impl> Worker { impl Task { /// Fetches the closest task to run - async fn fetch_closest(con: &mut PgConnection) -> ReportResult> { + async fn fetch_closest(con: &mut PgConnection) -> Result> { trace!("Fetching the closest task to run"); let task = sqlx::query_as!( Task, @@ -181,7 +155,7 @@ impl Task { ) .fetch_optional(con) .await - .map_err(sqlx_error!("select"))?; + .map_err(db_error!("select"))?; if let Some(ref task) = task { let delay = task.delay(); @@ -196,7 +170,7 @@ impl Task { Ok(task) } - async fn mark_running(&self, con: &mut PgConnection) -> ReportResult<()> { + async fn mark_running(&self, con: &mut PgConnection) -> Result<()> { sqlx::query!( " UPDATE pg_task @@ -208,7 +182,7 @@ impl Task { ) .execute(con) .await - .map_err(sqlx_error!())?; + .map_err(db_error!())?; Ok(()) } @@ -223,7 +197,7 @@ impl Task { } /// Runs the current step of the task to completion - async fn run_step>(&self, db: &PgPool) -> ReportResult<()> { + async fn run_step>(&self, db: &PgPool) -> Result<()> { info!( "[{}]{} run step {}", self.id, @@ -235,14 +209,11 @@ impl Task { self.step ); let step: S = match serde_json::from_str(&self.step) - .map_err(|e| ErrorReport::DeserializeStep(e, format!("{:?}", self.step))) + .map_err(|e| Error::DeserializeStep(e, format!("{:?}", self.step))) { Ok(x) => x, Err(e) => { - self.save_error(db, e.into()) - .await - .map_err(ErrorReport::log) - .ok(); + self.save_error(db, e.into()).await.ok(); return Ok(()); } }; @@ -262,10 +233,9 @@ impl Task { } /// Saves the task error - async fn save_error(&self, db: &PgPool, err: StepError) -> ReportResult<()> { - trace!("[{}] saving error", self.id); + async fn save_error(&self, db: &PgPool, err: StepError) -> Result<()> { + let err_str = source_chain::to_string(&*err); - let err = source_chain::to_string(&*err); let (tried, step) = sqlx::query!( r#" UPDATE pg_task @@ -277,16 +247,18 @@ impl Task { RETURNING tried, step::TEXT as "step!" "#, self.id, - &err, + &err_str, Utc::now(), ) .fetch_one(db) .await .map(|r| (r.tried, r.step)) - .map_err(sqlx_error!())?; + .map_err(db_error!())?; + error!( - "[{}] resulted in an error at step {step} after {tried} attempts: {}", - self.id, err + "[{id}] resulted in an error at step {step} on {attempt} attempt: {err_str}", + id = self.id, + attempt = util::ordinal(tried + 1) ); Ok(()) } @@ -297,16 +269,13 @@ impl Task { db: &PgPool, step: impl Serialize + fmt::Debug, delay: Duration, - ) -> ReportResult<()> { + ) -> Result<()> { let step = match serde_json::to_string(&step) - .map_err(|e| ErrorReport::SerializeStep(e, format!("{:?}", step))) + .map_err(|e| Error::SerializeStep(e, format!("{:?}", step))) { Ok(x) => x, Err(e) => { - self.save_error(db, e.into()) - .await - .map_err(ErrorReport::log) - .ok(); + self.save_error(db, e.into()).await.ok(); return Ok(()); } }; @@ -328,19 +297,19 @@ impl Task { ) .execute(db) .await - .map_err(sqlx_error!())?; + .map_err(db_error!())?; debug!("[{}] step is done", self.id); Ok(()) } /// Removes the finished task - async fn complete(&self, db: &PgPool) -> ReportResult<()> { + async fn complete(&self, db: &PgPool) -> Result<()> { info!("[{}] is successfully completed", self.id); sqlx::query!("DELETE FROM pg_task WHERE id = $1", self.id) .execute(db) .await - .map_err(sqlx_error!())?; + .map_err(db_error!())?; Ok(()) } @@ -352,7 +321,7 @@ impl Task { retry_limit: i32, retry_delay: Duration, err: StepError, - ) -> ReportResult<()> { + ) -> Result<()> { if tried < retry_limit { self.retry(db, tried, retry_limit, retry_delay, err).await } else { @@ -368,7 +337,7 @@ impl Task { retry_limit: i32, delay: Duration, err: StepError, - ) -> ReportResult<()> { + ) -> Result<()> { trace!("[{}] scheduling a retry", self.id); let delay = util::std_duration_to_chrono(delay); @@ -387,13 +356,13 @@ impl Task { ) .execute(db) .await - .map_err(sqlx_error!())?; + .map_err(db_error!())?; debug!( - "[{}] scheduled {attempt} of {retry_limit} retries in {delay:?} on error: {}", - self.id, - source_chain::to_string(&*err), - attempt = util::ordinal(tried + 1) + "[{id}] scheduled {attempt} of {retry_limit} retries in {delay:?} on error: {err}", + id = self.id, + attempt = util::ordinal(tried + 1), + err = source_chain::to_string(&*err), ); Ok(()) }