You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

351 lines
8.3 KiB

4 years ago
package webrtc
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
4 years ago
"sync"
4 years ago
"time"
. "github.com/Monibuca/engine/v2"
4 years ago
"github.com/Monibuca/engine/v2/avformat"
4 years ago
. "github.com/Monibuca/plugin-rtp"
4 years ago
"github.com/pion/rtcp"
. "github.com/pion/webrtc/v2"
4 years ago
"github.com/pion/webrtc/v2/pkg/media"
4 years ago
)
4 years ago
var config struct {
4 years ago
ICEServers []string
4 years ago
}
// }{[]string{
// "stun:stun.ekiga.net",
// "stun:stun.ideasip.com",
// "stun:stun.schlund.de",
// "stun:stun.stunprotocol.org:3478",
// "stun:stun.voiparound.com",
// "stun:stun.voipbuster.com",
// "stun:stun.voipstunt.com",
// "stun:stun.voxgratia.org",
// "stun:stun.services.mozilla.com",
// "stun:stun.xten.com",
// "stun:stun.softjoys.com",
// "stun:stunserver.org",
// "stun:stun.schlund.de",
// "stun:stun.rixtelecom.se",
// "stun:stun.iptel.org",
// "stun:stun.ideasip.com",
// "stun:stun.fwdnet.net",
// "stun:stun.ekiga.net",
// "stun:stun01.sipphone.com",
// }}
4 years ago
// type udpConn struct {
// conn *net.UDPConn
// port int
// }
var playWaitList WaitList
4 years ago
type WaitList struct {
m map[string]*WebRTC
l sync.Mutex
}
func (wl *WaitList) Set(k string, v *WebRTC) {
wl.l.Lock()
defer wl.l.Unlock()
if wl.m == nil {
wl.m = make(map[string]*WebRTC)
}
wl.m[k] = v
}
func (wl *WaitList) Get(k string) *WebRTC {
wl.l.Lock()
defer wl.l.Unlock()
defer delete(wl.m, k)
return wl.m[k]
}
4 years ago
func init() {
InstallPlugin(&PluginConfig{
4 years ago
Config: &config,
4 years ago
Name: "WebRTC",
Type: PLUGIN_PUBLISHER | PLUGIN_SUBSCRIBER,
Run: run,
})
}
type WebRTC struct {
4 years ago
RTP
*PeerConnection
4 years ago
RemoteAddr string
videoTrack *Track
4 years ago
m MediaEngine
api *API
payloader avformat.H264
// codecs.H264Packet
// *os.File
}
4 years ago
func (rtc *WebRTC) Play(streamPath string) bool {
4 years ago
var sub Subscriber
sub.ID = rtc.RemoteAddr
sub.Type = "WebRTC"
var lastTimeStamp uint32
sub.OnData = func(packet *avformat.SendPacket) error {
if packet.Type == avformat.FLV_TAG_TYPE_AUDIO {
return nil
}
if packet.IsSequence {
rtc.payloader.PPS = sub.PPS
rtc.payloader.SPS = sub.SPS
4 years ago
} else {
var s uint32
if lastTimeStamp > 0 {
s = packet.Timestamp - lastTimeStamp
}
lastTimeStamp = packet.Timestamp
rtc.videoTrack.WriteSample(media.Sample{
Data: packet.Payload,
Samples: s * 90,
})
// if packet.IsKeyFrame {
// rtc.videoTrack.WriteSample(media.Sample{
// Data: sub.SPS,
// Samples: 0,
// })
// rtc.videoTrack.WriteSample(media.Sample{
// Data: sub.PPS,
// Samples: 0,
// })
// }
// for payload := packet.Payload[5:]; len(payload) > 4; {
// var naulLen = int(util.BigEndian.Uint32(payload))
// payload = payload[4:]
// rtc.videoTrack.WriteSample(media.Sample{
// Data: payload[:naulLen],
// Samples: s * 90,
// })
// s = 0
// payload = payload[naulLen:]
// }
4 years ago
}
return nil
}
// go sub.Subscribe(streamPath)
rtc.OnICEConnectionStateChange(func(connectionState ICEConnectionState) {
Printf("%s Connection State has changed %s ", streamPath, connectionState.String())
switch connectionState {
case ICEConnectionStateDisconnected:
sub.Close()
case ICEConnectionStateConnected:
4 years ago
//rtc.videoTrack = rtc.GetSenders()[0].Track()
sub.Subscribe(streamPath)
4 years ago
}
})
4 years ago
return true
}
func (rtc *WebRTC) Publish(streamPath string) bool {
4 years ago
rtc.m.RegisterCodec(NewRTPCodec(RTPCodecTypeVideo,
H264,
90000,
0,
"level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f",
DefaultPayloadTypeH264,
new(avformat.H264)))
//m.RegisterCodec(NewRTPPCMUCodec(DefaultPayloadTypePCMU, 8000))
rtc.api = NewAPI(WithMediaEngine(rtc.m))
peerConnection, err := rtc.api.NewPeerConnection(Configuration{
4 years ago
ICEServers: []ICEServer{
{
URLs: config.ICEServers,
},
},
})
if _, err = peerConnection.AddTransceiverFromKind(RTPCodecTypeVideo); err != nil {
if err != nil {
Println(err)
return false
}
}
if err != nil {
return false
}
peerConnection.OnICEConnectionStateChange(func(connectionState ICEConnectionState) {
Printf("%s Connection State has changed %s ", streamPath, connectionState.String())
switch connectionState {
case ICEConnectionStateDisconnected, ICEConnectionStateFailed:
if rtc.Stream != nil {
rtc.Stream.Close()
}
4 years ago
}
})
rtc.PeerConnection = peerConnection
4 years ago
if rtc.RTP.Publish(streamPath) {
4 years ago
//f, _ := os.OpenFile("resource/live/rtc.h264", os.O_TRUNC|os.O_WRONLY, 0666)
rtc.Stream.Type = "WebRTC"
peerConnection.OnTrack(func(track *Track, receiver *RTPReceiver) {
4 years ago
defer rtc.Stream.Close()
go func() {
ticker := time.NewTicker(time.Second * 2)
select {
case <-ticker.C:
if rtcpErr := peerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: track.SSRC()}}); rtcpErr != nil {
fmt.Println(rtcpErr)
}
case <-rtc.Done():
return
}
}()
4 years ago
pack := &RTPPack{
Type: RTPType(track.Kind() - 1),
}
4 years ago
for b := make([]byte, 1460); ; rtc.PushPack(pack) {
i, err := track.Read(b)
if err != nil {
return
}
if err = pack.Unmarshal(b[:i]); err != nil {
return
}
// rtc.Unmarshal(pack.Payload)
4 years ago
// f.Write(bytes)
}
})
} else {
return false
}
return true
}
func (rtc *WebRTC) GetAnswer() ([]byte, error) {
4 years ago
// Sets the LocalDescription, and starts our UDP listeners
answer, err := rtc.CreateAnswer(nil)
if err != nil {
return nil, err
}
if err := rtc.SetLocalDescription(answer); err != nil {
4 years ago
Println(err)
return nil, err
}
if bytes, err := json.Marshal(answer); err != nil {
4 years ago
Println(err)
return bytes, err
} else {
return bytes, nil
}
}
func run() {
4 years ago
http.HandleFunc("/webrtc/play", func(w http.ResponseWriter, r *http.Request) {
streamPath := r.URL.Query().Get("streamPath")
4 years ago
var offer SessionDescription
var rtc WebRTC
bytes, err := ioutil.ReadAll(r.Body)
4 years ago
defer func() {
if err != nil {
Println(err)
fmt.Fprintf(w, `{"errmsg":"%s"}`, err)
4 years ago
return
}
rtc.Play(streamPath)
4 years ago
}()
if err != nil {
return
}
if err = json.Unmarshal(bytes, &offer); err != nil {
return
4 years ago
}
// pli := "42001f"
// if stream := FindStream(streamPath); stream != nil {
// pli = fmt.Sprintf("%x", stream.SPS[1:4])
// }
4 years ago
rtc.m.RegisterCodec(NewRTPCodec(RTPCodecTypeVideo,
H264,
90000,
0,
"level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f",
4 years ago
DefaultPayloadTypeH264,
&rtc.payloader))
4 years ago
//m.RegisterCodec(NewRTPPCMUCodec(DefaultPayloadTypePCMU, 8000))
rtc.api = NewAPI(WithMediaEngine(rtc.m))
peerConnection, err := rtc.api.NewPeerConnection(Configuration{
// ICEServers: []ICEServer{
// {
// URLs: config.ICEServers,
// },
// },
})
4 years ago
rtc.PeerConnection = peerConnection
rtc.OnICECandidate(func(ice *ICECandidate) {
if ice != nil {
Println(ice.ToJSON().Candidate)
4 years ago
}
})
// if r, err := peerConnection.AddTransceiverFromKind(RTPCodecTypeVideo); err == nil {
// rtc.videoTrack = r.Sender().Track()
// } else {
// Println(err)
// }
if err != nil {
return
}
rtc.RemoteAddr = r.RemoteAddr
if err = rtc.SetRemoteDescription(offer); err != nil {
return
}
rtc.m.PopulateFromSDP(offer)
// var vpayloadType uint8 = 0
// for _, videoCodec := range rtc.m.GetCodecsByKind(RTPCodecTypeVideo) {
// if videoCodec.Name == H264 {
// vpayloadType = videoCodec.PayloadType
// videoCodec.Payloader = &rtc.payloader
// Printf("H264 fmtp %v", videoCodec.SDPFmtpLine)
// break
// }
// }
if rtc.videoTrack, err = rtc.NewTrack(DefaultPayloadTypeH264, 8, "video", "monibuca"); err != nil {
return
}
if _, err = rtc.AddTrack(rtc.videoTrack); err != nil {
return
}
if bytes, err := rtc.GetAnswer(); err == nil {
w.Write(bytes)
4 years ago
} else {
return
4 years ago
}
})
4 years ago
http.HandleFunc("/webrtc/publish", func(w http.ResponseWriter, r *http.Request) {
streamPath := r.URL.Query().Get("streamPath")
offer := SessionDescription{}
bytes, err := ioutil.ReadAll(r.Body)
err = json.Unmarshal(bytes, &offer)
if err != nil {
Println(err)
return
}
rtc := new(WebRTC)
rtc.RemoteAddr = r.RemoteAddr
if rtc.Publish(streamPath) {
if err := rtc.SetRemoteDescription(offer); err != nil {
Println(err)
return
}
if bytes, err = rtc.GetAnswer(); err == nil {
4 years ago
w.Write(bytes)
} else {
Println(err)
4 years ago
w.Write([]byte(err.Error()))
return
}
4 years ago
} else {
w.Write([]byte("bad name"))
4 years ago
}
})
}