carrot/screen_server.py
2025-03-31 13:56:26 +08:00

232 lines
6.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import cv2
import numpy as np
from flask import Flask, Response
from flask_socketio import SocketIO
import threading
import time
import io
import os
import struct
import fcntl
import array
import ctypes
import subprocess
app = Flask(__name__)
socketio = SocketIO(app, cors_allowed_origins="*")
# Framebuffer ioctl constants
FBIOGET_VSCREENINFO = 0x4600
FBIOGET_FSCREENINFO = 0x4602
class FrameBufferInfo(ctypes.Structure):
_fields_ = [
('xres', ctypes.c_uint32),
('yres', ctypes.c_uint32),
('xres_virtual', ctypes.c_uint32),
('yres_virtual', ctypes.c_uint32),
('xoffset', ctypes.c_uint32),
('yoffset', ctypes.c_uint32),
('bits_per_pixel', ctypes.c_uint32),
('grayscale', ctypes.c_uint32),
('red', ctypes.c_uint32 * 3),
('green', ctypes.c_uint32 * 3),
('blue', ctypes.c_uint32 * 3),
('transp', ctypes.c_uint32 * 3),
('nonstd', ctypes.c_uint32),
('activate', ctypes.c_uint32),
('height', ctypes.c_uint32),
('width', ctypes.c_uint32),
('accel_flags', ctypes.c_uint32),
('pixclock', ctypes.c_uint32),
('left_margin', ctypes.c_uint32),
('right_margin', ctypes.c_uint32),
('upper_margin', ctypes.c_uint32),
('lower_margin', ctypes.c_uint32),
('hsync_len', ctypes.c_uint32),
('vsync_len', ctypes.c_uint32),
('sync', ctypes.c_uint32),
('vmode', ctypes.c_uint32),
('rotate', ctypes.c_uint32),
('colorspace', ctypes.c_uint32),
('reserved', ctypes.c_uint32 * 4),
]
class FrameBufferCapture:
def __init__(self, device='/dev/fb0'):
self.device = device
self.fb_file = None
self.width = 0
self.height = 0
self.bpp = 0
self.initialized = False
try:
self.fb_file = open(device, 'rb')
fb_var = FrameBufferInfo()
fcntl.ioctl(self.fb_file, FBIOGET_VSCREENINFO, fb_var)
self.width = fb_var.xres
self.height = fb_var.yres
self.bpp = fb_var.bits_per_pixel
self.initialized = True
print(f"Framebuffer info: {self.width}x{self.height}, {self.bpp} bits per pixel")
except Exception as e:
print(f"Failed to initialize framebuffer: {str(e)}")
if self.fb_file:
self.fb_file.close()
self.fb_file = None
def capture(self):
if not self.initialized or not self.fb_file:
return None
try:
# Seek to beginning of framebuffer
self.fb_file.seek(0)
# Read the entire framebuffer
buffer_size = self.width * self.height * (self.bpp // 8)
frame_data = self.fb_file.read(buffer_size)
# Convert to numpy array based on bpp
if self.bpp == 32: # RGBA
frame = np.frombuffer(frame_data, dtype=np.uint8).reshape(self.height, self.width, 4)
return cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR)
elif self.bpp == 24: # RGB
frame = np.frombuffer(frame_data, dtype=np.uint8).reshape(self.height, self.width, 3)
return frame
elif self.bpp == 16: # RGB565
frame = np.frombuffer(frame_data, dtype=np.uint16).reshape(self.height, self.width)
# Convert RGB565 to RGB888
r = ((frame & 0xF800) >> 11) * 8
g = ((frame & 0x07E0) >> 5) * 4
b = (frame & 0x001F) * 8
frame_rgb = np.stack([r, g, b], axis=2).astype(np.uint8)
return frame_rgb
else:
print(f"Unsupported bpp: {self.bpp}")
return None
except Exception as e:
print(f"Error capturing framebuffer: {str(e)}")
return None
def close(self):
if self.fb_file:
self.fb_file.close()
self.fb_file = None
self.initialized = False
# Try to use MSS if available
try:
from mss import mss
has_mss = True
except ImportError:
has_mss = False
def capture_with_mss():
with mss() as sct:
monitor = sct.monitors[1]
while True:
try:
screenshot = sct.grab(monitor)
frame = np.array(screenshot)
frame = cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR)
return frame
except Exception as e:
print(f"MSS截图错误: {str(e)}")
return None
def capture_screen():
# 尝试初始化framebuffer
fb_capture = FrameBufferCapture()
use_fb = fb_capture.initialized
# 如果framebuffer不可用尝试使用mss
use_mss = has_mss and not use_fb
while True:
try:
frame = None
if use_fb:
frame = fb_capture.capture()
elif use_mss:
frame = capture_with_mss()
if frame is None:
print("无法捕获屏幕,尝试其他方法...")
time.sleep(1)
continue
# 压缩图像
_, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 80])
# 转换为字节
frame_bytes = buffer.tobytes()
# 通过WebSocket广播
socketio.emit('screen_frame', frame_bytes)
except Exception as e:
print(f"截图错误: {str(e)}")
time.sleep(0.033) # 约30fps
# 清理资源
if use_fb:
fb_capture.close()
@app.route('/')
def index():
return """
<html>
<head>
<title>OpenPilot Screen Stream</title>
<style>
body { margin: 0; background: #000; }
img { width: 100%; height: 100vh; object-fit: contain; }
</style>
</head>
<body>
<img id="screen">
<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.js"></script>
<script>
var socket = io();
var img = document.getElementById('screen');
socket.on('screen_frame', function(frame) {
var blob = new Blob([frame], {type: 'image/jpeg'});
var url = URL.createObjectURL(blob);
img.src = url;
});
</script>
</body>
</html>
"""
if __name__ == '__main__':
# 检测环境
print("检测屏幕捕获方法...")
# 检查framebuffer
if os.path.exists('/dev/fb0'):
print("检测到framebuffer设备")
else:
print("未检测到framebuffer设备")
# 检查mss
if has_mss:
print("检测到MSS库")
else:
print("未检测到MSS库")
# 启动屏幕捕获线程
capture_thread = threading.Thread(target=capture_screen, daemon=True)
capture_thread.start()
# 启动Web服务器
socketio.run(app, host='0.0.0.0', port=5000)