Browse Source

📦 NEW: 增加批量订阅功能

v4
dexter 2 years ago
parent
commit
37dd13859b
  1. 96
      batcher.go
  2. 26
      main.go
  3. 69
      subscriber.go

96
batcher.go

@ -0,0 +1,96 @@
package webrtc
import (
"encoding/json"
. "github.com/pion/webrtc/v3"
"go.uber.org/zap"
)
type Signal struct {
Type string `json:"type"`
StreamList []string `json:"streamList"`
Offer string `json:"offer"`
Answer string `json:"answer"`
}
type WebRTCBatcher struct {
WebRTCIO
PageSize int
PageNum int
subscribers []*WebRTCBatchSubscriber
signalChannel *DataChannel
}
func (suber *WebRTCBatcher) Start() (err error) {
suber.OnICECandidate(func(ice *ICECandidate) {
if ice != nil {
WebRTCPlugin.Info(ice.ToJSON().Candidate)
}
})
suber.OnDataChannel(func(d *DataChannel) {
WebRTCPlugin.Info("OnDataChannel:" + d.Label())
suber.signalChannel = d
suber.signalChannel.OnMessage(suber.Signal)
})
if err = suber.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: suber.SDP}); err != nil {
return
}
suber.OnConnectionStateChange(func(pcs PeerConnectionState) {
WebRTCPlugin.Info("Connection State has changed:" + pcs.String())
switch pcs {
case PeerConnectionStateConnected:
case PeerConnectionStateDisconnected, PeerConnectionStateFailed:
for _, sub := range suber.subscribers {
go sub.Stop()
}
suber.PeerConnection.Close()
}
})
return
}
func (suber *WebRTCBatcher) Signal(msg DataChannelMessage) {
var s Signal
// var offer SessionDescription
if err := json.Unmarshal(msg.Data, &s); err != nil {
WebRTCPlugin.Error("Signal", zap.Error(err))
} else {
switch s.Type {
case "subscribe":
// if err = suber.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: s.Offer}); err != nil {
// WebRTCPlugin.Error("Signal SetRemoteDescription", zap.Error(err))
// return
// }
// var answer string
// if answer, err = suber.GetAnswer(); err == nil {
// b, _ := json.Marshal(map[string]string{"type": "answer", "answer": answer})
// err = suber.signalChannel.Send(b)
// }
// if err != nil {
// WebRTCPlugin.Error("Signal GetAnswer", zap.Error(err))
// return
// }
sub := &WebRTCBatchSubscriber{}
sub.WebRTCIO = suber.WebRTCIO
for _, streamPath := range s.StreamList {
if err = WebRTCPlugin.Subscribe(streamPath, sub); err == nil {
suber.subscribers = append(suber.subscribers, sub)
go sub.PlayRTP()
}
}
// if offer, err = suber.CreateOffer(nil); err == nil {
// b, _ := json.Marshal(offer)
// err = suber.signalChannel.SendText(string(b))
// suber.SetLocalDescription(offer)
// }
case "answer":
if err = suber.SetRemoteDescription(SessionDescription{Type: SDPTypeAnswer, SDP: s.Answer}); err != nil {
WebRTCPlugin.Error("Signal SetRemoteDescription", zap.Error(err))
return
}
}
}
}

26
main.go

@ -39,10 +39,10 @@ import (
// "stun:stun01.sipphone.com", // "stun:stun01.sipphone.com",
// }} // }}
// type udpConn struct { // type udpConn struct {
// conn *net.UDPConn // conn *net.UDPConn
// port int // port int
// } // }
var ( var (
reg_level = regexp.MustCompile("profile-level-id=(4.+f)") reg_level = regexp.MustCompile("profile-level-id=(4.+f)")
) )
@ -121,7 +121,7 @@ func (conf *WebRTCConfig) Play_(w http.ResponseWriter, r *http.Request) {
if err = WebRTCPlugin.Subscribe(streamPath, &suber); err != nil { if err = WebRTCPlugin.Subscribe(streamPath, &suber); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
return return
} }
if sdp, err := suber.GetAnswer(); err == nil { if sdp, err := suber.GetAnswer(); err == nil {
w.Write([]byte(sdp)) w.Write([]byte(sdp))
} else { } else {
@ -176,32 +176,22 @@ var webrtcConfig = &WebRTCConfig{
var WebRTCPlugin = engine.InstallPlugin(webrtcConfig) var WebRTCPlugin = engine.InstallPlugin(webrtcConfig)
func (conf *WebRTCConfig) Batch(w http.ResponseWriter, r *http.Request) { func (conf *WebRTCConfig) Batch(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/sdp") w.Header().Set("Content-Type", "application/sdp")
bytes, err := ioutil.ReadAll(r.Body) bytes, err := ioutil.ReadAll(r.Body)
var suber WebRTCSubscriber var suber WebRTCBatcher
suber.SDP = string(bytes) suber.SDP = string(bytes)
if suber.PeerConnection, err = conf.api.NewPeerConnection(Configuration{}); err != nil { if suber.PeerConnection, err = conf.api.NewPeerConnection(Configuration{}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
suber.OnICECandidate(func(ice *ICECandidate) { if err = suber.Start(); err != nil {
if ice != nil {
suber.Info(ice.ToJSON().Candidate)
}
})
if err = suber.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: suber.SDP}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
// if err = WebRTCPlugin.Subscribe(streamPath, &suber); err != nil {
// http.Error(w, err.Error(), http.StatusBadRequest)
// return
// }
if sdp, err := suber.GetAnswer(); err == nil { if sdp, err := suber.GetAnswer(); err == nil {
w.Write([]byte(sdp)) w.Write([]byte(sdp))
} else { } else {
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
} }
} }

69
subscriber.go

@ -2,6 +2,7 @@ package webrtc
import ( import (
"fmt" "fmt"
"net"
"strings" "strings"
"github.com/pion/rtcp" "github.com/pion/rtcp"
@ -9,25 +10,37 @@ import (
. "m7s.live/engine/v4" . "m7s.live/engine/v4"
"m7s.live/engine/v4/codec" "m7s.live/engine/v4/codec"
"m7s.live/engine/v4/track" "m7s.live/engine/v4/track"
"m7s.live/engine/v4/util"
) )
type WebRTCSubscriber struct { type WebRTCSubscriber struct {
Subscriber Subscriber
WebRTCIO WebRTCIO
videoTrack *TrackLocalStaticRTP videoTrack *TrackLocalStaticRTP
audioTrack *TrackLocalStaticRTP audioTrack *TrackLocalStaticRTP
DC *DataChannel
flvHeadCache []byte
} }
func (suber *WebRTCSubscriber) OnEvent(event any) { func (suber *WebRTCSubscriber) OnEvent(event any) {
switch v := event.(type) { switch v := event.(type) {
case *track.Video: case *track.Video:
if v.CodecID == codec.CodecID_H264 { switch v.CodecID {
case codec.CodecID_H264:
pli := "42001f" pli := "42001f"
pli = fmt.Sprintf("%x", v.GetDecoderConfiguration().Raw[0][1:4]) pli = fmt.Sprintf("%x", v.GetDecoderConfiguration().Raw[0][1:4])
if !strings.Contains(suber.SDP, pli) { if !strings.Contains(suber.SDP, pli) {
pli = reg_level.FindAllStringSubmatch(suber.SDP, -1)[0][1] pli = reg_level.FindAllStringSubmatch(suber.SDP, -1)[0][1]
} }
suber.videoTrack, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeH264, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=" + pli}, "video", "m7s") suber.videoTrack, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeH264, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=" + pli}, "video", suber.Subscriber.Stream.Path)
case codec.CodecID_H265:
// suber.videoTrack, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeH265, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=" + pli}, "video", suber.Subscriber.Stream.Path)
default:
return
}
if suber.videoTrack == nil {
suber.DC, _ = suber.PeerConnection.CreateDataChannel(suber.Subscriber.Stream.Path, nil)
} else {
rtpSender, _ := suber.PeerConnection.AddTrack(suber.videoTrack) rtpSender, _ := suber.PeerConnection.AddTrack(suber.videoTrack)
go func() { go func() {
rtcpBuf := make([]byte, 1500) rtcpBuf := make([]byte, 1500)
@ -47,20 +60,46 @@ func (suber *WebRTCSubscriber) OnEvent(event any) {
} }
} }
}() }()
suber.Subscriber.AddTrack(v) //接受这个track
} }
suber.Subscriber.AddTrack(v) //接受这个track
case *track.Audio: case *track.Audio:
audioMimeType := MimeTypePCMA audioMimeType := MimeTypePCMA
if v.CodecID == codec.CodecID_PCMU { if v.CodecID == codec.CodecID_PCMU {
audioMimeType = MimeTypePCMU audioMimeType = MimeTypePCMU
} }
if v.CodecID == codec.CodecID_PCMA || v.CodecID == codec.CodecID_PCMU { if v.CodecID == codec.CodecID_PCMA || v.CodecID == codec.CodecID_PCMU {
suber.audioTrack, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: audioMimeType}, "audio", "m7s") suber.audioTrack, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: audioMimeType}, "audio", suber.Subscriber.Stream.Path)
suber.PeerConnection.AddTrack(suber.audioTrack) suber.PeerConnection.AddTrack(suber.audioTrack)
suber.Subscriber.AddTrack(v) //接受这个track suber.Subscriber.AddTrack(v) //接受这个track
} }
case VideoDeConf:
if suber.DC != nil {
if suber.flvHeadCache == nil {
suber.flvHeadCache = make([]byte, 15)
suber.flvHeadCache[0] = 9
suber.DC.Send(codec.FLVHeader)
}
suber.DC.Send(util.ConcatBuffers(v.FLV))
}
case VideoRTP: case VideoRTP:
suber.videoTrack.WriteRTP(&v.Packet) if suber.videoTrack != nil {
suber.videoTrack.WriteRTP(&v.Packet)
} else if suber.DC != nil {
frame := suber.Video.Frame
dataSize := uint32(util.SizeOfBuffers(frame.AVCC))
result := net.Buffers{suber.flvHeadCache[:11]}
result = append(result, frame.AVCC...)
ts := frame.AbsTime - suber.SkipTS
util.PutBE(suber.flvHeadCache[1:4], dataSize)
util.PutBE(suber.flvHeadCache[4:7], ts)
suber.flvHeadCache[7] = byte(ts >> 24)
result = append(result, util.PutBE(suber.flvHeadCache[11:15], dataSize+11))
for _, data := range util.SplitBuffers(result, 65535) {
for _, d := range data {
suber.DC.Send(d)
}
}
}
case AudioRTP: case AudioRTP:
suber.audioTrack.WriteRTP(&v.Packet) suber.audioTrack.WriteRTP(&v.Packet)
case ISubscriber: case ISubscriber:
@ -79,9 +118,15 @@ func (suber *WebRTCSubscriber) OnEvent(event any) {
} }
} }
// WebRTCBatcher 批量订阅者 type WebRTCBatchSubscriber struct {
type WebRTCBatcher struct { WebRTCSubscriber
WebRTCIO }
PageSize int
PageNum int func (suber *WebRTCBatchSubscriber) OnEvent(event any) {
switch event.(type) {
case ISubscriber:
default:
suber.WebRTCSubscriber.OnEvent(event)
}
} }

Loading…
Cancel
Save