### python代码 ``` import cv2 import time import threading # 优化配置 BUFFER_SIZE = 1 # 最小缓冲区 FPS_TARGET = 60 # 目标帧率 FRAME_TIME = 1.0 / FPS_TARGET # 每帧时间 class VideoStreamReceiver: def __init__(self, url): self.url = url self.cap = None self.latest_frame = None self.frame_lock = threading.Lock() self.running = False def connect(self): # 使用性能优化参数 self.cap = cv2.VideoCapture(self.url) if not self.cap.isOpened(): print("无法连接到视频流") return False # 设置视频捕获参数以降低延迟 self.cap.set(cv2.CAP_PROP_BUFFERSIZE, BUFFER_SIZE) self.cap.set(cv2.CAP_PROP_FPS, FPS_TARGET) print("连接成功!正在接收视频流...") return True def read_frames(self): """后台线程持续读取帧""" while self.running: ret, frame = self.cap.read() if ret: with self.frame_lock: self.latest_frame = frame else: print("读取失败,重试中...") time.sleep(0.1) def start(self): if not self.connect(): return False self.running = True self.read_thread = threading.Thread(target=self.read_frames, daemon=True) self.read_thread.start() return True def get_latest_frame(self): """获取最新帧""" with self.frame_lock: return self.latest_frame.copy() if self.latest_frame is not None else None def stop(self): self.running = False if self.cap: self.cap.release() def main(): url = 'udp://127.0.0.1:12345' receiver = VideoStreamReceiver(url) if not receiver.start(): return last_frame_time = time.time() frame_count = 0 try: while True: # 计算帧时间 current_time = time.time() elapsed = current_time - last_frame_time # 帧率控制 - 只在需要时显示新帧 if elapsed >= FRAME_TIME: frame = receiver.get_latest_frame() if frame is not None: cv2.imshow("MJPEG Stream", frame) frame_count += 1 # 显示实际FPS(每秒更新一次) if frame_count % 60 == 0: actual_fps = 60 / (current_time - last_frame_time + FRAME_TIME * 59) print(f"实际FPS: {actual_fps:.1f}") last_frame_time = current_time # 检查退出键 if cv2.waitKey(1) & 0xFF == 27: # ESC退出 break else: # 短暂休眠以减少CPU使用 time.sleep(0.001) except KeyboardInterrupt: print("程序被中断") finally: receiver.stop() cv2.destroyAllWindows() if __name__ == "__main__": main() ``` ### obs设置  Loading... ### python代码 ``` import cv2 import time import threading # 优化配置 BUFFER_SIZE = 1 # 最小缓冲区 FPS_TARGET = 60 # 目标帧率 FRAME_TIME = 1.0 / FPS_TARGET # 每帧时间 class VideoStreamReceiver: def __init__(self, url): self.url = url self.cap = None self.latest_frame = None self.frame_lock = threading.Lock() self.running = False def connect(self): # 使用性能优化参数 self.cap = cv2.VideoCapture(self.url) if not self.cap.isOpened(): print("无法连接到视频流") return False # 设置视频捕获参数以降低延迟 self.cap.set(cv2.CAP_PROP_BUFFERSIZE, BUFFER_SIZE) self.cap.set(cv2.CAP_PROP_FPS, FPS_TARGET) print("连接成功!正在接收视频流...") return True def read_frames(self): """后台线程持续读取帧""" while self.running: ret, frame = self.cap.read() if ret: with self.frame_lock: self.latest_frame = frame else: print("读取失败,重试中...") time.sleep(0.1) def start(self): if not self.connect(): return False self.running = True self.read_thread = threading.Thread(target=self.read_frames, daemon=True) self.read_thread.start() return True def get_latest_frame(self): """获取最新帧""" with self.frame_lock: return self.latest_frame.copy() if self.latest_frame is not None else None def stop(self): self.running = False if self.cap: self.cap.release() def main(): url = 'udp://127.0.0.1:12345' receiver = VideoStreamReceiver(url) if not receiver.start(): return last_frame_time = time.time() frame_count = 0 try: while True: # 计算帧时间 current_time = time.time() elapsed = current_time - last_frame_time # 帧率控制 - 只在需要时显示新帧 if elapsed >= FRAME_TIME: frame = receiver.get_latest_frame() if frame is not None: cv2.imshow("MJPEG Stream", frame) frame_count += 1 # 显示实际FPS(每秒更新一次) if frame_count % 60 == 0: actual_fps = 60 / (current_time - last_frame_time + FRAME_TIME * 59) print(f"实际FPS: {actual_fps:.1f}") last_frame_time = current_time # 检查退出键 if cv2.waitKey(1) & 0xFF == 27: # ESC退出 break else: # 短暂休眠以减少CPU使用 time.sleep(0.001) except KeyboardInterrupt: print("程序被中断") finally: receiver.stop() cv2.destroyAllWindows() if __name__ == "__main__": main() ``` ### obs设置  最后修改:2025 年 09 月 22 日 © 允许规范转载 打赏 赞赏作者 赞 如果觉得我的文章对你有用,请随意赞赏