Browse Source

🐛 FIX: 批量订阅

v4
dexter 2 years ago
parent
commit
190a0161ba
  1. 19
      batcher.go
  2. 10
      subscriber.go

19
batcher.go

@ -53,6 +53,7 @@ func (suber *WebRTCBatcher) Start() (err error) {
func (suber *WebRTCBatcher) Signal(msg DataChannelMessage) { func (suber *WebRTCBatcher) Signal(msg DataChannelMessage) {
var s Signal var s Signal
var removeMap = map[string]string{"type": "remove", "streamPath": ""}
// var offer SessionDescription // var offer SessionDescription
if err := json.Unmarshal(msg.Data, &s); err != nil { if err := json.Unmarshal(msg.Data, &s); err != nil {
WebRTCPlugin.Error("Signal", zap.Error(err)) WebRTCPlugin.Error("Signal", zap.Error(err))
@ -66,13 +67,27 @@ func (suber *WebRTCBatcher) Signal(msg DataChannelMessage) {
for _, streamPath := range s.StreamList { for _, streamPath := range s.StreamList {
sub := &WebRTCBatchSubscriber{} sub := &WebRTCBatchSubscriber{}
sub.WebRTCIO = suber.WebRTCIO sub.WebRTCIO = suber.WebRTCIO
if err = WebRTCPlugin.Subscribe(streamPath, sub); err == nil { if err = WebRTCPlugin.SubscribeExist(streamPath, sub); err == nil {
suber.subscribers = append(suber.subscribers, sub) suber.subscribers = append(suber.subscribers, sub)
go func() { go func() {
sub.PlayRTP() sub.PlayRTP()
b, _ := json.Marshal(map[string]string{"type": "remove", "streamPath": streamPath}) if sub.audioSender != nil {
suber.RemoveTrack(sub.audioSender)
}
if sub.videoSender != nil {
suber.RemoveTrack(sub.videoSender)
}
if sub.DC != nil {
sub.DC.Close()
}
removeMap["streamPath"] = streamPath
b, _ := json.Marshal(removeMap)
suber.signalChannel.SendText(string(b)) suber.signalChannel.SendText(string(b))
}() }()
} else {
removeMap["streamPath"] = streamPath
b, _ := json.Marshal(removeMap)
suber.signalChannel.SendText(string(b))
} }
} }
var answer string var answer string

10
subscriber.go

@ -18,6 +18,8 @@ type WebRTCSubscriber struct {
WebRTCIO WebRTCIO
videoTrack *TrackLocalStaticRTP videoTrack *TrackLocalStaticRTP
audioTrack *TrackLocalStaticRTP audioTrack *TrackLocalStaticRTP
videoSender *RTPSender
audioSender *RTPSender
DC *DataChannel DC *DataChannel
flvHeadCache []byte flvHeadCache []byte
} }
@ -44,11 +46,11 @@ func (suber *WebRTCSubscriber) OnEvent(event any) {
if suber.videoTrack == nil { if suber.videoTrack == nil {
suber.DC, _ = suber.PeerConnection.CreateDataChannel(suber.Subscriber.Stream.Path, nil) suber.DC, _ = suber.PeerConnection.CreateDataChannel(suber.Subscriber.Stream.Path, nil)
} else { } else {
rtpSender, _ := suber.PeerConnection.AddTrack(suber.videoTrack) suber.videoSender, _ = suber.PeerConnection.AddTrack(suber.videoTrack)
go func() { go func() {
rtcpBuf := make([]byte, 1500) rtcpBuf := make([]byte, 1500)
for { for {
if n, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil { if n, _, rtcpErr := suber.videoSender.Read(rtcpBuf); rtcpErr != nil {
return return
} else { } else {
@ -72,7 +74,7 @@ func (suber *WebRTCSubscriber) OnEvent(event any) {
} }
if v.CodecID == codec.CodecID_PCMA || v.CodecID == codec.CodecID_PCMU { if v.CodecID == codec.CodecID_PCMA || v.CodecID == codec.CodecID_PCMU {
suber.audioTrack, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: audioMimeType}, v.Name, suber.Subscriber.Stream.Path) suber.audioTrack, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: audioMimeType}, v.Name, suber.Subscriber.Stream.Path)
suber.PeerConnection.AddTrack(suber.audioTrack) suber.audioSender, _ = suber.PeerConnection.AddTrack(suber.audioTrack)
suber.Subscriber.AddTrack(v) //接受这个track suber.Subscriber.AddTrack(v) //接受这个track
} }
case VideoDeConf: case VideoDeConf:
@ -82,7 +84,7 @@ func (suber *WebRTCSubscriber) OnEvent(event any) {
suber.flvHeadCache[0] = 9 suber.flvHeadCache[0] = 9
suber.DC.Send(codec.FLVHeader) suber.DC.Send(codec.FLVHeader)
} }
suber.DC.Send(util.ConcatBuffers(codec.VideoAVCC2FLV(v.AVCC,0))) suber.DC.Send(util.ConcatBuffers(codec.VideoAVCC2FLV(v.AVCC, 0)))
} }
case VideoRTP: case VideoRTP:
if suber.videoTrack != nil { if suber.videoTrack != nil {

Loading…
Cancel
Save