Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Global request response handling #250

Merged
merged 2 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 50 additions & 8 deletions russh/src/client/encrypted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::client::{Handler, Msg, Prompt, Reply, Session};
use crate::key::PubKey;
use crate::negotiation::{Named, Select};
use crate::parsing::{ChannelOpenConfirmation, ChannelType, OpenChannelMessage};
use crate::session::{Encrypted, EncryptedState, Kex, KexInit};
use crate::session::{Encrypted, EncryptedState, GlobalRequestResponse, Kex, KexInit};
use crate::{
auth, msg, negotiation, strict_kex_violation, Channel, ChannelId, ChannelMsg,
ChannelOpenFailure, ChannelParams, Sig,
Expand Down Expand Up @@ -815,15 +815,57 @@ impl Session {
Err(crate::Error::Inconsistent.into())
}
}
Some(&msg::REQUEST_SUCCESS | &msg::REQUEST_FAILURE)
if self.common.alive_timeouts > 0 =>
{
// TODO what other things might need to happen in response to these two opcodes?
self.common.alive_timeouts = 0;
Some(&msg::REQUEST_SUCCESS) => {
trace!("Global Request Success");
match self.open_global_requests.pop_front() {
Some(GlobalRequestResponse::Keepalive) => {
// ignore keepalives
}
Some(GlobalRequestResponse::TcpIpForward(return_channel)) => {
let result = if buf.len() == 1 {
// If a specific port was requested, the reply has no data
Some(0)
} else {
let mut r = buf.reader(1);
match r.read_u32() {
Ok(port) => Some(port),
Err(e) => {
error!("Error parsing port for TcpIpForward request: {e:?}");
None
}
}
};
let _ = return_channel.send(result);
}
Some(GlobalRequestResponse::CancelTcpIpForward(return_channel)) => {
let _ = return_channel.send(true);
}
None => {
error!("Received global request failure for unknown request!")
}
}
Ok(())
}
Some(&msg::REQUEST_FAILURE) => {
trace!("global request failure");
match self.open_global_requests.pop_front() {
Some(GlobalRequestResponse::Keepalive) => {
// ignore keepalives
}
Some(GlobalRequestResponse::TcpIpForward(return_channel)) => {
let _ = return_channel.send(None);
}
Some(GlobalRequestResponse::CancelTcpIpForward(return_channel)) => {
let _ = return_channel.send(false);
}
None => {
error!("Received global request failure for unknown request!")
}
}
Ok(())
}
_ => {
info!("Unhandled packet: {:?}", buf);
m => {
debug!("unknown message received: {:?}", m);
Ok(())
}
}
Expand Down
59 changes: 42 additions & 17 deletions russh/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
//! [Session]: client::Session

use std::cell::RefCell;
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::num::Wrapping;
use std::pin::Pin;
use std::sync::Arc;
Expand All @@ -55,12 +55,15 @@ use tokio::pin;
use tokio::sync::mpsc::{
channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender,
};
use tokio::sync::Mutex;
use tokio::sync::{oneshot, Mutex};

use crate::channels::{Channel, ChannelMsg, ChannelRef};
use crate::cipher::{self, clear, CipherPair, OpeningKey};
use crate::key::PubKey;
use crate::session::{CommonSession, EncryptedState, Exchange, Kex, KexDhDone, KexInit, NewKeys};
use crate::session::{
CommonSession, EncryptedState, Exchange, GlobalRequestResponse, Kex, KexDhDone, KexInit,
NewKeys,
};
use crate::ssh_read::SshRead;
use crate::sshbuffer::{SSHBuffer, SshId};
use crate::{
Expand All @@ -87,6 +90,7 @@ pub struct Session {
pending_len: u32,
inbound_channel_sender: Sender<Msg>,
inbound_channel_receiver: Receiver<Msg>,
open_global_requests: VecDeque<GlobalRequestResponse>,
}

const STRICT_KEX_MSG_ORDER: &[u8] = &[msg::KEXINIT, msg::KEX_ECDH_REPLY, msg::NEWKEYS];
Expand Down Expand Up @@ -146,12 +150,14 @@ pub enum Msg {
channel_ref: ChannelRef,
},
TcpIpForward {
want_reply: bool,
/// Provide a channel for the reply result to request a reply from the server
reply_channel: Option<oneshot::Sender<Option<u32>>>,
address: String,
port: u32,
},
CancelTcpIpForward {
want_reply: bool,
/// Provide a channel for the reply result to request a reply from the server
reply_channel: Option<oneshot::Sender<bool>>,
address: String,
port: u32,
},
Expand Down Expand Up @@ -507,39 +513,57 @@ impl<H: Handler> Handle<H> {
.await
}

/// Requests the server to open a TCP/IP forward channel
///
/// If port == 0 the server will choose a port that will be returned, returns 0 otherwise
pub async fn tcpip_forward<A: Into<String>>(
&mut self,
address: A,
port: u32,
) -> Result<bool, crate::Error> {
) -> Result<u32, crate::Error> {
let (reply_send, reply_recv) = oneshot::channel();
self.sender
.send(Msg::TcpIpForward {
want_reply: true,
reply_channel: Some(reply_send),
address: address.into(),
port,
})
.await
.map_err(|_| crate::Error::SendError)?;
if port == 0 {
self.wait_recv_reply().await?;

match reply_recv.await {
Ok(Some(port)) => Ok(port),
Ok(None) => Err(crate::Error::RequestDenied),
Err(e) => {
error!("Unable to receive TcpIpForward result: {e:?}");
Err(crate::Error::Disconnect)
}
}
Ok(true)
}

pub async fn cancel_tcpip_forward<A: Into<String>>(
&self,
address: A,
port: u32,
) -> Result<bool, crate::Error> {
) -> Result<(), crate::Error> {
let (reply_send, reply_recv) = oneshot::channel();
self.sender
.send(Msg::CancelTcpIpForward {
want_reply: true,
reply_channel: Some(reply_send),
address: address.into(),
port,
})
.await
.map_err(|_| crate::Error::SendError)?;
Ok(true)

match reply_recv.await {
Ok(true) => Ok(()),
Ok(false) => Err(crate::Error::RequestDenied),
Err(e) => {
error!("Unable to receive CancelTcpIpForward result: {e:?}");
Err(crate::Error::Disconnect)
}
}
}

/// Sends a disconnect message.
Expand Down Expand Up @@ -707,6 +731,7 @@ impl Session {
channels: HashMap::new(),
pending_reads: Vec::new(),
pending_len: 0,
open_global_requests: VecDeque::new(),
}
}

Expand Down Expand Up @@ -931,15 +956,15 @@ impl Session {
self.channels.insert(id, channel_ref);
}
Msg::TcpIpForward {
want_reply,
reply_channel,
address,
port,
} => self.tcpip_forward(want_reply, &address, port),
} => self.tcpip_forward(reply_channel, &address, port),
Msg::CancelTcpIpForward {
want_reply,
reply_channel,
address,
port,
} => self.cancel_tcpip_forward(want_reply, &address, port),
} => self.cancel_tcpip_forward(reply_channel, &address, port),
Msg::Disconnect {
reason,
description,
Expand Down
39 changes: 36 additions & 3 deletions russh/src/client/session.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use log::error;
use russh_cryptovec::CryptoVec;
use russh_keys::encoding::Encoding;
use tokio::sync::oneshot;

use crate::client::Session;
use crate::session::EncryptedState;
Expand Down Expand Up @@ -264,8 +265,23 @@ impl Session {
}
}

pub fn tcpip_forward(&mut self, want_reply: bool, address: &str, port: u32) {
/// Requests a TCP/IP forwarding from the server
///
/// If `reply_channel` is not None, sets want_reply and returns the server's response via the channel,
/// Some<u32> for a success message with port, or None for failure
pub fn tcpip_forward(
&mut self,
reply_channel: Option<oneshot::Sender<Option<u32>>>,
address: &str,
port: u32,
) {
if let Some(ref mut enc) = self.common.encrypted {
let want_reply = reply_channel.is_some();
if let Some(reply_channel) = reply_channel {
self.open_global_requests.push_back(
crate::session::GlobalRequestResponse::TcpIpForward(reply_channel),
);
}
push_packet!(enc.write, {
enc.write.push(msg::GLOBAL_REQUEST);
enc.write.extend_ssh_string(b"tcpip-forward");
Expand All @@ -276,8 +292,23 @@ impl Session {
}
}

pub fn cancel_tcpip_forward(&mut self, want_reply: bool, address: &str, port: u32) {
/// Requests cancellation of TCP/IP forwarding from the server
///
/// If `want_reply` is `true`, returns a oneshot receiveing the server's reply:
/// `true` for a success message, or `false` for failure
pub fn cancel_tcpip_forward(
&mut self,
reply_channel: Option<oneshot::Sender<bool>>,
address: &str,
port: u32,
) {
if let Some(ref mut enc) = self.common.encrypted {
let want_reply = reply_channel.is_some();
if let Some(reply_channel) = reply_channel {
self.open_global_requests.push_back(
crate::session::GlobalRequestResponse::CancelTcpIpForward(reply_channel),
);
}
push_packet!(enc.write, {
enc.write.push(msg::GLOBAL_REQUEST);
enc.write.extend_ssh_string(b"cancel-tcpip-forward");
Expand All @@ -289,10 +320,12 @@ impl Session {
}

pub fn send_keepalive(&mut self, want_reply: bool) {
self.open_global_requests
.push_back(crate::session::GlobalRequestResponse::Keepalive);
if let Some(ref mut enc) = self.common.encrypted {
push_packet!(enc.write, {
enc.write.push(msg::GLOBAL_REQUEST);
enc.write.extend_ssh_string(b"keepalive@openssh.org");
enc.write.extend_ssh_string(b"keepalive@openssh.com");
enc.write.push(want_reply as u8);
});
}
Expand Down
3 changes: 3 additions & 0 deletions russh/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ pub enum Error {
#[error("Failed to decrypt a packet")]
DecryptionError,

#[error("The request was rejected by the other party")]
RequestDenied,

#[error(transparent)]
Keys(#[from] russh_keys::Error),

Expand Down
49 changes: 49 additions & 0 deletions russh/src/server/encrypted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,55 @@ impl Session {

Ok(())
}
Some(&msg::REQUEST_SUCCESS) => {
trace!("Global Request Success");
match self.open_global_requests.pop_front() {
Some(GlobalRequestResponse::Keepalive) => {
// ignore keepalives
}
Some(GlobalRequestResponse::TcpIpForward(return_channel)) => {
let result = if buf.len() == 1 {
// If a specific port was requested, the reply has no data
Some(0)
} else {
let mut r = buf.reader(1);
match r.read_u32() {
Ok(port) => Some(port),
Err(e) => {
error!("Error parsing port for TcpIpForward request: {e:?}");
None
}
}
};
let _ = return_channel.send(result);
}
Some(GlobalRequestResponse::CancelTcpIpForward(return_channel)) => {
let _ = return_channel.send(true);
}
None => {
error!("Received global request failure for unknown request!")
}
}
Ok(())
}
Some(&msg::REQUEST_FAILURE) => {
trace!("global request failure");
match self.open_global_requests.pop_front() {
Some(GlobalRequestResponse::Keepalive) => {
// ignore keepalives
}
Some(GlobalRequestResponse::TcpIpForward(return_channel)) => {
let _ = return_channel.send(None);
}
Some(GlobalRequestResponse::CancelTcpIpForward(return_channel)) => {
let _ = return_channel.send(false);
}
None => {
error!("Received global request failure for unknown request!")
}
}
Ok(())
}
m => {
debug!("unknown message received: {:?}", m);
Ok(())
Expand Down
3 changes: 2 additions & 1 deletion russh/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
//! * Serving `ratatui` based TUI app to clients: [per-client](https://github.com/warp-tech/russh/blob/main/russh/examples/ratatui_app.rs), [shared](https://github.com/warp-tech/russh/blob/main/russh/examples/ratatui_shared_app.rs)

use std;
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::num::Wrapping;
use std::pin::Pin;
use std::sync::Arc;
Expand Down Expand Up @@ -678,6 +678,7 @@ where
pending_reads: Vec::new(),
pending_len: 0,
channels: HashMap::new(),
open_global_requests: VecDeque::new(),
};
let join = tokio::spawn(session.run(stream, handler));

Expand Down
Loading
Loading