qiyun996
3 years ago
4 changed files with 230 additions and 1 deletions
@ -0,0 +1,73 @@ |
|||
import cv2 |
|||
import time |
|||
import subprocess as sp |
|||
import multiprocessing |
|||
|
|||
|
|||
class stream_pusher(object): |
|||
def __init__(self, rtmp_url=None, raw_frame_q=None): # 类实例化的时候传入rtmp地址和帧传入队列 |
|||
self.rtmp_url = rtmp_url |
|||
self.raw_frame_q = raw_frame_q |
|||
|
|||
fps = 20 # 设置帧速率 |
|||
# 设置分辨率 |
|||
width = 1920 # 宽 |
|||
height = 1080 # 高 |
|||
|
|||
# 设置FFmpeg命令文本 |
|||
self.command = ['ffmpeg', |
|||
'-y', |
|||
'-f', 'rawvideo', |
|||
'-vcodec', 'rawvideo', |
|||
'-pix_fmt', 'bgr24', |
|||
'-s', "{}x{}".format(width, height), |
|||
'-r', str(fps), |
|||
'-i', '-', |
|||
'-c:v', 'libx264', |
|||
'-pix_fmt', 'yuv420p', |
|||
'-preset', 'ultrafast', |
|||
'-f', 'flv', |
|||
self.rtmp_url] |
|||
|
|||
# 向服务器推送 |
|||
def push_frame(self): |
|||
|
|||
# 配置向os传递命令的管道 |
|||
p = sp.Popen(self.command, stdin=sp.PIPE) |
|||
|
|||
while True: |
|||
if not self.raw_frame_q.empty(): # 如果输入管道不为空 |
|||
# 把帧和相关信息从输入队列中取出 |
|||
frame = self.raw_frame_q.get() |
|||
|
|||
# 把内容放入管道,放入后有os自己去执行 |
|||
p.stdin.write(frame.tostring()) |
|||
else: |
|||
time.sleep(0.01) |
|||
|
|||
# 启动运行 |
|||
def run(self): |
|||
# 定义一个子进程 |
|||
push_frame_p = multiprocessing.Process(target=self.push_frame, args=()) |
|||
push_frame_p.daemon = True # 把子进程设置为daemon方式 |
|||
push_frame_p.start() # 运行子进程 |
|||
|
|||
|
|||
if __name__ == '__main__': |
|||
|
|||
cap = cv2.VideoCapture("rtsp://admin:hk123456@192.168.1.65:554") |
|||
|
|||
rtmpUrl = "rtmp://127.0.0.1:8554/video" # 用vcl等直播软件播放时,也用这个地址 |
|||
raw_q = multiprocessing.Queue() # 定义一个向推流对象传入帧及其他信息的队列 |
|||
|
|||
my_pusher = stream_pusher(rtmp_url=rtmpUrl, raw_frame_q=raw_q) # 实例化一个对象 |
|||
my_pusher.run() # 让这个对象在后台推送视频流 |
|||
while True: |
|||
_, raw_frame = cap.read() |
|||
|
|||
if not raw_q.full(): # 如果队列没满 |
|||
raw_q.put(raw_frame) # 送入队列 |
|||
if cv2.waitKey(1) == ord('q'): # q to quit |
|||
raise StopIteration |
|||
cap.release() |
|||
print('finish') |
@ -0,0 +1,45 @@ |
|||
# -*- coding:utf-8 -*- |
|||
# @Time : 2021/11/4 10:17 |
|||
# @Author : JulyLi |
|||
# @File : rtsp2.py |
|||
# @Software: PyCharm |
|||
|
|||
import cv2 |
|||
|
|||
import sys |
|||
import json |
|||
import subprocess as sp |
|||
import signal |
|||
import numpy as npbool |
|||
#此处换为你自己的地址 |
|||
rtsp_url = 'rtsp://192.168.1.182:8554/video' |
|||
cap = cv2.VideoCapture("rtsp://admin:hk123456@192.168.1.65:554") |
|||
# Get video information |
|||
fps = int(cap.get(cv2.CAP_PROP_FPS)) |
|||
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|||
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|||
command = ['ffmpeg', |
|||
'-y', |
|||
'-f', 'rawvideo', |
|||
'-vcodec', 'rawvideo', |
|||
'-pix_fmt', 'bgr24', |
|||
'-s', "{}x{}".format(width, height), |
|||
'-r', str(fps), |
|||
'-i', '-', |
|||
'-c:v', 'libx265', |
|||
'-pix_fmt', 'yuv420p', |
|||
'-preset', 'ultrafast', |
|||
'-f', 'rtsp', |
|||
rtsp_url] |
|||
p = sp.Popen(command, stdin=sp.PIPE) |
|||
|
|||
while (cap.isOpened()): |
|||
ret, frame = cap.read() |
|||
if not ret: |
|||
print("Opening camera is failed") |
|||
break |
|||
# frame = 你的图像处理的函数(frame) |
|||
p.stdin.write(frame.tostring()) |
|||
cv2.imshow('img',frame) |
|||
if cv2.waitKey(1) == ord('q'): # q to quit |
|||
raise StopIteration |
@ -0,0 +1,109 @@ |
|||
# -*- coding:utf-8 -*- |
|||
# @Time : 2021/11/4 10:17 |
|||
# @Author : JulyLi |
|||
# @File : rtsp2.py |
|||
# @Software: PyCharm |
|||
|
|||
import cv2 |
|||
import gi |
|||
import sys |
|||
import json |
|||
import time |
|||
import signal |
|||
import numpy as np |
|||
|
|||
gi.require_version('Gst', '1.0') |
|||
gi.require_version('GstRtspServer', '1.0') |
|||
from gi.repository import Gst, GstRtspServer, GObject |
|||
|
|||
#cv2.namedWindow('video_realtime_face', cv2.WINDOW_NORMAL) |
|||
|
|||
def to_node(type, message): |
|||
# convert to json and print (node helper will read from stdout) |
|||
try: |
|||
print(json.dumps({type: message})) |
|||
except Exception: |
|||
pass |
|||
# stdout has to be flushed manually to prevent delays in the node helper communication |
|||
sys.stdout.flush() |
|||
|
|||
to_node("status", "Facerecognition started...") |
|||
|
|||
def shutdown(self, signum): |
|||
to_node("status", 'Shutdown: Cleaning up camera...') |
|||
quit() |
|||
|
|||
signal.signal(signal.SIGINT, shutdown) |
|||
|
|||
|
|||
class SensorFactory(GstRtspServer.RTSPMediaFactory): |
|||
def __init__(self, **properties): |
|||
super(SensorFactory, self).__init__(**properties) |
|||
self.cap = cv2.VideoCapture("rtsp://admin:admin123@192.168.2.190:554/sub") |
|||
# self.cap = cv2.VideoCapture("shmsrc socket-path=/tmp/foo2 ! video/x-raw, format=BGR ,width=1920,height=1080,framerate=30/1 ! videoconvert ! video/x-raw, format=BGR ! appsink") |
|||
#self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920) |
|||
#self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080) |
|||
self.number_frames = 0 |
|||
self.fps = 30.0 |
|||
self.duration = 1 / self.fps * Gst.SECOND # duration of a frame in nanoseconds |
|||
self.launch_string = 'appsrc name=source is-live=true block=true format=GST_FORMAT_TIME ' \ |
|||
'caps=video/x-raw,format=BGR,width=1920,height=1080,framerate=30/1 ' \ |
|||
'! videoconvert ! video/x-raw,format=I420 ' \ |
|||
'! x264enc speed-preset=ultrafast tune=zerolatency threads=4 ' \ |
|||
'! rtph264pay config-interval=1 name=pay0 pt=96' |
|||
|
|||
def on_need_data(self, src, lenght): |
|||
if self.cap.isOpened(): |
|||
ret, frame = self.cap.read() |
|||
if ret: |
|||
#cv2.imshow("video_realtime_face", frame) |
|||
#if cv2.waitKey(1) & 0xFF == ord('q'): |
|||
# return |
|||
data = frame.tostring() |
|||
buf = Gst.Buffer.new_allocate(None, len(data), None) |
|||
buf.fill(0, data) |
|||
buf.duration = self.duration |
|||
timestamp = self.number_frames * self.duration |
|||
buf.pts = buf.dts = int(timestamp) |
|||
buf.offset = timestamp |
|||
self.number_frames += 1 |
|||
retval = src.emit('push-buffer', buf) |
|||
print('pushed buffer, frame {}, duration {} ns, durations {} s'.format(self.number_frames, |
|||
self.duration, |
|||
self.duration / Gst.SECOND)) |
|||
if retval != Gst.FlowReturn.OK: |
|||
print(retval) |
|||
|
|||
def do_create_element(self, url): |
|||
return Gst.parse_launch(self.launch_string) |
|||
|
|||
def do_configure(self, rtsp_media): |
|||
self.number_frames = 0 |
|||
appsrc = rtsp_media.get_element().get_child_by_name('source') |
|||
appsrc.connect('need-data', self.on_need_data) |
|||
|
|||
|
|||
class GstServer(GstRtspServer.RTSPServer): |
|||
def __init__(self, **properties): |
|||
super(GstServer, self).__init__(**properties) |
|||
self.factory = SensorFactory() |
|||
self.factory.set_shared(True) |
|||
self.get_mount_points().add_factory("/test", self.factory) |
|||
self.attach(None) |
|||
|
|||
|
|||
def run(): |
|||
GObject.threads_init() |
|||
Gst.init(None) |
|||
|
|||
server = GstServer() |
|||
rtsp_port_num = 8554 |
|||
print("\n *** DeepStream: Launched RTSP Streaming at rtsp://localhost:%d/test ***\n\n" % rtsp_port_num) |
|||
loop = GObject.MainLoop() |
|||
loop.run() |
|||
|
|||
|
|||
|
|||
if __name__ == "__main__": |
|||
run() |
|||
|
Loading…
Reference in new issue