Browse Source

update serivce

master
fajiao 2 years ago
parent
commit
22f70e53dd
  1. 35
      internal/app/ffmpeg/baseService.go
  2. 4
      internal/app/ffmpeg/router.go
  3. 5
      internal/app/ffmpeg/toHlsService.go
  4. 11
      internal/pkg/global/global.go
  5. 15
      internal/pkg/result/result.go
  6. 25
      internal/pkg/server/ffmpegServer/baseCmd.go
  7. 17
      internal/pkg/server/ffmpegServer/baseCmd_test.go
  8. 6
      internal/pkg/server/systemServer/config.go
  9. 15
      internal/pkg/server/systemServer/payload_test.go

35
internal/app/ffmpeg/baseService.go

@ -0,0 +1,35 @@
package ffmpeg
import (
"go.uber.org/zap"
"ycmediakit/internal/pkg/global"
"ycmediakit/internal/pkg/result"
"ycmediakit/internal/pkg/server/ffmpegServer"
"github.com/gin-gonic/gin"
)
var (
cfg *ffmpegServer.FfmpegConfig
log *zap.Logger
)
func init() {
cfg = &global.AppConfig.Ffmpeg
log = zap.L()
}
func ProbeStream(c *gin.Context) {
target := c.Query("target")
target, ok := ffmpegServer.PrepareUrl(target)
if !ok {
result.InvalidParams.WithVoidData().Failure(c)
return
}
jsonStr, err := ffmpegServer.ProbeStreamsWithTimeout(target, cfg.Timeout)
if err != nil {
result.Wrong.WithVoidData().WithMsg(err.Error()).Error(c)
return
}
result.Ok.WithData(jsonStr).Success(c)
}

4
internal/app/ffmpeg/router.go

@ -3,6 +3,10 @@ package ffmpeg
import "github.com/gin-gonic/gin" import "github.com/gin-gonic/gin"
func BindRouters(group *gin.RouterGroup) { func BindRouters(group *gin.RouterGroup) {
// BaseService
group.GET("/probeStream", ProbeStream)
// ToHlsService
group.GET("/startToHls", StartToHls) group.GET("/startToHls", StartToHls)
group.GET("/existsToHls", ExistsToHls) group.GET("/existsToHls", ExistsToHls)
group.GET("/closeToHls", CloseToHls) group.GET("/closeToHls", CloseToHls)

5
internal/app/ffmpeg/service.go → internal/app/ffmpeg/toHlsService.go

@ -6,18 +6,13 @@ import (
"ycmediakit/internal/pkg/server/ffmpegServer" "ycmediakit/internal/pkg/server/ffmpegServer"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"go.uber.org/zap"
) )
var ( var (
cfg *ffmpegServer.FfmpegConfig
log *zap.Logger
toHlsServer *ffmpegServer.ToHlsServer toHlsServer *ffmpegServer.ToHlsServer
) )
func init() { func init() {
cfg = &global.AppConfig.Ffmpeg
log = zap.L()
toHlsServer = ffmpegServer.NewToHlsServer(&cfg.ToHls) toHlsServer = ffmpegServer.NewToHlsServer(&cfg.ToHls)
global.ToHlsServer = toHlsServer global.ToHlsServer = toHlsServer
} }

11
internal/pkg/global/global.go

@ -1,6 +1,8 @@
package global package global
import ( import (
"fmt"
"github.com/go-playground/validator/v10"
"ycmediakit/internal/pkg/config" "ycmediakit/internal/pkg/config"
"ycmediakit/internal/pkg/logger" "ycmediakit/internal/pkg/logger"
"ycmediakit/internal/pkg/server/ffmpegServer" "ycmediakit/internal/pkg/server/ffmpegServer"
@ -29,6 +31,15 @@ func init() {
func initConfig() { func initConfig() {
rCfg := config.NewRunConfig() rCfg := config.NewRunConfig()
aCfg := config.NewAppConfig(rCfg) aCfg := config.NewAppConfig(rCfg)
validate := validator.New()
err := validate.Struct(rCfg)
if err != nil {
fmt.Println(err)
}
err = validate.Struct(aCfg)
if err != nil {
fmt.Println(err)
}
RunConfig = rCfg RunConfig = rCfg
AppConfig = aCfg AppConfig = aCfg
} }

15
internal/pkg/result/result.go

@ -34,36 +34,49 @@ func NewResponse(options ...ResponseOption) *Response {
return resp return resp
} }
// WithCode code:code
func (r *Response) WithCode(code int) *Response { func (r *Response) WithCode(code int) *Response {
r.Code = code r.Code = code
return r return r
} }
// WithMsg msg:msg
func (r *Response) WithMsg(msg string) *Response { func (r *Response) WithMsg(msg string) *Response {
r.Msg = msg r.Msg = msg
return r return r
} }
// WithData data:data
func (r *Response) WithData(data interface{}) *Response { func (r *Response) WithData(data interface{}) *Response {
r.Data = data r.Data = data
return r return r
} }
// WithVoidData data:nil
func (r *Response) WithVoidData() *Response {
r.Data = nil
return r
}
// WithFlag data:{"flag":flag}
func (r *Response) WithFlag(flag bool) *Response { func (r *Response) WithFlag(flag bool) *Response {
r.Data = gin.H{"flag": flag} r.Data = gin.H{"flag": flag}
return r return r
} }
func (r *Response) WithFlagValue(flag bool, value interface{}) *Response { // WithFV data:{"flag":flag, "value":value}
func (r *Response) WithFV(flag bool, value interface{}) *Response {
r.Data = gin.H{"flag": flag, "value": value} r.Data = gin.H{"flag": flag, "value": value}
return r return r
} }
// WithVoidFV data:{"flag":false, "value":nil}
func (r *Response) WithVoidFV() *Response { func (r *Response) WithVoidFV() *Response {
r.Data = gin.H{"flag": false, "value": nil} r.Data = gin.H{"flag": false, "value": nil}
return r return r
} }
// WithFullFV data:{"flag":true, "value":value}
func (r *Response) WithFullFV(value interface{}) *Response { func (r *Response) WithFullFV(value interface{}) *Response {
r.Data = gin.H{"flag": true, "value": value} r.Data = gin.H{"flag": true, "value": value}
return r return r

25
internal/pkg/server/ffmpegServer/baseCmd.go

@ -20,6 +20,7 @@ func PrepareUrl(url string) (string, bool) {
url = urlArr[0] url = urlArr[0]
return url, true return url, true
} }
func PrepareStreamPath(streamPath string) (string, bool) { func PrepareStreamPath(streamPath string) (string, bool) {
streamArr := strings.Split(streamPath, "/") streamArr := strings.Split(streamPath, "/")
if len(streamArr) != 2 { if len(streamArr) != 2 {
@ -31,10 +32,10 @@ func PrepareStreamPath(streamPath string) (string, bool) {
func ProbeWithTimeout(url string, timeoutMs int) error { func ProbeWithTimeout(url string, timeoutMs int) error {
args := ffmpegCmd.ConvertKwargsToCmdLineArgs(ffmpegCmd.KwArgs{ args := ffmpegCmd.ConvertKwargsToCmdLineArgs(ffmpegCmd.KwArgs{
"i": url,
"stimeout": timeoutMs * 1000, "stimeout": timeoutMs * 1000,
"loglevel": "error", "loglevel": "error",
}) })
args = append(args, url)
ctx := context.Background() ctx := context.Background()
cmd := exec.CommandContext(ctx, "ffprobe", args...) cmd := exec.CommandContext(ctx, "ffprobe", args...)
errBuf := new(strings.Builder) errBuf := new(strings.Builder)
@ -46,3 +47,25 @@ func ProbeWithTimeout(url string, timeoutMs int) error {
} }
return nil return nil
} }
func ProbeStreamsWithTimeout(url string, timeoutMs int) (string, error) {
args := ffmpegCmd.ConvertKwargsToCmdLineArgs(ffmpegCmd.KwArgs{
"i": url,
"show_streams": "",
"stimeout": timeoutMs * 1000,
"of": "json",
"loglevel": "error",
})
ctx := context.Background()
cmd := exec.CommandContext(ctx, "ffprobe", args...)
infoBuf, errBuf := new(strings.Builder), new(strings.Builder)
cmd.Stdout = infoBuf
cmd.Stderr = errBuf
err := cmd.Run()
if err != nil {
errStr := strings.TrimSpace(errBuf.String())
return "", errors.New(errStr)
}
infoStr := strings.TrimSpace(infoBuf.String())
return infoStr, nil
}

17
internal/pkg/server/ffmpegServer/baseCmd_test.go

@ -0,0 +1,17 @@
package ffmpegServer
import (
"fmt"
"testing"
)
func TestProbeStreamsWithTimeout(t *testing.T) {
url := "rtsp://admin:hk123456@192.168.1.65:554/Streaming/Channels/101"
timeout := 5000
str, err := ProbeStreamsWithTimeout(url, timeout)
if err != nil {
fmt.Println(err)
} else {
fmt.Println(str)
}
}

6
internal/pkg/server/systemServer/config.go

@ -5,7 +5,7 @@ type SystemConfig struct {
} }
type PayloadConfig struct { type PayloadConfig struct {
MaxRetryTime int MaxRetryTime int `validate:"min=1,max=5"`
MaxCpuUsage float32 MaxCpuUsage float32 `validate:"min=60,max=100"`
MaxMemoryUsage float32 MaxMemoryUsage float32 `validate:"min=60,max=100"`
} }

15
internal/pkg/server/systemServer/payload_test.go

@ -3,6 +3,7 @@ package systemServer
import ( import (
"fmt" "fmt"
"testing" "testing"
"time"
) )
func TestPayloadServer(t *testing.T) { func TestPayloadServer(t *testing.T) {
@ -13,3 +14,17 @@ func TestPayloadServer(t *testing.T) {
fmt.Println(p) fmt.Println(p)
} }
} }
func TestPayloadServerV2(t *testing.T) {
ps := NewPayloadServer(nil)
var p Payload
for i := 0; i < 1000; i++ {
if i%2 == 0 {
time.Sleep(time.Second)
} else {
<-ps.Wait()
}
p = ps.GetPayload()
fmt.Println(p)
}
}

Loading…
Cancel
Save