-
Notifications
You must be signed in to change notification settings - Fork 6
/
main.go
120 lines (109 loc) · 2.73 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package webtransport
import (
"net/http"
"strconv"
"github.com/quic-go/quic-go"
. "m7s.live/engine/v4"
"m7s.live/engine/v4/config"
)
type WebTransportConfig struct {
ListenAddr string `default:":4433" desc:"监听地址端口(IP:PORT)IP可省略"`
CertFile string `desc:"证书文件"`
KeyFile string `desc:"密钥文件"`
}
func (c *WebTransportConfig) OnEvent(event any) {
switch event.(type) {
case FirstConfig:
// if c.CertFile != "" {
// _, err := os.Stat(c.CertFile)
// if err != nil {
// plugin.Error("need certfile", zap.Error(err))
// plugin.Disabled = true
// return
// }
// }
// if c.KeyFile != "" {
// _, err := os.Stat(c.KeyFile)
// if err != nil {
// plugin.Error("need keyfile", zap.Error(err))
// plugin.Disabled = true
// return
// }
// }
mux := http.NewServeMux()
mux.HandleFunc("/play/", func(w http.ResponseWriter, r *http.Request) {
streamPath := r.URL.Path[len("/play/"):]
session := r.Body.(*Session)
session.AcceptSession()
defer session.CloseSession()
// TODO: 多路
s, err := session.AcceptStream()
if err != nil {
return
}
// buf := make([]byte, 1024)
// n, err := s.Read(buf)
// if err != nil {
// return
// }
sub := &WebTransportSubscriber{}
sub.RemoteAddr = r.RemoteAddr
sub.SetIO(s)
sub.ID = strconv.FormatInt(int64(s.StreamID()), 10)
plugin.SubscribeBlock(streamPath, sub, SUBTYPE_FLV)
})
mux.HandleFunc("/push/", func(w http.ResponseWriter, r *http.Request) {
streamPath := r.URL.Path[len("/push/"):]
session := r.Body.(*Session)
session.AcceptSession()
defer session.CloseSession()
// TODO: 多路
s, err := session.AcceptStream()
if err != nil {
return
}
// buf := make([]byte, 1024)
// n, err := s.Read(buf)
// if err != nil {
// return
// }
pub := &WebTransportPublisher{}
pub.SetIO(s)
pub.ID = strconv.FormatInt(int64(s.StreamID()), 10)
if plugin.Publish(streamPath, pub) == nil {
}
})
c.Run(mux)
}
}
func (c *WebTransportConfig) Run(mux http.Handler) {
s := &Server{
Handler: mux,
ListenAddr: c.ListenAddr,
TLSCert: CertFile{Path: c.CertFile, Data: config.LocalCert},
TLSKey: CertFile{Path: c.KeyFile, Data: config.LocalKey},
}
if s.QuicConfig == nil {
s.QuicConfig = &QuicConfig{}
}
s.QuicConfig.EnableDatagrams = true
listener, err := quic.ListenAddr(c.ListenAddr, s.generateTLSConfig(), (*quic.Config)(s.QuicConfig))
if err != nil {
plugin.Disabled = true
return
}
go func() {
<-plugin.Done()
listener.Close()
}()
go func() {
for {
sess, err := listener.Accept(plugin)
if err != nil {
return
}
go s.handleSession(plugin, sess)
}
}()
}
var plugin = InstallPlugin(&WebTransportConfig{})