From 190a0161ba0674a79703ca7699b0e6137d69cea8 Mon Sep 17 00:00:00 2001 From: dexter <178529795@qq.com> Date: Wed, 30 Nov 2022 13:17:57 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20FIX:=20=E6=89=B9=E9=87=8F?= =?UTF-8?q?=E8=AE=A2=E9=98=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- batcher.go | 19 +++++++++++++++++-- subscriber.go | 10 ++++++---- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/batcher.go b/batcher.go index 4acdfe4..4a6eb05 100644 --- a/batcher.go +++ b/batcher.go @@ -53,6 +53,7 @@ func (suber *WebRTCBatcher) Start() (err error) { func (suber *WebRTCBatcher) Signal(msg DataChannelMessage) { var s Signal + var removeMap = map[string]string{"type": "remove", "streamPath": ""} // var offer SessionDescription if err := json.Unmarshal(msg.Data, &s); err != nil { WebRTCPlugin.Error("Signal", zap.Error(err)) @@ -66,13 +67,27 @@ func (suber *WebRTCBatcher) Signal(msg DataChannelMessage) { for _, streamPath := range s.StreamList { sub := &WebRTCBatchSubscriber{} sub.WebRTCIO = suber.WebRTCIO - if err = WebRTCPlugin.Subscribe(streamPath, sub); err == nil { + if err = WebRTCPlugin.SubscribeExist(streamPath, sub); err == nil { suber.subscribers = append(suber.subscribers, sub) go func() { sub.PlayRTP() - b, _ := json.Marshal(map[string]string{"type": "remove", "streamPath": streamPath}) + if sub.audioSender != nil { + suber.RemoveTrack(sub.audioSender) + } + if sub.videoSender != nil { + suber.RemoveTrack(sub.videoSender) + } + if sub.DC != nil { + sub.DC.Close() + } + removeMap["streamPath"] = streamPath + b, _ := json.Marshal(removeMap) suber.signalChannel.SendText(string(b)) }() + } else { + removeMap["streamPath"] = streamPath + b, _ := json.Marshal(removeMap) + suber.signalChannel.SendText(string(b)) } } var answer string diff --git a/subscriber.go b/subscriber.go index 153e83b..5bada38 100644 --- a/subscriber.go +++ b/subscriber.go @@ -18,6 +18,8 @@ type WebRTCSubscriber struct { WebRTCIO videoTrack *TrackLocalStaticRTP audioTrack *TrackLocalStaticRTP + videoSender *RTPSender + audioSender *RTPSender DC *DataChannel flvHeadCache []byte } @@ -44,11 +46,11 @@ func (suber *WebRTCSubscriber) OnEvent(event any) { if suber.videoTrack == nil { suber.DC, _ = suber.PeerConnection.CreateDataChannel(suber.Subscriber.Stream.Path, nil) } else { - rtpSender, _ := suber.PeerConnection.AddTrack(suber.videoTrack) + suber.videoSender, _ = suber.PeerConnection.AddTrack(suber.videoTrack) go func() { rtcpBuf := make([]byte, 1500) for { - if n, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil { + if n, _, rtcpErr := suber.videoSender.Read(rtcpBuf); rtcpErr != nil { return } else { @@ -72,7 +74,7 @@ func (suber *WebRTCSubscriber) OnEvent(event any) { } if v.CodecID == codec.CodecID_PCMA || v.CodecID == codec.CodecID_PCMU { suber.audioTrack, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: audioMimeType}, v.Name, suber.Subscriber.Stream.Path) - suber.PeerConnection.AddTrack(suber.audioTrack) + suber.audioSender, _ = suber.PeerConnection.AddTrack(suber.audioTrack) suber.Subscriber.AddTrack(v) //接受这个track } case VideoDeConf: @@ -82,7 +84,7 @@ func (suber *WebRTCSubscriber) OnEvent(event any) { suber.flvHeadCache[0] = 9 suber.DC.Send(codec.FLVHeader) } - suber.DC.Send(util.ConcatBuffers(codec.VideoAVCC2FLV(v.AVCC,0))) + suber.DC.Send(util.ConcatBuffers(codec.VideoAVCC2FLV(v.AVCC, 0))) } case VideoRTP: if suber.videoTrack != nil {