diff --git a/go.mod b/go.mod index e488db0..d69a780 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,9 @@ module github.com/Monibuca/plugin-webrtc/v3 go 1.13 require ( - github.com/Monibuca/engine/v3 v3.0.0-alpha8 - github.com/Monibuca/utils/v3 v3.0.0-alpha5 + github.com/Monibuca/engine/v3 v3.0.0-beta5 + github.com/Monibuca/utils/v3 v3.0.0-beta github.com/pion/rtcp v1.2.6 github.com/pion/rtp v1.6.5 - github.com/pion/webrtc/v3 v3.0.27 + github.com/pion/webrtc/v3 v3.0.29 ) diff --git a/go.sum b/go.sum index 94f45a0..6871abc 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,16 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Monibuca/engine/v3 v3.0.0-alpha8 h1:ywA64SJOIEawAvS44YNdE7reBrt75P59Ci5LtkjnVU0= github.com/Monibuca/engine/v3 v3.0.0-alpha8/go.mod h1:eonu3UFn3W7NpHzSrACipxdAyOBCUwzlFUe1R7JjttE= +github.com/Monibuca/engine/v3 v3.0.0-beta2 h1:hQguptIQsUDX7IJodQfB0vqbNmEOHFYjA6nfgIZXWj4= +github.com/Monibuca/engine/v3 v3.0.0-beta2/go.mod h1:eonu3UFn3W7NpHzSrACipxdAyOBCUwzlFUe1R7JjttE= +github.com/Monibuca/engine/v3 v3.0.0-beta3 h1:/co+L2qCRZUq55S0LtYpY9xzOJiUUG3VGytYGFf1RD4= +github.com/Monibuca/engine/v3 v3.0.0-beta3/go.mod h1:SMgnlwih4pBA/HkTLjKXZFYkv3ukRzFjv65CARRLVIk= +github.com/Monibuca/engine/v3 v3.0.0-beta5 h1:b27ZQDfvf5dBMZbCSIUXItUwVIFs95fpkAV4xjN7BNE= +github.com/Monibuca/engine/v3 v3.0.0-beta5/go.mod h1:SMgnlwih4pBA/HkTLjKXZFYkv3ukRzFjv65CARRLVIk= github.com/Monibuca/utils/v3 v3.0.0-alpha5 h1:IOyW/KJSRdRg+TPcgwkHLBynqfNQOV6p3iP7LgXEMFc= github.com/Monibuca/utils/v3 v3.0.0-alpha5/go.mod h1:3xYmhQbgAZBHLyIMteUCd1va+1z/xnd72B585mCaT3c= +github.com/Monibuca/utils/v3 v3.0.0-beta h1:z4p/BSH5J9Ja/gwoDmj1RyN+b0q28Nmn/fqXiwq2hGY= +github.com/Monibuca/utils/v3 v3.0.0-beta/go.mod h1:mQYP/OMox1tkWP6Qut7pBfARr1TXSRkK662dexQl6kI= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -85,10 +93,14 @@ github.com/pion/udp v0.1.1 h1:8UAPvyqmsxK8oOjloDk4wUt63TzFe9WEJkg5lChlj7o= github.com/pion/udp v0.1.1/go.mod h1:6AFo+CMdKQm7UiA0eUPA8/eVCTx8jBIITLZHc9DWX5M= github.com/pion/webrtc/v3 v3.0.27 h1:cPQEFNFrRSMT11j9c9aTmXzL3ikKAFPE2kR0ZrQcviw= github.com/pion/webrtc/v3 v3.0.27/go.mod h1:QpLDmsU5a/a05n230gRtxZRvfHhFzn9ukGUL2x4G5ic= +github.com/pion/webrtc/v3 v3.0.29 h1:pVs6mYjbbYvC8pMsztayEz35DnUEFLPswsicGXaQjxo= +github.com/pion/webrtc/v3 v3.0.29/go.mod h1:XFQeLYBf++bWWA0sJqh6zF1ouWluosxwTOMOoTZGaD0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/q191201771/naza v0.19.1 h1:4KLcxT2CHztO+7miPRtBG3FFgadSQYQw1gPPPKN7rnY= +github.com/q191201771/naza v0.19.1/go.mod h1:5LeGupZZFtYP1g/S203n9vXoUNVdlRnPIfM6rExjqt0= github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= diff --git a/main.go b/main.go index 4c936bd..e06838e 100644 --- a/main.go +++ b/main.go @@ -12,9 +12,7 @@ import ( "github.com/Monibuca/engine/v3" "github.com/Monibuca/utils/v3" - "github.com/Monibuca/utils/v3/codec" "github.com/pion/rtcp" - "github.com/pion/rtp" . "github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3/pkg/media" ) @@ -86,7 +84,6 @@ func init() { } type WebRTC struct { - engine.Publisher *PeerConnection } @@ -116,6 +113,7 @@ func (rtc *WebRTC) Publish(streamPath string) bool { // }, // }, }) + rtc.PeerConnection = peerConnection if err != nil { utils.Println(err) return false @@ -129,21 +127,21 @@ func (rtc *WebRTC) Publish(streamPath string) bool { if err != nil { return false } - peerConnection.OnICEConnectionStateChange(func(connectionState ICEConnectionState) { - utils.Printf("%s Connection State has changed %s ", streamPath, connectionState.String()) - switch connectionState { - case ICEConnectionStateDisconnected, ICEConnectionStateFailed: - if rtc.Stream != nil { - rtc.Stream.Close() + stream := &engine.Stream{ + Type: "WebRTC", + StreamPath: streamPath, + } + if stream.Publish() { + peerConnection.OnICEConnectionStateChange(func(connectionState ICEConnectionState) { + utils.Printf("%s Connection State has changed %s ", streamPath, connectionState.String()) + switch connectionState { + case ICEConnectionStateDisconnected, ICEConnectionStateFailed: + stream.Close() } - } - }) - rtc.PeerConnection = peerConnection - if rtc.Publish(streamPath) { + }) //f, _ := os.OpenFile("resource/live/rtc.h264", os.O_TRUNC|os.O_WRONLY, 0666) - rtc.Stream.Type = "WebRTC" peerConnection.OnTrack(func(track *TrackRemote, receiver *RTPReceiver) { - defer rtc.Stream.Close() + defer stream.Close() go func() { ticker := time.NewTicker(time.Second * 2) select { @@ -151,48 +149,34 @@ func (rtc *WebRTC) Publish(streamPath string) bool { if rtcpErr := peerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(track.SSRC())}}); rtcpErr != nil { fmt.Println(rtcpErr) } - case <-rtc.Done(): + case <-stream.Done(): return } }() - var etrack engine.Track - codec := track.Codec() - if track.Kind() == RTPCodecTypeAudio { + if codec := track.Codec(); track.Kind() == RTPCodecTypeAudio { + var at *engine.RTPAudio switch codec.MimeType { case MimeTypePCMA: - at := engine.NewAudioTrack() - at.SoundFormat = 7 + at = stream.NewRTPAudio(7) at.Channels = byte(codec.Channels) - rtc.SetOriginAT(at) - etrack = at case MimeTypePCMU: - at := engine.NewAudioTrack() - at.SoundFormat = 8 + at = stream.NewRTPAudio(8) at.Channels = byte(codec.Channels) - rtc.SetOriginAT(at) - etrack = at default: return } - - } else { - vt := engine.NewVideoTrack() - vt.CodecID = 7 - rtc.SetOriginVT(vt) - etrack = vt - } - var pack rtp.Packet - b := make([]byte, 1460) - for { - i, _, err := track.Read(b) - if err != nil { - return + b := make([]byte, 1460) + for i, _, err := track.Read(b); err == nil; i, _, err = track.Read(b) { + at.Push(b[:i]) } - if err = pack.Unmarshal(b[:i]); err != nil { - return + } else { + vt := stream.NewRTPVideo(7) + b := make([]byte, 1460) + for i, _, err := track.Read(b); err == nil; i, _, err = track.Read(b) { + vt.Push(b[:i]) } - etrack.PushRTP(pack) } + }) } else { return false @@ -271,9 +255,11 @@ func run() { return } vt := sub.WaitVideoTrack("h264") + at := sub.WaitAudioTrack("pcma", "pcmu") + var rtpSender *RTPSender if vt != nil { pli := "42001f" - pli = fmt.Sprintf("%x", vt.SPS[1:4]) + pli = fmt.Sprintf("%x", vt.ExtraData.NALUs[0][1:4]) if !strings.Contains(offer.SDP, pli) { pli = reg_level.FindAllStringSubmatch(offer.SDP, -1)[0][1] } @@ -281,35 +267,55 @@ func run() { if videoTrack, err = NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeH264, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=" + pli}, "video", "m7s"); err != nil { return } - if _, err = rtc.AddTrack(videoTrack); err != nil { + rtpSender, err = rtc.AddTrack(videoTrack) + if err != nil { return } var lastTimeStampV uint32 sub.OnVideo = func(pack engine.VideoPack) { - var s uint32 + var s uint32 = 40 if lastTimeStampV > 0 { s = pack.Timestamp - lastTimeStampV } lastTimeStampV = pack.Timestamp - if pack.NalType == codec.NALU_IDR_Picture { - videoTrack.WriteSample(media.Sample{ - Data: vt.SPS, + if pack.IDR { + err = videoTrack.WriteSample(media.Sample{ + Data: vt.ExtraData.NALUs[0], }) - videoTrack.WriteSample(media.Sample{ - Data: vt.PPS, + err = videoTrack.WriteSample(media.Sample{ + Data: vt.ExtraData.NALUs[1], + }) + } + for _, nalu := range pack.NALUs { + err = videoTrack.WriteSample(media.Sample{ + Data: nalu, + Duration: time.Millisecond * time.Duration(s), }) } - videoTrack.WriteSample(media.Sample{ - Data: pack.Payload, - Duration: time.Millisecond * time.Duration(s), - }) } + go func() { + rtcpBuf := make([]byte, 1500) + for { + if n, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil { + + return + } else { + if p, err := rtcp.Unmarshal(rtcpBuf[:n]); err == nil { + for _, pp := range p { + switch pp.(type) { + case *rtcp.PictureLossIndication: + fmt.Println("PictureLossIndication") + } + } + } + } + } + }() } - at := sub.GetAudioTrack("pcma", "pcmu") if at != nil { var audioTrack *TrackLocalStaticSample audioMimeType := MimeTypePCMA - if at.SoundFormat == 8 { + if at.CodecID == 8 { audioMimeType = MimeTypePCMU } if audioTrack, err = NewTrackLocalStaticSample(RTPCodecCapability{audioMimeType, 8000, 0, "", nil}, "audio", "m7s"); err != nil { @@ -326,28 +332,29 @@ func run() { } lastTimeStampA = pack.Timestamp audioTrack.WriteSample(media.Sample{ - Data: pack.Payload, Duration: time.Millisecond * time.Duration(s), + Data: pack.Raw, Duration: time.Millisecond * time.Duration(s), }) } } if bytes, err := rtc.GetAnswer(); err == nil { w.Write(bytes) - rtc.OnICEConnectionStateChange(func(connectionState ICEConnectionState) { - utils.Printf("%s Connection State has changed %s ", streamPath, connectionState.String()) - switch connectionState { - case ICEConnectionStateDisconnected, ICEConnectionStateFailed: - sub.Close() - rtc.PeerConnection.Close() - case ICEConnectionStateConnected: + rtc.OnConnectionStateChange(func(pcs PeerConnectionState) { + utils.Printf("%s Connection State has changed %s ", streamPath, pcs.String()) + switch pcs { + case PeerConnectionStateConnected: if at != nil { go sub.PlayAudio(at) } if vt != nil { go sub.PlayVideo(vt) } + case PeerConnectionStateDisconnected, PeerConnectionStateFailed: + sub.Close() + rtc.PeerConnection.Close() } }) + } else { return }