
Recherche avancée
Médias (91)
-
999,999
26 septembre 2011, par
Mis à jour : Septembre 2011
Langue : English
Type : Audio
-
The Slip - Artworks
26 septembre 2011, par
Mis à jour : Septembre 2011
Langue : English
Type : Texte
-
Demon seed (wav version)
26 septembre 2011, par
Mis à jour : Avril 2013
Langue : English
Type : Audio
-
The four of us are dying (wav version)
26 septembre 2011, par
Mis à jour : Avril 2013
Langue : English
Type : Audio
-
Corona radiata (wav version)
26 septembre 2011, par
Mis à jour : Avril 2013
Langue : English
Type : Audio
-
Lights in the sky (wav version)
26 septembre 2011, par
Mis à jour : Avril 2013
Langue : English
Type : Audio
Autres articles (78)
-
Le profil des utilisateurs
12 avril 2011, parChaque utilisateur dispose d’une page de profil lui permettant de modifier ses informations personnelle. Dans le menu de haut de page par défaut, un élément de menu est automatiquement créé à l’initialisation de MediaSPIP, visible uniquement si le visiteur est identifié sur le site.
L’utilisateur a accès à la modification de profil depuis sa page auteur, un lien dans la navigation "Modifier votre profil" est (...) -
Configurer la prise en compte des langues
15 novembre 2010, parAccéder à la configuration et ajouter des langues prises en compte
Afin de configurer la prise en compte de nouvelles langues, il est nécessaire de se rendre dans la partie "Administrer" du site.
De là, dans le menu de navigation, vous pouvez accéder à une partie "Gestion des langues" permettant d’activer la prise en compte de nouvelles langues.
Chaque nouvelle langue ajoutée reste désactivable tant qu’aucun objet n’est créé dans cette langue. Dans ce cas, elle devient grisée dans la configuration et (...) -
Les tâches Cron régulières de la ferme
1er décembre 2010, parLa gestion de la ferme passe par l’exécution à intervalle régulier de plusieurs tâches répétitives dites Cron.
Le super Cron (gestion_mutu_super_cron)
Cette tâche, planifiée chaque minute, a pour simple effet d’appeler le Cron de l’ensemble des instances de la mutualisation régulièrement. Couplée avec un Cron système sur le site central de la mutualisation, cela permet de simplement générer des visites régulières sur les différents sites et éviter que les tâches des sites peu visités soient trop (...)
Sur d’autres sites (10789)
-
Issues with Publishing and Subscribing Rates for H.264 Video Streaming over RabbitMQ
7 octobre 2024, par LuisI am working on a project to stream an H.264 video file using RabbitMQ (AMQP protocol) and display it in a web application. The setup involves capturing video frames, encoding them, sending them to RabbitMQ, and then consuming and decoding them on the web application side using Flask and Flask-SocketIO.


However, I am encountering performance issues with the publishing and subscribing rates in RabbitMQ. I cannot seem to achieve more than 10 messages per second. This is not sufficient for smooth video streaming.
I need help to diagnose and resolve these performance bottlenecks.


Here is my code :


- 

- Video Capture and Publishing Script :




# RabbitMQ setup
RABBITMQ_HOST = 'localhost'
EXCHANGE = 'DRONE'
CAM_LOCATION = 'Out_Front'
KEY = f'DRONE_{CAM_LOCATION}'
QUEUE_NAME = f'DRONE_{CAM_LOCATION}_video_queue'

# Path to the H.264 video file
VIDEO_FILE_PATH = 'videos/FPV.h264'

# Configure logging
logging.basicConfig(level=logging.INFO)

@contextmanager
def rabbitmq_channel(host):
 """Context manager to handle RabbitMQ channel setup and teardown."""
 connection = pika.BlockingConnection(pika.ConnectionParameters(host))
 channel = connection.channel()
 try:
 yield channel
 finally:
 connection.close()

def initialize_rabbitmq(channel):
 """Initialize RabbitMQ exchange and queue, and bind them together."""
 channel.exchange_declare(exchange=EXCHANGE, exchange_type='direct')
 channel.queue_declare(queue=QUEUE_NAME)
 channel.queue_bind(exchange=EXCHANGE, queue=QUEUE_NAME, routing_key=KEY)

def send_frame(channel, frame):
 """Encode the video frame using FFmpeg and send it to RabbitMQ."""
 ffmpeg_path = 'ffmpeg/bin/ffmpeg.exe'
 cmd = [
 ffmpeg_path,
 '-f', 'rawvideo',
 '-pix_fmt', 'rgb24',
 '-s', '{}x{}'.format(frame.shape[1], frame.shape[0]),
 '-i', 'pipe:0',
 '-f', 'h264',
 '-vcodec', 'libx264',
 '-pix_fmt', 'yuv420p',
 '-preset', 'ultrafast',
 'pipe:1'
 ]
 
 start_time = time.time()
 process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 out, err = process.communicate(input=frame.tobytes())
 encoding_time = time.time() - start_time
 
 if process.returncode != 0:
 logging.error("ffmpeg error: %s", err.decode())
 raise RuntimeError("ffmpeg error")
 
 frame_size = len(out)
 logging.info("Sending frame with shape: %s, size: %d bytes", frame.shape, frame_size)
 timestamp = time.time()
 formatted_timestamp = datetime.fromtimestamp(timestamp).strftime('%H:%M:%S.%f')
 logging.info(f"Timestamp: {timestamp}") 
 logging.info(f"Formatted Timestamp: {formatted_timestamp[:-3]}")
 timestamp_bytes = struct.pack('d', timestamp)
 message_body = timestamp_bytes + out
 channel.basic_publish(exchange=EXCHANGE, routing_key=KEY, body=message_body)
 logging.info(f"Encoding time: {encoding_time:.4f} seconds")

def capture_video(channel):
 """Read video from the file, encode frames, and send them to RabbitMQ."""
 if not os.path.exists(VIDEO_FILE_PATH):
 logging.error("Error: Video file does not exist.")
 return
 cap = cv2.VideoCapture(VIDEO_FILE_PATH)
 if not cap.isOpened():
 logging.error("Error: Could not open video file.")
 return
 try:
 while True:
 start_time = time.time()
 ret, frame = cap.read()
 read_time = time.time() - start_time
 if not ret:
 break
 frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
 frame_rgb = np.ascontiguousarray(frame_rgb) # Ensure the frame is contiguous
 send_frame(channel, frame_rgb)
 cv2.imshow('Video', frame)
 if cv2.waitKey(1) & 0xFF == ord('q'):
 break
 logging.info(f"Read time: {read_time:.4f} seconds")
 finally:
 cap.release()
 cv2.destroyAllWindows()



- 

- the backend (flask) :




app = Flask(__name__)
CORS(app)
socketio = SocketIO(app, cors_allowed_origins="*")

RABBITMQ_HOST = 'localhost'
EXCHANGE = 'DRONE'
CAM_LOCATION = 'Out_Front'
QUEUE_NAME = f'DRONE_{CAM_LOCATION}_video_queue'

def initialize_rabbitmq():
 connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_HOST))
 channel = connection.channel()
 channel.exchange_declare(exchange=EXCHANGE, exchange_type='direct')
 channel.queue_declare(queue=QUEUE_NAME)
 channel.queue_bind(exchange=EXCHANGE, queue=QUEUE_NAME, routing_key=f'DRONE_{CAM_LOCATION}')
 return connection, channel

def decode_frame(frame_data):
 # FFmpeg command to decode H.264 frame data
 ffmpeg_path = 'ffmpeg/bin/ffmpeg.exe'
 cmd = [
 ffmpeg_path,
 '-f', 'h264',
 '-i', 'pipe:0',
 '-pix_fmt', 'bgr24',
 '-vcodec', 'rawvideo',
 '-an', '-sn',
 '-f', 'rawvideo',
 'pipe:1'
 ]
 process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 start_time = time.time() # Start timing the decoding process
 out, err = process.communicate(input=frame_data)
 decoding_time = time.time() - start_time # Calculate decoding time
 
 if process.returncode != 0:
 print("ffmpeg error: ", err.decode())
 return None
 frame_size = (960, 1280, 3) # frame dimensions expected by the frontend
 frame = np.frombuffer(out, np.uint8).reshape(frame_size)
 print(f"Decoding time: {decoding_time:.4f} seconds")
 return frame

def format_timestamp(ts):
 dt = datetime.fromtimestamp(ts)
 return dt.strftime('%H:%M:%S.%f')[:-3]

def rabbitmq_consumer():
 connection, channel = initialize_rabbitmq()
 for method_frame, properties, body in channel.consume(QUEUE_NAME):
 message_receive_time = time.time() # Time when the message is received

 # Extract the timestamp from the message body
 timestamp_bytes = body[:8]
 frame_data = body[8:]
 publish_timestamp = struct.unpack('d', timestamp_bytes)[0]

 print(f"Message Receive Time: {message_receive_time:.4f} ({format_timestamp(message_receive_time)})")
 print(f"Publish Time: {publish_timestamp:.4f} ({format_timestamp(publish_timestamp)})")

 frame = decode_frame(frame_data)
 decode_time = time.time() - message_receive_time # Calculate decode time

 if frame is not None:
 _, buffer = cv2.imencode('.jpg', frame)
 frame_data = buffer.tobytes()
 socketio.emit('video_frame', {'frame': frame_data, 'timestamp': publish_timestamp}, namespace='/')
 emit_time = time.time() # Time after emitting the frame

 # Log the time taken to emit the frame and its size
 rtt = emit_time - publish_timestamp # Calculate RTT from publish to emit
 print(f"Current Time: {emit_time:.4f} ({format_timestamp(emit_time)})")
 print(f"RTT: {rtt:.4f} seconds")
 print(f"Emit time: {emit_time - message_receive_time:.4f} seconds, Frame size: {len(frame_data)} bytes")
 channel.basic_ack(method_frame.delivery_tag)

@app.route('/')
def index():
 return render_template('index.html')

@socketio.on('connect')
def handle_connect():
 print('Client connected')

@socketio.on('disconnect')
def handle_disconnect():
 print('Client disconnected')

if __name__ == '__main__':
 consumer_thread = threading.Thread(target=rabbitmq_consumer)
 consumer_thread.daemon = True
 consumer_thread.start()
 socketio.run(app, host='0.0.0.0', port=5000)




How can I optimize the publishing and subscribing rates to handle a higher number of messages per second ?


Any help or suggestions would be greatly appreciated !


I attempted to use threading and multiprocessing to handle multiple frames concurrently and I tried to optimize the frame decoding function to make it faster but with no success.


-
Revision 122868 : Toujours sur ces vues de saisies fieldset. Champ extra se base sur le ...
7 mars 2020, par Maïeul Rouquette — LogToujours sur ces vues de saisies fieldset.
Champ extra se base sur le classe vide pour ne pas afficher les saisies
sans réponse. Le masquage est en css, contrairement à ce que je pensais
(suis-je bête !)
On ajoute donc la classe vide à la vie d’une saisie si l’ensemble de ses
sous saisies sont vide.
Permet d’éviter avec champ extra d’avoir un titre de fieldset qui se
balade sans aucune réponse en dessous. -
Evolution #2040 (Nouveau) : Accueil inistial sur un site nouvellement installé
1er mai 2011, par Suske -Première arrivée dans le privé : contenu/accueil.html est vide => il y a mieux que le vide :-)
un lien vers la config du site
une phrase "Créez maintenant une rubrique puis publiez-y un premier article... bla."
... Un page "premiers pas en quelques sortes (...)