-
Notifications
You must be signed in to change notification settings - Fork 205
/
publisher-ts.go
111 lines (103 loc) · 2.36 KB
/
publisher-ts.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package engine
import (
"go.uber.org/zap"
"m7s.live/engine/v4/codec/mpegts"
"m7s.live/engine/v4/track"
"m7s.live/engine/v4/util"
)
type TSReader struct {
*TSPublisher
mpegts.MpegTsStream
}
func NewTSReader(pub *TSPublisher) (r *TSReader) {
r = &TSReader{
TSPublisher: pub,
}
r.PESChan = make(chan *mpegts.MpegTsPESPacket, 50)
r.PESBuffer = make(map[uint16]*mpegts.MpegTsPESPacket)
go r.ReadPES()
return
}
type TSPublisher struct {
Publisher
pool util.BytesPool
}
func (t *TSPublisher) OnEvent(event any) {
switch v := event.(type) {
case IPublisher:
t.pool = make(util.BytesPool, 17)
if v.GetPublisher() != &t.Publisher {
t.AudioTrack = v.GetAudioTrack()
t.VideoTrack = v.GetVideoTrack()
}
case SEKick, SEclose:
// close(t.PESChan)
t.Publisher.OnEvent(event)
default:
t.Publisher.OnEvent(event)
}
}
func (t *TSPublisher) OnPmtStream(s mpegts.MpegTsPmtStream) {
switch s.StreamType {
case mpegts.STREAM_TYPE_H264:
if t.VideoTrack == nil {
t.VideoTrack = track.NewH264(t, t.pool)
}
case mpegts.STREAM_TYPE_H265:
if t.VideoTrack == nil {
t.VideoTrack = track.NewH265(t, t.pool)
}
case mpegts.STREAM_TYPE_AAC:
if t.AudioTrack == nil {
t.AudioTrack = track.NewAAC(t, t.pool)
}
case mpegts.STREAM_TYPE_G711A:
if t.AudioTrack == nil {
t.AudioTrack = track.NewG711(t, true, t.pool)
}
case mpegts.STREAM_TYPE_G711U:
if t.AudioTrack == nil {
t.AudioTrack = track.NewG711(t, false, t.pool)
}
default:
t.Warn("unsupport stream type:", zap.Uint8("type", s.StreamType))
}
}
func (t *TSReader) Close() {
close(t.PESChan)
}
func (t *TSReader) ReadPES() {
for pes := range t.PESChan {
if t.Err() != nil {
continue
}
if pes.Header.Dts == 0 {
pes.Header.Dts = pes.Header.Pts
}
switch pes.Header.StreamID & 0xF0 {
case mpegts.STREAM_ID_VIDEO:
if t.VideoTrack == nil {
for _, s := range t.PMT.Stream {
t.OnPmtStream(s)
}
}
if t.VideoTrack != nil {
t.WriteAnnexB(uint32(pes.Header.Pts), uint32(pes.Header.Dts), pes.Payload)
}
default:
if t.AudioTrack == nil {
for _, s := range t.PMT.Stream {
t.OnPmtStream(s)
}
}
if t.AudioTrack != nil {
switch t.AudioTrack.(type) {
case *track.AAC:
t.AudioTrack.WriteADTS(uint32(pes.Header.Pts), pes.Payload)
case *track.G711:
t.AudioTrack.WriteRawBytes(uint32(pes.Header.Pts), pes.Payload)
}
}
}
}
}