Browse Source

update scaffold and serivce

master
fajiao 2 years ago
parent
commit
2cb81675b0
  1. 82
      cmd/run.go
  2. 7
      config/config.dev.yaml
  3. 7
      go.mod
  4. 22
      go.sum
  5. 71
      internal/app/ffmpeg/service.go
  6. 7
      internal/pkg/config/config.go
  7. 61
      internal/pkg/global/global.go
  8. 41
      internal/pkg/result/result.go
  9. 48
      internal/pkg/server/ffmpegServer/baseCmd.go
  10. 98
      internal/pkg/server/ffmpegServer/toHlsCmd.go
  11. 11
      internal/pkg/server/systemServer/config.go
  12. 80
      internal/pkg/server/systemServer/payload.go
  13. 15
      internal/pkg/server/systemServer/payload_test.go
  14. 47
      internal/pkg/util/channel.go
  15. 4
      main.go
  16. 21
      middleware/interceptor.go

82
cmd/run.go

@ -0,0 +1,82 @@
package cmd
import (
"context"
"fmt"
"net/http"
"path/filepath"
"ycmediakit/internal/app/ffmpeg"
"ycmediakit/internal/pkg/config"
"ycmediakit/internal/pkg/global"
"ycmediakit/internal/pkg/server/systemServer"
"ycmediakit/internal/pkg/util"
"ycmediakit/middleware"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
)
var (
rCfg *config.RunConfig
aCfg *config.AppConfig
log *zap.Logger
ctx context.Context
engine *gin.Engine
)
var (
payloadServer *systemServer.PayloadServer
)
func init() {
initAttr()
initServer()
initEngine()
}
func initAttr() {
rCfg = global.RunConfig
aCfg = global.AppConfig
log = global.Logger
}
func initServer() {
payloadServer = global.PayloadServer
}
func initEngine() {
if !rCfg.Debug {
gin.SetMode(gin.ReleaseMode)
}
engine = gin.New()
engine.Use(middleware.Cors())
engine.Use(middleware.GinLogger(log), middleware.GinRecovery(log, true))
engine.Use(middleware.HttpInterceptor())
engineBindFS()
engineBindRouters()
}
func engineBindFS() {
fCfg := &aCfg.Ffmpeg
hlsPath := filepath.Join(util.GetWorkPath(), fCfg.ToHls.HlsRoot)
_, _ = util.CheckDir(hlsPath)
engine.StaticFS("/"+fCfg.ToHls.HlsRoot, http.Dir(hlsPath))
}
func engineBindRouters() {
group := engine.Group("/ffmpeg")
group.Use(middleware.ProbePayloadInterceptor(payloadServer))
ffmpeg.BindRouters(group)
}
func Run(ctx_ context.Context) {
ctx = ctx_
port := aCfg.Server.Port
runMsg := "Listening and serving HTTP on :" + port
fmt.Printf(runMsg + "\n")
log.Info(runMsg)
if err := engine.Run(":" + port); err != nil {
log.Error(err.Error())
return
}
}

7
config/config.dev.yaml

@ -1,8 +1,13 @@
server:
port: 8888
system:
payload:
maxRetryTime: 3
maxCpuUsage: 95.0
maxMemoryUsage: 95.0
ffmpeg:
timeout: 5000
ToHls:
toHls:
base:
encodeCodec: h264
profile: baseline

7
go.mod

@ -4,6 +4,7 @@ go 1.18
require (
github.com/gin-gonic/gin v1.9.0
github.com/shirou/gopsutil/v3 v3.23.2
github.com/spf13/viper v1.15.0
github.com/u2takey/ffmpeg-go v0.4.1
go.uber.org/zap v1.24.0
@ -16,6 +17,7 @@ require (
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.11.2 // indirect
@ -25,20 +27,25 @@ require (
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/spf13/afero v1.9.3 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
github.com/tklauser/go-sysconf v0.3.11 // indirect
github.com/tklauser/numcpus v0.6.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/u2takey/go-utils v0.3.1 // indirect
github.com/ugorji/go/codec v1.2.9 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/arch v0.0.0-20210923205945-b76863e36670 // indirect

22
go.sum

@ -77,6 +77,8 @@ github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
@ -125,7 +127,9 @@ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
@ -174,6 +178,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w=
github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng=
@ -196,9 +202,13 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8=
github.com/shirou/gopsutil/v3 v3.23.2 h1:PAWSuiAszn7IhPMBtXsbSCafej7PqUOvY6YywlQUExU=
github.com/shirou/gopsutil/v3 v3.23.2/go.mod h1:gv0aQw33GLo3pG8SiWKiQrbDzbRY1K80RyZJ7V4Th1M=
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
github.com/spf13/afero v1.9.3 h1:41FoI0fD7OR7mGcKE/aOiLkGreyf8ifIOQmJANWogMk=
github.com/spf13/afero v1.9.3/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y=
@ -221,10 +231,15 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/subosito/gotenv v1.4.2 h1:X1TuBLAMDFbaTAChgCBLu3DU3UPyELpnF2jjJ2cz/S8=
github.com/subosito/gotenv v1.4.2/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM=
github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI=
github.com/tklauser/numcpus v0.6.0 h1:kebhY2Qt+3U6RNK7UqpYNA+tJ23IBEGKkB7JQBfDYms=
github.com/tklauser/numcpus v0.6.0/go.mod h1:FEZLMke0lhOUG6w2JadTzp0a+Nl8PF/GFkQ5UVIcaL4=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/u2takey/ffmpeg-go v0.4.1 h1:l5ClIwL3N2LaH1zF3xivb3kP2HW95eyG5xhHE1JdZ9Y=
@ -237,6 +252,8 @@ github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg=
github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
@ -359,6 +376,7 @@ golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -380,6 +398,7 @@ golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -387,6 +406,7 @@ golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=

71
internal/app/ffmpeg/service.go

@ -1,92 +1,95 @@
package ffmpeg
import (
"github.com/gin-gonic/gin"
"go.uber.org/zap"
"ycmediakit/internal/pkg/global"
"ycmediakit/internal/pkg/result"
"ycmediakit/internal/pkg/server/ffmpegServer"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
)
var (
cfg *ffmpegServer.FfmpegConfig
logger *zap.Logger
server *ffmpegServer.ToHlsServer
log *zap.Logger
toHlsServer *ffmpegServer.ToHlsServer
)
func Init(cfg_ *ffmpegServer.FfmpegConfig) {
cfg = cfg_
logger = zap.L()
server = ffmpegServer.NewToHlsServer(&cfg.ToHls)
func init() {
cfg = &global.AppConfig.Ffmpeg
log = zap.L()
toHlsServer = ffmpegServer.NewToHlsServer(&cfg.ToHls)
global.ToHlsServer = toHlsServer
}
func StartToHls(c *gin.Context) {
target := c.Query("target")
streamPath := c.Query("streamPath")
target, ok := server.PrepareUrl(target)
target, ok := ffmpegServer.PrepareUrl(target)
if !ok {
result.InvalidParams.WithFlag(false).Failure(c)
result.InvalidParams.WithVoidFV().Failure(c)
return
}
streamPath, ok = server.PrepareStreamPath(streamPath)
streamPath, ok = ffmpegServer.PrepareStreamPath(streamPath)
if !ok {
result.InvalidParams.WithFlag(false).Failure(c)
result.InvalidParams.WithVoidFV().Failure(c)
return
}
ok = server.Exists(streamPath)
ok = toHlsServer.Exists(streamPath)
if ok {
result.Created.WithFlagValue(true, server.BuildHlsPath(streamPath)).Success(c)
result.Created.WithFullFV(toHlsServer.BuildHlsPath(streamPath)).Success(c)
return
}
err := server.ProbeWithTimeout(target, cfg.Timeout)
err := ffmpegServer.ProbeWithTimeout(target, cfg.Timeout)
if err != nil {
logger.Error(err.Error())
result.Wrong.WithMsg(err.Error()).Error(c)
log.Error(err.Error())
result.Wrong.WithVoidFV().WithMsg(err.Error()).Error(c)
return
}
ok = server.Add(target, streamPath, cfg.Timeout, &cfg.ToHls)
ok = toHlsServer.Add(target, streamPath, cfg.Timeout, &cfg.ToHls)
if !ok {
result.Ok.WithFlag(false).Failure(c)
result.Ok.WithVoidFV().Failure(c)
return
}
go func() {
_, err = server.RunToHlsCmd(streamPath)
_, err = toHlsServer.RunToHlsCmd(streamPath)
if err != nil {
logger.Error(err.Error())
err = server.DeleteAndStop(streamPath)
log.Error(err.Error())
err = toHlsServer.DeleteAndStop(streamPath)
if err != nil {
logger.Error(err.Error())
log.Error(err.Error())
}
}
}()
result.Ok.WithFlagValue(true, server.BuildHlsPath(streamPath)).Success(c)
result.Ok.WithFullFV(toHlsServer.BuildHlsPath(streamPath)).Success(c)
}
func ExistsToHls(c *gin.Context) {
streamPath := c.Query("streamPath")
streamPath, ok := server.PrepareStreamPath(streamPath)
streamPath, ok := ffmpegServer.PrepareStreamPath(streamPath)
if !ok {
result.InvalidParams.WithFlag(false).Failure(c)
result.InvalidParams.Failure(c)
return
}
ok = server.Exists(streamPath)
ok = toHlsServer.Exists(streamPath)
result.Ok.WithFlag(ok).Success(c)
}
func CloseToHls(c *gin.Context) {
streamPath := c.Query("streamPath")
streamPath, ok := server.PrepareStreamPath(streamPath)
streamPath, ok := ffmpegServer.PrepareStreamPath(streamPath)
if !ok {
result.InvalidParams.WithFlag(false).Failure(c)
result.InvalidParams.Failure(c)
return
}
ok = server.Exists(streamPath)
ok = toHlsServer.Exists(streamPath)
if !ok {
result.NotFound.WithFlag(false).Failure(c)
result.NotFound.Failure(c)
return
}
err := server.DeleteAndStop(streamPath)
err := toHlsServer.DeleteAndStop(streamPath)
if err != nil {
logger.Error(err.Error())
log.Error(err.Error())
result.Wrong.WithMsg(err.Error()).Error(c)
return
}
@ -94,5 +97,5 @@ func CloseToHls(c *gin.Context) {
}
func GetToHlsList(c *gin.Context) {
result.Ok.WithFlagValue(true, server.GetList()).Success(c)
result.Ok.WithData(toHlsServer.GetList()).Success(c)
}

7
internal/pkg/config/config.go

@ -7,6 +7,7 @@ import (
"strings"
"ycmediakit/internal/pkg/logger"
"ycmediakit/internal/pkg/server/ffmpegServer"
"ycmediakit/internal/pkg/server/systemServer"
"ycmediakit/internal/pkg/util"
)
@ -18,6 +19,7 @@ type RunConfig struct {
type AppConfig struct {
Server ServerConfig ``
System systemServer.SystemConfig ``
Ffmpeg ffmpegServer.FfmpegConfig ``
}
@ -25,9 +27,6 @@ type ServerConfig struct {
Port string
}
func init() {
}
func NewRunConfig() *RunConfig {
v := viper.New()
v.AddConfigPath(filepath.Join(util.GetWorkPath(), "config"))
@ -38,6 +37,7 @@ func NewRunConfig() *RunConfig {
}
var cfg RunConfig
if err := v.Unmarshal(&cfg); err != nil {
log.Fatalf("read config failed: %v", err)
}
return &cfg
}
@ -52,6 +52,7 @@ func NewAppConfig(runConfig *RunConfig) *AppConfig {
}
var cfg AppConfig
if err := v.Unmarshal(&cfg); err != nil {
log.Fatalf("read config failed: %v", err)
}
return &cfg
}

61
internal/pkg/global/global.go

@ -1,31 +1,29 @@
package global
import (
"context"
"fmt"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
"net/http"
"path/filepath"
"ycmediakit/internal/app/ffmpeg"
"ycmediakit/internal/pkg/config"
"ycmediakit/internal/pkg/logger"
"ycmediakit/internal/pkg/util"
"ycmediakit/middleware"
"ycmediakit/internal/pkg/server/ffmpegServer"
"ycmediakit/internal/pkg/server/systemServer"
"go.uber.org/zap"
)
var (
RunConfig *config.RunConfig
AppConfig *config.AppConfig
Logger *zap.Logger
Ctx context.Context
engine *gin.Engine
)
var (
PayloadServer *systemServer.PayloadServer
ToHlsServer *ffmpegServer.ToHlsServer
)
func init() {
initConfig()
initLogger()
initEngine()
initServer()
}
func initConfig() {
@ -45,41 +43,6 @@ func initLogger() {
zap.ReplaceGlobals(Logger) // 替换 zap 库中全局变量,可以直接通过 zap.L() 访问
}
func initEngine() {
if !RunConfig.Debug {
gin.SetMode(gin.ReleaseMode)
}
engine = gin.New()
engine.Use(middleware.Cors())
engine.Use(middleware.GinLogger(Logger), middleware.GinRecovery(Logger, true))
engine.Use(middleware.HttpInterceptor())
engineBindFS()
engineBindRouters()
}
func engineBindFS() {
fCfg := &AppConfig.Ffmpeg
hlsPath := filepath.Join(util.GetWorkPath(), fCfg.ToHls.HlsRoot)
_, _ = util.CheckDir(hlsPath)
engine.StaticFS("/"+fCfg.ToHls.HlsRoot, http.Dir(hlsPath))
}
func engineBindRouters() {
fCfg := &AppConfig.Ffmpeg
group := engine.Group("/ffmpeg")
group.Use(middleware.ProbePayloadInterceptor())
ffmpeg.Init(fCfg)
ffmpeg.BindRouters(group)
}
func Run(ctx context.Context) {
Ctx = ctx
port := AppConfig.Server.Port
runMsg := "Listening and serving HTTP on :" + port
fmt.Printf(runMsg + "\n")
Logger.Info(runMsg)
if err := engine.Run(":" + port); err != nil {
Logger.Error(err.Error())
return
}
func initServer() {
PayloadServer = systemServer.NewPayloadServer(&AppConfig.System.Payload)
}

41
internal/pkg/result/result.go

@ -9,13 +9,13 @@ import (
var (
Ok = NewResponse(WithCode(200), WithMsg("success"))
Created = NewResponse(WithCode(201), WithMsg("created"))
InvalidParams = NewResponse(WithCode(400), WithMsg("invalid params"))
Unauthorized = NewResponse(WithCode(401), WithMsg("unauthorized"))
Forbidden = NewResponse(WithCode(403), WithMsg("forbidden"))
NotFound = NewResponse(WithCode(404), WithMsg("not found"))
TooManyRequests = NewResponse(WithCode(429), WithMsg("too many requests"))
Wrong = NewResponse(WithCode(500), WithMsg("something wrong"))
Created = NewResponse(WithCode(201), WithFlag(true), WithMsg("created"))
InvalidParams = NewResponse(WithCode(400), WithFlag(false), WithMsg("invalid params"))
Unauthorized = NewResponse(WithCode(401), WithFlag(false), WithMsg("unauthorized"))
Forbidden = NewResponse(WithCode(403), WithFlag(false), WithMsg("forbidden"))
NotFound = NewResponse(WithCode(404), WithFlag(false), WithMsg("not found"))
TooManyRequests = NewResponse(WithCode(429), WithFlag(false), WithMsg("too many requests"))
Wrong = NewResponse(WithCode(500), WithFlag(false), WithMsg("something wrong"))
)
type Response struct {
@ -49,18 +49,23 @@ func (r *Response) WithData(data interface{}) *Response {
return r
}
func (r *Response) WithFlag(flag interface{}) *Response {
func (r *Response) WithFlag(flag bool) *Response {
r.Data = gin.H{"flag": flag}
return r
}
func (r *Response) WithValue(value interface{}) *Response {
r.Data = gin.H{"value": value}
func (r *Response) WithFlagValue(flag bool, value interface{}) *Response {
r.Data = gin.H{"flag": flag, "value": value}
return r
}
func (r *Response) WithFlagValue(flag interface{}, value interface{}) *Response {
r.Data = gin.H{"flag": flag, "value": value}
func (r *Response) WithVoidFV() *Response {
r.Data = gin.H{"flag": false, "value": nil}
return r
}
func (r *Response) WithFullFV(value interface{}) *Response {
r.Data = gin.H{"flag": true, "value": value}
return r
}
@ -82,6 +87,18 @@ func WithData(data interface{}) ResponseOption {
}
}
func WithFlag(flag bool) ResponseOption {
return func(resp *Response) {
resp.Data = gin.H{"flag": flag}
}
}
func WithFlagValue(flag bool, value interface{}) ResponseOption {
return func(resp *Response) {
resp.Data = gin.H{"flag": flag, "value": value}
}
}
func (r *Response) Response(c *gin.Context, httpCode int) {
c.JSON(httpCode, &r)
}

48
internal/pkg/server/ffmpegServer/baseCmd.go

@ -0,0 +1,48 @@
package ffmpegServer
import (
"context"
"errors"
ffmpegCmd "github.com/u2takey/ffmpeg-go"
"os/exec"
"path/filepath"
"strings"
)
func PrepareUrl(url string) (string, bool) {
if url == "" {
return "", false
}
urlArr := strings.Split(url, "?")
if len(urlArr) > 2 {
return "", false
}
url = urlArr[0]
return url, true
}
func PrepareStreamPath(streamPath string) (string, bool) {
streamArr := strings.Split(streamPath, "/")
if len(streamArr) != 2 {
return "", false
}
streamPath = filepath.Join(streamArr...)
return streamPath, true
}
func ProbeWithTimeout(url string, timeoutMs int) error {
args := ffmpegCmd.ConvertKwargsToCmdLineArgs(ffmpegCmd.KwArgs{
"stimeout": timeoutMs * 1000,
"loglevel": "error",
})
args = append(args, url)
ctx := context.Background()
cmd := exec.CommandContext(ctx, "ffprobe", args...)
errBuf := new(strings.Builder)
cmd.Stderr = errBuf
err := cmd.Run()
if err != nil {
errStr := strings.TrimSpace(errBuf.String())
return errors.New(errStr)
}
return nil
}

98
internal/pkg/server/ffmpegServer/ffmpegCmd.go → internal/pkg/server/ffmpegServer/toHlsCmd.go

@ -1,31 +1,30 @@
package ffmpegServer
import (
"context"
"errors"
"fmt"
ffmpegCmd "github.com/u2takey/ffmpeg-go"
"go.uber.org/zap"
"os/exec"
"path/filepath"
"sort"
"strings"
"sync"
"ycmediakit/internal/pkg/util"
ffmpegCmd "github.com/u2takey/ffmpeg-go"
"go.uber.org/zap"
)
type FfmpegCmdServer struct {
m sync.Mutex
cmdMap map[string]*CmdBaseInfo
}
var (
log *zap.Logger
)
type CmdBaseInfo struct {
CmdBaseConfig
func init() {
log = zap.L()
}
type ToHlsServer struct {
hlsCfg *ToHlsConfig
lock sync.Mutex
cfg *ToHlsConfig
m sync.Mutex
cmdMap map[string]*ToHlsInfo
}
@ -38,19 +37,19 @@ type ToHlsInfo struct {
func NewToHlsServer(hlsCfg *ToHlsConfig) *ToHlsServer {
return &ToHlsServer{
hlsCfg: hlsCfg,
cfg: hlsCfg,
cmdMap: make(map[string]*ToHlsInfo),
}
}
func (server *ToHlsServer) Add(url string, streamPath string, timeoutMs int, curHlsCfg *ToHlsConfig) bool {
server.lock.Lock()
defer server.lock.Unlock()
server.m.Lock()
defer server.m.Unlock()
_, ok := server.cmdMap[streamPath]
if ok {
return true
}
// merge curHlsCfg, hlsCfg
// merge curHlsCfg, cfg
cmd := server.BuildToHlsCmd(url, streamPath, timeoutMs, curHlsCfg)
info := &ToHlsInfo{
ToHlsConfig: *curHlsCfg,
@ -63,27 +62,27 @@ func (server *ToHlsServer) Add(url string, streamPath string, timeoutMs int, cur
}
func (server *ToHlsServer) Exists(streamPath string) bool {
server.lock.Lock()
defer server.lock.Unlock()
server.m.Lock()
defer server.m.Unlock()
_, ok := server.cmdMap[streamPath]
return ok
}
func (server *ToHlsServer) Delete(streamPath string) bool {
server.lock.Lock()
defer server.lock.Unlock()
server.m.Lock()
defer server.m.Unlock()
delete(server.cmdMap, streamPath)
path := filepath.Join(util.GetWorkPath(), server.hlsCfg.HlsRoot, streamPath, "..")
path := filepath.Join(util.GetWorkPath(), server.cfg.HlsRoot, streamPath, "..")
err := util.RemoveDir(path)
if err != nil {
zap.L().Error(err.Error())
log.Error(err.Error())
}
return true
}
func (server *ToHlsServer) GetList() []string {
server.lock.Lock()
defer server.lock.Unlock()
server.m.Lock()
defer server.m.Unlock()
var slice []string
for _, elm := range server.cmdMap {
slice = append(slice, elm.streamPath)
@ -93,65 +92,26 @@ func (server *ToHlsServer) GetList() []string {
}
func (server *ToHlsServer) DeleteAndStop(streamPath string) error {
server.lock.Lock()
defer server.lock.Unlock()
server.m.Lock()
defer server.m.Unlock()
delete(server.cmdMap, streamPath)
_, err := server.StopToHlsCmd(streamPath)
if err != nil {
return err
}
path := filepath.Join(util.GetWorkPath(), server.hlsCfg.HlsRoot, streamPath)
path := filepath.Join(util.GetWorkPath(), server.cfg.HlsRoot, streamPath)
err = util.RemoveDir(path)
if err != nil {
zap.L().Error(err.Error())
}
return nil
}
func (server *ToHlsServer) PrepareUrl(url string) (string, bool) {
if url == "" {
return "", false
}
urlArr := strings.Split(url, "?")
if len(urlArr) > 2 {
return "", false
}
url = urlArr[0]
return url, true
}
func (server *ToHlsServer) PrepareStreamPath(streamPath string) (string, bool) {
streamArr := strings.Split(streamPath, "/")
if len(streamArr) != 2 {
return "", false
}
streamPath = filepath.Join(streamArr...)
return streamPath, true
}
func (server *ToHlsServer) ProbeWithTimeout(url string, timeoutMs int) error {
args := ffmpegCmd.ConvertKwargsToCmdLineArgs(ffmpegCmd.KwArgs{
"stimeout": timeoutMs * 1000,
"loglevel": "error",
})
args = append(args, url)
ctx := context.Background()
cmd := exec.CommandContext(ctx, "ffprobe", args...)
errBuf := new(strings.Builder)
cmd.Stderr = errBuf
err := cmd.Run()
if err != nil {
errStr := strings.TrimSpace(errBuf.String())
return errors.New(errStr)
log.Error(err.Error())
}
return nil
}
func (server *ToHlsServer) BuildToHlsCmd(url string, streamPath string, timeoutMs int, hlsCfg *ToHlsConfig) *exec.Cmd {
path := filepath.Join(util.GetWorkPath(), server.hlsCfg.HlsRoot, streamPath)
path := filepath.Join(util.GetWorkPath(), server.cfg.HlsRoot, streamPath)
_, err := util.CheckDir(path)
if err != nil {
zap.L().Error(err.Error())
log.Error(err.Error())
}
path = filepath.Join(path, hlsCfg.HlsName+".m3u8")
buffer := new(strings.Builder)
@ -200,5 +160,5 @@ func (server *ToHlsServer) StopToHlsCmd(streamPath string) (bool, error) {
}
func (server *ToHlsServer) BuildHlsPath(streamPath string) string {
return filepath.Join(server.hlsCfg.HlsRoot, streamPath, server.hlsCfg.HlsName+".m3u8")
return filepath.Join(server.cfg.HlsRoot, streamPath, server.cfg.HlsName+".m3u8")
}

11
internal/pkg/server/systemServer/config.go

@ -0,0 +1,11 @@
package systemServer
type SystemConfig struct {
Payload PayloadConfig
}
type PayloadConfig struct {
MaxRetryTime int
MaxCpuUsage float32
MaxMemoryUsage float32
}

80
internal/pkg/server/systemServer/payload.go

@ -0,0 +1,80 @@
package systemServer
import (
"time"
"ycmediakit/internal/pkg/util"
"github.com/shirou/gopsutil/v3/cpu"
"github.com/shirou/gopsutil/v3/mem"
)
type PayloadServer struct {
Cfg *PayloadConfig
ref *util.Channel
p Payload
}
type Payload struct {
Memory struct {
Total uint64
Free uint64
Used uint64
Usage float32
}
Cpu struct {
Usage float32
}
}
func NewPayloadServer(plCfg *PayloadConfig) *PayloadServer {
s := &PayloadServer{
Cfg: plCfg,
ref: util.NewChannel(),
}
go s.start()
return s
}
func (ps *PayloadServer) start() {
for range time.Tick(time.Second) {
if ps.running() {
ps.collect()
ps.done()
}
}
}
// running 是否正在采集数据
func (ps *PayloadServer) running() bool {
return !ps.ref.IsClosed()
}
// Wait 等待
func (ps *PayloadServer) Wait() chan interface{} {
ps.ref.Open()
return ps.ref.C
}
// done 结束
func (ps *PayloadServer) done() {
ps.ref.Close()
}
// collect 采集数据
func (ps *PayloadServer) collect() {
var p Payload
v, _ := mem.VirtualMemory()
p.Memory.Total = v.Total >> 20
p.Memory.Free = v.Available >> 20
p.Memory.Used = v.Used >> 20
p.Memory.Usage = float32(v.UsedPercent)
if cc, _ := cpu.Percent(time.Second, false); len(cc) > 0 {
p.Cpu.Usage = float32(cc[0])
}
ps.p = p
}
// GetPayload 获取采集数据
func (ps *PayloadServer) GetPayload() Payload {
return ps.p
}

15
internal/pkg/server/systemServer/payload_test.go

@ -0,0 +1,15 @@
package systemServer
import (
"fmt"
"testing"
)
func TestPayloadServer(t *testing.T) {
ps := NewPayloadServer(nil)
for {
<-ps.Wait()
p := ps.GetPayload()
fmt.Println(p)
}
}

47
internal/pkg/util/channel.go

@ -0,0 +1,47 @@
package util
import (
"sync"
)
type Channel struct {
m sync.Mutex
closed bool
C chan interface{}
}
func NewChannel() *Channel {
return &Channel{
C: nil,
closed: true,
}
}
func (c *Channel) Open() {
c.m.Lock()
defer c.m.Unlock()
if c.isClosed() {
c.C = make(chan interface{})
c.closed = false
}
}
func (c *Channel) Close() {
c.m.Lock()
defer c.m.Unlock()
if !c.isClosed() {
close(c.C)
c.C = nil
c.closed = true
}
}
func (c *Channel) IsClosed() bool {
c.m.Lock()
defer c.m.Unlock()
return c.isClosed()
}
func (c *Channel) isClosed() bool {
return c.C == nil && c.closed
}

4
main.go

@ -18,12 +18,12 @@ The ffmpeg server for Go
import (
"context"
"ycmediakit/internal/pkg/global"
"ycmediakit/cmd"
"ycmediakit/internal/pkg/util"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
go util.WaitTerminal(cancel)
global.Run(ctx)
cmd.Run(ctx)
}

21
middleware/interceptor.go

@ -2,6 +2,10 @@ package middleware
import (
"github.com/gin-gonic/gin"
"strconv"
"strings"
"ycmediakit/internal/pkg/result"
"ycmediakit/internal/pkg/server/systemServer"
)
// HttpInterceptor 可自定义鉴权等操作
@ -14,8 +18,23 @@ func HttpInterceptor() gin.HandlerFunc {
// ProbePayloadInterceptor 探测 ffmpeg 命令服务器是否有资源得以运行
// TODO
func ProbePayloadInterceptor() gin.HandlerFunc {
func ProbePayloadInterceptor(ps *systemServer.PayloadServer) gin.HandlerFunc {
return func(c *gin.Context) {
cfg := ps.Cfg
if strings.Contains(c.FullPath(), "/start") {
for i := 0; i < cfg.MaxRetryTime; i++ {
<-ps.Wait()
p := ps.GetPayload()
if p.Cpu.Usage <= cfg.MaxCpuUsage && p.Memory.Usage <= cfg.MaxMemoryUsage {
c.Next()
return
}
}
errMsg := "cpu or memory overload, retry time " + strconv.Itoa(cfg.MaxRetryTime)
result.Forbidden.WithMsg(errMsg).Failure(c)
c.Abort()
return
}
c.Next()
}
}

Loading…
Cancel
Save