Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce license manager and feature tiers #17396

Merged
merged 22 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,7 @@ message SystemParams {
optional string wasm_storage_url = 14 [deprecated = true];
optional bool enable_tracing = 15;
optional bool use_new_object_prefix_strategy = 16;
optional string my_token = 17;
}

message GetSystemParamsRequest {}
Expand Down
1 change: 1 addition & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ hytra = { workspace = true }
itertools = { workspace = true }
itoa = "1.0"
jsonbb = { workspace = true }
jsonwebtoken = "9"
BugenZhao marked this conversation as resolved.
Show resolved Hide resolved
lru = { workspace = true }
memcomparable = { version = "0.2", features = ["decimal"] }
num-integer = "0.1"
Expand Down
1 change: 1 addition & 0 deletions src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub use risingwave_common_metrics::{
register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
register_guarded_int_gauge_vec_with_registry,
};
pub mod license;
pub mod lru;
pub mod opts;
pub mod range;
Expand Down
181 changes: 181 additions & 0 deletions src/common/src/license.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
use std::sync::{LazyLock, RwLock};
BugenZhao marked this conversation as resolved.
Show resolved Hide resolved

use jsonwebtoken::{Algorithm, DecodingKey, Validation};
use serde::Deserialize;
use thiserror::Error;

// TODO: case
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Deserialize)]
pub enum Tier {
Free,
Paid,
}

impl Default for Tier {
fn default() -> Self {
if cfg!(debug_assertions) {
Self::Paid
} else {
Self::Free
}
}
}

// TODO: case
#[derive(Debug, Clone, Deserialize)]
struct License {
#[allow(dead_code)]
sub: String,
tier: Tier,
exp: u64,
}

impl Default for License {
fn default() -> Self {
Self {
sub: "default".to_owned(),
tier: Tier::default(),
exp: u64::MAX,
}
}
}

#[derive(Debug, Clone, Error)]
#[error("invalid license")]
pub struct LicenseError(#[source] jsonwebtoken::errors::Error);

pub type Result<T> = std::result::Result<T, LicenseError>;

struct Inner {
last_token: String,
license: Result<License>,
}

pub(crate) struct LicenseManager {
inner: RwLock<Inner>,
}

impl LicenseManager {
pub fn get() -> &'static Self {
static INSTANCE: LazyLock<LicenseManager> = LazyLock::new(|| LicenseManager {
inner: RwLock::new(Inner {
last_token: String::new(),
license: Ok(License::default()),
}),
});

&INSTANCE
}

pub fn refresh(&self, token: &str) {
let mut inner = self.inner.write().unwrap();
inner.last_token = token.to_owned();

if token.is_empty() {
inner.license = Ok(License::default());
return;
}

// TODO: use asymmetric encryption
let validation = Validation::new(Algorithm::HS256);
let decoding_key = DecodingKey::from_secret(b"my-very-private-secret");

inner.license = match jsonwebtoken::decode(token, &decoding_key, &validation) {
Ok(data) => Ok(data.claims),
Err(error) => Err(LicenseError(error)),
};
}

fn license(&self) -> Result<License> {
let license = self.inner.read().unwrap().license.clone()?;

if license.exp < jsonwebtoken::get_current_timestamp() {
return Err(LicenseError(
jsonwebtoken::errors::ErrorKind::ExpiredSignature.into(),
));
}

Ok(license)
}
}

macro_rules! for_all_features {
($macro:ident) => {
$macro! {
// name min tier doc
{ MyCommonFeature, Free, "My common feature." },
{ MyAwesomeFeature, Paid, "My awesome feature." },
}
};
}

macro_rules! def_feature {
($({ $name:ident, $min_tier:ident, $doc:literal },)*) => {
#[derive(Clone, Copy, Debug)]
pub enum Feature {
$(
#[doc = $doc]
$name,
)*
}

impl Feature {
fn min_tier(self) -> Tier {
match self {
$(
Self::$name => Tier::$min_tier,
)*
}
}
}
};
}

for_all_features!(def_feature);

#[derive(Debug, Error)]
pub enum FeatureNotAvailable {
#[error(
"feature {:?} is only available for tier {:?} and above, while the current tier is {:?}",
feature, feature.min_tier(), current_tier,
)]
InsufficientTier {
feature: Feature,
current_tier: Tier,
},

#[error("feature {feature:?} is not available due to license error")]
LicenseError {
feature: Feature,
source: LicenseError,
},
}

impl Feature {
pub fn check_available(self) -> std::result::Result<(), FeatureNotAvailable> {
match LicenseManager::get().license() {
Ok(license) => {
if license.tier >= self.min_tier() {
Ok(())
} else {
Err(FeatureNotAvailable::InsufficientTier {
feature: self,
current_tier: license.tier,
})
}
}
Err(error) => {
// If there's a license error, we still try against the default license first
// and allow the feature if it's available.
if License::default().tier >= self.min_tier() {
Ok(())
} else {
Err(FeatureNotAvailable::LicenseError {
feature: self,
source: error,
})
}
}
}
}
}
7 changes: 6 additions & 1 deletion src/common/src/system_param/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use super::diff::SystemParamsDiff;
use super::reader::SystemParamsReader;
use crate::license::LicenseManager;
use crate::util::tracing::layer::toggle_otel_layer;

/// Node-independent handler for system parameter changes.
Expand All @@ -33,7 +34,11 @@ impl CommonHandler {
/// Handle the change of system parameters.
pub fn handle_change(&self, diff: &SystemParamsDiff) {
if let Some(enabled) = diff.enable_tracing {
toggle_otel_layer(enabled)
toggle_otel_layer(enabled);
}
if let Some(token) = diff.my_token.as_ref() {
LicenseManager::get().refresh(token);
tracing::debug!("token refreshed!");
BugenZhao marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
1 change: 1 addition & 0 deletions src/common/src/system_param/local_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ impl LocalSystemParamsManager {
let this = Self::new_inner(initial_params.clone());

// Spawn a task to run the common handler.
// TODO: this may be spawned multiple times under standalone deployment, though idempotent.
tokio::spawn({
let mut rx = this.tx.subscribe();
async move {
Expand Down
5 changes: 4 additions & 1 deletion src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ macro_rules! for_all_params {
{ max_concurrent_creating_streaming_jobs, u32, Some(1_u32), true, "Max number of concurrent creating streaming jobs.", },
{ pause_on_next_bootstrap, bool, Some(false), true, "Whether to pause all data sources on next bootstrap.", },
{ enable_tracing, bool, Some(false), true, "Whether to enable distributed tracing.", },
{ use_new_object_prefix_strategy, bool, None, false, "Whether to split object prefix.", },
{ use_new_object_prefix_strategy, bool, None, false, "Whether to split object prefix.", },
// TODO: a more serious name
// TODO: initialize with configuration file or env var
{ my_token, String, Some("".to_owned()), true, "My interesting Token.", },
}
};
}
Expand Down
8 changes: 8 additions & 0 deletions src/common/src/system_param/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,12 @@ where
.enable_tracing
.unwrap_or_else(default::enable_tracing)
}

fn my_token(&self) -> &str {
self.inner()
.my_token
.as_ref()
.map(|s| s.as_str())
.unwrap_or_default()
}
}
7 changes: 7 additions & 0 deletions src/expr/impl/src/scalar/arithmetic_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::fmt::Debug;

use chrono::{Duration, NaiveDateTime};
use num_traits::{CheckedDiv, CheckedMul, CheckedNeg, CheckedRem, CheckedSub, Zero};
use risingwave_common::license::Feature;
use risingwave_common::types::{
CheckedAdd, Date, Decimal, FloatExt, Interval, IsNegative, Time, Timestamp, F64,
};
Expand All @@ -33,6 +34,9 @@ where
T2: Into<T3> + Debug,
T3: CheckedAdd<Output = T3>,
{
Feature::MyAwesomeFeature
.check_available()
.map_err(anyhow::Error::from)?;
general_atm(l, r, |a, b| {
a.checked_add(b).ok_or(ExprError::NumericOutOfRange)
})
Expand All @@ -49,6 +53,9 @@ where
T2: Into<T3> + Debug,
T3: CheckedSub,
{
Feature::MyCommonFeature
.check_available()
.map_err(anyhow::Error::from)?;
general_atm(l, r, |a, b| {
a.checked_sub(&b).ok_or(ExprError::NumericOutOfRange)
})
Expand Down
Loading