Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MTG-661 Add redis pool messenger #81

Merged
merged 12 commits into from
Oct 17, 2024
2 changes: 1 addition & 1 deletion plerkle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ tracing-subscriber = { version = "0.3.16", features = [
"ansi",
] }
hex = "0.4.3"
plerkle_messenger = { path = "../plerkle_messenger", features = ["redis"] }
plerkle_messenger = { path = "../plerkle_messenger", version = "1.6.0" }
flatbuffers = "23.1.21"
plerkle_serialization = { path = "../plerkle_serialization", version = "1.6.0" }
tokio = { version = "1.23.0", features = ["full"] }
Expand Down
5 changes: 2 additions & 3 deletions plerkle/src/geyser_plugin_nft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ use dashmap::DashMap;
use figment::{providers::Env, Figment};
use flatbuffers::FlatBufferBuilder;
use plerkle_messenger::{
select_messenger, MessengerConfig, ACCOUNT_STREAM, BLOCK_STREAM, SLOT_STREAM,
TRANSACTION_STREAM,
select_messenger_stream, MessengerConfig, ACCOUNT_STREAM, BLOCK_STREAM, SLOT_STREAM, TRANSACTION_STREAM
};
use plerkle_serialization::serializer::{
serialize_account, serialize_block, serialize_transaction,
Expand Down Expand Up @@ -391,7 +390,7 @@ impl GeyserPlugin for Plerkle<'static> {
let mut worker_senders = Vec::with_capacity(workers_num);
for _ in 0..workers_num {
let (send, recv) = unbounded_channel::<SerializedData>();
let mut msg = select_messenger(config.messenger_config.clone())
let mut msg = select_messenger_stream(config.messenger_config.clone())
.await
.unwrap(); // We want to fail if the messenger is not configured correctly.

Expand Down
3 changes: 2 additions & 1 deletion plerkle_messenger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ edition = "2021"
readme = "Readme.md"

[dependencies]
redis = { version = "0.22.3", features = ["aio", "tokio-comp", "streams", "tokio-native-tls-comp", "connection-manager"], optional = true}
redis = { version = "0.27.2", features = ["aio", "tokio-comp", "streams", "tokio-native-tls-comp", "connection-manager"]}
tokio = "1.40.0"
log = "0.4.11"
thiserror = "1.0.30"
async-trait = "0.1.53"
Expand Down
31 changes: 31 additions & 0 deletions plerkle_messenger/Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,34 @@ A message bus agnostic Messaging Library that sends Transaction, Account, Block

The plerkle serialization API changes at 1.0.0 which is a breaking change.
This method removes confusion around the Recv data lifetime being tied back to the messenger interface. Now the data is owned.

# Env example

The Messenger can operate in two modes: a single Redis instance or multiple Redis instances.

Just to clarify, the multiple Redis instances setup doesn't create a clustered connection. It's designed to work with separate, independent instances.

You can configure the Redis client type via environment variables.

Example environment configuration for a single Redis instance:

```
export PLUGIN_MESSENGER_CONFIG='{
messenger_type="Redis",
redis_connection_str="redis://:[email protected]:6379"
}'
```

Example environment configuration for multiple Redis instances:

```
export PLUGIN_MESSENGER_CONFIG='{
messenger_type="RedisPool",
redis_connection_str=[
"redis://:[email protected]:6379",
"redis://:[email protected]:6379"
]
}'
```

To switch between modes, you'll need to update both the `messenger_type` and `redis_connection_str` values.
8 changes: 4 additions & 4 deletions plerkle_messenger/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#[cfg(feature = "redis")]
pub mod redis_messenger;

mod error;
mod metrics;
mod plerkle_messenger;

pub use crate::{error::*, plerkle_messenger::*};
pub mod redis;
pub use redis::*;

pub use {crate::error::*, plerkle_messenger::*};
51 changes: 36 additions & 15 deletions plerkle_messenger/src/plerkle_messenger.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use crate::error::MessengerError;
use crate::{error::MessengerError, redis_pool_messenger::RedisPoolMessenger};
use async_trait::async_trait;
use blake3::OUT_LEN;
use figment::value::{Dict, Value};
use serde::Deserialize;
use std::collections::BTreeMap;

#[cfg(feature = "pulsar")]
use crate::pulsar_messenger::PulsarMessenger;
#[cfg(feature = "redis")]
use crate::redis_messenger::RedisMessenger;
danenbm marked this conversation as resolved.
Show resolved Hide resolved

/// Some constants that can be used as stream key values.
Expand Down Expand Up @@ -51,14 +48,12 @@ pub enum ConsumptionType {

#[async_trait]
pub trait Messenger: Sync + Send {
async fn new(config: MessengerConfig) -> Result<Self, MessengerError>
where
Self: Sized;
fn messenger_type(&self) -> MessengerType;
async fn add_stream(&mut self, stream_key: &'static str) -> Result<(), MessengerError>;
async fn set_buffer_size(&mut self, stream_key: &'static str, max_buffer_size: usize);
async fn send(&mut self, stream_key: &'static str, bytes: &[u8]) -> Result<(), MessengerError>;
async fn recv(&mut self, stream_key: &'static str, consumption_type: ConsumptionType) -> Result<Vec<RecvData>, MessengerError>;
async fn recv(
&mut self,
stream_key: &'static str,
consumption_type: ConsumptionType,
) -> Result<Vec<RecvData>, MessengerError>;
async fn stream_size(&mut self, stream_key: &'static str) -> Result<u64, MessengerError>;

// Ack-ing messages is made a bit awkward by the current interface layout because
Expand All @@ -81,24 +76,50 @@ pub trait Messenger: Sync + Send {
) -> Result<(), MessengerError>;
}

pub async fn select_messenger(
#[async_trait]
pub trait MessageStreamer: Sync + Send {
fn messenger_type(&self) -> MessengerType;
async fn add_stream(&mut self, stream_key: &'static str) -> Result<(), MessengerError>;
async fn set_buffer_size(&mut self, stream_key: &'static str, max_buffer_size: usize);
async fn send(&mut self, stream_key: &'static str, bytes: &[u8]) -> Result<(), MessengerError>;
}

pub async fn select_messenger_read(
config: MessengerConfig,
) -> Result<Box<dyn Messenger>, MessengerError> {
match config.messenger_type {
#[cfg(feature = "redis")]
MessengerType::Redis => {
RedisMessenger::new(config).await.map(|a| Box::new(a) as Box<dyn Messenger>)
}
_ => Err(MessengerError::ConfigurationError {
msg: "This Messenger type is not valid, unimplemented or you dont have the right crate features on.".to_string()
msg: "This Messenger type is not valid or not unimplemented.".to_string()
})
}
}

pub async fn select_messenger_stream(
config: MessengerConfig,
) -> Result<Box<dyn MessageStreamer>, MessengerError> {
match config.messenger_type {
MessengerType::Redis => {
RedisMessenger::new(config).await.map(|a| Box::new(a) as Box<dyn MessageStreamer>)
}
MessengerType::RedisPool => {
RedisPoolMessenger::new(config).await.map(|a| Box::new(a) as Box<dyn MessageStreamer>)
}
_ => Err(MessengerError::ConfigurationError {
msg: "This Messenger type is not valid, unimplemented or you don't have the right crate features on.".to_string()
})
}
}

#[derive(Deserialize, Debug, PartialEq, Eq, Clone)]
pub enum MessengerType {
// Connect to one Redis instance
Redis,
Pulsar,
// Connect to few different Redis instances
// Not a cluster
RedisPool,
Invalid,
}

Expand Down
15 changes: 15 additions & 0 deletions plerkle_messenger/src/redis/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
pub mod redis_messenger;
pub mod redis_pool_messenger;

// Redis stream values.
pub const GROUP_NAME: &str = "plerkle";
pub const DATA_KEY: &str = "data";
pub const DEFAULT_RETRIES: usize = 3;
pub const DEFAULT_MSG_BATCH_SIZE: usize = 10;
pub const MESSAGE_WAIT_TIMEOUT: usize = 10;
pub const IDLE_TIMEOUT: usize = 5000;
pub const REDIS_MAX_BYTES_COMMAND: usize = 536870912;
pub const PIPELINE_SIZE_BYTES: usize = REDIS_MAX_BYTES_COMMAND / 100;
pub const PIPELINE_MAX_TIME: u64 = 10;

pub(crate) const REDIS_CON_STR: &str = "redis_connection_str";
Loading
Loading