-
Notifications
You must be signed in to change notification settings - Fork 0
/
proxy.go
128 lines (113 loc) · 2.3 KB
/
proxy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package sipt
import (
"fmt"
"github.com/golang/glog"
"io"
"net"
"sync"
)
//A proxy represents a pair of connections and their state
type Proxy struct {
sentBytes uint64
receivedBytes uint64
laddr, raddr *net.TCPAddr
lconn, rconn *net.TCPConn
Connid uint64
nagles bool
erred bool
errsig chan bool
prefix string
rules map[string]*Rule
mutex *sync.Mutex
}
//func (p *Proxy) log(s string, args ...interface{}) {
// if *verbose {
// glog.Info(p.prefix+s, args...)
// }
//}
func (p *Proxy) err(s string, err error) {
if p.erred {
return
}
if err != io.EOF {
glog.Warningf(p.prefix+s, err)
}
p.errsig <- true
p.erred = true
}
func (p *Proxy) Start() {
defer p.lconn.Close()
//connect to remote
rconn, err := net.DialTCP("tcp", nil, p.raddr)
if err != nil {
p.err("Remote connection failed: %s", err)
return
}
p.rconn = rconn
defer p.rconn.Close()
if p.nagles {
p.lconn.SetNoDelay(true)
p.rconn.SetNoDelay(true)
}
//display both ends
glog.Infof("Opened %s >>> %s", p.lconn.RemoteAddr().String(), p.rconn.RemoteAddr().String())
//bidirectional copy
go p.pipe(p.lconn, p.rconn)
go p.pipe(p.rconn, p.lconn)
//wait for close...
<-p.errsig
glog.Infof("Closed (%d bytes sent, %d bytes recieved)", p.sentBytes, p.receivedBytes)
}
func (p *Proxy) Stop() {
p.errsig <- true
p.erred = true
}
func (p *Proxy) pipe(src, dst *net.TCPConn) {
//data direction
var f, h string
islocal := src == p.lconn
if islocal {
f = ">>> %d bytes sent%s\n"
} else {
f = "<<< %d bytes recieved%s\n"
}
h = "%s"
//directional copy (64k buffer)
buff := make([]byte, 0xffff)
for {
n, err := src.Read(buff)
if err != nil {
p.err("Read failed '%s'\n", err)
return
}
b := buff[:n]
p.mutex.Lock()
for _, r := range p.rules {
b = r.Do(b)
}
p.mutex.Unlock()
//show output
glog.Infof(f, n, "\n"+fmt.Sprintf(h, b))
//write out result
n, err = dst.Write(b)
if err != nil {
p.err("Write failed '%s'\n", err)
return
}
if islocal {
p.sentBytes += uint64(n)
} else {
p.receivedBytes += uint64(n)
}
}
}
func (p *Proxy) AddRule(key string, r *Rule) {
p.mutex.Lock()
defer p.mutex.Unlock()
p.rules[key] = r
}
func (p *Proxy) RmRule(key string) {
p.mutex.Lock()
defer p.mutex.Unlock()
delete(p.rules, key)
}