Skip to content

Commit

Permalink
Refactor error
Browse files Browse the repository at this point in the history
  • Loading branch information
imbolc committed Jun 28, 2024
1 parent 179cc1b commit 8797b24
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 83 deletions.
1 change: 1 addition & 0 deletions examples/delay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub struct Sleep(u64);
#[async_trait]
impl Step<Sleeper> for Sleep {
async fn step(self, _db: &PgPool) -> StepResult<Sleeper> {
std::fs::read_to_string("non-existing")?;
println!("Sleeping for {} sec", self.0);
NextStep::delay(Wakeup(self.0), Duration::from_secs(self.0))
}
Expand Down
11 changes: 9 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@ use std::{error::Error as StdError, result::Result as StdResult};
pub enum Error {
/// can't add task
AddTask(#[source] sqlx::Error),
/// can't serialize step
SerializeStep(#[source] serde_json::Error),
/// can't serialize step: {1}
SerializeStep(#[source] serde_json::Error, String),
/**
can't deserialize step (the task was likely changed between the
scheduling and running of the step): {1}
*/
DeserializeStep(#[source] serde_json::Error, String),
/// can't unlock stale tasks
UnlockStaleTasks(#[source] sqlx::Error),
/// waiter can't connect to the db
Expand All @@ -16,6 +21,8 @@ pub enum Error {
WaiterListen(#[source] sqlx::Error),
/// unreachable: worker semaphore is closed
UnreachableWorkerSemaphoreClosed(#[source] tokio::sync::AcquireError),
/// db error: {1}
Db(#[source] sqlx::Error, String),
}

/// The crate result
Expand Down
13 changes: 13 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,16 @@ pub async fn delay(db: &PgPool, task: &impl Scheduler, delay: Duration) -> Resul
pub async fn schedule(db: &PgPool, task: &impl Scheduler, at: DateTime<Utc>) -> Result<Uuid> {
task.schedule(db, at).await
}

mod private_macros {
macro_rules! db_error {
() => {
|e| $crate::Error::Db(e, code_path!().into())
};
($desc:expr) => {
|e| $crate::Error::Db(e, format!("{} {}", code_path!(), $desc))
};
}

pub(crate) use db_error;
}
3 changes: 2 additions & 1 deletion src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ pub trait Scheduler: fmt::Debug + DeserializeOwned + Serialize + Sized + Sync {

/// Schedules a task to run at a specified time in the future
async fn schedule(&self, db: &PgPool, at: DateTime<Utc>) -> crate::Result<Uuid> {
let step = serde_json::to_string(self).map_err(Error::SerializeStep)?;
let step = serde_json::to_string(self)
.map_err(|e| Error::SerializeStep(e, format!("{self:?}")))?;
sqlx::query!(
"INSERT INTO pg_task (step, wakeup_at) VALUES ($1, $2) RETURNING id",
step,
Expand Down
10 changes: 5 additions & 5 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ pub fn std_duration_to_chrono(std_duration: std::time::Duration) -> chrono::Dura
/// Returns the ordinal string of a given integer
pub fn ordinal(n: i32) -> String {
match n.abs() {
11..=13 => format!("{}-th", n),
11..=13 => format!("{}th", n),
_ => match n % 10 {
1 => format!("{}-st", n),
2 => format!("{}-nd", n),
3 => format!("{}-rd", n),
_ => format!("{}-th", n),
1 => format!("{}st", n),
2 => format!("{}nd", n),
3 => format!("{}rd", n),
_ => format!("{}th", n),
},
}
}
Expand Down
119 changes: 44 additions & 75 deletions src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::{util, waiter::Waiter, NextStep, Step, StepError, LOST_CONNECTION_SLEEP};
use crate::{
private_macros::db_error, util, waiter::Waiter, Error, NextStep, Result, Step, StepError,
LOST_CONNECTION_SLEEP,
};
use chrono::{DateTime, Utc};
use code_path::code_path;
use serde::Serialize;
Expand All @@ -10,31 +13,6 @@ use std::{fmt, marker::PhantomData, sync::Arc, time::Duration};
use tokio::{sync::Semaphore, time::sleep};
use tracing::{debug, error, info, trace, warn};

/// An error report to log from the worker
#[derive(Debug, displaydoc::Display, thiserror::Error)]
enum ErrorReport {
/// db error: {1}
Sqlx(#[source] sqlx::Error, String),
/// can't serialize step: {1}
SerializeStep(#[source] serde_json::Error, String),
/**
can't deserialize step (the task was likely changed between the
scheduling and running of the step): {1}
*/
DeserializeStep(#[source] serde_json::Error, String),
}

type ReportResult<T> = std::result::Result<T, ErrorReport>;

macro_rules! sqlx_error {
() => {
|e| ErrorReport::Sqlx(e, code_path!().into())
};
($desc:expr) => {
|e| ErrorReport::Sqlx(e, format!("{} {}", code_path!(), $desc))
};
}

/// A worker for processing tasks
pub struct Worker<T> {
db: PgPool,
Expand All @@ -51,12 +29,6 @@ struct Task {
wakeup_at: DateTime<Utc>,
}

impl ErrorReport {
fn log(self) {
error!("{}", source_chain::to_string(&self));
}
}

impl<S: Step<S>> Worker<S> {
/// Creates a new worker
pub fn new(db: PgPool) -> Self {
Expand All @@ -77,7 +49,7 @@ impl<S: Step<S>> Worker<S> {
}

/// Runs all ready tasks to completion and waits for new ones
pub async fn run(&self) -> crate::Result<()> {
pub async fn run(&self) -> Result<()> {
self.unlock_stale_tasks().await?;
self.waiter.listen(self.db.clone()).await?;

Expand All @@ -89,10 +61,12 @@ impl<S: Step<S>> Worker<S> {
.clone()
.acquire_owned()
.await
.map_err(crate::Error::UnreachableWorkerSemaphoreClosed)?;
.map_err(Error::UnreachableWorkerSemaphoreClosed)?;
let db = self.db.clone();
tokio::spawn(async move {
task.run_step::<S>(&db).await.map_err(ErrorReport::log).ok();
if let Err(e) = task.run_step::<S>(&db).await {
error!("[{}] {}", task.id, source_chain::to_string(&e));
};
drop(permit);
});
}
Expand All @@ -112,12 +86,12 @@ impl<S: Step<S>> Worker<S> {
/// Unlocks all tasks. This is intended to run at the start of the worker as
/// some tasks could remain locked as running indefinitely if the
/// previous run ended due to some kind of crash.
async fn unlock_stale_tasks(&self) -> crate::Result<()> {
async fn unlock_stale_tasks(&self) -> Result<()> {
let unlocked =
sqlx::query!("UPDATE pg_task SET is_running = false WHERE is_running = true")
.execute(&self.db)
.await
.map_err(crate::Error::UnlockStaleTasks)?
.map_err(Error::UnlockStaleTasks)?
.rows_affected();
if unlocked == 0 {
debug!("No stale tasks to unlock")
Expand All @@ -128,31 +102,31 @@ impl<S: Step<S>> Worker<S> {
}

/// Waits until the next task is ready, marks it as running and returns it.
async fn recv_task(&self) -> ReportResult<Task> {
async fn recv_task(&self) -> Result<Task> {
trace!("Receiving the next task");

loop {
let table_changes = self.waiter.subscribe();
let mut tx = self.db.begin().await.map_err(sqlx_error!("begin"))?;
let mut tx = self.db.begin().await.map_err(db_error!("begin"))?;
if let Some(task) = Task::fetch_closest(&mut tx).await? {
let time_to_run = task.wakeup_at - Utc::now();
if time_to_run <= chrono::Duration::zero() {
task.mark_running(&mut tx).await?;
tx.commit()
.await
.map_err(sqlx_error!("commit on task return"))?;
.map_err(db_error!("commit on task return"))?;
return Ok(task);
}
tx.commit()
.await
.map_err(sqlx_error!("commit on wait for a period"))?;
.map_err(db_error!("commit on wait for a period"))?;
table_changes
.wait_for(util::chrono_duration_to_std(time_to_run))
.await;
} else {
tx.commit()
.await
.map_err(sqlx_error!("commit on wait forever"))?;
.map_err(db_error!("commit on wait forever"))?;
table_changes.wait_forever().await;
}
}
Expand All @@ -161,7 +135,7 @@ impl<S: Step<S>> Worker<S> {

impl Task {
/// Fetches the closest task to run
async fn fetch_closest(con: &mut PgConnection) -> ReportResult<Option<Self>> {
async fn fetch_closest(con: &mut PgConnection) -> Result<Option<Self>> {
trace!("Fetching the closest task to run");
let task = sqlx::query_as!(
Task,
Expand All @@ -181,7 +155,7 @@ impl Task {
)
.fetch_optional(con)
.await
.map_err(sqlx_error!("select"))?;
.map_err(db_error!("select"))?;

if let Some(ref task) = task {
let delay = task.delay();
Expand All @@ -196,7 +170,7 @@ impl Task {
Ok(task)
}

async fn mark_running(&self, con: &mut PgConnection) -> ReportResult<()> {
async fn mark_running(&self, con: &mut PgConnection) -> Result<()> {
sqlx::query!(
"
UPDATE pg_task
Expand All @@ -208,7 +182,7 @@ impl Task {
)
.execute(con)
.await
.map_err(sqlx_error!())?;
.map_err(db_error!())?;
Ok(())
}

Expand All @@ -223,7 +197,7 @@ impl Task {
}

/// Runs the current step of the task to completion
async fn run_step<S: Step<S>>(&self, db: &PgPool) -> ReportResult<()> {
async fn run_step<S: Step<S>>(&self, db: &PgPool) -> Result<()> {
info!(
"[{}]{} run step {}",
self.id,
Expand All @@ -235,14 +209,11 @@ impl Task {
self.step
);
let step: S = match serde_json::from_str(&self.step)
.map_err(|e| ErrorReport::DeserializeStep(e, format!("{:?}", self.step)))
.map_err(|e| Error::DeserializeStep(e, format!("{:?}", self.step)))
{
Ok(x) => x,
Err(e) => {
self.save_error(db, e.into())
.await
.map_err(ErrorReport::log)
.ok();
self.save_error(db, e.into()).await.ok();
return Ok(());
}
};
Expand All @@ -262,10 +233,9 @@ impl Task {
}

/// Saves the task error
async fn save_error(&self, db: &PgPool, err: StepError) -> ReportResult<()> {
trace!("[{}] saving error", self.id);
async fn save_error(&self, db: &PgPool, err: StepError) -> Result<()> {
let err_str = source_chain::to_string(&*err);

let err = source_chain::to_string(&*err);
let (tried, step) = sqlx::query!(
r#"
UPDATE pg_task
Expand All @@ -277,16 +247,18 @@ impl Task {
RETURNING tried, step::TEXT as "step!"
"#,
self.id,
&err,
&err_str,
Utc::now(),
)
.fetch_one(db)
.await
.map(|r| (r.tried, r.step))
.map_err(sqlx_error!())?;
.map_err(db_error!())?;

error!(
"[{}] resulted in an error at step {step} after {tried} attempts: {}",
self.id, err
"[{id}] resulted in an error at step {step} on {attempt} attempt: {err_str}",
id = self.id,
attempt = util::ordinal(tried + 1)
);
Ok(())
}
Expand All @@ -297,16 +269,13 @@ impl Task {
db: &PgPool,
step: impl Serialize + fmt::Debug,
delay: Duration,
) -> ReportResult<()> {
) -> Result<()> {
let step = match serde_json::to_string(&step)
.map_err(|e| ErrorReport::SerializeStep(e, format!("{:?}", step)))
.map_err(|e| Error::SerializeStep(e, format!("{:?}", step)))
{
Ok(x) => x,
Err(e) => {
self.save_error(db, e.into())
.await
.map_err(ErrorReport::log)
.ok();
self.save_error(db, e.into()).await.ok();
return Ok(());
}
};
Expand All @@ -328,19 +297,19 @@ impl Task {
)
.execute(db)
.await
.map_err(sqlx_error!())?;
.map_err(db_error!())?;

debug!("[{}] step is done", self.id);
Ok(())
}

/// Removes the finished task
async fn complete(&self, db: &PgPool) -> ReportResult<()> {
async fn complete(&self, db: &PgPool) -> Result<()> {
info!("[{}] is successfully completed", self.id);
sqlx::query!("DELETE FROM pg_task WHERE id = $1", self.id)
.execute(db)
.await
.map_err(sqlx_error!())?;
.map_err(db_error!())?;
Ok(())
}

Expand All @@ -352,7 +321,7 @@ impl Task {
retry_limit: i32,
retry_delay: Duration,
err: StepError,
) -> ReportResult<()> {
) -> Result<()> {
if tried < retry_limit {
self.retry(db, tried, retry_limit, retry_delay, err).await
} else {
Expand All @@ -368,7 +337,7 @@ impl Task {
retry_limit: i32,
delay: Duration,
err: StepError,
) -> ReportResult<()> {
) -> Result<()> {
trace!("[{}] scheduling a retry", self.id);

let delay = util::std_duration_to_chrono(delay);
Expand All @@ -387,13 +356,13 @@ impl Task {
)
.execute(db)
.await
.map_err(sqlx_error!())?;
.map_err(db_error!())?;

debug!(
"[{}] scheduled {attempt} of {retry_limit} retries in {delay:?} on error: {}",
self.id,
source_chain::to_string(&*err),
attempt = util::ordinal(tried + 1)
"[{id}] scheduled {attempt} of {retry_limit} retries in {delay:?} on error: {err}",
id = self.id,
attempt = util::ordinal(tried + 1),
err = source_chain::to_string(&*err),
);
Ok(())
}
Expand Down

0 comments on commit 8797b24

Please sign in to comment.