Browse Source

加入批量连接中上行逻辑

v4
dexter 2 years ago
parent
commit
fae71a8d77
  1. 92
      batcher.go
  2. 18
      subscriber.go

92
batcher.go

@ -2,9 +2,14 @@ package webrtc
import ( import (
"encoding/json" "encoding/json"
"fmt"
"time"
"github.com/pion/rtcp"
. "github.com/pion/webrtc/v3" . "github.com/pion/webrtc/v3"
"go.uber.org/zap" "go.uber.org/zap"
. "m7s.live/engine/v4"
. "m7s.live/engine/v4/track"
) )
type Signal struct { type Signal struct {
@ -12,14 +17,19 @@ type Signal struct {
StreamList []string `json:"streamList"` StreamList []string `json:"streamList"`
Offer string `json:"offer"` Offer string `json:"offer"`
Answer string `json:"answer"` Answer string `json:"answer"`
StreamPath string `json:"streamPath"`
}
type BatchUplink struct {
Publisher
StreamPath string
} }
type WebRTCBatcher struct { type WebRTCBatcher struct {
WebRTCIO WebRTCIO
PageSize int PageSize int
PageNum int PageNum int
subscribers []*WebRTCBatchSubscriber subscribers []*WebRTCBatchSubscriber
signalChannel *DataChannel signalChannel *DataChannel
BatchUplink
} }
func (suber *WebRTCBatcher) Start() (err error) { func (suber *WebRTCBatcher) Start() (err error) {
@ -40,7 +50,59 @@ func (suber *WebRTCBatcher) Start() (err error) {
WebRTCPlugin.Info("Connection State has changed:" + pcs.String()) WebRTCPlugin.Info("Connection State has changed:" + pcs.String())
switch pcs { switch pcs {
case PeerConnectionStateConnected: case PeerConnectionStateConnected:
suber.OnTrack(func(track *TrackRemote, receiver *RTPReceiver) {
if suber.Publisher.Stream == nil {
WebRTCPlugin.Publish(suber.StreamPath, &suber.BatchUplink)
}
if suber.Publisher.Stream == nil {
return
}
puber := &suber.Publisher
if codec := track.Codec(); track.Kind() == RTPCodecTypeAudio {
if puber.AudioTrack == nil {
switch codec.PayloadType {
case 8:
puber.AudioTrack = NewG711(puber.Stream, true)
case 0:
puber.AudioTrack = NewG711(puber.Stream, false)
default:
puber.AudioTrack = nil
return
}
}
for {
b := make([]byte, 1460)
if i, _, err := track.Read(b); err == nil {
puber.AudioTrack.WriteRTP(b[:i])
} else {
return
}
}
} else {
go func() {
ticker := time.NewTicker(time.Millisecond * webrtcConfig.PLI)
for {
select {
case <-ticker.C:
if rtcpErr := suber.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(track.SSRC())}}); rtcpErr != nil {
fmt.Println(rtcpErr)
}
case <-puber.Done():
return
}
}
}()
puber.VideoTrack = NewH264(puber.Stream)
for {
b := make([]byte, 1460)
if i, _, err := track.Read(b); err == nil {
puber.VideoTrack.WriteRTP(b[:i])
} else {
return
}
}
}
})
case PeerConnectionStateDisconnected, PeerConnectionStateFailed: case PeerConnectionStateDisconnected, PeerConnectionStateFailed:
for _, sub := range suber.subscribers { for _, sub := range suber.subscribers {
go sub.Stop() go sub.Stop()
@ -59,6 +121,8 @@ func (suber *WebRTCBatcher) Signal(msg DataChannelMessage) {
WebRTCPlugin.Error("Signal", zap.Error(err)) WebRTCPlugin.Error("Signal", zap.Error(err))
} else { } else {
switch s.Type { switch s.Type {
case "streamPath":
suber.StreamPath = s.StreamPath
case "subscribe": case "subscribe":
if err = suber.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: s.Offer}); err != nil { if err = suber.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: s.Offer}); err != nil {
WebRTCPlugin.Error("Signal SetRemoteDescription", zap.Error(err)) WebRTCPlugin.Error("Signal SetRemoteDescription", zap.Error(err))
@ -99,11 +163,25 @@ func (suber *WebRTCBatcher) Signal(msg DataChannelMessage) {
WebRTCPlugin.Error("Signal GetAnswer", zap.Error(err)) WebRTCPlugin.Error("Signal GetAnswer", zap.Error(err))
return return
} }
// if offer, err = suber.CreateOffer(nil); err == nil { // if offer, err = suber.CreateOffer(nil); err == nil {
// b, _ := json.Marshal(offer) // b, _ := json.Marshal(offer)
// err = suber.signalChannel.SendText(string(b)) // err = suber.signalChannel.SendText(string(b))
// suber.SetLocalDescription(offer) // suber.SetLocalDescription(offer)
// } // }
case "publish":
if err = suber.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: s.Offer}); err != nil {
WebRTCPlugin.Error("Signal SetRemoteDescription", zap.Error(err))
return
}
var answer string
if answer, err = suber.GetAnswer(); err == nil {
b, _ := json.Marshal(map[string]string{"type": "answer", "sdp": answer})
err = suber.signalChannel.SendText(string(b))
}
if err != nil {
WebRTCPlugin.Error("Signal GetAnswer", zap.Error(err))
return
}
case "answer": case "answer":
if err = suber.SetRemoteDescription(SessionDescription{Type: SDPTypeAnswer, SDP: s.Answer}); err != nil { if err = suber.SetRemoteDescription(SessionDescription{Type: SDPTypeAnswer, SDP: s.Answer}); err != nil {
WebRTCPlugin.Error("Signal SetRemoteDescription", zap.Error(err)) WebRTCPlugin.Error("Signal SetRemoteDescription", zap.Error(err))

18
subscriber.go

@ -1,9 +1,7 @@
package webrtc package webrtc
import ( import (
"fmt"
"net" "net"
"strings"
"github.com/pion/rtcp" "github.com/pion/rtcp"
. "github.com/pion/webrtc/v3" . "github.com/pion/webrtc/v3"
@ -29,14 +27,14 @@ func (suber *WebRTCSubscriber) OnEvent(event any) {
case *track.Video: case *track.Video:
switch v.CodecID { switch v.CodecID {
case codec.CodecID_H264: case codec.CodecID_H264:
pli := "42001f" pli := "420028"
pli = fmt.Sprintf("%x", v.GetDecoderConfiguration().Raw[0][1:4]) // pli = fmt.Sprintf("%x", v.GetDecoderConfiguration().Raw[0][1:4])
if !strings.Contains(suber.SDP, pli) { // if !strings.Contains(suber.SDP, pli) {
list := reg_level.FindAllStringSubmatch(suber.SDP, -1) // list := reg_level.FindAllStringSubmatch(suber.SDP, -1)
if len(list) > 0 { // if len(list) > 0 {
pli = list[0][1] // pli = list[0][1]
} // }
} // }
suber.videoTrack, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeH264, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=" + pli}, v.Name, suber.Subscriber.Stream.Path) suber.videoTrack, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeH264, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=" + pli}, v.Name, suber.Subscriber.Stream.Path)
case codec.CodecID_H265: case codec.CodecID_H265:
// suber.videoTrack, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeH265, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=" + pli}, "video", suber.Subscriber.Stream.Path) // suber.videoTrack, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeH265, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=" + pli}, "video", suber.Subscriber.Stream.Path)

Loading…
Cancel
Save