diff --git a/go.mod b/go.mod index f7e50b61027..20498ed1ee3 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.21 require ( bazil.org/fuse v0.0.0-20180421153158-65cc252bf669 github.com/360EntSecGroup-Skylar/excelize v1.4.0 - github.com/LeeEirc/terminalparser v0.0.0-20220328021224-de16b7643ea4 + github.com/LeeEirc/terminalparser v0.0.0-20240205084113-fbf78c8480f2 github.com/aliyun/alibaba-cloud-sdk-go v1.61.684 github.com/anacrolix/sync v0.0.0-20180808010631-44578de4e778 github.com/anacrolix/torrent v0.0.0-20181129073333-cc531b8c4a80 diff --git a/go.sum b/go.sum index a5257f1f19e..22025dd0fbc 100644 --- a/go.sum +++ b/go.sum @@ -91,6 +91,8 @@ github.com/DataDog/zstd v1.3.4 h1:LAGHkXuvC6yky+C2CUG2tD7w8QlrUwpue8XwIh0X4AY= github.com/DataDog/zstd v1.3.4/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/LeeEirc/terminalparser v0.0.0-20220328021224-de16b7643ea4 h1:Gk7m4Nu2jqVqJAJqNlTYqkiq96WkANAtB4fVi+t7Xv8= github.com/LeeEirc/terminalparser v0.0.0-20220328021224-de16b7643ea4/go.mod h1:tiLv6VBLH4Z3KdBSe2qIKRwQDGCVQ9/F5fOKpQGvyoA= +github.com/LeeEirc/terminalparser v0.0.0-20240205084113-fbf78c8480f2 h1:XGB3B0651J1uKOE1KJa1gsrV/DO1kthhk2NTDUHATgs= +github.com/LeeEirc/terminalparser v0.0.0-20240205084113-fbf78c8480f2/go.mod h1:tiLv6VBLH4Z3KdBSe2qIKRwQDGCVQ9/F5fOKpQGvyoA= github.com/Microsoft/azure-vhd-utils v0.0.0-20181115010904-44cbada2ece3 h1:gImoAO1xAcC1oDlYmD/X7dggsodGf2DFJOVE5m0ssms= github.com/Microsoft/azure-vhd-utils v0.0.0-20181115010904-44cbada2ece3/go.mod h1:u0H9gMieFLxkUy8RS0X8VbFWyPs2815qQAaitRbj6x0= github.com/Microsoft/go-winio v0.4.15 h1:qkLXKzb1QoVatRyd/YlXZ/Kg0m5K3SPuoD82jjSOaBc= diff --git a/pkg/webconsole/server/ssh_server.go b/pkg/webconsole/server/ssh_server.go index ba3406f86bf..a1d5709c39e 100644 --- a/pkg/webconsole/server/ssh_server.go +++ b/pkg/webconsole/server/ssh_server.go @@ -18,10 +18,10 @@ import ( "encoding/base64" "fmt" "io" + "net" "net/http" "time" - "github.com/anacrolix/sync" "github.com/gorilla/websocket" "github.com/pkg/errors" "github.com/pkg/sftp" @@ -42,12 +42,16 @@ type WebsocketServer struct { Password string PrivateKey string - session *ssh.Session - StdinPipe io.WriteCloser - ws *websocket.Conn - conn *ssh.Client - sftp *sftp.Client - timer *time.Timer + session *ssh.Session + StdinPipe io.WriteCloser + StdoutPipe io.Reader + StderrPipe io.Reader + + ws *websocket.Conn + conn *ssh.Client + sshNetConn net.Conn + sftp *sftp.Client + timer *time.Timer } func NewSshServer(s *session.SSession) (*WebsocketServer, error) { @@ -63,22 +67,20 @@ func NewSshServer(s *session.SSession) (*WebsocketServer, error) { return server, nil } -type WebSocketBufferWriter struct { - s *session.SSession - ws *websocket.Conn - lock sync.Mutex -} - -func (w *WebSocketBufferWriter) Write(p []byte) (int, error) { - w.lock.Lock() - defer w.lock.Unlock() - - go w.s.GetRecorder().Write("", string(p)) - err := w.ws.WriteMessage(websocket.BinaryMessage, p) - if err != nil { - return 0, err +func writeToWebsocket(reader io.Reader, s *WebsocketServer) error { + var data = make([]byte, 1024) + for { + n, err := reader.Read(data) + if err != nil { + return errors.Wrap(err, "read data from reader") + } + out := data[:n] + go s.Session.GetRecorder().Write("", string(out)) + if err := s.ws.WriteMessage(websocket.BinaryMessage, out); err != nil { + return errors.Wrapf(err, "write data to websocket, out: %s", string(out)) + } } - return len(p), nil + return nil } func (s *WebsocketServer) initWs(w http.ResponseWriter, r *http.Request) error { @@ -86,7 +88,7 @@ func (s *WebsocketServer) initWs(w http.ResponseWriter, r *http.Request) error { privateKey := s.PrivateKey password := s.Password config := &ssh.ClientConfig{ - Timeout: time.Second, + Timeout: 5 * time.Second, User: username, HostKeyCallback: ssh.InsecureIgnoreHostKey(), Auth: []ssh.AuthMethod{ @@ -101,7 +103,7 @@ func (s *WebsocketServer) initWs(w http.ResponseWriter, r *http.Request) error { var err error addr := fmt.Sprintf("%s:%d", s.Host, s.Port) - s.conn, err = ssh.Dial("tcp", addr, config) + s.conn, s.sshNetConn, err = NewSshClient("tcp", addr, config) if err != nil { return errors.Wrapf(err, "dial %s", addr) } @@ -119,7 +121,15 @@ func (s *WebsocketServer) initWs(w http.ResponseWriter, r *http.Request) error { s.StdinPipe, err = s.session.StdinPipe() if err != nil { - return errors.Wrapf(err, "StdinPip") + return errors.Wrapf(err, "StdinPipe") + } + s.StdoutPipe, err = s.session.StdoutPipe() + if err != nil { + return errors.Wrapf(err, "StdoutPipe") + } + s.StderrPipe, err = s.session.StderrPipe() + if err != nil { + return errors.Wrapf(err, "StderrPipe") } var up = websocket.Upgrader{ @@ -135,14 +145,6 @@ func (s *WebsocketServer) initWs(w http.ResponseWriter, r *http.Request) error { return errors.Wrapf(err, "upgrade") } - wsWriter := WebSocketBufferWriter{ - s: s.Session, - ws: s.ws, - } - - s.session.Stdout = &wsWriter - s.session.Stderr = &wsWriter - modes := ssh.TerminalModes{ ssh.ECHO: 1, ssh.TTY_OP_ISPEED: 14400, @@ -161,15 +163,55 @@ func (s *WebsocketServer) initWs(w http.ResponseWriter, r *http.Request) error { return nil } +// ref: https://github.com/golang/go/issues/19338#issuecomment-539057790 +func NewSshClient(network, addr string, conf *ssh.ClientConfig) (*ssh.Client, net.Conn, error) { + conn, err := net.DialTimeout(network, addr, conf.Timeout) + if err != nil { + return nil, nil, errors.Wrapf(err, "dial %s %s", network, addr) + } + if conf.Timeout > 0 { + conn.SetDeadline(time.Now().Add(conf.Timeout)) + } + c, chans, reqs, err := ssh.NewClientConn(conn, addr, conf) + if err != nil { + return nil, nil, errors.Wrapf(err, "new client conn %s", addr) + } + if conf.Timeout > 0 { + conn.SetDeadline(time.Time{}) + } + return ssh.NewClient(c, chans, reqs), conn, nil +} + func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + logPrefix := fmt.Sprintf("ssh %s@%s:%d, session_id: %s", s.Username, s.Host, s.Port, s.Session.Id) + err := s.initWs(w, r) if err != nil { - log.Errorf("initWs error: %v", err) + log.Errorf("%s, initWs error: %v", logPrefix, err) return } done := make(chan bool, 3) - setDone := func() { done <- true } + keepAliveDone := make(chan struct{}) + + go func() { + if err := sshKeepAlive(s.conn, s.sshNetConn, keepAliveDone); err != nil { + log.Errorf("%s, keepalive error: %v", logPrefix, err) + } + }() + + setDone := func() { + done <- true + } + + for _, reader := range []io.Reader{s.StdoutPipe, s.StderrPipe} { + tmpReader := reader + go func() { + if err := writeToWebsocket(tmpReader, s); err != nil { + log.Warningf("%s, writeToWebsocket error: %v", logPrefix, err) + } + }() + } go func() { defer setDone() @@ -193,12 +235,12 @@ func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { }{} obj, err := jsonutils.Parse(p) if err != nil { - log.Errorf("parse %s error: %v", string(p), err) + log.Errorf("%s, parse %s error: %v", logPrefix, string(p), err) continue } err = obj.Unmarshal(&input) if err != nil { - log.Errorf("unmarshal %s error: %v", string(p), err) + log.Errorf("%s, unmarshal %s error: %v", logPrefix, string(p), err) continue } @@ -208,7 +250,7 @@ func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { case "resize": err = s.session.WindowChange(input.Data.Rows, input.Data.Cols) if err != nil { - log.Errorf("resize %dx%d error: %v", input.Data.Cols, input.Data.Rows, err) + log.Errorf("%s, resize %dx%d error: %v", logPrefix, input.Data.Cols, input.Data.Rows, err) } case "input": if input.Data.Base64 { @@ -218,13 +260,13 @@ func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { go s.Session.GetRecorder().Write(input.Data.Data, "") _, err = s.StdinPipe.Write([]byte(input.Data.Data)) if err != nil { - log.Errorf("write %s error: %v", input.Data.Data, err) + log.Errorf("%s, write %s error: %v", logPrefix, input.Data.Data, err) return } case "heartbeat": continue default: - log.Errorf("unknow msg type %s for %s:%d", input.Type, s.Host, s.Port) + log.Errorf("%s, unknow msg type %s", logPrefix, input.Type) } } }() @@ -237,6 +279,7 @@ func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.sftp.Close() s.conn.Close() s.Session.Close() + keepAliveDone <- struct{}{} }() stop := make(chan bool) @@ -266,12 +309,35 @@ func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { err = s.session.Wait() if err != nil { - log.Warningf("session %s wait error: %v", s.Session.Id, err) + log.Warningf("%s wait error: %v", logPrefix, err) s.StdinPipe.Write([]byte(err.Error())) } }() <-done stop <- true - log.Infof("ssh %s@%s:%d complete, session_id: %s", s.Username, s.Host, s.Port, s.Session.Id) +} + +func sshKeepAlive(cli *ssh.Client, conn net.Conn, done <-chan struct{}) error { + // ref: + // - https://github.com/golang/go/issues/21478 + // - https://github.com/scylladb/go-sshtools/blob/master/keepalive.go#L36 + const keepAliveInterval = 15 * time.Second + t := time.NewTicker(keepAliveInterval) + defer t.Stop() + for { + deadline := time.Now().Add(keepAliveInterval).Add(15 * time.Second) + if err := conn.SetDeadline(deadline); err != nil { + return errors.Wrap(err, "failed to set deadline") + } + select { + case <-t.C: + _, _, err := cli.SendRequest("keepalive@openssh.com", true, nil) + if err != nil { + return errors.Wrap(err, "failed to send keep alive") + } + case <-done: + return nil + } + } } diff --git a/vendor/github.com/LeeEirc/terminalparser/csi_func.go b/vendor/github.com/LeeEirc/terminalparser/csi_func.go index 30e624d0c2d..2efe9b4e52e 100644 --- a/vendor/github.com/LeeEirc/terminalparser/csi_func.go +++ b/vendor/github.com/LeeEirc/terminalparser/csi_func.go @@ -11,25 +11,15 @@ type screenCsiFunc func(screen *Screen, params []rune) var CSIFuncMap = map[rune]screenCsiFunc{ '@': func(s *Screen, params []rune) { currentRow := s.GetCursorRow() - switch len(params) { - case 1: - if ps, err := strconv.Atoi(string(params)); err == nil { - insertData := make([]rune, ps) - for i := 0; i < ps; i++ { - insertData[i] = Spaces[0] - } - currentRow.changeCursorToX(s.Cursor.X) - currentRow.insertCharacters(insertData) + if ps, err := strconv.Atoi(string(params)); err == nil { + insertData := make([]rune, ps) + for i := 0; i < ps; i++ { + insertData[i] = Spaces[0] } - case 2: - if params[len(params)-1] == Spaces[0] { - if _, err := strconv.Atoi(string(params[0])); err == nil { - log.Printf("Screen 不支持解析 CSI `%s` @\n", string(params)) - } - } - default: currentRow.changeCursorToX(s.Cursor.X) - currentRow.insertCharacters([]rune{Spaces[0]}) + currentRow.insertCharacters(insertData) + } else { + log.Printf("Screen 不支持解析 CSI `%s` @\n", string(params)) } }, 'A': func(s *Screen, params []rune) { diff --git a/vendor/github.com/LeeEirc/terminalparser/screen.go b/vendor/github.com/LeeEirc/terminalparser/screen.go index 62bb8d38308..39bd3b09575 100644 --- a/vendor/github.com/LeeEirc/terminalparser/screen.go +++ b/vendor/github.com/LeeEirc/terminalparser/screen.go @@ -101,7 +101,6 @@ func (s *Screen) parseC0Sequence(code rune) { \r */ s.Cursor.X = 0 - s.Cursor.Y++ if s.Cursor.Y > len(s.Rows) { s.Rows = append(s.Rows, &Row{ dataRune: make([]rune, 0, 1024), diff --git a/vendor/modules.txt b/vendor/modules.txt index 8e0622dde39..5100c19e28f 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -85,7 +85,7 @@ github.com/DataDog/dd-trace-go/tracer/ext # github.com/DataDog/zstd v1.3.4 ## explicit github.com/DataDog/zstd -# github.com/LeeEirc/terminalparser v0.0.0-20220328021224-de16b7643ea4 +# github.com/LeeEirc/terminalparser v0.0.0-20240205084113-fbf78c8480f2 ## explicit; go 1.15 github.com/LeeEirc/terminalparser # github.com/Microsoft/azure-vhd-utils v0.0.0-20181115010904-44cbada2ece3