From fae71a8d77e44b6498f9904153caf98d88cd4a56 Mon Sep 17 00:00:00 2001 From: dexter <178529795@qq.com> Date: Mon, 12 Dec 2022 21:22:58 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A0=E5=85=A5=E6=89=B9=E9=87=8F=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E4=B8=AD=E4=B8=8A=E8=A1=8C=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- batcher.go | 92 +++++++++++++++++++++++++++++++++++++++++++++++---- subscriber.go | 18 +++++----- 2 files changed, 93 insertions(+), 17 deletions(-) diff --git a/batcher.go b/batcher.go index 4a6eb05..8f008aa 100644 --- a/batcher.go +++ b/batcher.go @@ -2,9 +2,14 @@ package webrtc import ( "encoding/json" + "fmt" + "time" + "github.com/pion/rtcp" . "github.com/pion/webrtc/v3" "go.uber.org/zap" + . "m7s.live/engine/v4" + . "m7s.live/engine/v4/track" ) type Signal struct { @@ -12,14 +17,19 @@ type Signal struct { StreamList []string `json:"streamList"` Offer string `json:"offer"` Answer string `json:"answer"` + StreamPath string `json:"streamPath"` +} +type BatchUplink struct { + Publisher + StreamPath string } - type WebRTCBatcher struct { WebRTCIO PageSize int PageNum int subscribers []*WebRTCBatchSubscriber signalChannel *DataChannel + BatchUplink } func (suber *WebRTCBatcher) Start() (err error) { @@ -40,7 +50,59 @@ func (suber *WebRTCBatcher) Start() (err error) { WebRTCPlugin.Info("Connection State has changed:" + pcs.String()) switch pcs { case PeerConnectionStateConnected: - + suber.OnTrack(func(track *TrackRemote, receiver *RTPReceiver) { + if suber.Publisher.Stream == nil { + WebRTCPlugin.Publish(suber.StreamPath, &suber.BatchUplink) + } + if suber.Publisher.Stream == nil { + return + } + puber := &suber.Publisher + if codec := track.Codec(); track.Kind() == RTPCodecTypeAudio { + if puber.AudioTrack == nil { + switch codec.PayloadType { + case 8: + puber.AudioTrack = NewG711(puber.Stream, true) + case 0: + puber.AudioTrack = NewG711(puber.Stream, false) + default: + puber.AudioTrack = nil + return + } + } + for { + b := make([]byte, 1460) + if i, _, err := track.Read(b); err == nil { + puber.AudioTrack.WriteRTP(b[:i]) + } else { + return + } + } + } else { + go func() { + ticker := time.NewTicker(time.Millisecond * webrtcConfig.PLI) + for { + select { + case <-ticker.C: + if rtcpErr := suber.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(track.SSRC())}}); rtcpErr != nil { + fmt.Println(rtcpErr) + } + case <-puber.Done(): + return + } + } + }() + puber.VideoTrack = NewH264(puber.Stream) + for { + b := make([]byte, 1460) + if i, _, err := track.Read(b); err == nil { + puber.VideoTrack.WriteRTP(b[:i]) + } else { + return + } + } + } + }) case PeerConnectionStateDisconnected, PeerConnectionStateFailed: for _, sub := range suber.subscribers { go sub.Stop() @@ -59,6 +121,8 @@ func (suber *WebRTCBatcher) Signal(msg DataChannelMessage) { WebRTCPlugin.Error("Signal", zap.Error(err)) } else { switch s.Type { + case "streamPath": + suber.StreamPath = s.StreamPath case "subscribe": if err = suber.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: s.Offer}); err != nil { WebRTCPlugin.Error("Signal SetRemoteDescription", zap.Error(err)) @@ -99,11 +163,25 @@ func (suber *WebRTCBatcher) Signal(msg DataChannelMessage) { WebRTCPlugin.Error("Signal GetAnswer", zap.Error(err)) return } - // if offer, err = suber.CreateOffer(nil); err == nil { - // b, _ := json.Marshal(offer) - // err = suber.signalChannel.SendText(string(b)) - // suber.SetLocalDescription(offer) - // } + // if offer, err = suber.CreateOffer(nil); err == nil { + // b, _ := json.Marshal(offer) + // err = suber.signalChannel.SendText(string(b)) + // suber.SetLocalDescription(offer) + // } + case "publish": + if err = suber.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: s.Offer}); err != nil { + WebRTCPlugin.Error("Signal SetRemoteDescription", zap.Error(err)) + return + } + var answer string + if answer, err = suber.GetAnswer(); err == nil { + b, _ := json.Marshal(map[string]string{"type": "answer", "sdp": answer}) + err = suber.signalChannel.SendText(string(b)) + } + if err != nil { + WebRTCPlugin.Error("Signal GetAnswer", zap.Error(err)) + return + } case "answer": if err = suber.SetRemoteDescription(SessionDescription{Type: SDPTypeAnswer, SDP: s.Answer}); err != nil { WebRTCPlugin.Error("Signal SetRemoteDescription", zap.Error(err)) diff --git a/subscriber.go b/subscriber.go index 5bada38..b83309e 100644 --- a/subscriber.go +++ b/subscriber.go @@ -1,9 +1,7 @@ package webrtc import ( - "fmt" "net" - "strings" "github.com/pion/rtcp" . "github.com/pion/webrtc/v3" @@ -29,14 +27,14 @@ func (suber *WebRTCSubscriber) OnEvent(event any) { case *track.Video: switch v.CodecID { case codec.CodecID_H264: - pli := "42001f" - pli = fmt.Sprintf("%x", v.GetDecoderConfiguration().Raw[0][1:4]) - if !strings.Contains(suber.SDP, pli) { - list := reg_level.FindAllStringSubmatch(suber.SDP, -1) - if len(list) > 0 { - pli = list[0][1] - } - } + pli := "420028" + // pli = fmt.Sprintf("%x", v.GetDecoderConfiguration().Raw[0][1:4]) + // if !strings.Contains(suber.SDP, pli) { + // list := reg_level.FindAllStringSubmatch(suber.SDP, -1) + // if len(list) > 0 { + // pli = list[0][1] + // } + // } suber.videoTrack, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeH264, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=" + pli}, v.Name, suber.Subscriber.Stream.Path) case codec.CodecID_H265: // suber.videoTrack, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeH265, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=" + pli}, "video", suber.Subscriber.Stream.Path)