From 3c36bb9547a61bf6937d80c238f67eebd26b9fbc Mon Sep 17 00:00:00 2001 From: DarkSky Date: Wed, 13 Sep 2023 17:24:14 +0800 Subject: [PATCH 1/8] feat: optional client param for history --- libs/jwst-codec/src/doc/history.rs | 58 ++++++++++++---------- libs/jwst-codec/src/doc/publisher.rs | 2 +- libs/jwst-core/src/lib.rs | 2 +- libs/jwst-core/src/workspaces/workspace.rs | 4 +- 4 files changed, 36 insertions(+), 30 deletions(-) diff --git a/libs/jwst-codec/src/doc/history.rs b/libs/jwst-codec/src/doc/history.rs index 1eb92d3c..4bc7d905 100644 --- a/libs/jwst-codec/src/doc/history.rs +++ b/libs/jwst-codec/src/doc/history.rs @@ -16,8 +16,11 @@ enum ParentNode { #[derive(Clone, Default)] pub struct HistoryOptions { - skip: Option, - limit: Option, + pub client: Option, + /// Only available when client is set + pub skip: Option, + /// Only available when client is set + pub limit: Option, } #[derive(Debug, Clone, Default)] @@ -57,7 +60,7 @@ impl StoreHistory { } } - pub fn parse_update(&self, update: &Update, client: u64) -> Vec { + pub fn parse_update(&self, update: &Update) -> Vec { let store_items = SortedNodes::new(update.structs.iter().collect::>()) .filter_map(|n| n.as_item().get().cloned()) .collect::>(); @@ -66,20 +69,28 @@ impl StoreHistory { let mut store_items = store_items.iter().collect::>(); store_items.sort_by(|a, b| a.id.cmp(&b.id)); - self.parse_items(store_items, client) + self.parse_items(store_items) } - pub fn parse_store(&self, client: u64, options: HistoryOptions) -> Vec { + pub fn parse_store(&self, options: HistoryOptions) -> Vec { let store_items = { let store = self.store.read().unwrap(); let mut sort_iter: Box> = Box::new( - SortedNodes::new(store.items.iter().collect::>()).filter_map(|n| n.as_item().get().cloned()), + SortedNodes::new(if let Some(client) = options.client.as_ref() { + store.items.get(client).map(|i| vec![(client, i)]).unwrap_or_default() + } else { + store.items.iter().collect::>() + }) + .filter_map(|n| n.as_item().get().cloned()), ); - if let Some(skip) = options.skip { - sort_iter = Box::new(sort_iter.skip(skip)); - } - if let Some(limit) = options.limit { - sort_iter = Box::new(sort_iter.take(limit)); + if options.client.is_some() { + // skip and limit only available when client is set + if let Some(skip) = options.skip { + sort_iter = Box::new(sort_iter.skip(skip)); + } + if let Some(limit) = options.limit { + sort_iter = Box::new(sort_iter.take(limit)); + } } sort_iter.collect::>() @@ -89,10 +100,10 @@ impl StoreHistory { let mut store_items = store_items.iter().collect::>(); store_items.sort_by(|a, b| a.id.cmp(&b.id)); - self.parse_items(store_items, client) + self.parse_items(store_items) } - fn parse_items(&self, store_items: Vec<&Item>, client: u64) -> Vec { + fn parse_items(&self, store_items: Vec<&Item>) -> Vec { let parents = self.parents.read().unwrap(); let mut histories = vec![]; @@ -101,15 +112,13 @@ impl StoreHistory { continue; } - if item.id.client == client || client == 0 { - histories.push(History { - id: item.id.to_string(), - parent: Self::parse_path(item, &parents), - content: Value::try_from(item.content.as_ref()) - .map(|v| v.to_string()) - .unwrap_or("unknown".to_owned()), - }) - } + histories.push(History { + id: item.id.to_string(), + parent: Self::parse_path(item, &parents), + content: Value::try_from(item.content.as_ref()) + .map(|v| v.to_string()) + .unwrap_or("unknown".to_owned()), + }) } histories @@ -250,10 +259,7 @@ mod test { let update = doc.encode_update().unwrap(); - assert_eq!( - history.parse_store(0, Default::default()), - history.parse_update(&update, 0) - ); + assert_eq!(history.parse_store(Default::default()), history.parse_update(&update,)); }); } } diff --git a/libs/jwst-codec/src/doc/publisher.rs b/libs/jwst-codec/src/doc/publisher.rs index 325d1dc7..b31cbe9c 100644 --- a/libs/jwst-codec/src/doc/publisher.rs +++ b/libs/jwst-codec/src/doc/publisher.rs @@ -80,7 +80,7 @@ impl DocPublisher { Ok(update) => { drop(store); - let history = history.parse_update(&update, 0); + let history = history.parse_update(&update); let mut encoder = RawEncoder::default(); if let Err(e) = update.write(&mut encoder) { diff --git a/libs/jwst-core/src/lib.rs b/libs/jwst-core/src/lib.rs index 63fbac05..a61923b4 100644 --- a/libs/jwst-core/src/lib.rs +++ b/libs/jwst-core/src/lib.rs @@ -7,7 +7,7 @@ mod workspaces; pub mod constants; pub use block::Block; -pub use jwst_codec::Any; +pub use jwst_codec::{Any, HistoryOptions}; pub use space::Space; pub use tracing::{debug, error, info, log::LevelFilter, trace, warn}; pub use types::{BlobMetadata, BlobStorage, BucketBlobStorage, DocStorage, JwstError, JwstResult}; diff --git a/libs/jwst-core/src/workspaces/workspace.rs b/libs/jwst-core/src/workspaces/workspace.rs index aba48e46..f388634d 100644 --- a/libs/jwst-core/src/workspaces/workspace.rs +++ b/libs/jwst-core/src/workspaces/workspace.rs @@ -81,8 +81,8 @@ impl Workspace { self.doc.clients() } - pub fn history(&self, client: u64, options: HistoryOptions) -> Vec { - self.doc.history().parse_store(client, options) + pub fn history(&self, options: HistoryOptions) -> Vec { + self.doc.history().parse_store(options) } } From 4795c2a45c4f11a0d64730a88e50b83261b09a2c Mon Sep 17 00:00:00 2001 From: DarkSky Date: Wed, 13 Sep 2023 18:55:57 +0800 Subject: [PATCH 2/8] feat: prefer smaller random client id & workspace history api --- Cargo.lock | 11 ++++++ apps/doc_merger/src/main.rs | 4 +-- apps/keck/src/server/api/blocks/mod.rs | 19 +++++----- apps/keck/src/server/api/blocks/workspace.rs | 37 ++++++++++++-------- apps/keck/src/server/api/doc.rs | 2 +- libs/jwst-codec/Cargo.toml | 1 + libs/jwst-codec/src/doc/document.rs | 20 ++++++++++- libs/jwst-codec/src/doc/history.rs | 8 +++-- 8 files changed, 74 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b4af04f5..1667a01f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2311,6 +2311,7 @@ dependencies = [ "proptest-derive", "rand 0.8.5", "rand_chacha 0.3.1", + "rand_distr", "serde", "serde_json", "thiserror", @@ -3522,6 +3523,16 @@ dependencies = [ "getrandom 0.2.10", ] +[[package]] +name = "rand_distr" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32cb0b9bc82b0a0876c2dd994a7e7a2683d3e7390ca40e6886785ef0c7e3ee31" +dependencies = [ + "num-traits", + "rand 0.8.5", +] + [[package]] name = "rand_hc" version = "0.2.0" diff --git a/apps/doc_merger/src/main.rs b/apps/doc_merger/src/main.rs index f747cc0c..ebcb13e1 100644 --- a/apps/doc_merger/src/main.rs +++ b/apps/doc_merger/src/main.rs @@ -74,8 +74,8 @@ fn jwst_merge(path: &str, output: &str) { println!("status: {:?}", doc.store_status()); } let ts = Instant::now(); - let history = doc.history().parse_store(0, Default::default()); - println!("history1: {:?}", ts.elapsed()); + let history = doc.history().parse_store(Default::default()); + println!("history: {:?}", ts.elapsed()); for history in history.iter().take(100) { println!("history: {:?}", history); } diff --git a/apps/keck/src/server/api/blocks/mod.rs b/apps/keck/src/server/api/blocks/mod.rs index 35bb2242..390e2d60 100644 --- a/apps/keck/src/server/api/blocks/mod.rs +++ b/apps/keck/src/server/api/blocks/mod.rs @@ -10,26 +10,29 @@ pub use workspace::{delete_workspace, get_workspace, set_workspace, subscribe_wo use super::*; fn block_apis(router: Router) -> Router { - let block_operation = Router::new() + let children_apis = Router::new() .route( "/children", get(block::get_block_children).post(block::insert_block_children), ) .route("/children/:children", delete(block::remove_block_children)); + let block_apis = Router::new().route( + "/", + get(block::get_block).post(block::set_block).delete(block::delete_block), + ); + doc_apis(router) - .nest("/block/:workspace/:block/", block_operation) - .route( - "/block/:workspace/:block", - get(block::get_block).post(block::set_block).delete(block::delete_block), - ) + .nest("/block/:workspace/:block/", children_apis) + .nest("/block/:workspace/:block/", block_apis.clone()) + .nest("/block/:workspace/:block", block_apis) } fn workspace_apis(router: Router) -> Router { router .route("/block/:workspace/client", get(workspace::workspace_client)) - .route("/block/:workspace/history", get(workspace::history_workspace_clients)) - .route("/block/:workspace/history/:client", get(workspace::history_workspace)) + .route("/block/:workspace/clients", get(workspace::workspace_clients)) + .route("/block/:workspace/history", get(workspace::history_workspace)) .route( "/block/:workspace", get(workspace::get_workspace) diff --git a/apps/keck/src/server/api/blocks/workspace.rs b/apps/keck/src/server/api/blocks/workspace.rs index 9fbf1850..2c279679 100644 --- a/apps/keck/src/server/api/blocks/workspace.rs +++ b/apps/keck/src/server/api/blocks/workspace.rs @@ -2,7 +2,7 @@ use axum::{ extract::{Path, Query}, response::Response, }; -use jwst_core::DocStorage; +use jwst_core::{DocStorage, HistoryOptions}; use utoipa::IntoParams; use super::*; @@ -280,10 +280,7 @@ pub async fn get_workspace_block( (status = 500, description = "Failed to get workspace history") ) )] -pub async fn history_workspace_clients( - Extension(context): Extension>, - Path(workspace): Path, -) -> Response { +pub async fn workspace_clients(Extension(context): Extension>, Path(workspace): Path) -> Response { if let Ok(workspace) = context.get_workspace(&workspace).await { Json(workspace.clients()).into_response() } else { @@ -291,6 +288,17 @@ pub async fn history_workspace_clients( } } +/// Block History Options +#[derive(Deserialize, IntoParams)] +pub struct BlockHistoryQuery { + /// client id, is give 0 or empty then return all clients histories + client: Option, + /// skip count, available when client is set + skip: Option, + /// limit count, available when client is set + limit: Option, +} + /// Get the history generated by a specific `Client ID` of the `Workspace` /// /// If client id set to 0, return all history of the `Workspace`. @@ -298,10 +306,10 @@ pub async fn history_workspace_clients( get, tag = "Workspace", context_path = "/api/block", - path = "/{workspace}/history/{client}", + path = "/{workspace}/history", params( ("workspace", description = "workspace id"), - ("client", description = "client id, is give 0 then return all clients histories"), + BlockHistoryQuery, ), responses( (status = 200, description = "Get workspace history", body = [History]), @@ -311,15 +319,16 @@ pub async fn history_workspace_clients( )] pub async fn history_workspace( Extension(context): Extension>, - Path(params): Path<(String, String)>, + Path(ws_id): Path, + query: Query, ) -> Response { - let (ws_id, client) = params; if let Ok(workspace) = context.get_workspace(&ws_id).await { - if let Ok(client) = client.parse::() { - Json(workspace.history(client, Default::default())).into_response() - } else { - StatusCode::BAD_REQUEST.into_response() - } + Json(workspace.history(HistoryOptions { + client: query.client, + skip: query.skip, + limit: query.limit, + })) + .into_response() } else { (StatusCode::NOT_FOUND, format!("Workspace({ws_id:?}) not found")).into_response() } diff --git a/apps/keck/src/server/api/doc.rs b/apps/keck/src/server/api/doc.rs index 87663cb1..745beb2c 100644 --- a/apps/keck/src/server/api/doc.rs +++ b/apps/keck/src/server/api/doc.rs @@ -15,7 +15,7 @@ use super::{ workspace::set_workspace, workspace::delete_workspace, workspace::workspace_client, - workspace::history_workspace_clients, + workspace::workspace_clients, workspace::history_workspace, workspace::get_workspace_block, // workspace::workspace_search, diff --git a/libs/jwst-codec/Cargo.toml b/libs/jwst-codec/Cargo.toml index 8cbf273f..9ac5ac93 100644 --- a/libs/jwst-codec/Cargo.toml +++ b/libs/jwst-codec/Cargo.toml @@ -15,6 +15,7 @@ nom = "7.1.3" ordered-float = "3.6.0" rand = "0.8.5" rand_chacha = "0.3.1" +rand_distr = "0.4.3" serde = { version = "1.0.183" } serde_json = "1.0.105" thiserror = "1.0.40" diff --git a/libs/jwst-codec/src/doc/document.rs b/libs/jwst-codec/src/doc/document.rs index 0eef0479..5e3cfd3a 100644 --- a/libs/jwst-codec/src/doc/document.rs +++ b/libs/jwst-codec/src/doc/document.rs @@ -42,8 +42,26 @@ impl Default for DocOptions { gc: true, } } else { + /// It tends to generate small numbers. + /// Since the client id will be included in all crdt items, the + /// small client helps to reduce the binary size. + /// + /// NOTE: The probability of 36% of the random number generated by + /// this function is greater than [u32::MAX] + fn prefer_small_random() -> u64 { + use rand::{distributions::Distribution, thread_rng}; + use rand_distr::Exp; + + let scale_factor = u16::MAX as f64; + let v: f64 = Exp::new(1.0 / scale_factor) + .map(|exp| exp.sample(&mut thread_rng())) + .unwrap_or_else(|_| rand::random()); + + (v * scale_factor) as u64 + } + Self { - client_id: rand::random(), + client_id: prefer_small_random(), guid: nanoid::nanoid!(), gc: true, } diff --git a/libs/jwst-codec/src/doc/history.rs b/libs/jwst-codec/src/doc/history.rs index 4bc7d905..976ae9ee 100644 --- a/libs/jwst-codec/src/doc/history.rs +++ b/libs/jwst-codec/src/doc/history.rs @@ -74,16 +74,20 @@ impl StoreHistory { pub fn parse_store(&self, options: HistoryOptions) -> Vec { let store_items = { + let client = options + .client + .as_ref() + .and_then(|client| client.ne(&0).then_some(client)); let store = self.store.read().unwrap(); let mut sort_iter: Box> = Box::new( - SortedNodes::new(if let Some(client) = options.client.as_ref() { + SortedNodes::new(if let Some(client) = client { store.items.get(client).map(|i| vec![(client, i)]).unwrap_or_default() } else { store.items.iter().collect::>() }) .filter_map(|n| n.as_item().get().cloned()), ); - if options.client.is_some() { + if client.is_some() { // skip and limit only available when client is set if let Some(skip) = options.skip { sort_iter = Box::new(sort_iter.skip(skip)); From f11ffb938d55a3eae3f9be1a02167a89cc0a0b00 Mon Sep 17 00:00:00 2001 From: DarkSky Date: Wed, 13 Sep 2023 23:07:36 +0800 Subject: [PATCH 3/8] feat: split keck apis & catch panic if subscribe callback throw --- apps/keck/src/server/api/blocks/clients.rs | 69 ++++++++++++++++++++++ apps/keck/src/server/api/blocks/history.rs | 54 +++++++++++++++++ apps/keck/src/server/api/blocks/mod.rs | 12 ++-- libs/jwst-codec/src/doc/publisher.rs | 9 ++- 4 files changed, 138 insertions(+), 6 deletions(-) create mode 100644 apps/keck/src/server/api/blocks/clients.rs create mode 100644 apps/keck/src/server/api/blocks/history.rs diff --git a/apps/keck/src/server/api/blocks/clients.rs b/apps/keck/src/server/api/blocks/clients.rs new file mode 100644 index 00000000..a4ef747d --- /dev/null +++ b/apps/keck/src/server/api/blocks/clients.rs @@ -0,0 +1,69 @@ +use axum::{extract::Path, response::Response}; + +use super::*; + +/// Get current client id of server +/// +/// When the server initializes or get the `Workspace`, a `Client` will be +/// created. This `Client` will not be destroyed until the server restarts. +/// Therefore, the `Client ID` in the history generated by modifying `Block` +/// through HTTP API will remain unchanged until the server restarts. +/// +/// This interface return the client id that server will used. +#[utoipa::path( + get, + tag = "Workspace", + context_path = "/api/block", + path = "/{workspace}/client", + params( + ("workspace", description = "workspace id"), + ), + responses( + (status = 200, description = "Get workspace client id", body = u64), + (status = 404, description = "Workspace not found") + ) +)] +pub async fn workspace_client(Extension(context): Extension>, Path(workspace): Path) -> Response { + if let Ok(workspace) = context.get_workspace(&workspace).await { + Json(workspace.client_id()).into_response() + } else { + (StatusCode::NOT_FOUND, format!("Workspace({workspace:?}) not found")).into_response() + } +} + +/// Get all client ids of the `Workspace` +/// +/// This interface returns all `Client IDs` that includes history in the +/// `Workspace` +/// +/// Every client write something into a `Workspace` will has a unique id. +/// +/// For example: +/// - A user writes a new `Block` to a `Workspace` through `Client` on the +/// front end, which will generate a series of histories. A `Client ID` +/// contained in these histories will be randomly generated by the `Client` +/// and will remain unchanged until the Client instance is destroyed +/// - When the server initializes or get the `Workspace`, a `Client` will be +/// created. This `Client` will not be destroyed until the server restarts. +/// Therefore, the `Client ID` in the history generated by modifying `Block` +/// through HTTP API will remain unchanged until the server restarts. +#[utoipa::path( + get, + tag = "Workspace", + context_path = "/api/block", + path = "/{workspace}/clients", + params( + ("workspace", description = "workspace id"), + ), + responses( + (status = 200, description = "Get workspace client ids", body = [u64]), + (status = 500, description = "Failed to get workspace client ids") + ) +)] +pub async fn workspace_clients(Extension(context): Extension>, Path(workspace): Path) -> Response { + if let Ok(workspace) = context.get_workspace(&workspace).await { + Json(workspace.clients()).into_response() + } else { + (StatusCode::NOT_FOUND, format!("Workspace({workspace:?}) not found")).into_response() + } +} diff --git a/apps/keck/src/server/api/blocks/history.rs b/apps/keck/src/server/api/blocks/history.rs new file mode 100644 index 00000000..25e99d67 --- /dev/null +++ b/apps/keck/src/server/api/blocks/history.rs @@ -0,0 +1,54 @@ +use axum::{ + extract::{Path, Query}, + response::Response, +}; +use jwst_core::HistoryOptions; +use utoipa::IntoParams; + +use super::*; + +/// Block History Options +#[derive(Deserialize, IntoParams)] +pub struct BlockHistoryQuery { + /// client id, is give 0 or empty then return all clients histories + client: Option, + /// skip count, available when client is set + skip: Option, + /// limit count, available when client is set + limit: Option, +} + +/// Get the history generated by a specific `Client ID` of the `Workspace` +/// +/// If client id set to 0, return all history of the `Workspace`. +#[utoipa::path( + get, + tag = "Workspace", + context_path = "/api/block", + path = "/{workspace}/history", + params( + ("workspace", description = "workspace id"), + BlockHistoryQuery, + ), + responses( + (status = 200, description = "Get workspace history", body = [History]), + (status = 400, description = "Client id invalid"), + (status = 500, description = "Failed to get workspace history") + ) +)] +pub async fn history_workspace( + Extension(context): Extension>, + Path(ws_id): Path, + query: Query, +) -> Response { + if let Ok(workspace) = context.get_workspace(&ws_id).await { + Json(workspace.history(HistoryOptions { + client: query.client, + skip: query.skip, + limit: query.limit, + })) + .into_response() + } else { + (StatusCode::NOT_FOUND, format!("Workspace({ws_id:?}) not found")).into_response() + } +} diff --git a/apps/keck/src/server/api/blocks/mod.rs b/apps/keck/src/server/api/blocks/mod.rs index 390e2d60..b668eaec 100644 --- a/apps/keck/src/server/api/blocks/mod.rs +++ b/apps/keck/src/server/api/blocks/mod.rs @@ -1,4 +1,6 @@ pub mod block; +pub mod clients; +pub mod history; pub mod schema; pub mod workspace; @@ -30,9 +32,9 @@ fn block_apis(router: Router) -> Router { fn workspace_apis(router: Router) -> Router { router - .route("/block/:workspace/client", get(workspace::workspace_client)) - .route("/block/:workspace/clients", get(workspace::workspace_clients)) - .route("/block/:workspace/history", get(workspace::history_workspace)) + .route("/block/:workspace/client", get(clients::workspace_client)) + .route("/block/:workspace/clients", get(clients::workspace_clients)) + .route("/block/:workspace/history", get(history::history_workspace)) .route( "/block/:workspace", get(workspace::get_workspace) @@ -105,8 +107,8 @@ mod tests { resp.text().await.parse::().unwrap(), ctx.storage.get_workspace("test").await.unwrap().client_id() ); - // let resp = client.get("/block/test/history").send().await; - // assert_eq!(resp.json::>().await, Vec::::new()); + let resp = client.get("/block/test/clients").send().await; + assert_eq!(resp.json::>().await, Vec::::new()); let resp = client.get("/block/test").send().await; assert_eq!(resp.status(), StatusCode::OK); let resp = client.delete("/block/test").send().await; diff --git a/libs/jwst-codec/src/doc/publisher.rs b/libs/jwst-codec/src/doc/publisher.rs index b31cbe9c..7a4b2dee 100644 --- a/libs/jwst-codec/src/doc/publisher.rs +++ b/libs/jwst-codec/src/doc/publisher.rs @@ -98,7 +98,14 @@ impl DocPublisher { last_update = update; for cb in subscribers.iter() { - cb(&binary, &history); + use std::panic::{catch_unwind, AssertUnwindSafe}; + // catch panic if callback throw + catch_unwind(AssertUnwindSafe(|| { + cb(&binary, &history); + })) + .unwrap_or_else(|e| { + warn!("Failed to call subscriber: {:?}", e); + }); } } else { drop(store); From c6866bd3d22be26370c013d34916db655630e876 Mon Sep 17 00:00:00 2001 From: DarkSky Date: Wed, 13 Sep 2023 23:10:01 +0800 Subject: [PATCH 4/8] fix: docs --- apps/keck/src/server/api/blocks/mod.rs | 2 +- apps/keck/src/server/api/blocks/workspace.rs | 114 +------------------ apps/keck/src/server/api/doc.rs | 8 +- 3 files changed, 6 insertions(+), 118 deletions(-) diff --git a/apps/keck/src/server/api/blocks/mod.rs b/apps/keck/src/server/api/blocks/mod.rs index b668eaec..4831f0c5 100644 --- a/apps/keck/src/server/api/blocks/mod.rs +++ b/apps/keck/src/server/api/blocks/mod.rs @@ -7,7 +7,7 @@ pub mod workspace; pub use block::{delete_block, get_block, insert_block_children, remove_block_children, set_block}; use schema::InsertChildren; pub use schema::SubscribeWorkspace; -pub use workspace::{delete_workspace, get_workspace, set_workspace, subscribe_workspace, workspace_client}; +pub use workspace::{delete_workspace, get_workspace, set_workspace, subscribe_workspace}; use super::*; diff --git a/apps/keck/src/server/api/blocks/workspace.rs b/apps/keck/src/server/api/blocks/workspace.rs index 2c279679..11ee2cf9 100644 --- a/apps/keck/src/server/api/blocks/workspace.rs +++ b/apps/keck/src/server/api/blocks/workspace.rs @@ -2,7 +2,7 @@ use axum::{ extract::{Path, Query}, response::Response, }; -use jwst_core::{DocStorage, HistoryOptions}; +use jwst_core::DocStorage; use utoipa::IntoParams; use super::*; @@ -88,35 +88,6 @@ pub async fn delete_workspace(Extension(context): Extension>, Path( StatusCode::NO_CONTENT.into_response() } -/// Get current client id of server -/// -/// When the server initializes or get the `Workspace`, a `Client` will be -/// created. This `Client` will not be destroyed until the server restarts. -/// Therefore, the `Client ID` in the history generated by modifying `Block` -/// through HTTP API will remain unchanged until the server restarts. -/// -/// This interface return the client id that server will used. -#[utoipa::path( - get, - tag = "Workspace", - context_path = "/api/block", - path = "/{workspace}/client", - params( - ("workspace", description = "workspace id"), - ), - responses( - (status = 200, description = "Get workspace client id", body = u64), - (status = 404, description = "Workspace not found") - ) -)] -pub async fn workspace_client(Extension(context): Extension>, Path(workspace): Path) -> Response { - if let Ok(workspace) = context.get_workspace(&workspace).await { - Json(workspace.client_id()).into_response() - } else { - (StatusCode::NOT_FOUND, format!("Workspace({workspace:?}) not found")).into_response() - } -} - /// Block search query // See doc for using utoipa search queries example here: https://github.com/juhaku/utoipa/blob/6c7f6a2d/examples/todo-axum/src/main.rs#L124-L130 #[derive(Deserialize, IntoParams)] @@ -251,89 +222,6 @@ pub async fn get_workspace_block( } } -/// Get all client ids of the `Workspace` -/// -/// This interface returns all `Client IDs` that includes history in the -/// `Workspace` -/// -/// Every client write something into a `Workspace` will has a unique id. -/// -/// For example: -/// - A user writes a new `Block` to a `Workspace` through `Client` on the -/// front end, which will generate a series of histories. A `Client ID` -/// contained in these histories will be randomly generated by the `Client` -/// and will remain unchanged until the Client instance is destroyed -/// - When the server initializes or get the `Workspace`, a `Client` will be -/// created. This `Client` will not be destroyed until the server restarts. -/// Therefore, the `Client ID` in the history generated by modifying `Block` -/// through HTTP API will remain unchanged until the server restarts. -#[utoipa::path( - get, - tag = "Workspace", - context_path = "/api/block", - path = "/{workspace}/history", - params( - ("workspace", description = "workspace id"), - ), - responses( - (status = 200, description = "Get workspace history client ids", body = [u64]), - (status = 500, description = "Failed to get workspace history") - ) -)] -pub async fn workspace_clients(Extension(context): Extension>, Path(workspace): Path) -> Response { - if let Ok(workspace) = context.get_workspace(&workspace).await { - Json(workspace.clients()).into_response() - } else { - (StatusCode::NOT_FOUND, format!("Workspace({workspace:?}) not found")).into_response() - } -} - -/// Block History Options -#[derive(Deserialize, IntoParams)] -pub struct BlockHistoryQuery { - /// client id, is give 0 or empty then return all clients histories - client: Option, - /// skip count, available when client is set - skip: Option, - /// limit count, available when client is set - limit: Option, -} - -/// Get the history generated by a specific `Client ID` of the `Workspace` -/// -/// If client id set to 0, return all history of the `Workspace`. -#[utoipa::path( - get, - tag = "Workspace", - context_path = "/api/block", - path = "/{workspace}/history", - params( - ("workspace", description = "workspace id"), - BlockHistoryQuery, - ), - responses( - (status = 200, description = "Get workspace history", body = [History]), - (status = 400, description = "Client id invalid"), - (status = 500, description = "Failed to get workspace history") - ) -)] -pub async fn history_workspace( - Extension(context): Extension>, - Path(ws_id): Path, - query: Query, -) -> Response { - if let Ok(workspace) = context.get_workspace(&ws_id).await { - Json(workspace.history(HistoryOptions { - client: query.client, - skip: query.skip, - limit: query.limit, - })) - .into_response() - } else { - (StatusCode::NOT_FOUND, format!("Workspace({ws_id:?}) not found")).into_response() - } -} - /// Register a webhook for all block changes from all workspace changes #[utoipa::path( post, diff --git a/apps/keck/src/server/api/doc.rs b/apps/keck/src/server/api/doc.rs index 745beb2c..749e6eb7 100644 --- a/apps/keck/src/server/api/doc.rs +++ b/apps/keck/src/server/api/doc.rs @@ -4,19 +4,19 @@ use utoipa_swagger_ui::{serve, Config, Url}; use super::{ blobs, - blocks::{block, schema, workspace}, + blocks::{block, clients, history, schema, workspace}, *, }; #[derive(OpenApi)] #[openapi( paths( + clients::workspace_client, + clients::workspace_clients, + history::history_workspace, workspace::get_workspace, workspace::set_workspace, workspace::delete_workspace, - workspace::workspace_client, - workspace::workspace_clients, - workspace::history_workspace, workspace::get_workspace_block, // workspace::workspace_search, // workspace::set_search_index, From a0a23ffd4cd65aad46ad34b4b4cf89c2bf2451cd Mon Sep 17 00:00:00 2001 From: DarkSky Date: Wed, 13 Sep 2023 23:53:36 +0800 Subject: [PATCH 5/8] feat: webhook for workspace histories --- apps/keck/src/server/api/blocks/block.rs | 2 +- apps/keck/src/server/api/blocks/mod.rs | 22 ++----- apps/keck/src/server/api/blocks/schema.rs | 11 +--- apps/keck/src/server/api/blocks/subscribe.rs | 61 ++++++++++++++++++++ apps/keck/src/server/api/blocks/workspace.rs | 29 ---------- apps/keck/src/server/api/doc.rs | 10 ++-- apps/keck/src/server/api/mod.rs | 39 ++++++++++++- apps/keck/src/server/mod.rs | 2 - libs/jwst-codec/src/doc/document.rs | 4 ++ libs/jwst-codec/src/doc/history.rs | 4 +- libs/jwst-codec/src/doc/publisher.rs | 4 ++ libs/jwst-core/src/lib.rs | 2 +- libs/jwst-core/src/workspaces/observe.rs | 10 +++- 13 files changed, 128 insertions(+), 72 deletions(-) create mode 100644 apps/keck/src/server/api/blocks/subscribe.rs diff --git a/apps/keck/src/server/api/blocks/block.rs b/apps/keck/src/server/api/blocks/block.rs index 9a8986ff..977550ae 100644 --- a/apps/keck/src/server/api/blocks/block.rs +++ b/apps/keck/src/server/api/blocks/block.rs @@ -2,7 +2,7 @@ use axum::{extract::Query, response::Response}; use jwst_core::{constants, Any}; use serde_json::Value as JsonValue; -use super::*; +use super::{schema::InsertChildren, *}; /// Get a `Block` by id /// - Return 200 and `Block`'s data if `Block` is exists. diff --git a/apps/keck/src/server/api/blocks/mod.rs b/apps/keck/src/server/api/blocks/mod.rs index 4831f0c5..683947b3 100644 --- a/apps/keck/src/server/api/blocks/mod.rs +++ b/apps/keck/src/server/api/blocks/mod.rs @@ -2,13 +2,9 @@ pub mod block; pub mod clients; pub mod history; pub mod schema; +pub mod subscribe; pub mod workspace; -pub use block::{delete_block, get_block, insert_block_children, remove_block_children, set_block}; -use schema::InsertChildren; -pub use schema::SubscribeWorkspace; -pub use workspace::{delete_workspace, get_workspace, set_workspace, subscribe_workspace}; - use super::*; fn block_apis(router: Router) -> Router { @@ -48,7 +44,8 @@ fn workspace_apis(router: Router) -> Router { // "/search/:workspace/index", // get(workspace::get_search_index).post(workspace::set_search_index), // ) - .route("/subscribe", post(subscribe_workspace)) + .route("/subscribe", post(subscribe::subscribe_workspace)) + .route("/hook", post(subscribe::subscribe_test_hook)) } pub fn blocks_apis(router: Router) -> Router { @@ -78,10 +75,6 @@ mod tests { #[tokio::test] async fn test_workspace_apis() { - let client = Arc::new(reqwest::Client::builder().no_proxy().build().unwrap()); - - let hook_endpoint = Arc::new(RwLock::new(String::new())); - let ctx = Arc::new( Context::new( JwstStorage::new_with_migration("sqlite::memory:", BlobStorageType::DB) @@ -90,12 +83,7 @@ mod tests { ) .await, ); - let client = TestClient::new( - workspace_apis(Router::new()) - .layer(Extension(ctx.clone())) - .layer(Extension(client.clone())) - .layer(Extension(hook_endpoint.clone())), - ); + let client = TestClient::new(workspace_apis(Router::new()).layer(Extension(ctx.clone()))); // basic workspace apis let resp = client.get("/block/test").send().await; @@ -140,7 +128,7 @@ mod tests { // assert_eq!(index, vec!["test".to_owned()]); let body = json!({ - "hookEndpoint": "localhost:3000/api/hook" + "endpoint": "localhost:3000/api/hook" }) .to_string(); let resp = client diff --git a/apps/keck/src/server/api/blocks/schema.rs b/apps/keck/src/server/api/blocks/schema.rs index c12ecf1c..a6391f8a 100644 --- a/apps/keck/src/server/api/blocks/schema.rs +++ b/apps/keck/src/server/api/blocks/schema.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; use utoipa::ToSchema; #[derive(Default, Deserialize, PartialEq, Debug, ToSchema)] @@ -40,12 +40,3 @@ pub enum InsertChildren { InsertAfter { id: String, after: String }, InsertAt { id: String, pos: u64 }, } - -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] -#[schema(example=json!({ - "hookEndpoint": "http://localhost:3000/api/hooks" -}))] -pub struct SubscribeWorkspace { - #[serde(rename = "hookEndpoint")] - pub hook_endpoint: String, -} diff --git a/apps/keck/src/server/api/blocks/subscribe.rs b/apps/keck/src/server/api/blocks/subscribe.rs new file mode 100644 index 00000000..fcbd3f15 --- /dev/null +++ b/apps/keck/src/server/api/blocks/subscribe.rs @@ -0,0 +1,61 @@ +use axum::response::Response; +use jwst_core::History; +use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; + +use super::*; + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[schema(example=json!({ + "endpoint": "http://localhost:3000/api/hook" +}))] +pub struct SubscribeWorkspace { + #[serde(rename = "endpoint")] + pub endpoint: String, +} + +/// Register a webhook for all block changes from all workspace changes +#[utoipa::path( + post, + tag = "Workspace", + context_path = "/api/subscribe", + path = "", + request_body( + content_type = "application/json", + content = SubscribeWorkspace, + description = "Provide endpoint of webhook server", + ), + responses( + (status = 200, description = "Subscribe workspace succeed"), + (status = 500, description = "Internal Server Error") + ) +)] +pub async fn subscribe_workspace( + Extension(context): Extension>, + Json(payload): Json, +) -> Response { + info!("subscribe all workspaces, hook endpoint: {}", payload.endpoint); + context.set_webhook(payload.endpoint); + info!("successfully subscribed all workspaces"); + StatusCode::OK.into_response() +} + +/// The webhook receiver for debug history subscribe feature +#[utoipa::path( + post, + tag = "Workspace", + context_path = "/api/hook", + path = "", + request_body( + content_type = "application/json", + content = [History], + description = "Histories of block changes" + ), + responses( + (status = 200, description = "Histories received"), + ) +)] +pub async fn subscribe_test_hook(Json(payload): Json>) -> Response { + info!("webhook receive {} histories", payload.len()); + StatusCode::OK.into_response() +} diff --git a/apps/keck/src/server/api/blocks/workspace.rs b/apps/keck/src/server/api/blocks/workspace.rs index 11ee2cf9..7601d09a 100644 --- a/apps/keck/src/server/api/blocks/workspace.rs +++ b/apps/keck/src/server/api/blocks/workspace.rs @@ -6,7 +6,6 @@ use jwst_core::DocStorage; use utoipa::IntoParams; use super::*; -use crate::server::api::blocks::SubscribeWorkspace; /// Get a exists `Workspace` by id /// - Return 200 Ok and `Workspace`'s data if `Workspace` is exists. @@ -222,34 +221,6 @@ pub async fn get_workspace_block( } } -/// Register a webhook for all block changes from all workspace changes -#[utoipa::path( - post, - tag = "Workspace", - context_path = "/api/subscribe", - path = "", - request_body( - content_type = "application/json", - content = SubscribeWorkspace, - description = "Provide endpoint of webhook server", - ), - responses( - (status = 200, description = "Subscribe workspace succeed"), - (status = 500, description = "Internal Server Error") - ) -)] -pub async fn subscribe_workspace( - Extension(hook_endpoint): Extension>>, - Json(payload): Json, -) -> Response { - info!("subscribe all workspaces, hook endpoint: {}", payload.hook_endpoint); - - let mut write_guard = hook_endpoint.write().await; - *write_guard = payload.hook_endpoint.clone(); - info!("successfully subscribed all workspaces"); - StatusCode::OK.into_response() -} - #[cfg(all(test, feature = "sqlite"))] mod test { use super::*; diff --git a/apps/keck/src/server/api/doc.rs b/apps/keck/src/server/api/doc.rs index 749e6eb7..7130ce4a 100644 --- a/apps/keck/src/server/api/doc.rs +++ b/apps/keck/src/server/api/doc.rs @@ -4,7 +4,7 @@ use utoipa_swagger_ui::{serve, Config, Url}; use super::{ blobs, - blocks::{block, clients, history, schema, workspace}, + blocks::{block, clients, history, schema, subscribe, workspace}, *, }; @@ -14,6 +14,8 @@ use super::{ clients::workspace_client, clients::workspace_clients, history::history_workspace, + subscribe::subscribe_workspace, + subscribe::subscribe_test_hook, workspace::get_workspace, workspace::set_workspace, workspace::delete_workspace, @@ -21,7 +23,6 @@ use super::{ // workspace::workspace_search, // workspace::set_search_index, // workspace::get_search_index, - workspace::subscribe_workspace, block::get_block, block::get_block_by_flavour, block::set_block, @@ -37,9 +38,8 @@ use super::{ components( schemas( blobs::BlobStatus, schema::InsertChildren, - schema::Workspace, schema::Block, schema::BlockRawHistory, schema::SubscribeWorkspace - // jwst::BlockHistory, jwst::HistoryOperation, jwst::RawHistory, - // jwst::SearchResults, jwst::SearchResult, + schema::Workspace, schema::Block, schema::BlockRawHistory, + subscribe::SubscribeWorkspace ) ), tags( diff --git a/apps/keck/src/server/api/mod.rs b/apps/keck/src/server/api/mod.rs index 31551fdf..393ffaf2 100644 --- a/apps/keck/src/server/api/mod.rs +++ b/apps/keck/src/server/api/mod.rs @@ -43,6 +43,7 @@ pub struct PageData { pub struct Context { channel: BroadcastChannels, storage: JwstStorage, + webhook: Arc>, } impl Context { @@ -67,21 +68,55 @@ impl Context { Context { channel: RwLock::new(HashMap::new()), storage, + webhook: Arc::default(), } } + fn register_webhook(&self, workspace: Workspace) -> Workspace { + if workspace.subscribe_count() == 0 { + let client = reqwest::Client::new(); + let rt = tokio::runtime::Handle::current(); + let webhook = self.webhook.clone(); + workspace.subscribe_doc(move |_, history| { + let webhook = webhook.read().unwrap(); + if webhook.is_empty() { + return; + } + rt.block_on(async { + debug!("send {} histories to webhook {}", history.len(), webhook); + let resp = client.post(webhook.as_str()).json(history).send().await.unwrap(); + if !resp.status().is_success() { + error!("failed to send webhook: {}", resp.status()); + } + }); + }); + } + workspace + } + + pub fn set_webhook(&self, endpoint: String) { + let mut write_guard = self.webhook.write().unwrap(); + *write_guard = endpoint; + } + pub async fn get_workspace(&self, workspace_id: S) -> JwstStorageResult where S: AsRef, { - self.storage.get_workspace(workspace_id).await + self.storage + .get_workspace(workspace_id) + .await + .map(|w| self.register_webhook(w)) } pub async fn create_workspace(&self, workspace_id: S) -> JwstStorageResult where S: AsRef, { - self.storage.create_workspace(workspace_id).await + self.storage + .create_workspace(workspace_id) + .await + .map(|w| self.register_webhook(w)) } } diff --git a/apps/keck/src/server/mod.rs b/apps/keck/src/server/mod.rs index 432e6331..a1b6f6e7 100644 --- a/apps/keck/src/server/mod.rs +++ b/apps/keck/src/server/mod.rs @@ -52,7 +52,6 @@ pub async fn start_server() { .allow_origin(origins) .allow_headers(Any); - let client = Arc::new(reqwest::Client::builder().no_proxy().build().unwrap()); let hook_endpoint = Arc::new(RwLock::new(dotenvy::var("HOOK_ENDPOINT").unwrap_or_default())); let context = Arc::new(Context::new(None).await); @@ -60,7 +59,6 @@ pub async fn start_server() { let app = sync::sync_handler(api::api_handler(Router::new())) .layer(cors) .layer(Extension(context.clone())) - .layer(Extension(client)) .layer(Extension(hook_endpoint)); let addr = SocketAddr::from(( diff --git a/libs/jwst-codec/src/doc/document.rs b/libs/jwst-codec/src/doc/document.rs index 5e3cfd3a..f6710d35 100644 --- a/libs/jwst-codec/src/doc/document.rs +++ b/libs/jwst-codec/src/doc/document.rs @@ -371,6 +371,10 @@ impl Doc { self.publisher.unsubscribe_all(); } + pub fn subscribe_count(&self) -> usize { + self.publisher.count() + } + pub fn gc(&self) -> JwstCodecResult<()> { self.store.write().unwrap().optimize() } diff --git a/libs/jwst-codec/src/doc/history.rs b/libs/jwst-codec/src/doc/history.rs index 976ae9ee..35225a74 100644 --- a/libs/jwst-codec/src/doc/history.rs +++ b/libs/jwst-codec/src/doc/history.rs @@ -3,7 +3,7 @@ use std::{ sync::Arc, }; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use super::{store::StoreRef, *}; use crate::sync::RwLock; @@ -194,7 +194,7 @@ impl StoreHistory { } } -#[derive(Debug, Serialize, PartialEq)] +#[derive(Debug, Serialize, Deserialize, PartialEq)] pub struct History { pub id: String, pub parent: Vec, diff --git a/libs/jwst-codec/src/doc/publisher.rs b/libs/jwst-codec/src/doc/publisher.rs index 7a4b2dee..6591ed3d 100644 --- a/libs/jwst-codec/src/doc/publisher.rs +++ b/libs/jwst-codec/src/doc/publisher.rs @@ -126,6 +126,10 @@ impl DocPublisher { } } + pub(crate) fn count(&self) -> usize { + self.subscribers.read().unwrap().len() + } + pub(crate) fn subscribe(&self, subscriber: impl Fn(&[u8], &[History]) + Send + Sync + 'static) { self.subscribers.write().unwrap().push(Box::new(subscriber)); } diff --git a/libs/jwst-core/src/lib.rs b/libs/jwst-core/src/lib.rs index a61923b4..0f046f4e 100644 --- a/libs/jwst-core/src/lib.rs +++ b/libs/jwst-core/src/lib.rs @@ -7,7 +7,7 @@ mod workspaces; pub mod constants; pub use block::Block; -pub use jwst_codec::{Any, HistoryOptions}; +pub use jwst_codec::{Any, History, HistoryOptions}; pub use space::Space; pub use tracing::{debug, error, info, log::LevelFilter, trace, warn}; pub use types::{BlobMetadata, BlobStorage, BucketBlobStorage, DocStorage, JwstError, JwstResult}; diff --git a/libs/jwst-core/src/workspaces/observe.rs b/libs/jwst-core/src/workspaces/observe.rs index fc972e86..cb5e8c2c 100644 --- a/libs/jwst-core/src/workspaces/observe.rs +++ b/libs/jwst-core/src/workspaces/observe.rs @@ -3,15 +3,19 @@ use jwst_codec::{Awareness, AwarenessEvent, History}; use super::*; impl Workspace { - pub async fn subscribe_awareness(&mut self, f: impl Fn(&Awareness, AwarenessEvent) + Send + Sync + 'static) { + pub async fn subscribe_awareness(&self, f: impl Fn(&Awareness, AwarenessEvent) + Send + Sync + 'static) { self.awareness.write().unwrap().on_update(f); } - pub fn subscribe_doc(&mut self, f: impl Fn(&[u8], &[History]) + Sync + Send + 'static) { + pub fn subscribe_count(&self) -> usize { + self.doc.subscribe_count() + } + + pub fn subscribe_doc(&self, f: impl Fn(&[u8], &[History]) + Sync + Send + 'static) { self.doc.subscribe(f) } - pub fn unsubscribe_all(&mut self) { + pub fn unsubscribe_all(&self) { self.doc.unsubscribe_all(); self.awareness.write().unwrap().on_update(|_, _| {}) } From 957c698667d2e611d22d80a7fdae5e2d4db59c02 Mon Sep 17 00:00:00 2001 From: X1a0t <405028157@qq.com> Date: Thu, 14 Sep 2023 14:09:42 +0800 Subject: [PATCH 6/8] feat: use default webhook endpoint (#537) --- apps/keck/src/server/api/mod.rs | 2 +- apps/keck/src/server/mod.rs | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/apps/keck/src/server/api/mod.rs b/apps/keck/src/server/api/mod.rs index 393ffaf2..de3ddabf 100644 --- a/apps/keck/src/server/api/mod.rs +++ b/apps/keck/src/server/api/mod.rs @@ -68,7 +68,7 @@ impl Context { Context { channel: RwLock::new(HashMap::new()), storage, - webhook: Arc::default(), + webhook: Arc::new(RwLock::new(dotenvy::var("HOOK_ENDPOINT").unwrap_or_default())), } } diff --git a/apps/keck/src/server/mod.rs b/apps/keck/src/server/mod.rs index a1b6f6e7..8c07c819 100644 --- a/apps/keck/src/server/mod.rs +++ b/apps/keck/src/server/mod.rs @@ -52,14 +52,11 @@ pub async fn start_server() { .allow_origin(origins) .allow_headers(Any); - let hook_endpoint = Arc::new(RwLock::new(dotenvy::var("HOOK_ENDPOINT").unwrap_or_default())); - let context = Arc::new(Context::new(None).await); let app = sync::sync_handler(api::api_handler(Router::new())) .layer(cors) .layer(Extension(context.clone())) - .layer(Extension(hook_endpoint)); let addr = SocketAddr::from(( [0, 0, 0, 0], From b5bad45a3ae4ee75b3aea1901b22a2d529a72ddd Mon Sep 17 00:00:00 2001 From: DarkSky Date: Thu, 14 Sep 2023 14:14:21 +0800 Subject: [PATCH 7/8] feat: release subscriber correctly --- apps/keck/src/server/api/mod.rs | 4 +++- libs/jwst-codec/src/doc/publisher.rs | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/apps/keck/src/server/api/mod.rs b/apps/keck/src/server/api/mod.rs index de3ddabf..cde4cc73 100644 --- a/apps/keck/src/server/api/mod.rs +++ b/apps/keck/src/server/api/mod.rs @@ -82,9 +82,11 @@ impl Context { if webhook.is_empty() { return; } + // release the lock before move webhook + let webhook = webhook.clone(); rt.block_on(async { debug!("send {} histories to webhook {}", history.len(), webhook); - let resp = client.post(webhook.as_str()).json(history).send().await.unwrap(); + let resp = client.post(webhook).json(history).send().await.unwrap(); if !resp.status().is_success() { error!("failed to send webhook: {}", resp.status()); } diff --git a/libs/jwst-codec/src/doc/publisher.rs b/libs/jwst-codec/src/doc/publisher.rs index 6591ed3d..d6f35f89 100644 --- a/libs/jwst-codec/src/doc/publisher.rs +++ b/libs/jwst-codec/src/doc/publisher.rs @@ -148,6 +148,7 @@ impl std::fmt::Debug for DocPublisher { impl Drop for DocPublisher { fn drop(&mut self) { self.stop(); + self.unsubscribe_all(); } } From f2c33bf1fafe3ade2b7f535afb109f273e40521a Mon Sep 17 00:00:00 2001 From: DarkSky Date: Thu, 14 Sep 2023 14:16:59 +0800 Subject: [PATCH 8/8] fix: lint --- apps/keck/src/server/api/mod.rs | 4 +++- apps/keck/src/server/mod.rs | 4 ++-- libs/jwst-rpc/src/broadcast.rs | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/apps/keck/src/server/api/mod.rs b/apps/keck/src/server/api/mod.rs index cde4cc73..7b53b5db 100644 --- a/apps/keck/src/server/api/mod.rs +++ b/apps/keck/src/server/api/mod.rs @@ -68,7 +68,9 @@ impl Context { Context { channel: RwLock::new(HashMap::new()), storage, - webhook: Arc::new(RwLock::new(dotenvy::var("HOOK_ENDPOINT").unwrap_or_default())), + webhook: Arc::new(std::sync::RwLock::new( + dotenvy::var("HOOK_ENDPOINT").unwrap_or_default(), + )), } } diff --git a/apps/keck/src/server/mod.rs b/apps/keck/src/server/mod.rs index 8c07c819..8804be4b 100644 --- a/apps/keck/src/server/mod.rs +++ b/apps/keck/src/server/mod.rs @@ -7,7 +7,7 @@ use std::{net::SocketAddr, sync::Arc}; use api::Context; use axum::{http::Method, Extension, Router, Server}; use jwst_core::Workspace; -use tokio::{signal, sync::RwLock}; +use tokio::signal; use tower_http::cors::{Any, CorsLayer}; pub use utils::*; @@ -56,7 +56,7 @@ pub async fn start_server() { let app = sync::sync_handler(api::api_handler(Router::new())) .layer(cors) - .layer(Extension(context.clone())) + .layer(Extension(context.clone())); let addr = SocketAddr::from(( [0, 0, 0, 0], diff --git a/libs/jwst-rpc/src/broadcast.rs b/libs/jwst-rpc/src/broadcast.rs index 300e32a0..9293aee9 100644 --- a/libs/jwst-rpc/src/broadcast.rs +++ b/libs/jwst-rpc/src/broadcast.rs @@ -21,7 +21,7 @@ pub enum BroadcastType { type Broadcast = Sender; pub type BroadcastChannels = RwLock>; -pub async fn subscribe(workspace: &mut Workspace, identifier: String, sender: Broadcast) { +pub async fn subscribe(workspace: &Workspace, identifier: String, sender: Broadcast) { { let sender = sender.clone(); let workspace_id = workspace.id();