Skip to content
This repository has been archived by the owner on Jun 10, 2024. It is now read-only.

Commit

Permalink
fix to #43 - subscriptions not getting delivered from messaging provi…
Browse files Browse the repository at this point in the history
…der to actor (#44)

* migrate wascc_codec to wasmcloud-actor-core

* migrate from wasc_codec to provider-core; fix some clippy warningss

* fixes #43 - subscriptions not getting delivered to actor

Co-authored-by: steve <[email protected]>
  • Loading branch information
stevelr and steve authored Mar 9, 2021
1 parent 3107dd1 commit df1b55e
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 52 deletions.
3 changes: 1 addition & 2 deletions logging/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ crate-type = ["cdylib", "rlib"]
static_plugin = [] # Enable to statically compile this into a host

[dependencies]
wascc-codec = "0.9.0"
log = "0.4.14"
env_logger = "0.8.2"
wasmcloud-actor-core = "0.2.0"
wasmcloud-provider-core = "0.1.0"
wasmcloud-actor-logging = "0.1.0"
19 changes: 8 additions & 11 deletions logging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#[macro_use]
extern crate wascc_codec as codec;
#[macro_use]
extern crate log;
use log::{debug, error, info, trace, warn};

use codec::capabilities::{CapabilityProvider, Dispatcher, NullDispatcher};
use codec::core::{OP_BIND_ACTOR, OP_REMOVE_ACTOR};
use std::error::Error;
use std::sync::{Arc, RwLock};
use wasmcloud_actor_core::deserialize;
use wasmcloud_actor_logging::{WriteLogArgs, OP_LOG};
use wasmcloud_provider_core::{
capabilities::{CapabilityProvider, Dispatcher, NullDispatcher},
capability_provider,
core::{OP_BIND_ACTOR, OP_REMOVE_ACTOR},
deserialize,
};

#[cfg(not(feature = "static_plugin"))]
capability_provider!(LoggingProvider, LoggingProvider::new);
Expand All @@ -45,10 +45,7 @@ pub struct LoggingProvider {

impl Default for LoggingProvider {
fn default() -> Self {
match env_logger::try_init() {
Ok(_) => {}
Err(_) => {}
}
if env_logger::try_init().is_err() {}

LoggingProvider {
dispatcher: Arc::new(RwLock::new(Box::new(NullDispatcher::new()))),
Expand Down
6 changes: 3 additions & 3 deletions nats/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "wasmcloud-nats"
version = "0.10.1"
version = "0.10.2"
authors = ["wasmCloud Team"]
edition = "2018"
homepage = "https://wasmcloud.dev"
Expand All @@ -24,9 +24,9 @@ static_plugin = []

[dependencies]
wascap = "0.6.0"
wascc-codec = "0.9.0"
wasmcloud-actor-messaging = "0.1.0"
wasmcloud-actor-core = "0.2.0"
wasmcloud-actor-core = "0.2.2"
wasmcloud-provider-core = "0.1.0"
log = "0.4.14"
env_logger = "0.8.2"
crossbeam-channel = "0.5.0"
Expand Down
43 changes: 17 additions & 26 deletions nats/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,28 @@
#[macro_use]
extern crate wascc_codec as codec;

extern crate wasmcloud_actor_core as actorcore;
extern crate wasmcloud_actor_messaging as messaging;
use actorcore::{CapabilityConfiguration, HealthCheckResponse};
use log::{info, trace};
use messaging::{BrokerMessage, RequestArgs};
use std::collections::HashMap;
use std::error::Error;
use std::sync::Arc;
use std::sync::RwLock;
use wasmcloud_actor_core as actorcore;
use wasmcloud_actor_messaging as messaging;
use wasmcloud_provider_core::{
capabilities::{CapabilityProvider, Dispatcher, NullDispatcher},
capability_provider,
core::{OP_BIND_ACTOR, OP_HEALTH_REQUEST, OP_REMOVE_ACTOR},
deserialize, serialize,
};

mod natsprov;

#[allow(unused)] // used by the Makefile
const CAPABILITY_ID: &str = "wasmcloud:messaging";

#[macro_use]
extern crate log;

use codec::capabilities::{CapabilityProvider, Dispatcher, NullDispatcher};

pub const OP_DELIVER_MESSAGE: &str = "DeliverMessage";
pub use messaging::OP_HANDLE_MESSAGE;
pub const OP_PUBLISH_MESSAGE: &str = "Publish";
pub const OP_PERFORM_REQUEST: &str = "Request";

use codec::core::{OP_BIND_ACTOR, OP_HEALTH_REQUEST, OP_REMOVE_ACTOR};
use messaging::{BrokerMessage, RequestArgs};

use actorcore::{CapabilityConfiguration, HealthCheckResponse};
use std::collections::HashMap;
use wascc_codec::{deserialize, serialize};

use std::error::Error;
use std::sync::Arc;
use std::sync::RwLock;

#[cfg(not(feature = "static_plugin"))]
capability_provider!(NatsProvider, NatsProvider::new);

Expand All @@ -41,10 +35,7 @@ pub struct NatsProvider {

impl Default for NatsProvider {
fn default() -> Self {
match env_logger::try_init() {
Ok(_) => {}
Err(_) => {}
};
if env_logger::try_init().is_err() {}

NatsProvider {
dispatcher: Arc::new(RwLock::new(Box::new(NullDispatcher::new()))),
Expand Down
19 changes: 9 additions & 10 deletions nats/src/natsprov.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use crate::messaging::{BrokerMessage, PublishResponse, RequestArgs};
use crate::OP_HANDLE_MESSAGE;
use log::{error, info, trace};
use nats::Connection;
use std::error::Error;
use std::sync::Arc;
use std::sync::RwLock;
use std::{collections::HashMap, time::Duration};
use wascc_codec::capabilities::Dispatcher;

use crate::OP_DELIVER_MESSAGE;
use nats::Connection;
use wascc_codec::serialize;

use wascap::prelude::KeyPair;
use wasmcloud_provider_core::capabilities::Dispatcher;
use wasmcloud_provider_core::serialize;

const ENV_NATS_SUBSCRIPTION: &str = "SUBSCRIPTION";
const ENV_NATS_URL: &str = "URL";
Expand All @@ -28,7 +27,7 @@ pub(crate) fn publish(
&msg.body.len()
);

let res = if msg.reply_to.len() > 0 {
let res = if !msg.reply_to.is_empty() {
client.publish_with_reply_or_headers(&msg.subject, Some(&msg.reply_to), None, &msg.body)
} else {
client.publish(&msg.subject, &msg.body)
Expand Down Expand Up @@ -102,7 +101,7 @@ fn create_subscription(
let buf = serialize(&dm).unwrap();

let d = dispatcher.read().unwrap();
if let Err(e) = d.dispatch(&actor, OP_DELIVER_MESSAGE, &buf) {
if let Err(e) = d.dispatch(&actor, OP_HANDLE_MESSAGE, &buf) {
error!("Dispatch failed: {}", e);
}
Ok(())
Expand All @@ -115,7 +114,7 @@ fn create_subscription(
let dm = delivermessage_for_natsmessage(&msg);
let buf = serialize(&dm).unwrap();
let d = dispatcher.read().unwrap();
if let Err(e) = d.dispatch(&actor, OP_DELIVER_MESSAGE, &buf) {
if let Err(e) = d.dispatch(&actor, OP_HANDLE_MESSAGE, &buf) {
error!("Dispatch failed: {}", e);
}
Ok(())
Expand Down Expand Up @@ -151,7 +150,7 @@ fn get_connection(
.clone()
.unwrap_or(&"".to_string())
.to_string();
if jwt.len() > 0 {
if !jwt.is_empty() {
let seed = values
.get(ENV_NATS_CLIENT_SEED)
.clone()
Expand Down

0 comments on commit df1b55e

Please sign in to comment.