Skip to content

Commit

Permalink
Rename Waiter to Listener
Browse files Browse the repository at this point in the history
  • Loading branch information
imbolc committed Jun 28, 2024
1 parent 8943f8e commit 47a84a8
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 14 deletions.
6 changes: 3 additions & 3 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ pub enum Error {
/// can't unlock stale tasks
UnlockStaleTasks(#[source] sqlx::Error),
/// waiter can't connect to the db
WaiterConnect(#[source] sqlx::Error),
/// waiter can't start listening to tables changes
WaiterListen(#[source] sqlx::Error),
ListenerConnect(#[source] sqlx::Error),
/// can't start listening for tables changes
ListenerListen(#[source] sqlx::Error),
/// unreachable: worker semaphore is closed
UnreachableWorkerSemaphoreClosed(#[source] tokio::sync::AcquireError),
/// db error: {1}
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
#![warn(clippy::all, missing_docs, nonstandard_style, future_incompatible)]

mod error;
mod listener;
mod macros;
mod next_step;
mod task;
mod traits;
mod util;
mod waiter;
mod worker;

pub use error::{Error, Result, StepError, StepResult};
Expand Down
8 changes: 4 additions & 4 deletions src/waiter.rs → src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ use tokio::{
use tracing::{trace, warn};

/// Waits for tasks table to change
pub struct Waiter(Arc<Notify>);
pub struct Listener(Arc<Notify>);
pub struct Subscription<'a>(Notified<'a>);

const PG_NOTIFICATION_CHANNEL: &str = "pg_task_changed";

impl Waiter {
impl Listener {
/// Creates a waiter
pub fn new() -> Self {
let notify = Arc::new(Notify::new());
Expand All @@ -24,11 +24,11 @@ impl Waiter {
pub async fn listen(&self, db: PgPool) -> crate::Result<()> {
let mut listener = PgListener::connect_with(&db)
.await
.map_err(crate::Error::WaiterListen)?;
.map_err(crate::Error::ListenerConnect)?;
listener
.listen(PG_NOTIFICATION_CHANNEL)
.await
.map_err(crate::Error::WaiterListen)?;
.map_err(crate::Error::ListenerListen)?;

let notify = self.0.clone();
tokio::spawn(async move {
Expand Down
12 changes: 6 additions & 6 deletions src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
listener::Listener,
task::Task,
util::{chrono_duration_to_std, db_error, wait_for_reconnection},
waiter::Waiter,
Error, Result, Step, LOST_CONNECTION_SLEEP,
};
use chrono::Utc;
Expand All @@ -13,19 +13,19 @@ use tracing::{debug, error, trace, warn};
/// A worker for processing tasks
pub struct Worker<T> {
db: PgPool,
waiter: Waiter,
listener: Listener,
tasks: PhantomData<T>,
concurrency: usize,
}

impl<S: Step<S>> Worker<S> {
/// Creates a new worker
pub fn new(db: PgPool) -> Self {
let waiter = Waiter::new();
let listener = Listener::new();
let concurrency = num_cpus::get();
Self {
db,
waiter,
listener,
concurrency,
tasks: PhantomData,
}
Expand All @@ -40,7 +40,7 @@ impl<S: Step<S>> Worker<S> {
/// Runs all ready tasks to completion and waits for new ones
pub async fn run(&self) -> Result<()> {
self.unlock_stale_tasks().await?;
self.waiter.listen(self.db.clone()).await?;
self.listener.listen(self.db.clone()).await?;

let semaphore = Arc::new(Semaphore::new(self.concurrency));
loop {
Expand Down Expand Up @@ -95,7 +95,7 @@ impl<S: Step<S>> Worker<S> {
trace!("Receiving the next task");

loop {
let table_changes = self.waiter.subscribe();
let table_changes = self.listener.subscribe();
let mut tx = self.db.begin().await.map_err(db_error!("begin"))?;
if let Some(task) = Task::fetch_closest(&mut tx).await? {
let time_to_run = task.wakeup_at - Utc::now();
Expand Down

0 comments on commit 47a84a8

Please sign in to comment.