Skip to content

Commit

Permalink
fix(webconsole): ssh randomly exited error (#21407)
Browse files Browse the repository at this point in the history
  • Loading branch information
zexi authored Oct 14, 2024
1 parent 09e95bd commit f25c054
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 62 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.21
require (
bazil.org/fuse v0.0.0-20180421153158-65cc252bf669
github.com/360EntSecGroup-Skylar/excelize v1.4.0
github.com/LeeEirc/terminalparser v0.0.0-20220328021224-de16b7643ea4
github.com/LeeEirc/terminalparser v0.0.0-20240205084113-fbf78c8480f2
github.com/aliyun/alibaba-cloud-sdk-go v1.61.684
github.com/anacrolix/sync v0.0.0-20180808010631-44578de4e778
github.com/anacrolix/torrent v0.0.0-20181129073333-cc531b8c4a80
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ github.com/DataDog/zstd v1.3.4 h1:LAGHkXuvC6yky+C2CUG2tD7w8QlrUwpue8XwIh0X4AY=
github.com/DataDog/zstd v1.3.4/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/LeeEirc/terminalparser v0.0.0-20220328021224-de16b7643ea4 h1:Gk7m4Nu2jqVqJAJqNlTYqkiq96WkANAtB4fVi+t7Xv8=
github.com/LeeEirc/terminalparser v0.0.0-20220328021224-de16b7643ea4/go.mod h1:tiLv6VBLH4Z3KdBSe2qIKRwQDGCVQ9/F5fOKpQGvyoA=
github.com/LeeEirc/terminalparser v0.0.0-20240205084113-fbf78c8480f2 h1:XGB3B0651J1uKOE1KJa1gsrV/DO1kthhk2NTDUHATgs=
github.com/LeeEirc/terminalparser v0.0.0-20240205084113-fbf78c8480f2/go.mod h1:tiLv6VBLH4Z3KdBSe2qIKRwQDGCVQ9/F5fOKpQGvyoA=
github.com/Microsoft/azure-vhd-utils v0.0.0-20181115010904-44cbada2ece3 h1:gImoAO1xAcC1oDlYmD/X7dggsodGf2DFJOVE5m0ssms=
github.com/Microsoft/azure-vhd-utils v0.0.0-20181115010904-44cbada2ece3/go.mod h1:u0H9gMieFLxkUy8RS0X8VbFWyPs2815qQAaitRbj6x0=
github.com/Microsoft/go-winio v0.4.15 h1:qkLXKzb1QoVatRyd/YlXZ/Kg0m5K3SPuoD82jjSOaBc=
Expand Down
150 changes: 108 additions & 42 deletions pkg/webconsole/server/ssh_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import (
"encoding/base64"
"fmt"
"io"
"net"
"net/http"
"time"

"github.com/anacrolix/sync"
"github.com/gorilla/websocket"
"github.com/pkg/errors"
"github.com/pkg/sftp"
Expand All @@ -42,12 +42,16 @@ type WebsocketServer struct {
Password string
PrivateKey string

session *ssh.Session
StdinPipe io.WriteCloser
ws *websocket.Conn
conn *ssh.Client
sftp *sftp.Client
timer *time.Timer
session *ssh.Session
StdinPipe io.WriteCloser
StdoutPipe io.Reader
StderrPipe io.Reader

ws *websocket.Conn
conn *ssh.Client
sshNetConn net.Conn
sftp *sftp.Client
timer *time.Timer
}

func NewSshServer(s *session.SSession) (*WebsocketServer, error) {
Expand All @@ -63,30 +67,28 @@ func NewSshServer(s *session.SSession) (*WebsocketServer, error) {
return server, nil
}

type WebSocketBufferWriter struct {
s *session.SSession
ws *websocket.Conn
lock sync.Mutex
}

func (w *WebSocketBufferWriter) Write(p []byte) (int, error) {
w.lock.Lock()
defer w.lock.Unlock()

go w.s.GetRecorder().Write("", string(p))
err := w.ws.WriteMessage(websocket.BinaryMessage, p)
if err != nil {
return 0, err
func writeToWebsocket(reader io.Reader, s *WebsocketServer) error {
var data = make([]byte, 1024)
for {
n, err := reader.Read(data)
if err != nil {
return errors.Wrap(err, "read data from reader")
}
out := data[:n]
go s.Session.GetRecorder().Write("", string(out))
if err := s.ws.WriteMessage(websocket.BinaryMessage, out); err != nil {
return errors.Wrapf(err, "write data to websocket, out: %s", string(out))
}
}
return len(p), nil
return nil
}

func (s *WebsocketServer) initWs(w http.ResponseWriter, r *http.Request) error {
username := s.Username
privateKey := s.PrivateKey
password := s.Password
config := &ssh.ClientConfig{
Timeout: time.Second,
Timeout: 5 * time.Second,
User: username,
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
Auth: []ssh.AuthMethod{
Expand All @@ -101,7 +103,7 @@ func (s *WebsocketServer) initWs(w http.ResponseWriter, r *http.Request) error {

var err error
addr := fmt.Sprintf("%s:%d", s.Host, s.Port)
s.conn, err = ssh.Dial("tcp", addr, config)
s.conn, s.sshNetConn, err = NewSshClient("tcp", addr, config)
if err != nil {
return errors.Wrapf(err, "dial %s", addr)
}
Expand All @@ -119,7 +121,15 @@ func (s *WebsocketServer) initWs(w http.ResponseWriter, r *http.Request) error {

s.StdinPipe, err = s.session.StdinPipe()
if err != nil {
return errors.Wrapf(err, "StdinPip")
return errors.Wrapf(err, "StdinPipe")
}
s.StdoutPipe, err = s.session.StdoutPipe()
if err != nil {
return errors.Wrapf(err, "StdoutPipe")
}
s.StderrPipe, err = s.session.StderrPipe()
if err != nil {
return errors.Wrapf(err, "StderrPipe")
}

var up = websocket.Upgrader{
Expand All @@ -135,14 +145,6 @@ func (s *WebsocketServer) initWs(w http.ResponseWriter, r *http.Request) error {
return errors.Wrapf(err, "upgrade")
}

wsWriter := WebSocketBufferWriter{
s: s.Session,
ws: s.ws,
}

s.session.Stdout = &wsWriter
s.session.Stderr = &wsWriter

modes := ssh.TerminalModes{
ssh.ECHO: 1,
ssh.TTY_OP_ISPEED: 14400,
Expand All @@ -161,15 +163,55 @@ func (s *WebsocketServer) initWs(w http.ResponseWriter, r *http.Request) error {
return nil
}

// ref: https://github.com/golang/go/issues/19338#issuecomment-539057790
func NewSshClient(network, addr string, conf *ssh.ClientConfig) (*ssh.Client, net.Conn, error) {
conn, err := net.DialTimeout(network, addr, conf.Timeout)
if err != nil {
return nil, nil, errors.Wrapf(err, "dial %s %s", network, addr)
}
if conf.Timeout > 0 {
conn.SetDeadline(time.Now().Add(conf.Timeout))
}
c, chans, reqs, err := ssh.NewClientConn(conn, addr, conf)
if err != nil {
return nil, nil, errors.Wrapf(err, "new client conn %s", addr)
}
if conf.Timeout > 0 {
conn.SetDeadline(time.Time{})
}
return ssh.NewClient(c, chans, reqs), conn, nil
}

func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
logPrefix := fmt.Sprintf("ssh %s@%s:%d, session_id: %s", s.Username, s.Host, s.Port, s.Session.Id)

err := s.initWs(w, r)
if err != nil {
log.Errorf("initWs error: %v", err)
log.Errorf("%s, initWs error: %v", logPrefix, err)
return
}

done := make(chan bool, 3)
setDone := func() { done <- true }
keepAliveDone := make(chan struct{})

go func() {
if err := sshKeepAlive(s.conn, s.sshNetConn, keepAliveDone); err != nil {
log.Errorf("%s, keepalive error: %v", logPrefix, err)
}
}()

setDone := func() {
done <- true
}

for _, reader := range []io.Reader{s.StdoutPipe, s.StderrPipe} {
tmpReader := reader
go func() {
if err := writeToWebsocket(tmpReader, s); err != nil {
log.Warningf("%s, writeToWebsocket error: %v", logPrefix, err)
}
}()
}

go func() {
defer setDone()
Expand All @@ -193,12 +235,12 @@ func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}{}
obj, err := jsonutils.Parse(p)
if err != nil {
log.Errorf("parse %s error: %v", string(p), err)
log.Errorf("%s, parse %s error: %v", logPrefix, string(p), err)
continue
}
err = obj.Unmarshal(&input)
if err != nil {
log.Errorf("unmarshal %s error: %v", string(p), err)
log.Errorf("%s, unmarshal %s error: %v", logPrefix, string(p), err)
continue
}

Expand All @@ -208,7 +250,7 @@ func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
case "resize":
err = s.session.WindowChange(input.Data.Rows, input.Data.Cols)
if err != nil {
log.Errorf("resize %dx%d error: %v", input.Data.Cols, input.Data.Rows, err)
log.Errorf("%s, resize %dx%d error: %v", logPrefix, input.Data.Cols, input.Data.Rows, err)
}
case "input":
if input.Data.Base64 {
Expand All @@ -218,13 +260,13 @@ func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
go s.Session.GetRecorder().Write(input.Data.Data, "")
_, err = s.StdinPipe.Write([]byte(input.Data.Data))
if err != nil {
log.Errorf("write %s error: %v", input.Data.Data, err)
log.Errorf("%s, write %s error: %v", logPrefix, input.Data.Data, err)
return
}
case "heartbeat":
continue
default:
log.Errorf("unknow msg type %s for %s:%d", input.Type, s.Host, s.Port)
log.Errorf("%s, unknow msg type %s", logPrefix, input.Type)
}
}
}()
Expand All @@ -237,6 +279,7 @@ func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.sftp.Close()
s.conn.Close()
s.Session.Close()
keepAliveDone <- struct{}{}
}()

stop := make(chan bool)
Expand Down Expand Up @@ -266,12 +309,35 @@ func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {

err = s.session.Wait()
if err != nil {
log.Warningf("session %s wait error: %v", s.Session.Id, err)
log.Warningf("%s wait error: %v", logPrefix, err)
s.StdinPipe.Write([]byte(err.Error()))
}
}()

<-done
stop <- true
log.Infof("ssh %s@%s:%d complete, session_id: %s", s.Username, s.Host, s.Port, s.Session.Id)
}

func sshKeepAlive(cli *ssh.Client, conn net.Conn, done <-chan struct{}) error {
// ref:
// - https://github.com/golang/go/issues/21478
// - https://github.com/scylladb/go-sshtools/blob/master/keepalive.go#L36
const keepAliveInterval = 15 * time.Second
t := time.NewTicker(keepAliveInterval)
defer t.Stop()
for {
deadline := time.Now().Add(keepAliveInterval).Add(15 * time.Second)
if err := conn.SetDeadline(deadline); err != nil {
return errors.Wrap(err, "failed to set deadline")
}
select {
case <-t.C:
_, _, err := cli.SendRequest("[email protected]", true, nil)
if err != nil {
return errors.Wrap(err, "failed to send keep alive")
}
case <-done:
return nil
}
}
}
24 changes: 7 additions & 17 deletions vendor/github.com/LeeEirc/terminalparser/csi_func.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion vendor/github.com/LeeEirc/terminalparser/screen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ github.com/DataDog/dd-trace-go/tracer/ext
# github.com/DataDog/zstd v1.3.4
## explicit
github.com/DataDog/zstd
# github.com/LeeEirc/terminalparser v0.0.0-20220328021224-de16b7643ea4
# github.com/LeeEirc/terminalparser v0.0.0-20240205084113-fbf78c8480f2
## explicit; go 1.15
github.com/LeeEirc/terminalparser
# github.com/Microsoft/azure-vhd-utils v0.0.0-20181115010904-44cbada2ece3
Expand Down

0 comments on commit f25c054

Please sign in to comment.