Skip to content

Commit

Permalink
MTG-661 Add redis pool messenger (#81)
Browse files Browse the repository at this point in the history
Co-authored-by: rwwwx <[email protected]>
  • Loading branch information
n00m4d and rwwwx authored Oct 17, 2024
1 parent 4ce246f commit 63ae0d3
Show file tree
Hide file tree
Showing 9 changed files with 465 additions and 196 deletions.
2 changes: 1 addition & 1 deletion plerkle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ tracing-subscriber = { version = "0.3.16", features = [
"ansi",
] }
hex = "0.4.3"
plerkle_messenger = { path = "../plerkle_messenger", features = ["redis"] }
plerkle_messenger = { path = "../plerkle_messenger", version = "1.6.0" }
flatbuffers = "23.1.21"
plerkle_serialization = { path = "../plerkle_serialization", version = "1.6.0" }
tokio = { version = "1.23.0", features = ["full"] }
Expand Down
5 changes: 2 additions & 3 deletions plerkle/src/geyser_plugin_nft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ use dashmap::DashMap;
use figment::{providers::Env, Figment};
use flatbuffers::FlatBufferBuilder;
use plerkle_messenger::{
select_messenger, MessengerConfig, ACCOUNT_STREAM, BLOCK_STREAM, SLOT_STREAM,
TRANSACTION_STREAM,
select_messenger_stream, MessengerConfig, ACCOUNT_STREAM, BLOCK_STREAM, SLOT_STREAM, TRANSACTION_STREAM
};
use plerkle_serialization::serializer::{
serialize_account, serialize_block, serialize_transaction,
Expand Down Expand Up @@ -391,7 +390,7 @@ impl GeyserPlugin for Plerkle<'static> {
let mut worker_senders = Vec::with_capacity(workers_num);
for _ in 0..workers_num {
let (send, recv) = unbounded_channel::<SerializedData>();
let mut msg = select_messenger(config.messenger_config.clone())
let mut msg = select_messenger_stream(config.messenger_config.clone())
.await
.unwrap(); // We want to fail if the messenger is not configured correctly.

Expand Down
3 changes: 2 additions & 1 deletion plerkle_messenger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ edition = "2021"
readme = "Readme.md"

[dependencies]
redis = { version = "0.22.3", features = ["aio", "tokio-comp", "streams", "tokio-native-tls-comp", "connection-manager"], optional = true}
redis = { version = "0.27.2", features = ["aio", "tokio-comp", "streams", "tokio-native-tls-comp", "connection-manager"]}
tokio = "1.40.0"
log = "0.4.11"
thiserror = "1.0.30"
async-trait = "0.1.53"
Expand Down
31 changes: 31 additions & 0 deletions plerkle_messenger/Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,34 @@ A message bus agnostic Messaging Library that sends Transaction, Account, Block

The plerkle serialization API changes at 1.0.0 which is a breaking change.
This method removes confusion around the Recv data lifetime being tied back to the messenger interface. Now the data is owned.

# Env example

The Messenger can operate in two modes: a single Redis instance or multiple Redis instances.

Just to clarify, the multiple Redis instances setup doesn't create a clustered connection. It's designed to work with separate, independent instances.

You can configure the Redis client type via environment variables.

Example environment configuration for a single Redis instance:

```
export PLUGIN_MESSENGER_CONFIG='{
messenger_type="Redis",
redis_connection_str="redis://:[email protected]:6379"
}'
```

Example environment configuration for multiple Redis instances:

```
export PLUGIN_MESSENGER_CONFIG='{
messenger_type="RedisPool",
redis_connection_str=[
"redis://:[email protected]:6379",
"redis://:[email protected]:6379"
]
}'
```

To switch between modes, you'll need to update both the `messenger_type` and `redis_connection_str` values.
8 changes: 4 additions & 4 deletions plerkle_messenger/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#[cfg(feature = "redis")]
pub mod redis_messenger;

mod error;
mod metrics;
mod plerkle_messenger;

pub use crate::{error::*, plerkle_messenger::*};
pub mod redis;
pub use redis::*;

pub use {crate::error::*, plerkle_messenger::*};
51 changes: 36 additions & 15 deletions plerkle_messenger/src/plerkle_messenger.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use crate::error::MessengerError;
use crate::{error::MessengerError, redis_pool_messenger::RedisPoolMessenger};
use async_trait::async_trait;
use blake3::OUT_LEN;
use figment::value::{Dict, Value};
use serde::Deserialize;
use std::collections::BTreeMap;

#[cfg(feature = "pulsar")]
use crate::pulsar_messenger::PulsarMessenger;
#[cfg(feature = "redis")]
use crate::redis_messenger::RedisMessenger;

/// Some constants that can be used as stream key values.
Expand Down Expand Up @@ -51,14 +48,12 @@ pub enum ConsumptionType {

#[async_trait]
pub trait Messenger: Sync + Send {
async fn new(config: MessengerConfig) -> Result<Self, MessengerError>
where
Self: Sized;
fn messenger_type(&self) -> MessengerType;
async fn add_stream(&mut self, stream_key: &'static str) -> Result<(), MessengerError>;
async fn set_buffer_size(&mut self, stream_key: &'static str, max_buffer_size: usize);
async fn send(&mut self, stream_key: &'static str, bytes: &[u8]) -> Result<(), MessengerError>;
async fn recv(&mut self, stream_key: &'static str, consumption_type: ConsumptionType) -> Result<Vec<RecvData>, MessengerError>;
async fn recv(
&mut self,
stream_key: &'static str,
consumption_type: ConsumptionType,
) -> Result<Vec<RecvData>, MessengerError>;
async fn stream_size(&mut self, stream_key: &'static str) -> Result<u64, MessengerError>;

// Ack-ing messages is made a bit awkward by the current interface layout because
Expand All @@ -81,24 +76,50 @@ pub trait Messenger: Sync + Send {
) -> Result<(), MessengerError>;
}

pub async fn select_messenger(
#[async_trait]
pub trait MessageStreamer: Sync + Send {
fn messenger_type(&self) -> MessengerType;
async fn add_stream(&mut self, stream_key: &'static str) -> Result<(), MessengerError>;
async fn set_buffer_size(&mut self, stream_key: &'static str, max_buffer_size: usize);
async fn send(&mut self, stream_key: &'static str, bytes: &[u8]) -> Result<(), MessengerError>;
}

pub async fn select_messenger_read(
config: MessengerConfig,
) -> Result<Box<dyn Messenger>, MessengerError> {
match config.messenger_type {
#[cfg(feature = "redis")]
MessengerType::Redis => {
RedisMessenger::new(config).await.map(|a| Box::new(a) as Box<dyn Messenger>)
}
_ => Err(MessengerError::ConfigurationError {
msg: "This Messenger type is not valid, unimplemented or you dont have the right crate features on.".to_string()
msg: "This Messenger type is not valid or not unimplemented.".to_string()
})
}
}

pub async fn select_messenger_stream(
config: MessengerConfig,
) -> Result<Box<dyn MessageStreamer>, MessengerError> {
match config.messenger_type {
MessengerType::Redis => {
RedisMessenger::new(config).await.map(|a| Box::new(a) as Box<dyn MessageStreamer>)
}
MessengerType::RedisPool => {
RedisPoolMessenger::new(config).await.map(|a| Box::new(a) as Box<dyn MessageStreamer>)
}
_ => Err(MessengerError::ConfigurationError {
msg: "This Messenger type is not valid, unimplemented or you don't have the right crate features on.".to_string()
})
}
}

#[derive(Deserialize, Debug, PartialEq, Eq, Clone)]
pub enum MessengerType {
// Connect to one Redis instance
Redis,
Pulsar,
// Connect to few different Redis instances
// Not a cluster
RedisPool,
Invalid,
}

Expand Down
15 changes: 15 additions & 0 deletions plerkle_messenger/src/redis/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
pub mod redis_messenger;
pub mod redis_pool_messenger;

// Redis stream values.
pub const GROUP_NAME: &str = "plerkle";
pub const DATA_KEY: &str = "data";
pub const DEFAULT_RETRIES: usize = 3;
pub const DEFAULT_MSG_BATCH_SIZE: usize = 10;
pub const MESSAGE_WAIT_TIMEOUT: usize = 10;
pub const IDLE_TIMEOUT: usize = 5000;
pub const REDIS_MAX_BYTES_COMMAND: usize = 536870912;
pub const PIPELINE_SIZE_BYTES: usize = REDIS_MAX_BYTES_COMMAND / 100;
pub const PIPELINE_MAX_TIME: u64 = 10;

pub(crate) const REDIS_CON_STR: &str = "redis_connection_str";
Loading

0 comments on commit 63ae0d3

Please sign in to comment.