Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: history webhook api #536

Merged
merged 8 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions apps/doc_merger/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ fn jwst_merge(path: &str, output: &str) {
println!("status: {:?}", doc.store_status());
}
let ts = Instant::now();
let history = doc.history().parse_store(0, Default::default());
println!("history1: {:?}", ts.elapsed());
let history = doc.history().parse_store(Default::default());
println!("history: {:?}", ts.elapsed());
for history in history.iter().take(100) {
println!("history: {:?}", history);
}
Expand Down
2 changes: 1 addition & 1 deletion apps/keck/src/server/api/blocks/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use axum::{extract::Query, response::Response};
use jwst_core::{constants, Any};
use serde_json::Value as JsonValue;

use super::*;
use super::{schema::InsertChildren, *};

/// Get a `Block` by id
/// - Return 200 and `Block`'s data if `Block` is exists.
Expand Down
69 changes: 69 additions & 0 deletions apps/keck/src/server/api/blocks/clients.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use axum::{extract::Path, response::Response};

use super::*;

/// Get current client id of server
///
/// When the server initializes or get the `Workspace`, a `Client` will be
/// created. This `Client` will not be destroyed until the server restarts.
/// Therefore, the `Client ID` in the history generated by modifying `Block`
/// through HTTP API will remain unchanged until the server restarts.
///
/// This interface return the client id that server will used.
#[utoipa::path(
get,
tag = "Workspace",
context_path = "/api/block",
path = "/{workspace}/client",
params(
("workspace", description = "workspace id"),
),
responses(
(status = 200, description = "Get workspace client id", body = u64),
(status = 404, description = "Workspace not found")
)
)]
pub async fn workspace_client(Extension(context): Extension<Arc<Context>>, Path(workspace): Path<String>) -> Response {
if let Ok(workspace) = context.get_workspace(&workspace).await {
Json(workspace.client_id()).into_response()
} else {
(StatusCode::NOT_FOUND, format!("Workspace({workspace:?}) not found")).into_response()
}
}

/// Get all client ids of the `Workspace`
///
/// This interface returns all `Client IDs` that includes history in the
/// `Workspace`
///
/// Every client write something into a `Workspace` will has a unique id.
///
/// For example:
/// - A user writes a new `Block` to a `Workspace` through `Client` on the
/// front end, which will generate a series of histories. A `Client ID`
/// contained in these histories will be randomly generated by the `Client`
/// and will remain unchanged until the Client instance is destroyed
/// - When the server initializes or get the `Workspace`, a `Client` will be
/// created. This `Client` will not be destroyed until the server restarts.
/// Therefore, the `Client ID` in the history generated by modifying `Block`
/// through HTTP API will remain unchanged until the server restarts.
#[utoipa::path(
get,
tag = "Workspace",
context_path = "/api/block",
path = "/{workspace}/clients",
params(
("workspace", description = "workspace id"),
),
responses(
(status = 200, description = "Get workspace client ids", body = [u64]),
(status = 500, description = "Failed to get workspace client ids")
)
)]
pub async fn workspace_clients(Extension(context): Extension<Arc<Context>>, Path(workspace): Path<String>) -> Response {
if let Ok(workspace) = context.get_workspace(&workspace).await {
Json(workspace.clients()).into_response()
} else {
(StatusCode::NOT_FOUND, format!("Workspace({workspace:?}) not found")).into_response()
}
}
54 changes: 54 additions & 0 deletions apps/keck/src/server/api/blocks/history.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use axum::{
extract::{Path, Query},
response::Response,
};
use jwst_core::HistoryOptions;
use utoipa::IntoParams;

use super::*;

/// Block History Options
#[derive(Deserialize, IntoParams)]
pub struct BlockHistoryQuery {
/// client id, is give 0 or empty then return all clients histories
client: Option<u64>,
/// skip count, available when client is set
skip: Option<usize>,
/// limit count, available when client is set
limit: Option<usize>,
}

/// Get the history generated by a specific `Client ID` of the `Workspace`
///
/// If client id set to 0, return all history of the `Workspace`.
#[utoipa::path(
get,
tag = "Workspace",
context_path = "/api/block",
path = "/{workspace}/history",
params(
("workspace", description = "workspace id"),
BlockHistoryQuery,
),
responses(
(status = 200, description = "Get workspace history", body = [History]),
(status = 400, description = "Client id invalid"),
(status = 500, description = "Failed to get workspace history")
)
)]
pub async fn history_workspace(
Extension(context): Extension<Arc<Context>>,
Path(ws_id): Path<String>,
query: Query<BlockHistoryQuery>,
) -> Response {
if let Ok(workspace) = context.get_workspace(&ws_id).await {
Json(workspace.history(HistoryOptions {
client: query.client,
skip: query.skip,
limit: query.limit,
}))
.into_response()
} else {
(StatusCode::NOT_FOUND, format!("Workspace({ws_id:?}) not found")).into_response()
}
}
49 changes: 21 additions & 28 deletions apps/keck/src/server/api/blocks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,36 @@
pub mod block;
pub mod clients;
pub mod history;
pub mod schema;
pub mod subscribe;
pub mod workspace;

pub use block::{delete_block, get_block, insert_block_children, remove_block_children, set_block};
use schema::InsertChildren;
pub use schema::SubscribeWorkspace;
pub use workspace::{delete_workspace, get_workspace, set_workspace, subscribe_workspace, workspace_client};

use super::*;

fn block_apis(router: Router) -> Router {
let block_operation = Router::new()
let children_apis = Router::new()
.route(
"/children",
get(block::get_block_children).post(block::insert_block_children),
)
.route("/children/:children", delete(block::remove_block_children));

let block_apis = Router::new().route(
"/",
get(block::get_block).post(block::set_block).delete(block::delete_block),
);

doc_apis(router)
.nest("/block/:workspace/:block/", block_operation)
.route(
"/block/:workspace/:block",
get(block::get_block).post(block::set_block).delete(block::delete_block),
)
.nest("/block/:workspace/:block/", children_apis)
.nest("/block/:workspace/:block/", block_apis.clone())
.nest("/block/:workspace/:block", block_apis)
}

fn workspace_apis(router: Router) -> Router {
router
.route("/block/:workspace/client", get(workspace::workspace_client))
.route("/block/:workspace/history", get(workspace::history_workspace_clients))
.route("/block/:workspace/history/:client", get(workspace::history_workspace))
.route("/block/:workspace/client", get(clients::workspace_client))
.route("/block/:workspace/clients", get(clients::workspace_clients))
.route("/block/:workspace/history", get(history::history_workspace))
.route(
"/block/:workspace",
get(workspace::get_workspace)
Expand All @@ -43,7 +44,8 @@ fn workspace_apis(router: Router) -> Router {
// "/search/:workspace/index",
// get(workspace::get_search_index).post(workspace::set_search_index),
// )
.route("/subscribe", post(subscribe_workspace))
.route("/subscribe", post(subscribe::subscribe_workspace))
.route("/hook", post(subscribe::subscribe_test_hook))
}

pub fn blocks_apis(router: Router) -> Router {
Expand Down Expand Up @@ -73,10 +75,6 @@ mod tests {

#[tokio::test]
async fn test_workspace_apis() {
let client = Arc::new(reqwest::Client::builder().no_proxy().build().unwrap());

let hook_endpoint = Arc::new(RwLock::new(String::new()));

let ctx = Arc::new(
Context::new(
JwstStorage::new_with_migration("sqlite::memory:", BlobStorageType::DB)
Expand All @@ -85,12 +83,7 @@ mod tests {
)
.await,
);
let client = TestClient::new(
workspace_apis(Router::new())
.layer(Extension(ctx.clone()))
.layer(Extension(client.clone()))
.layer(Extension(hook_endpoint.clone())),
);
let client = TestClient::new(workspace_apis(Router::new()).layer(Extension(ctx.clone())));

// basic workspace apis
let resp = client.get("/block/test").send().await;
Expand All @@ -102,8 +95,8 @@ mod tests {
resp.text().await.parse::<u64>().unwrap(),
ctx.storage.get_workspace("test").await.unwrap().client_id()
);
// let resp = client.get("/block/test/history").send().await;
// assert_eq!(resp.json::<Vec<u64>>().await, Vec::<u64>::new());
let resp = client.get("/block/test/clients").send().await;
assert_eq!(resp.json::<Vec<u64>>().await, Vec::<u64>::new());
let resp = client.get("/block/test").send().await;
assert_eq!(resp.status(), StatusCode::OK);
let resp = client.delete("/block/test").send().await;
Expand Down Expand Up @@ -135,7 +128,7 @@ mod tests {
// assert_eq!(index, vec!["test".to_owned()]);

let body = json!({
"hookEndpoint": "localhost:3000/api/hook"
"endpoint": "localhost:3000/api/hook"
})
.to_string();
let resp = client
Expand Down
11 changes: 1 addition & 10 deletions apps/keck/src/server/api/blocks/schema.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashMap;

use serde::{Deserialize, Serialize};
use serde::Deserialize;
use utoipa::ToSchema;

#[derive(Default, Deserialize, PartialEq, Debug, ToSchema)]
Expand Down Expand Up @@ -40,12 +40,3 @@ pub enum InsertChildren {
InsertAfter { id: String, after: String },
InsertAt { id: String, pos: u64 },
}

#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[schema(example=json!({
"hookEndpoint": "http://localhost:3000/api/hooks"
}))]
pub struct SubscribeWorkspace {
#[serde(rename = "hookEndpoint")]
pub hook_endpoint: String,
}
61 changes: 61 additions & 0 deletions apps/keck/src/server/api/blocks/subscribe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use axum::response::Response;
use jwst_core::History;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;

use super::*;

#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[schema(example=json!({
"endpoint": "http://localhost:3000/api/hook"
}))]
pub struct SubscribeWorkspace {
#[serde(rename = "endpoint")]
pub endpoint: String,
}

/// Register a webhook for all block changes from all workspace changes
#[utoipa::path(
post,
tag = "Workspace",
context_path = "/api/subscribe",
path = "",
request_body(
content_type = "application/json",
content = SubscribeWorkspace,
description = "Provide endpoint of webhook server",
),
responses(
(status = 200, description = "Subscribe workspace succeed"),
(status = 500, description = "Internal Server Error")
)
)]
pub async fn subscribe_workspace(
Extension(context): Extension<Arc<Context>>,
Json(payload): Json<SubscribeWorkspace>,
) -> Response {
info!("subscribe all workspaces, hook endpoint: {}", payload.endpoint);
context.set_webhook(payload.endpoint);
info!("successfully subscribed all workspaces");
StatusCode::OK.into_response()
}

/// The webhook receiver for debug history subscribe feature
#[utoipa::path(
post,
tag = "Workspace",
context_path = "/api/hook",
path = "",
request_body(
content_type = "application/json",
content = [History],
description = "Histories of block changes"
),
responses(
(status = 200, description = "Histories received"),
)
)]
pub async fn subscribe_test_hook(Json(payload): Json<Vec<History>>) -> Response {
info!("webhook receive {} histories", payload.len());
StatusCode::OK.into_response()
}
Loading