Browse Source

[feat] 增加定时检测空闲 ffmpeg 命令功能

master
fajiao 2 years ago
parent
commit
8adb839b5a
  1. 28
      cmd/run.go
  2. 11
      config/config.dev.yaml
  3. 14
      internal/app/ffmpeg/baseService.go
  4. 21
      internal/app/ffmpeg/index.go
  5. 12
      internal/app/ffmpeg/toHlsService.go
  6. 17
      internal/pkg/config/config.go
  7. 4
      internal/pkg/global/global.go
  8. 8
      internal/pkg/server/ffmpegServer/baseCmd.go
  9. 3
      internal/pkg/server/ffmpegServer/config.go
  10. 120
      internal/pkg/server/ffmpegServer/toHlsCmd.go
  11. 1
      internal/pkg/server/systemServer/config.go
  12. 6
      internal/pkg/server/systemServer/payload.go
  13. 7
      internal/pkg/unit/keepalive/config.go
  14. 91
      internal/pkg/unit/keepalive/keepalive.go
  15. 2
      internal/pkg/unit/logger/config.go
  16. 45
      internal/pkg/unit/logger/logger.go
  17. 0
      internal/pkg/unit/result/result.go
  18. 106
      internal/pkg/util/RwMap.go
  19. 13
      middleware/interceptor.go

28
cmd/run.go

@ -8,6 +8,7 @@ import (
"ycmediakit/internal/app/ffmpeg" "ycmediakit/internal/app/ffmpeg"
"ycmediakit/internal/pkg/config" "ycmediakit/internal/pkg/config"
"ycmediakit/internal/pkg/global" "ycmediakit/internal/pkg/global"
"ycmediakit/internal/pkg/server/ffmpegServer"
"ycmediakit/internal/pkg/server/systemServer" "ycmediakit/internal/pkg/server/systemServer"
"ycmediakit/internal/pkg/util" "ycmediakit/internal/pkg/util"
"ycmediakit/middleware" "ycmediakit/middleware"
@ -26,6 +27,7 @@ var (
var ( var (
payloadServer *systemServer.PayloadServer payloadServer *systemServer.PayloadServer
toHlsServer *ffmpegServer.ToHlsServer
) )
func init() { func init() {
@ -42,6 +44,7 @@ func initAttr() {
func initServer() { func initServer() {
payloadServer = global.PayloadServer payloadServer = global.PayloadServer
toHlsServer = global.ToHlsServer
} }
func initEngine() { func initEngine() {
@ -52,21 +55,26 @@ func initEngine() {
engine.Use(middleware.Cors()) engine.Use(middleware.Cors())
engine.Use(middleware.GinLogger(log), middleware.GinRecovery(log, true)) engine.Use(middleware.GinLogger(log), middleware.GinRecovery(log, true))
engine.Use(middleware.HttpInterceptor()) engine.Use(middleware.HttpInterceptor())
engineBindFS() initFfmpegServices()
engineBindRouters()
} }
func engineBindFS() { func initFfmpegServices() {
fCfg := &aCfg.Ffmpeg fCfg := &aCfg.Ffmpeg
hlsPath := filepath.Join(util.GetWorkPath(), fCfg.ToHls.HlsRoot) ffmpegGroup := engine.Group("/ffmpeg")
_, _ = util.CheckDir(hlsPath) if payloadServer.Cfg.Enable {
engine.StaticFS("/"+fCfg.ToHls.HlsRoot, http.Dir(hlsPath)) ffmpegGroup.Use(middleware.ProbePayloadInterceptor(payloadServer))
} }
ffmpeg.BindRouters(ffmpegGroup)
func engineBindRouters() { toHlsCfg := &fCfg.ToHls
group := engine.Group("/ffmpeg") hlsPath := filepath.Join(util.GetWorkPath(), toHlsCfg.HlsRoot)
group.Use(middleware.ProbePayloadInterceptor(payloadServer)) _, _ = util.CheckDir(hlsPath)
ffmpeg.BindRouters(group) hlsGroup := engine.Group("/" + toHlsCfg.HlsRoot)
if toHlsServer.Cfg.Keepalive.Enable {
hlsGroup.Use(middleware.KeepAliveToHlsInterceptor(toHlsServer.Supervisor))
ffmpegGroup.Use(middleware.ProbePayloadInterceptor(payloadServer))
}
hlsGroup.StaticFS("/", http.Dir(hlsPath))
} }
func Run(ctx_ context.Context) { func Run(ctx_ context.Context) {

11
config/config.dev.yaml

@ -2,11 +2,12 @@ server:
port: 8888 port: 8888
system: system:
payload: payload:
enable: true
maxRetryTime: 3 maxRetryTime: 3
maxCpuUsage: 95.0 maxCpuUsage: 95.0
maxMemoryUsage: 95.0 maxMemoryUsage: 95.0
ffmpeg: ffmpeg:
timeout: 5000 timeout: 5000 # millisecond
toHls: toHls:
base: base:
encodeCodec: h264 encodeCodec: h264
@ -16,8 +17,12 @@ ffmpeg:
bitrate: 1M bitrate: 1M
vfScale: -1:640 vfScale: -1:640
gopSize: 3 gopSize: 3
HlsRoot: hls keepalive:
HlsName: trans enable: true
pollInterval: 5 # minute
checkInInterval: 1 # minute
hlsRoot: hls
hlsName: trans
hlsTime: 3.0 hlsTime: 3.0
hlsListSize: 3 hlsListSize: 3
hlsWarp: 15 hlsWarp: 15

14
internal/app/ffmpeg/baseService.go

@ -1,24 +1,12 @@
package ffmpeg package ffmpeg
import ( import (
"go.uber.org/zap"
"ycmediakit/internal/pkg/global"
"ycmediakit/internal/pkg/result"
"ycmediakit/internal/pkg/server/ffmpegServer" "ycmediakit/internal/pkg/server/ffmpegServer"
"ycmediakit/internal/pkg/unit/result"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
var (
cfg *ffmpegServer.FfmpegConfig
log *zap.Logger
)
func init() {
cfg = &global.AppConfig.Ffmpeg
log = zap.L()
}
func ProbeStream(c *gin.Context) { func ProbeStream(c *gin.Context) {
target := c.Query("target") target := c.Query("target")
target, ok := ffmpegServer.PrepareUrl(target) target, ok := ffmpegServer.PrepareUrl(target)

21
internal/app/ffmpeg/index.go

@ -0,0 +1,21 @@
package ffmpeg
import (
"go.uber.org/zap"
"ycmediakit/internal/pkg/global"
"ycmediakit/internal/pkg/server/ffmpegServer"
)
var (
cfg *ffmpegServer.FfmpegConfig
log *zap.Logger
toHlsServer *ffmpegServer.ToHlsServer
)
func init() {
cfg = &global.AppConfig.Ffmpeg
log = zap.L()
toHlsServer = ffmpegServer.NewToHlsServer(&cfg.ToHls)
global.ToHlsServer = toHlsServer
}

12
internal/app/ffmpeg/toHlsService.go

@ -1,22 +1,12 @@
package ffmpeg package ffmpeg
import ( import (
"ycmediakit/internal/pkg/global"
"ycmediakit/internal/pkg/result"
"ycmediakit/internal/pkg/server/ffmpegServer" "ycmediakit/internal/pkg/server/ffmpegServer"
"ycmediakit/internal/pkg/unit/result"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
var (
toHlsServer *ffmpegServer.ToHlsServer
)
func init() {
toHlsServer = ffmpegServer.NewToHlsServer(&cfg.ToHls)
global.ToHlsServer = toHlsServer
}
func StartToHls(c *gin.Context) { func StartToHls(c *gin.Context) {
target := c.Query("target") target := c.Query("target")
streamPath := c.Query("streamPath") streamPath := c.Query("streamPath")

17
internal/pkg/config/config.go

@ -1,20 +1,21 @@
package config package config
import ( import (
"github.com/spf13/viper"
"log" "log"
"path/filepath" "path/filepath"
"strings" "strings"
"ycmediakit/internal/pkg/logger"
"ycmediakit/internal/pkg/server/ffmpegServer" "ycmediakit/internal/pkg/server/ffmpegServer"
"ycmediakit/internal/pkg/server/systemServer" "ycmediakit/internal/pkg/server/systemServer"
"ycmediakit/internal/pkg/unit/logger"
"ycmediakit/internal/pkg/util" "ycmediakit/internal/pkg/util"
"github.com/spf13/viper"
) )
type RunConfig struct { type RunConfig struct {
Debug bool `` Debug bool ``
Type string `` Type string ``
Log logger.LogConfig Log logger.Config
} }
type AppConfig struct { type AppConfig struct {
@ -42,17 +43,17 @@ func NewRunConfig() *RunConfig {
return &cfg return &cfg
} }
func NewAppConfig(runConfig *RunConfig) *AppConfig { func NewAppConfig(cfg *RunConfig) *AppConfig {
v := viper.New() v := viper.New()
v.AddConfigPath(filepath.Join(util.GetWorkPath(), "config")) v.AddConfigPath(filepath.Join(util.GetWorkPath(), "config"))
v.SetConfigName(strings.Join([]string{"config", ".", runConfig.Type}, "")) v.SetConfigName(strings.Join([]string{"config", ".", cfg.Type}, ""))
v.SetConfigType("yaml") v.SetConfigType("yaml")
if err := v.ReadInConfig(); err != nil { if err := v.ReadInConfig(); err != nil {
log.Fatalf("read config failed: %v", err) log.Fatalf("read config failed: %v", err)
} }
var cfg AppConfig var appCfg AppConfig
if err := v.Unmarshal(&cfg); err != nil { if err := v.Unmarshal(&appCfg); err != nil {
log.Fatalf("read config failed: %v", err) log.Fatalf("read config failed: %v", err)
} }
return &cfg return &appCfg
} }

4
internal/pkg/global/global.go

@ -2,12 +2,12 @@ package global
import ( import (
"fmt" "fmt"
"github.com/go-playground/validator/v10"
"ycmediakit/internal/pkg/config" "ycmediakit/internal/pkg/config"
"ycmediakit/internal/pkg/logger"
"ycmediakit/internal/pkg/server/ffmpegServer" "ycmediakit/internal/pkg/server/ffmpegServer"
"ycmediakit/internal/pkg/server/systemServer" "ycmediakit/internal/pkg/server/systemServer"
"ycmediakit/internal/pkg/unit/logger"
"github.com/go-playground/validator/v10"
"go.uber.org/zap" "go.uber.org/zap"
) )

8
internal/pkg/server/ffmpegServer/baseCmd.go

@ -5,6 +5,7 @@ import (
"errors" "errors"
ffmpegCmd "github.com/u2takey/ffmpeg-go" ffmpegCmd "github.com/u2takey/ffmpeg-go"
"os/exec" "os/exec"
"path"
"path/filepath" "path/filepath"
"strings" "strings"
) )
@ -26,10 +27,15 @@ func PrepareStreamPath(streamPath string) (string, bool) {
if len(streamArr) != 2 { if len(streamArr) != 2 {
return "", false return "", false
} }
streamPath = filepath.Join(streamArr...) streamPath = path.Join(streamArr...)
return streamPath, true return streamPath, true
} }
func BuildStreamFilePath(streamPath string) string {
streamArr := strings.Split(streamPath, "/")
return filepath.Join(streamArr...)
}
func ProbeWithTimeout(url string, timeoutMs int) error { func ProbeWithTimeout(url string, timeoutMs int) error {
args := ffmpegCmd.ConvertKwargsToCmdLineArgs(ffmpegCmd.KwArgs{ args := ffmpegCmd.ConvertKwargsToCmdLineArgs(ffmpegCmd.KwArgs{
"i": url, "i": url,

3
internal/pkg/server/ffmpegServer/config.go

@ -1,5 +1,7 @@
package ffmpegServer package ffmpegServer
import "ycmediakit/internal/pkg/unit/keepalive"
type FfmpegConfig struct { type FfmpegConfig struct {
Timeout int Timeout int
ToHls ToHlsConfig ToHls ToHlsConfig
@ -18,6 +20,7 @@ type CmdBaseConfig struct {
type ToHlsConfig struct { type ToHlsConfig struct {
CmdBaseConfig `mapstructure:"base"` CmdBaseConfig `mapstructure:"base"`
Keepalive keepalive.Config
HlsRoot string HlsRoot string
HlsName string HlsName string
HlsTime string HlsTime string

120
internal/pkg/server/ffmpegServer/toHlsCmd.go

@ -4,10 +4,12 @@ import (
"errors" "errors"
"fmt" "fmt"
"os/exec" "os/exec"
"path"
"path/filepath" "path/filepath"
"sort" "sort"
"strings" "strings"
"sync" "sync"
"ycmediakit/internal/pkg/unit/keepalive"
"ycmediakit/internal/pkg/util" "ycmediakit/internal/pkg/util"
ffmpegCmd "github.com/u2takey/ffmpeg-go" ffmpegCmd "github.com/u2takey/ffmpeg-go"
@ -23,9 +25,10 @@ func init() {
} }
type ToHlsServer struct { type ToHlsServer struct {
cfg *ToHlsConfig Cfg *ToHlsConfig
m sync.Mutex m sync.Mutex
cmdMap map[string]*ToHlsInfo cmdMap map[string]*ToHlsInfo
Supervisor *keepalive.Supervisor
} }
type ToHlsInfo struct { type ToHlsInfo struct {
@ -35,92 +38,98 @@ type ToHlsInfo struct {
cmd *exec.Cmd cmd *exec.Cmd
} }
func NewToHlsServer(hlsCfg *ToHlsConfig) *ToHlsServer { func NewToHlsServer(cfg *ToHlsConfig) *ToHlsServer {
return &ToHlsServer{ s := &ToHlsServer{
cfg: hlsCfg, Cfg: cfg,
cmdMap: make(map[string]*ToHlsInfo), cmdMap: make(map[string]*ToHlsInfo),
} }
if s.Cfg.Keepalive.Enable {
s.Supervisor = keepalive.NewSupervisor(&s.Cfg.Keepalive)
handlePath = "/" + cfg.HlsRoot
s.Supervisor.Register(handlePath, s)
}
return s
} }
func (server *ToHlsServer) Add(url string, streamPath string, timeoutMs int, curHlsCfg *ToHlsConfig) bool { func (s *ToHlsServer) Add(url string, streamPath string, timeoutMs int, curHlsCfg *ToHlsConfig) bool {
server.m.Lock() s.m.Lock()
defer server.m.Unlock() defer s.m.Unlock()
_, ok := server.cmdMap[streamPath] _, ok := s.cmdMap[streamPath]
if ok { if ok {
return true return true
} }
// merge curHlsCfg, cfg // merge curHlsCfg, Cfg
cmd := server.BuildToHlsCmd(url, streamPath, timeoutMs, curHlsCfg) cmd := s.BuildToHlsCmd(url, streamPath, timeoutMs, curHlsCfg)
info := &ToHlsInfo{ info := &ToHlsInfo{
ToHlsConfig: *curHlsCfg, ToHlsConfig: *curHlsCfg,
url: url, url: url,
streamPath: streamPath, streamPath: streamPath,
cmd: cmd, cmd: cmd,
} }
server.cmdMap[streamPath] = info s.cmdMap[streamPath] = info
return true return true
} }
func (server *ToHlsServer) Exists(streamPath string) bool { func (s *ToHlsServer) Exists(streamPath string) bool {
server.m.Lock() s.m.Lock()
defer server.m.Unlock() defer s.m.Unlock()
_, ok := server.cmdMap[streamPath] _, ok := s.cmdMap[streamPath]
return ok return ok
} }
func (server *ToHlsServer) Delete(streamPath string) bool { func (s *ToHlsServer) Delete(streamPath string) bool {
server.m.Lock() s.m.Lock()
defer server.m.Unlock() defer s.m.Unlock()
delete(server.cmdMap, streamPath) delete(s.cmdMap, streamPath)
path := filepath.Join(util.GetWorkPath(), server.cfg.HlsRoot, streamPath, "..") dir := filepath.Join(util.GetWorkPath(), s.Cfg.HlsRoot, BuildStreamFilePath(streamPath), "..")
err := util.RemoveDir(path) err := util.RemoveDir(dir)
if err != nil { if err != nil {
log.Error(err.Error()) log.Error(err.Error())
} }
return true return true
} }
func (server *ToHlsServer) GetList() []string { func (s *ToHlsServer) GetList() []string {
server.m.Lock() s.m.Lock()
defer server.m.Unlock() defer s.m.Unlock()
var slice []string var slice []string
for _, elm := range server.cmdMap { for _, elm := range s.cmdMap {
slice = append(slice, elm.streamPath) slice = append(slice, elm.streamPath)
} }
sort.Strings(slice) sort.Strings(slice)
return slice return slice
} }
func (server *ToHlsServer) DeleteAndStop(streamPath string) error { func (s *ToHlsServer) DeleteAndStop(streamPath string) error {
server.m.Lock() s.m.Lock()
defer server.m.Unlock() defer s.m.Unlock()
delete(server.cmdMap, streamPath) delete(s.cmdMap, streamPath)
_, err := server.StopToHlsCmd(streamPath) _, err := s.StopToHlsCmd(streamPath)
if err != nil { if err != nil {
return err return err
} }
path := filepath.Join(util.GetWorkPath(), server.cfg.HlsRoot, streamPath) dir := filepath.Join(util.GetWorkPath(), s.Cfg.HlsRoot, BuildStreamFilePath(streamPath))
err = util.RemoveDir(path) err = util.RemoveDir(dir)
if err != nil { if err != nil {
log.Error(err.Error()) log.Error(err.Error())
} }
return nil return nil
} }
func (server *ToHlsServer) BuildToHlsCmd(url string, streamPath string, timeoutMs int, hlsCfg *ToHlsConfig) *exec.Cmd { func (s *ToHlsServer) BuildToHlsCmd(url string, streamPath string, timeoutMs int, hlsCfg *ToHlsConfig) *exec.Cmd {
path := filepath.Join(util.GetWorkPath(), server.cfg.HlsRoot, streamPath) dir := filepath.Join(util.GetWorkPath(), s.Cfg.HlsRoot, BuildStreamFilePath(streamPath))
_, err := util.CheckDir(path) _, err := util.CheckDir(dir)
if err != nil { if err != nil {
log.Error(err.Error()) log.Error(err.Error())
} }
path = filepath.Join(path, hlsCfg.HlsName+".m3u8") p := filepath.Join(dir, hlsCfg.HlsName+".m3u8")
buffer := new(strings.Builder) buffer := new(strings.Builder)
cmd := ffmpegCmd. cmd := ffmpegCmd.
Input(url, ffmpegCmd.KwArgs{ Input(url, ffmpegCmd.KwArgs{
"stimeout": timeoutMs * 1000, "stimeout": timeoutMs * 1000,
"loglevel": "error", "loglevel": "error",
}). }).
Output(path, ffmpegCmd.KwArgs{ Output(p, ffmpegCmd.KwArgs{
"c:v": hlsCfg.EncodeCodec, "c:v": hlsCfg.EncodeCodec,
"profile:v": hlsCfg.Profile, "profile:v": hlsCfg.Profile,
"vf": util.Merge("scale=", hlsCfg.VfScale), "vf": util.Merge("scale=", hlsCfg.VfScale),
@ -134,11 +143,13 @@ func (server *ToHlsServer) BuildToHlsCmd(url string, streamPath string, timeoutM
return cmd return cmd
} }
func (server *ToHlsServer) RunToHlsCmd(streamPath string) (bool, error) { func (s *ToHlsServer) RunToHlsCmd(streamPath string) (bool, error) {
info, ok := server.cmdMap[streamPath] info, ok := s.cmdMap[streamPath]
if !ok { if !ok {
return false, nil return false, nil
} }
s.Supervisor.Activate(streamPath)
defer s.Supervisor.DeActivate(streamPath)
err := info.cmd.Run() err := info.cmd.Run()
if err != nil { if err != nil {
errStr := strings.TrimSpace(fmt.Sprint(info.cmd.Stderr)) errStr := strings.TrimSpace(fmt.Sprint(info.cmd.Stderr))
@ -147,8 +158,8 @@ func (server *ToHlsServer) RunToHlsCmd(streamPath string) (bool, error) {
return true, nil return true, nil
} }
func (server *ToHlsServer) StopToHlsCmd(streamPath string) (bool, error) { func (s *ToHlsServer) StopToHlsCmd(streamPath string) (bool, error) {
info, ok := server.cmdMap[streamPath] info, ok := s.cmdMap[streamPath]
if !ok { if !ok {
return true, nil return true, nil
} }
@ -159,6 +170,29 @@ func (server *ToHlsServer) StopToHlsCmd(streamPath string) (bool, error) {
return true, nil return true, nil
} }
func (server *ToHlsServer) BuildHlsPath(streamPath string) string { func (s *ToHlsServer) BuildHlsPath(streamPath string) string {
return filepath.Join(server.cfg.HlsRoot, streamPath, server.cfg.HlsName+".m3u8") return path.Join("/"+s.Cfg.HlsRoot, streamPath, s.Cfg.HlsName+".m3u8")
}
var (
handlePath string
)
func (s *ToHlsServer) CreateKey(oriUrl string) (string, bool) {
url := oriUrl
paramArr := strings.Split(url, "/")
if len(paramArr) != 5 || paramArr[1] != s.Cfg.HlsRoot {
return "", false
}
streamPath := paramArr[2] + "/" + paramArr[3]
return streamPath, true
}
func (s *ToHlsServer) Expire(key string) bool {
_, ok := s.cmdMap[key]
if !ok {
return false
}
_ = s.DeleteAndStop(key)
return true
} }

1
internal/pkg/server/systemServer/config.go

@ -5,6 +5,7 @@ type SystemConfig struct {
} }
type PayloadConfig struct { type PayloadConfig struct {
Enable bool
MaxRetryTime int `validate:"min=1,max=5"` MaxRetryTime int `validate:"min=1,max=5"`
MaxCpuUsage float32 `validate:"min=60,max=100"` MaxCpuUsage float32 `validate:"min=60,max=100"`
MaxMemoryUsage float32 `validate:"min=60,max=100"` MaxMemoryUsage float32 `validate:"min=60,max=100"`

6
internal/pkg/server/systemServer/payload.go

@ -26,12 +26,14 @@ type Payload struct {
} }
} }
func NewPayloadServer(plCfg *PayloadConfig) *PayloadServer { func NewPayloadServer(cfg *PayloadConfig) *PayloadServer {
s := &PayloadServer{ s := &PayloadServer{
Cfg: plCfg, Cfg: cfg,
ref: util.NewChannel(), ref: util.NewChannel(),
} }
if s.Cfg.Enable {
go s.start() go s.start()
}
return s return s
} }

7
internal/pkg/unit/keepalive/config.go

@ -0,0 +1,7 @@
package keepalive
type Config struct {
Enable bool
PollInterval int `validate:"min=1,max=60"`
CheckInInterval int `validate:"min=1,max=30"`
}

91
internal/pkg/unit/keepalive/keepalive.go

@ -0,0 +1,91 @@
package keepalive
import (
"log"
"time"
"ycmediakit/internal/pkg/util"
)
type Supervisor struct {
Cfg *Config
HandlePath string
guarder Guarder
fastMap *util.RwMap[string, time.Time]
}
type Guarder interface {
CreateKey(string) (string, bool)
Expire(string) bool
}
func NewSupervisor(cfg *Config) *Supervisor {
s := &Supervisor{
Cfg: cfg,
fastMap: util.NewRwMap[string, time.Time](),
}
if s.Cfg.Enable {
go s.start()
}
return s
}
func (s *Supervisor) start() {
i := time.Duration(s.Cfg.PollInterval) * time.Minute
for range time.Tick(i) {
go s.inspect()
}
}
func (s *Supervisor) inspect() {
g := s.guarder
if g == nil {
return
}
nowTime := time.Now()
pollTime := time.Duration(s.Cfg.PollInterval) * time.Minute
s.fastMap.ModifyRange(func(key string, value time.Time) {
if nowTime.Sub(value) >= pollTime {
g.Expire(key)
delete(s.fastMap.Map, key)
}
})
}
func (s *Supervisor) Register(handlePath string, i interface{}) {
g, ok := i.(Guarder)
if !ok {
log.Panicf("%v is not implements Guarder\n", i)
}
if s.guarder == nil {
s.HandlePath = handlePath
s.guarder = g
}
}
func (s *Supervisor) CheckIn(oriUrl string) {
g := s.guarder
if g == nil {
return
}
key, ok := g.CreateKey(oriUrl)
if !ok {
return
}
oldTime, ok := s.fastMap.Get(key)
newTime := time.Now()
if ok && newTime.Sub(oldTime) < time.Duration(s.Cfg.CheckInInterval)*time.Minute {
return
}
s.Activate(key)
}
func (s *Supervisor) Activate(key string) {
s.fastMap.Set(key, time.Now())
}
func (s *Supervisor) DeActivate(key string) {
if !s.fastMap.Has(key) {
return
}
s.fastMap.Delete(key)
}

2
internal/pkg/logger/config.go → internal/pkg/unit/logger/config.go

@ -1,6 +1,6 @@
package logger package logger
type LogConfig struct { type Config struct {
Level string // Level 最低日志等级,DEBUG<INFO<WARN<ERROR<FATAL Level string // Level 最低日志等级,DEBUG<INFO<WARN<ERROR<FATAL
FilePath string // FilePath 日志文件位置 FilePath string // FilePath 日志文件位置
Separate bool // Separate 日志是否分隔 Separate bool // Separate 日志是否分隔

45
internal/pkg/logger/logger.go → internal/pkg/unit/logger/logger.go

@ -1,14 +1,15 @@
package logger package logger
import ( import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"gopkg.in/natefinch/lumberjack.v2"
"log" "log"
"os" "os"
"path/filepath" "path/filepath"
"strconv" "strconv"
"time" "time"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"gopkg.in/natefinch/lumberjack.v2"
) )
// GetEncoder 负责设置 encoding 的日志格式 // GetEncoder 负责设置 encoding 的日志格式
@ -30,39 +31,39 @@ func GetEncoder(color bool) zapcore.Encoder {
return zapcore.NewConsoleEncoder(encodeConfig) return zapcore.NewConsoleEncoder(encodeConfig)
} }
func NewCore(lCfg *LogConfig) zapcore.Core { func NewCore(cfg *Config) zapcore.Core {
encoder := GetEncoder(false) encoder := GetEncoder(false)
now := strconv.FormatInt(time.Now().Unix(), 10) now := strconv.FormatInt(time.Now().Unix(), 10)
fileName := filepath.Join(lCfg.FilePath, now+".log") fileName := filepath.Join(cfg.FilePath, now+".log")
ljLogger := &lumberjack.Logger{ ljLogger := &lumberjack.Logger{
Filename: fileName, // 文件位置 Filename: fileName, // 文件位置
MaxSize: lCfg.MaxSize, // 进行切割之前,日志文件的最大大小(MB为单位) MaxSize: cfg.MaxSize, // 进行切割之前,日志文件的最大大小(MB为单位)
MaxAge: lCfg.MaxAge, // 保留旧文件的最大天数 MaxAge: cfg.MaxAge, // 保留旧文件的最大天数
MaxBackups: lCfg.MaxBackups, // 保留旧文件的最大个数 MaxBackups: cfg.MaxBackups, // 保留旧文件的最大个数
Compress: lCfg.Compress, // 是否压缩/归档旧文件 Compress: cfg.Compress, // 是否压缩/归档旧文件
} }
level, err := zapcore.ParseLevel(lCfg.Level) level, err := zapcore.ParseLevel(cfg.Level)
if err != nil { if err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
return zapcore.NewCore(encoder, zapcore.AddSync(ljLogger), level) return zapcore.NewCore(encoder, zapcore.AddSync(ljLogger), level)
} }
func NewMultiCore(lCfg *LogConfig) zapcore.Core { func NewMultiCore(cfg *Config) zapcore.Core {
now := strconv.FormatInt(time.Now().Unix(), 10) now := strconv.FormatInt(time.Now().Unix(), 10)
level, err := zapcore.ParseLevel(lCfg.Level) level, err := zapcore.ParseLevel(cfg.Level)
if err != nil { if err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
infoEncoder := GetEncoder(false) infoEncoder := GetEncoder(false)
infoFileName := filepath.Join(lCfg.FilePath, now+".log") infoFileName := filepath.Join(cfg.FilePath, now+".log")
infoLogger := &lumberjack.Logger{ infoLogger := &lumberjack.Logger{
Filename: infoFileName, // 文件位置 Filename: infoFileName, // 文件位置
MaxSize: lCfg.MaxSize, // 进行切割之前,日志文件的最大大小(MB为单位) MaxSize: cfg.MaxSize, // 进行切割之前,日志文件的最大大小(MB为单位)
MaxAge: lCfg.MaxAge, // 保留旧文件的最大天数 MaxAge: cfg.MaxAge, // 保留旧文件的最大天数
MaxBackups: lCfg.MaxBackups, // 保留旧文件的最大个数 MaxBackups: cfg.MaxBackups, // 保留旧文件的最大个数
Compress: lCfg.Compress, // 是否压缩/归档旧文件 Compress: cfg.Compress, // 是否压缩/归档旧文件
} }
infoPrior := zap.LevelEnablerFunc(func(lvl zapcore.Level) bool { infoPrior := zap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
return lvl >= level && lvl < zap.ErrorLevel && lvl >= zap.DebugLevel return lvl >= level && lvl < zap.ErrorLevel && lvl >= zap.DebugLevel
@ -70,13 +71,13 @@ func NewMultiCore(lCfg *LogConfig) zapcore.Core {
infoCore := zapcore.NewCore(infoEncoder, zapcore.AddSync(infoLogger), infoPrior) infoCore := zapcore.NewCore(infoEncoder, zapcore.AddSync(infoLogger), infoPrior)
errEncoder := GetEncoder(true) errEncoder := GetEncoder(true)
errFileName := filepath.Join(lCfg.FilePath, now+"-error.log") errFileName := filepath.Join(cfg.FilePath, now+"-error.log")
errLogger := &lumberjack.Logger{ errLogger := &lumberjack.Logger{
Filename: errFileName, // 文件位置 Filename: errFileName, // 文件位置
MaxSize: lCfg.MaxSize, // 进行切割之前,日志文件的最大大小(MB为单位) MaxSize: cfg.MaxSize, // 进行切割之前,日志文件的最大大小(MB为单位)
MaxAge: lCfg.MaxAge, // 保留旧文件的最大天数 MaxAge: cfg.MaxAge, // 保留旧文件的最大天数
MaxBackups: lCfg.MaxBackups, // 保留旧文件的最大个数 MaxBackups: cfg.MaxBackups, // 保留旧文件的最大个数
Compress: lCfg.Compress, // 是否压缩/归档旧文件 Compress: cfg.Compress, // 是否压缩/归档旧文件
} }
errPrior := zap.LevelEnablerFunc(func(lvl zapcore.Level) bool { errPrior := zap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
return lvl >= level && lvl >= zap.ErrorLevel return lvl >= level && lvl >= zap.ErrorLevel

0
internal/pkg/result/result.go → internal/pkg/unit/result/result.go

106
internal/pkg/util/RwMap.go

@ -0,0 +1,106 @@
package util
import (
"sync"
"time"
)
type RwMap[K comparable, V any] struct {
sync.RWMutex
Map map[K]V
}
func NewRwMap[K comparable, V any]() *RwMap[K, V] {
return &RwMap[K, V]{
Map: make(map[K]V),
}
}
func (m *RwMap[K, V]) Add(k K, v V) bool {
m.Lock()
defer m.Unlock()
if _, ok := m.Map[k]; ok {
return false
}
m.Map[k] = v
return true
}
func (m *RwMap[K, V]) Set(k K, v V) {
m.Lock()
m.Map[k] = v
m.Unlock()
}
func (m *RwMap[K, V]) Has(k K) (ok bool) {
m.RLock()
defer m.RUnlock()
_, ok = m.Map[k]
return
}
func (m *RwMap[K, V]) Len() int {
return len(m.Map)
}
func (m *RwMap[K, V]) Get(k K) (V, bool) {
m.RLock()
defer m.RUnlock()
v, ok := m.Map[k]
return v, ok
}
func (m *RwMap[K, V]) Delete(k K) (v V, ok bool) {
m.RLock()
v, ok = m.Map[k]
m.RUnlock()
if ok {
m.Lock()
delete(m.Map, k)
m.Unlock()
}
return
}
func (m *RwMap[K, V]) ToList() (r []V) {
m.RLock()
defer m.RUnlock()
for _, s := range m.Map {
r = append(r, s)
}
return
}
func MapList[K comparable, V any, R any](m *RwMap[K, V], f func(K, V) R) (r []R) {
m.RLock()
defer m.RUnlock()
for k, v := range m.Map {
r = append(r, f(k, v))
}
return
}
func (m *RwMap[K, V]) Range(f func(K, V)) {
m.RLock()
defer m.RUnlock()
for k, v := range m.Map {
f(k, v)
}
}
func (m *RwMap[K, V]) RangeInterval(f func(K, V), d time.Duration) {
m.RLock()
defer m.RUnlock()
for k, v := range m.Map {
f(k, v)
time.Sleep(d)
}
}
func (m *RwMap[K, V]) ModifyRange(f func(K, V)) {
m.Lock()
defer m.Unlock()
for k, v := range m.Map {
f(k, v)
}
}

13
middleware/interceptor.go

@ -4,20 +4,27 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"strconv" "strconv"
"strings" "strings"
"ycmediakit/internal/pkg/result"
"ycmediakit/internal/pkg/server/systemServer" "ycmediakit/internal/pkg/server/systemServer"
"ycmediakit/internal/pkg/unit/keepalive"
"ycmediakit/internal/pkg/unit/result"
) )
// HttpInterceptor 可自定义鉴权等操作 // HttpInterceptor 可自定义鉴权等操作
// TODO
func HttpInterceptor() gin.HandlerFunc { func HttpInterceptor() gin.HandlerFunc {
return func(c *gin.Context) { return func(c *gin.Context) {
c.Next() c.Next()
} }
} }
// KeepAliveToHlsInterceptor 保活 toHls 服务
func KeepAliveToHlsInterceptor(kas *keepalive.Supervisor) gin.HandlerFunc {
return func(c *gin.Context) {
kas.CheckIn(c.Request.RequestURI)
c.Next()
}
}
// ProbePayloadInterceptor 探测 ffmpeg 命令服务器是否有资源得以运行 // ProbePayloadInterceptor 探测 ffmpeg 命令服务器是否有资源得以运行
// TODO
func ProbePayloadInterceptor(ps *systemServer.PayloadServer) gin.HandlerFunc { func ProbePayloadInterceptor(ps *systemServer.PayloadServer) gin.HandlerFunc {
return func(c *gin.Context) { return func(c *gin.Context) {
cfg := ps.Cfg cfg := ps.Cfg

Loading…
Cancel
Save