diff --git a/src/storage/ck/log.rs b/src/storage/ck/log.rs index 7904358..b61c669 100644 --- a/src/storage/ck/log.rs +++ b/src/storage/ck/log.rs @@ -128,6 +128,25 @@ impl LogStorage for CKLogQuerier { } impl CKLogQuerier { + pub async fn init_labels(&self) { + let sql = format!( + "SELECT {} FROM {} WHERE {} >= now() - INTERVAL 5 MINUTE LIMIT 3000", + self.schema.projection().join(","), + self.schema.table(), + self.schema.ts_key(), + ); + let rows = + send_query(self.cli.clone(), self.ck_cfg.common.clone(), sql) + .await + .unwrap_or_default(); + let mut records = vec![]; + for row in rows { + if let Ok(record) = LogRecod::try_from(row) { + records.push(record.into()); + } + } + self.record_label(&records).await; + } async fn record_label(&self, records: &[LogItem]) { let cfg = self.ck_cfg.label.clone(); for name in Self::collect_svcname(records) { diff --git a/src/storage/ck/mod.rs b/src/storage/ck/mod.rs index 66ef097..cf53c2f 100644 --- a/src/storage/ck/mod.rs +++ b/src/storage/ck/mod.rs @@ -13,13 +13,11 @@ pub mod trace; pub async fn new_log_source(cfg: ClickhouseLog) -> Result> { let cli = Client::builder() .gzip(true) - .timeout(Duration::from_secs(60)) + .timeout(Duration::from_secs(90)) .build()?; - Ok(Box::new(log::CKLogQuerier::new( - cli, - cfg.common.table.clone(), - cfg, - ))) + let q = log::CKLogQuerier::new(cli, cfg.common.table.clone(), cfg); + q.init_labels().await; + Ok(Box::new(q)) } pub async fn new_trace_source(