You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

372 lines
8.7 KiB

5 years ago
package webrtc
import (
4 years ago
"bytes"
5 years ago
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
4 years ago
"sync"
5 years ago
"time"
. "github.com/Monibuca/engine/v2"
4 years ago
"github.com/Monibuca/engine/v2/avformat"
"github.com/Monibuca/engine/v2/util"
5 years ago
. "github.com/Monibuca/plugin-rtp"
5 years ago
"github.com/pion/rtcp"
4 years ago
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
5 years ago
. "github.com/pion/webrtc/v2"
)
4 years ago
var config struct {
5 years ago
ICEServers []string
4 years ago
}
// }{[]string{
// "stun:stun.ekiga.net",
// "stun:stun.ideasip.com",
// "stun:stun.schlund.de",
// "stun:stun.stunprotocol.org:3478",
// "stun:stun.voiparound.com",
// "stun:stun.voipbuster.com",
// "stun:stun.voipstunt.com",
// "stun:stun.voxgratia.org",
// "stun:stun.services.mozilla.com",
// "stun:stun.xten.com",
// "stun:stun.softjoys.com",
// "stun:stunserver.org",
// "stun:stun.schlund.de",
// "stun:stun.rixtelecom.se",
// "stun:stun.iptel.org",
// "stun:stun.ideasip.com",
// "stun:stun.fwdnet.net",
// "stun:stun.ekiga.net",
// "stun:stun01.sipphone.com",
// }}
5 years ago
// type udpConn struct {
// conn *net.UDPConn
// port int
// }
5 years ago
var m MediaEngine
var api *API
4 years ago
var SSRC uint32
var SSRCMap = make(map[string]uint32)
var ssrcLock sync.Mutex
5 years ago
func init() {
m.RegisterCodec(NewRTPH264Codec(DefaultPayloadTypeH264, 90000))
//m.RegisterCodec(NewRTPPCMUCodec(DefaultPayloadTypePCMU, 8000))
api = NewAPI(WithMediaEngine(m))
5 years ago
InstallPlugin(&PluginConfig{
4 years ago
Config: &config,
5 years ago
Name: "WebRTC",
Type: PLUGIN_PUBLISHER | PLUGIN_SUBSCRIBER,
Run: run,
})
}
type WebRTC struct {
5 years ago
RTP
*PeerConnection
4 years ago
RemoteAddr string
}
4 years ago
func (rtc *WebRTC) Play(streamPath string) bool {
5 years ago
peerConnection, err := api.NewPeerConnection(Configuration{
ICEServers: []ICEServer{
{
URLs: config.ICEServers,
},
},
})
if _, err = peerConnection.AddTransceiverFromKind(RTPCodecTypeVideo); err != nil {
5 years ago
if err != nil {
Println(err)
return false
5 years ago
}
}
if err != nil {
return false
5 years ago
}
4 years ago
rtc.PeerConnection = peerConnection
// Create a video track, using the same SSRC as the incoming RTP Packet
ssrcLock.Lock()
ssrc, ok := SSRCMap[streamPath]
if !ok {
SSRC++
ssrc = SSRC
SSRCMap[streamPath] = SSRC
}
ssrcLock.Unlock()
videoTrack, err := peerConnection.NewTrack(DefaultPayloadTypeH264, ssrc, "video", "monibuca")
if err != nil {
Println(err)
return false
}
if _, err = peerConnection.AddTrack(videoTrack); err != nil {
Println(err)
return false
}
var sequence uint16
var sub Subscriber
var sps []byte
var pps []byte
sub.ID = rtc.RemoteAddr
sub.Type = "WebRTC"
nextHeader := func(ts uint32, marker bool) rtp.Header {
sequence++
return rtp.Header{
Version: 2,
SSRC: ssrc,
PayloadType: DefaultPayloadTypeH264,
SequenceNumber: sequence,
Timestamp: ts,
Marker: marker,
}
}
stapA := func(naul ...[]byte) []byte {
var buffer bytes.Buffer
buffer.WriteByte(24)
for _, n := range naul {
l := len(n)
buffer.WriteByte(byte(l >> 8))
buffer.WriteByte(byte(l))
buffer.Write(n)
}
return buffer.Bytes()
}
4 years ago
// aud := []byte{0x09, 0x30}
sub.OnData = func(packet *avformat.SendPacket) error {
if packet.Type == avformat.FLV_TAG_TYPE_AUDIO {
return nil
}
if packet.IsSequence {
payload := packet.Payload[11:]
spsLen := int(payload[0])<<8 + int(payload[1])
sps = payload[2:spsLen]
payload = payload[3+spsLen:]
ppsLen := int(payload[0])<<8 + int(payload[1])
pps = payload[2:ppsLen]
} else {
if packet.IsKeyFrame {
if err := videoTrack.WriteRTP(&rtp.Packet{
Header: nextHeader(0, true),
Payload: stapA(sps, pps),
}); err != nil {
return err
4 years ago
}
}
payload := packet.Payload[5:]
for {
var naulLen = int(util.BigEndian.Uint32(payload))
payload = payload[4:]
_payload := payload[:naulLen]
if naulLen > 1000 {
part := _payload[:1000]
indicator := ((part[0] >> 5) << 5) | 28
nalutype := part[0] & 31
header := 128 | nalutype
part = part[1:]
marker := false
4 years ago
for {
if err := videoTrack.WriteRTP(&rtp.Packet{
Header: nextHeader(packet.Timestamp*90, marker),
Payload: append([]byte{indicator, header}, part...),
}); err != nil {
return err
4 years ago
}
if _payload == nil {
4 years ago
break
}
if len(_payload[1000:]) <= 1000 {
header = 64 | nalutype
part = _payload[1000:]
_payload = nil
marker = true
} else {
header = nalutype
part = _payload[1000:]
_payload = part
}
}
} else {
if err := videoTrack.WriteRTP(&rtp.Packet{
Header: nextHeader(packet.Timestamp*90, true),
Payload: _payload,
}); err != nil {
return err
4 years ago
}
}
if len(payload) < naulLen+4 {
break
}
payload = payload[naulLen:]
4 years ago
}
// if err := videoTrack.WriteRTP(&rtp.Packet{
// Header: nextHeader(packet.Timestamp * 90),
// Payload: aud,
// }); err != nil {
// return err
// }
4 years ago
}
return nil
}
go sub.Subscribe(streamPath)
// peerConnection.OnICEConnectionStateChange(func(connectionState ICEConnectionState) {
// Printf("%s Connection State has changed %s ", streamPath, connectionState.String())
// switch connectionState {
// case ICEConnectionStateDisconnected:
// if rtc.Stream != nil {
// rtc.Stream.Close()
// }
// case ICEConnectionStateConnected:
// }
// })
4 years ago
return true
}
func (rtc *WebRTC) Publish(streamPath string) bool {
peerConnection, err := api.NewPeerConnection(Configuration{
ICEServers: []ICEServer{
{
URLs: config.ICEServers,
},
},
})
if _, err = peerConnection.AddTransceiverFromKind(RTPCodecTypeVideo); err != nil {
if err != nil {
Println(err)
return false
}
}
if err != nil {
return false
}
peerConnection.OnICEConnectionStateChange(func(connectionState ICEConnectionState) {
Printf("%s Connection State has changed %s ", streamPath, connectionState.String())
switch connectionState {
case ICEConnectionStateDisconnected, ICEConnectionStateFailed:
if rtc.Stream != nil {
rtc.Stream.Close()
}
5 years ago
}
})
rtc.PeerConnection = peerConnection
5 years ago
if rtc.RTP.Publish(streamPath) {
4 years ago
//f, _ := os.OpenFile("resource/live/rtc.h264", os.O_TRUNC|os.O_WRONLY, 0666)
var h264 codecs.H264Packet
rtc.Stream.Type = "WebRTC"
peerConnection.OnTrack(func(track *Track, receiver *RTPReceiver) {
5 years ago
defer rtc.Stream.Close()
go func() {
ticker := time.NewTicker(time.Second * 2)
select {
case <-ticker.C:
if rtcpErr := peerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: track.SSRC()}}); rtcpErr != nil {
fmt.Println(rtcpErr)
}
case <-rtc.Done():
return
}
}()
5 years ago
pack := &RTPPack{
Type: RTPType(track.Kind() - 1),
}
5 years ago
for b := make([]byte, 1460); ; rtc.PushPack(pack) {
i, err := track.Read(b)
if err != nil {
return
}
if err = pack.Unmarshal(b[:i]); err != nil {
return
}
4 years ago
h264.Unmarshal(pack.Payload)
// f.Write(bytes)
}
})
} else {
return false
}
return true
}
func (rtc *WebRTC) GetAnswer(localSdp SessionDescription) ([]byte, error) {
4 years ago
// Sets the LocalDescription, and starts our UDP listeners
if err := rtc.SetLocalDescription(localSdp); err != nil {
4 years ago
Println(err)
return nil, err
}
if bytes, err := json.Marshal(localSdp); err != nil {
4 years ago
Println(err)
return bytes, err
} else {
return bytes, nil
}
}
func run() {
4 years ago
http.HandleFunc("/webrtc/play", func(w http.ResponseWriter, r *http.Request) {
streamPath := r.URL.Query().Get("streamPath")
// offer := SessionDescription{}
// bytes, err := ioutil.ReadAll(r.Body)
// err = json.Unmarshal(bytes, &offer)
// if err != nil {
// Println(err)
// return
// }
rtc := new(WebRTC)
4 years ago
rtc.RemoteAddr = r.RemoteAddr
if rtc.Play(streamPath) {
offer, err := rtc.CreateOffer(nil)
if err != nil {
Println(err)
return
}
if bytes, err := rtc.GetAnswer(offer); err == nil {
4 years ago
w.Write(bytes)
} else {
Println(err)
4 years ago
w.Write([]byte(err.Error()))
return
}
4 years ago
} else {
w.Write([]byte(`{"errmsg":"bad name"}`))
}
})
http.HandleFunc("/webrtc/publish", func(w http.ResponseWriter, r *http.Request) {
streamPath := r.URL.Query().Get("streamPath")
offer := SessionDescription{}
bytes, err := ioutil.ReadAll(r.Body)
err = json.Unmarshal(bytes, &offer)
if err != nil {
Println(err)
return
}
rtc := new(WebRTC)
rtc.RemoteAddr = r.RemoteAddr
if rtc.Publish(streamPath) {
if err := rtc.SetRemoteDescription(offer); err != nil {
Println(err)
return
}
answer, err := rtc.CreateAnswer(nil)
if err != nil {
Println(err)
return
}
if bytes, err = rtc.GetAnswer(answer); err == nil {
4 years ago
w.Write(bytes)
} else {
Println(err)
4 years ago
w.Write([]byte(err.Error()))
return
}
5 years ago
} else {
w.Write([]byte(`{"errmsg":"bad name"}`))
5 years ago
}
})
}