From 37dd13859b3549b21b04f8daa133ef35bf0f5df9 Mon Sep 17 00:00:00 2001 From: dexter <178529795@qq.com> Date: Thu, 3 Nov 2022 16:28:27 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=93=A6=20NEW:=20=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E6=89=B9=E9=87=8F=E8=AE=A2=E9=98=85=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- batcher.go | 96 +++++++++++++++++++++++++++++++++++++++++++++++++++ main.go | 26 +++++--------- subscriber.go | 69 +++++++++++++++++++++++++++++------- 3 files changed, 161 insertions(+), 30 deletions(-) create mode 100644 batcher.go diff --git a/batcher.go b/batcher.go new file mode 100644 index 0000000..ea56dbc --- /dev/null +++ b/batcher.go @@ -0,0 +1,96 @@ +package webrtc + +import ( + "encoding/json" + + . "github.com/pion/webrtc/v3" + "go.uber.org/zap" +) + +type Signal struct { + Type string `json:"type"` + StreamList []string `json:"streamList"` + Offer string `json:"offer"` + Answer string `json:"answer"` +} + +type WebRTCBatcher struct { + WebRTCIO + PageSize int + PageNum int + subscribers []*WebRTCBatchSubscriber + signalChannel *DataChannel +} + +func (suber *WebRTCBatcher) Start() (err error) { + suber.OnICECandidate(func(ice *ICECandidate) { + if ice != nil { + WebRTCPlugin.Info(ice.ToJSON().Candidate) + } + }) + suber.OnDataChannel(func(d *DataChannel) { + WebRTCPlugin.Info("OnDataChannel:" + d.Label()) + suber.signalChannel = d + suber.signalChannel.OnMessage(suber.Signal) + }) + if err = suber.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: suber.SDP}); err != nil { + return + } + suber.OnConnectionStateChange(func(pcs PeerConnectionState) { + WebRTCPlugin.Info("Connection State has changed:" + pcs.String()) + switch pcs { + case PeerConnectionStateConnected: + + case PeerConnectionStateDisconnected, PeerConnectionStateFailed: + for _, sub := range suber.subscribers { + go sub.Stop() + } + suber.PeerConnection.Close() + } + }) + return +} + +func (suber *WebRTCBatcher) Signal(msg DataChannelMessage) { + var s Signal + // var offer SessionDescription + if err := json.Unmarshal(msg.Data, &s); err != nil { + WebRTCPlugin.Error("Signal", zap.Error(err)) + } else { + switch s.Type { + case "subscribe": + + // if err = suber.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: s.Offer}); err != nil { + // WebRTCPlugin.Error("Signal SetRemoteDescription", zap.Error(err)) + // return + // } + // var answer string + // if answer, err = suber.GetAnswer(); err == nil { + // b, _ := json.Marshal(map[string]string{"type": "answer", "answer": answer}) + // err = suber.signalChannel.Send(b) + // } + // if err != nil { + // WebRTCPlugin.Error("Signal GetAnswer", zap.Error(err)) + // return + // } + sub := &WebRTCBatchSubscriber{} + sub.WebRTCIO = suber.WebRTCIO + for _, streamPath := range s.StreamList { + if err = WebRTCPlugin.Subscribe(streamPath, sub); err == nil { + suber.subscribers = append(suber.subscribers, sub) + go sub.PlayRTP() + } + } + // if offer, err = suber.CreateOffer(nil); err == nil { + // b, _ := json.Marshal(offer) + // err = suber.signalChannel.SendText(string(b)) + // suber.SetLocalDescription(offer) + // } + case "answer": + if err = suber.SetRemoteDescription(SessionDescription{Type: SDPTypeAnswer, SDP: s.Answer}); err != nil { + WebRTCPlugin.Error("Signal SetRemoteDescription", zap.Error(err)) + return + } + } + } +} diff --git a/main.go b/main.go index a8adacb..e627d98 100644 --- a/main.go +++ b/main.go @@ -39,10 +39,10 @@ import ( // "stun:stun01.sipphone.com", // }} -// type udpConn struct { -// conn *net.UDPConn -// port int -// } +// type udpConn struct { +// conn *net.UDPConn +// port int +// } var ( reg_level = regexp.MustCompile("profile-level-id=(4.+f)") ) @@ -121,7 +121,7 @@ func (conf *WebRTCConfig) Play_(w http.ResponseWriter, r *http.Request) { if err = WebRTCPlugin.Subscribe(streamPath, &suber); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return - } + } if sdp, err := suber.GetAnswer(); err == nil { w.Write([]byte(sdp)) } else { @@ -176,32 +176,22 @@ var webrtcConfig = &WebRTCConfig{ var WebRTCPlugin = engine.InstallPlugin(webrtcConfig) - func (conf *WebRTCConfig) Batch(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/sdp") bytes, err := ioutil.ReadAll(r.Body) - var suber WebRTCSubscriber + var suber WebRTCBatcher suber.SDP = string(bytes) if suber.PeerConnection, err = conf.api.NewPeerConnection(Configuration{}); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - suber.OnICECandidate(func(ice *ICECandidate) { - if ice != nil { - suber.Info(ice.ToJSON().Candidate) - } - }) - if err = suber.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: suber.SDP}); err != nil { + if err = suber.Start(); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - // if err = WebRTCPlugin.Subscribe(streamPath, &suber); err != nil { - // http.Error(w, err.Error(), http.StatusBadRequest) - // return - // } if sdp, err := suber.GetAnswer(); err == nil { w.Write([]byte(sdp)) } else { http.Error(w, err.Error(), http.StatusBadRequest) } -} \ No newline at end of file +} diff --git a/subscriber.go b/subscriber.go index c8687af..4ef4858 100644 --- a/subscriber.go +++ b/subscriber.go @@ -2,6 +2,7 @@ package webrtc import ( "fmt" + "net" "strings" "github.com/pion/rtcp" @@ -9,25 +10,37 @@ import ( . "m7s.live/engine/v4" "m7s.live/engine/v4/codec" "m7s.live/engine/v4/track" + "m7s.live/engine/v4/util" ) type WebRTCSubscriber struct { Subscriber WebRTCIO - videoTrack *TrackLocalStaticRTP - audioTrack *TrackLocalStaticRTP + videoTrack *TrackLocalStaticRTP + audioTrack *TrackLocalStaticRTP + DC *DataChannel + flvHeadCache []byte } func (suber *WebRTCSubscriber) OnEvent(event any) { switch v := event.(type) { case *track.Video: - if v.CodecID == codec.CodecID_H264 { + switch v.CodecID { + case codec.CodecID_H264: pli := "42001f" pli = fmt.Sprintf("%x", v.GetDecoderConfiguration().Raw[0][1:4]) if !strings.Contains(suber.SDP, pli) { pli = reg_level.FindAllStringSubmatch(suber.SDP, -1)[0][1] } - suber.videoTrack, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeH264, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=" + pli}, "video", "m7s") + suber.videoTrack, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeH264, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=" + pli}, "video", suber.Subscriber.Stream.Path) + case codec.CodecID_H265: + // suber.videoTrack, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeH265, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=" + pli}, "video", suber.Subscriber.Stream.Path) + default: + return + } + if suber.videoTrack == nil { + suber.DC, _ = suber.PeerConnection.CreateDataChannel(suber.Subscriber.Stream.Path, nil) + } else { rtpSender, _ := suber.PeerConnection.AddTrack(suber.videoTrack) go func() { rtcpBuf := make([]byte, 1500) @@ -47,20 +60,46 @@ func (suber *WebRTCSubscriber) OnEvent(event any) { } } }() - suber.Subscriber.AddTrack(v) //接受这个track } + suber.Subscriber.AddTrack(v) //接受这个track case *track.Audio: audioMimeType := MimeTypePCMA if v.CodecID == codec.CodecID_PCMU { audioMimeType = MimeTypePCMU } if v.CodecID == codec.CodecID_PCMA || v.CodecID == codec.CodecID_PCMU { - suber.audioTrack, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: audioMimeType}, "audio", "m7s") + suber.audioTrack, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: audioMimeType}, "audio", suber.Subscriber.Stream.Path) suber.PeerConnection.AddTrack(suber.audioTrack) suber.Subscriber.AddTrack(v) //接受这个track } + case VideoDeConf: + if suber.DC != nil { + if suber.flvHeadCache == nil { + suber.flvHeadCache = make([]byte, 15) + suber.flvHeadCache[0] = 9 + suber.DC.Send(codec.FLVHeader) + } + suber.DC.Send(util.ConcatBuffers(v.FLV)) + } case VideoRTP: - suber.videoTrack.WriteRTP(&v.Packet) + if suber.videoTrack != nil { + suber.videoTrack.WriteRTP(&v.Packet) + } else if suber.DC != nil { + frame := suber.Video.Frame + dataSize := uint32(util.SizeOfBuffers(frame.AVCC)) + result := net.Buffers{suber.flvHeadCache[:11]} + result = append(result, frame.AVCC...) + ts := frame.AbsTime - suber.SkipTS + util.PutBE(suber.flvHeadCache[1:4], dataSize) + util.PutBE(suber.flvHeadCache[4:7], ts) + suber.flvHeadCache[7] = byte(ts >> 24) + result = append(result, util.PutBE(suber.flvHeadCache[11:15], dataSize+11)) + for _, data := range util.SplitBuffers(result, 65535) { + for _, d := range data { + suber.DC.Send(d) + } + } + } case AudioRTP: suber.audioTrack.WriteRTP(&v.Packet) case ISubscriber: @@ -79,9 +118,15 @@ func (suber *WebRTCSubscriber) OnEvent(event any) { } } -// WebRTCBatcher 批量订阅者 -type WebRTCBatcher struct { - WebRTCIO - PageSize int - PageNum int +type WebRTCBatchSubscriber struct { + WebRTCSubscriber +} + +func (suber *WebRTCBatchSubscriber) OnEvent(event any) { + switch event.(type) { + case ISubscriber: + + default: + suber.WebRTCSubscriber.OnEvent(event) + } }