
Recherche avancée
Autres articles (28)
-
Keeping control of your media in your hands
13 avril 2011, parThe vocabulary used on this site and around MediaSPIP in general, aims to avoid reference to Web 2.0 and the companies that profit from media-sharing.
While using MediaSPIP, you are invited to avoid using words like "Brand", "Cloud" and "Market".
MediaSPIP is designed to facilitate the sharing of creative media online, while allowing authors to retain complete control of their work.
MediaSPIP aims to be accessible to as many people as possible and development is based on expanding the (...) -
Modifier la date de publication
21 juin 2013, parComment changer la date de publication d’un média ?
Il faut au préalable rajouter un champ "Date de publication" dans le masque de formulaire adéquat :
Administrer > Configuration des masques de formulaires > Sélectionner "Un média"
Dans la rubrique "Champs à ajouter, cocher "Date de publication "
Cliquer en bas de la page sur Enregistrer -
L’agrémenter visuellement
10 avril 2011MediaSPIP est basé sur un système de thèmes et de squelettes. Les squelettes définissent le placement des informations dans la page, définissant un usage spécifique de la plateforme, et les thèmes l’habillage graphique général.
Chacun peut proposer un nouveau thème graphique ou un squelette et le mettre à disposition de la communauté.
Sur d’autres sites (3237)
-
Issues with Publishing and Subscribing Rates for H.264 Video Streaming over RabbitMQ
7 octobre 2024, par LuisI am working on a project to stream an H.264 video file using RabbitMQ (AMQP protocol) and display it in a web application. The setup involves capturing video frames, encoding them, sending them to RabbitMQ, and then consuming and decoding them on the web application side using Flask and Flask-SocketIO.


However, I am encountering performance issues with the publishing and subscribing rates in RabbitMQ. I cannot seem to achieve more than 10 messages per second. This is not sufficient for smooth video streaming.
I need help to diagnose and resolve these performance bottlenecks.


Here is my code :


- 

- Video Capture and Publishing Script :




# RabbitMQ setup
RABBITMQ_HOST = 'localhost'
EXCHANGE = 'DRONE'
CAM_LOCATION = 'Out_Front'
KEY = f'DRONE_{CAM_LOCATION}'
QUEUE_NAME = f'DRONE_{CAM_LOCATION}_video_queue'

# Path to the H.264 video file
VIDEO_FILE_PATH = 'videos/FPV.h264'

# Configure logging
logging.basicConfig(level=logging.INFO)

@contextmanager
def rabbitmq_channel(host):
 """Context manager to handle RabbitMQ channel setup and teardown."""
 connection = pika.BlockingConnection(pika.ConnectionParameters(host))
 channel = connection.channel()
 try:
 yield channel
 finally:
 connection.close()

def initialize_rabbitmq(channel):
 """Initialize RabbitMQ exchange and queue, and bind them together."""
 channel.exchange_declare(exchange=EXCHANGE, exchange_type='direct')
 channel.queue_declare(queue=QUEUE_NAME)
 channel.queue_bind(exchange=EXCHANGE, queue=QUEUE_NAME, routing_key=KEY)

def send_frame(channel, frame):
 """Encode the video frame using FFmpeg and send it to RabbitMQ."""
 ffmpeg_path = 'ffmpeg/bin/ffmpeg.exe'
 cmd = [
 ffmpeg_path,
 '-f', 'rawvideo',
 '-pix_fmt', 'rgb24',
 '-s', '{}x{}'.format(frame.shape[1], frame.shape[0]),
 '-i', 'pipe:0',
 '-f', 'h264',
 '-vcodec', 'libx264',
 '-pix_fmt', 'yuv420p',
 '-preset', 'ultrafast',
 'pipe:1'
 ]
 
 start_time = time.time()
 process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 out, err = process.communicate(input=frame.tobytes())
 encoding_time = time.time() - start_time
 
 if process.returncode != 0:
 logging.error("ffmpeg error: %s", err.decode())
 raise RuntimeError("ffmpeg error")
 
 frame_size = len(out)
 logging.info("Sending frame with shape: %s, size: %d bytes", frame.shape, frame_size)
 timestamp = time.time()
 formatted_timestamp = datetime.fromtimestamp(timestamp).strftime('%H:%M:%S.%f')
 logging.info(f"Timestamp: {timestamp}") 
 logging.info(f"Formatted Timestamp: {formatted_timestamp[:-3]}")
 timestamp_bytes = struct.pack('d', timestamp)
 message_body = timestamp_bytes + out
 channel.basic_publish(exchange=EXCHANGE, routing_key=KEY, body=message_body)
 logging.info(f"Encoding time: {encoding_time:.4f} seconds")

def capture_video(channel):
 """Read video from the file, encode frames, and send them to RabbitMQ."""
 if not os.path.exists(VIDEO_FILE_PATH):
 logging.error("Error: Video file does not exist.")
 return
 cap = cv2.VideoCapture(VIDEO_FILE_PATH)
 if not cap.isOpened():
 logging.error("Error: Could not open video file.")
 return
 try:
 while True:
 start_time = time.time()
 ret, frame = cap.read()
 read_time = time.time() - start_time
 if not ret:
 break
 frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
 frame_rgb = np.ascontiguousarray(frame_rgb) # Ensure the frame is contiguous
 send_frame(channel, frame_rgb)
 cv2.imshow('Video', frame)
 if cv2.waitKey(1) & 0xFF == ord('q'):
 break
 logging.info(f"Read time: {read_time:.4f} seconds")
 finally:
 cap.release()
 cv2.destroyAllWindows()



- 

- the backend (flask) :




app = Flask(__name__)
CORS(app)
socketio = SocketIO(app, cors_allowed_origins="*")

RABBITMQ_HOST = 'localhost'
EXCHANGE = 'DRONE'
CAM_LOCATION = 'Out_Front'
QUEUE_NAME = f'DRONE_{CAM_LOCATION}_video_queue'

def initialize_rabbitmq():
 connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_HOST))
 channel = connection.channel()
 channel.exchange_declare(exchange=EXCHANGE, exchange_type='direct')
 channel.queue_declare(queue=QUEUE_NAME)
 channel.queue_bind(exchange=EXCHANGE, queue=QUEUE_NAME, routing_key=f'DRONE_{CAM_LOCATION}')
 return connection, channel

def decode_frame(frame_data):
 # FFmpeg command to decode H.264 frame data
 ffmpeg_path = 'ffmpeg/bin/ffmpeg.exe'
 cmd = [
 ffmpeg_path,
 '-f', 'h264',
 '-i', 'pipe:0',
 '-pix_fmt', 'bgr24',
 '-vcodec', 'rawvideo',
 '-an', '-sn',
 '-f', 'rawvideo',
 'pipe:1'
 ]
 process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 start_time = time.time() # Start timing the decoding process
 out, err = process.communicate(input=frame_data)
 decoding_time = time.time() - start_time # Calculate decoding time
 
 if process.returncode != 0:
 print("ffmpeg error: ", err.decode())
 return None
 frame_size = (960, 1280, 3) # frame dimensions expected by the frontend
 frame = np.frombuffer(out, np.uint8).reshape(frame_size)
 print(f"Decoding time: {decoding_time:.4f} seconds")
 return frame

def format_timestamp(ts):
 dt = datetime.fromtimestamp(ts)
 return dt.strftime('%H:%M:%S.%f')[:-3]

def rabbitmq_consumer():
 connection, channel = initialize_rabbitmq()
 for method_frame, properties, body in channel.consume(QUEUE_NAME):
 message_receive_time = time.time() # Time when the message is received

 # Extract the timestamp from the message body
 timestamp_bytes = body[:8]
 frame_data = body[8:]
 publish_timestamp = struct.unpack('d', timestamp_bytes)[0]

 print(f"Message Receive Time: {message_receive_time:.4f} ({format_timestamp(message_receive_time)})")
 print(f"Publish Time: {publish_timestamp:.4f} ({format_timestamp(publish_timestamp)})")

 frame = decode_frame(frame_data)
 decode_time = time.time() - message_receive_time # Calculate decode time

 if frame is not None:
 _, buffer = cv2.imencode('.jpg', frame)
 frame_data = buffer.tobytes()
 socketio.emit('video_frame', {'frame': frame_data, 'timestamp': publish_timestamp}, namespace='/')
 emit_time = time.time() # Time after emitting the frame

 # Log the time taken to emit the frame and its size
 rtt = emit_time - publish_timestamp # Calculate RTT from publish to emit
 print(f"Current Time: {emit_time:.4f} ({format_timestamp(emit_time)})")
 print(f"RTT: {rtt:.4f} seconds")
 print(f"Emit time: {emit_time - message_receive_time:.4f} seconds, Frame size: {len(frame_data)} bytes")
 channel.basic_ack(method_frame.delivery_tag)

@app.route('/')
def index():
 return render_template('index.html')

@socketio.on('connect')
def handle_connect():
 print('Client connected')

@socketio.on('disconnect')
def handle_disconnect():
 print('Client disconnected')

if __name__ == '__main__':
 consumer_thread = threading.Thread(target=rabbitmq_consumer)
 consumer_thread.daemon = True
 consumer_thread.start()
 socketio.run(app, host='0.0.0.0', port=5000)




How can I optimize the publishing and subscribing rates to handle a higher number of messages per second ?


Any help or suggestions would be greatly appreciated !


I attempted to use threading and multiprocessing to handle multiple frames concurrently and I tried to optimize the frame decoding function to make it faster but with no success.


-
doc/filters : add geq diagonal split screen example
12 février 2016, par Lou Logan -
avcodec/h264 : Execute error concealment before marking the frame as done.
19 février 2016, par Michael Niedermayer