
Recherche avancée
Autres articles (12)
-
Les autorisations surchargées par les plugins
27 avril 2010, parMediaspip core
autoriser_auteur_modifier() afin que les visiteurs soient capables de modifier leurs informations sur la page d’auteurs -
Gestion de la ferme
2 mars 2010, parLa ferme est gérée dans son ensemble par des "super admins".
Certains réglages peuvent être fais afin de réguler les besoins des différents canaux.
Dans un premier temps il utilise le plugin "Gestion de mutualisation" -
Les tâches Cron régulières de la ferme
1er décembre 2010, parLa gestion de la ferme passe par l’exécution à intervalle régulier de plusieurs tâches répétitives dites Cron.
Le super Cron (gestion_mutu_super_cron)
Cette tâche, planifiée chaque minute, a pour simple effet d’appeler le Cron de l’ensemble des instances de la mutualisation régulièrement. Couplée avec un Cron système sur le site central de la mutualisation, cela permet de simplement générer des visites régulières sur les différents sites et éviter que les tâches des sites peu visités soient trop (...)
Sur d’autres sites (2604)
-
Java uses FFmpegRecoder to encode frames into H264 streams
5 septembre 2024, par zhang1973I want to obtain the Frame from the video stream, process it, use FFmpegRecoder to encode it into an H264 stream, and transmit it to the front-end. But I found that the AVPacket obtained directly using grabber.grabAVPacket can be converted into H264 stream and played normally. The H264 stream encoded using FFmpegRecoder cannot be played.


Here is my Code :


private FFmpegFrameRecorder recorder;
 private ByteArrayOutputStream outputStream = new ByteArrayOutputStream();;
 private boolean createRecoder(Frame frame){
 recorder = new FFmpegFrameRecorder(outputStream, frame.imageWidth, frame.imageHeight);
 recorder.setVideoCodec(avcodec.AV_CODEC_ID_H264);
 recorder.setFormat("h264"); //"h264"); //
 recorder.setFrameRate(30);
 recorder.setPixelFormat(avutil.AV_PIX_FMT_YUV420P);
 recorder.setVideoBitrate(4000 * 1000); // 设置比特率为4000 kbps
 recorder.setVideoOption("preset", "ultrafast"); // 设置编码器预设,"ultrafast"是最快的,"veryslow"是最慢但质量最好
 recorder.setAudioChannels(0);

 try {
 recorder.start();
 return recorderStatus = true;
 } catch (org.bytedeco.javacv.FrameRecorder.Exception e1) {
 log.info("启动转码录制器失败", e1);
 MediaService.cameras.remove(cameraDto.getMediaKey());
 e1.printStackTrace();
 }

 return recorderStatus = false;
 }

 private boolean slow = false;
 protected void transferStream2H264() throws FFmpegFrameGrabber.Exception {

 // 初始化和拉去图像的方法
 log.info(" create grabber ");
 if (!createGrabber()) {
 log.error(" == > ");
 return;
 }
 transferFlag = true;

 if(!createRecoder(grabber.grab())){
 return;
 }

 try {
 grabber.flush();
 } catch (Exception e) {
 log.info("清空拉流器缓存失败", e);
 e.printStackTrace();
 }

 if (header == null) {
 header = bos.toByteArray();
 slow = true;
// System.out.println("Header1");
// System.out.println(header);
 bos.reset();
 }else{
 System.out.println("Header2");
 System.out.println(header);
 }

 running = true;

 // 事实更新所有的连接数
 listenClient();

 long startTime = 0;
 long videoTS = 0;

 for (; running && grabberStatus;) {
 try {
 if (transferFlag) {
 long startGrab = System.currentTimeMillis();
 //视频采集器
// AVPacket pkt = grabber.grabPacket();
 Frame frame = grabber.grab();
 recorder.record(frame);
 byte[] videoData = outputStream.toByteArray();
 if ((System.currentTimeMillis() - startGrab) > 5000) {
 log.info("\r\n{}\r\n视频流网络异常>>>", cameraDto.getUrl());
 closeMedia();
 break;
 }

 videoTS = 1000 * (System.currentTimeMillis() - startTime);


 if (startTime == 0) {
 startTime = System.currentTimeMillis();
 }
 videoTS = 1000 * (System.currentTimeMillis() - startTime);

 byte[] rbuffer = videoData;
 readSize = videoData.length;

 if(spsdata == null || ppsdata == null){
 movePos = 0;
 lastPos = 0;
 isNewPack = true;
 while(movePos < readSize){
 if (rbuffer[movePos] == 0 && rbuffer[movePos + 1] == 0 && rbuffer[movePos + 2] == 1) {
 findCode = true;
 skipLen = 3;
 mCurFrameFirstByte = (int)(0xff & rbuffer[movePos + skipLen]);
 } else if (rbuffer[movePos] == 0 && rbuffer[movePos + 1] == 0 && rbuffer[movePos + 2] == 0 && rbuffer[movePos + 3] == 1) {
 findCode = true;
 skipLen = 4;
 mCurFrameFirstByte = (int)(0xff & rbuffer[movePos + skipLen]);
 } else {
 skipLen = 1;
 }

 if(!isFirstFind && isNewPack && findCode){
 mFrameFirstByte = mCurFrameFirstByte;
 findCode = false;
 isNewPack = false;
 mNaluType = mFrameFirstByte & 0x1f;
 if(mNaluType != MediaConstant.NALU_TYPE_SEI &&
 mNaluType != MediaConstant.NALU_TYPE_SPS &&
 mNaluType != MediaConstant.NALU_TYPE_PPS &&
 mNaluType != MediaConstant.NALU_TYPE_IDR){
 startCounter++;
 break;
 }
 }

 if(isFirstFind){
 isFirstFind = false;
 findCode = false;
 mFrameFirstByte = mCurFrameFirstByte;
 }

 if(findCode){
 startCounter++;
 mNaluType = mFrameFirstByte & 0x1f;

 findCode = false;
 mFrameLen = (movePos - lastPos);
 if(mNaluType == MediaConstant.NALU_TYPE_IDR){
 mFrameLen = readSize - movePos;
 }

 if(mNaluType != MediaConstant.NALU_TYPE_SEI &&
 mNaluType != MediaConstant.NALU_TYPE_SPS &&
 mNaluType != MediaConstant.NALU_TYPE_PPS &&
 mNaluType != MediaConstant.NALU_TYPE_IDR){
 System.out.println(" one packe many frames ---> type: " + mNaluType + " jump out ");
 break;
 }
 if(mNaluType == MediaConstant.NALU_TYPE_SPS){
 if(null == spsdata){
 spsdata = new byte[mFrameLen];
 System.arraycopy(rbuffer, lastPos, spsdata, 0, mFrameLen);
 }
 }
 if(mNaluType == MediaConstant.NALU_TYPE_PPS){

 if(null == ppsdata){
 ppsdata = new byte[mFrameLen];
 System.arraycopy(rbuffer, lastPos, ppsdata, 0, mFrameLen);
 }
 }

 lastPos = movePos;
 mFrameFirstByte = mCurFrameFirstByte;
 mNaluType = mFrameFirstByte & 0x1f;
 if(mNaluType == MediaConstant.NALU_TYPE_IDR){
 mFrameLen = readSize - movePos;
 startCounter++;

 break;
 }
 }

 movePos += skipLen;
 isNewPack = false;
 }
 }

 sendFrameData(rbuffer);
// }
// av_packet_unref(pkt);
// }

// }
 } else {
 }
 } catch (Exception e) {
 grabberStatus = false;
 MediaService.cameras.remove(cameraDto.getMediaKey());
 } catch (FFmpegFrameRecorder.Exception e) {
 throw new RuntimeException(e);
 }
 }

 try {
 grabber.close();
 bos.close();
 } catch (org.bytedeco.javacv.FrameRecorder.Exception e) {
 e.printStackTrace();
 } catch (Exception e) {
 e.printStackTrace();
 } catch (IOException e) {
 e.printStackTrace();
 } finally {
 closeMedia();
 }
 log.info("关闭媒体流-javacv,{} ", cameraDto.getUrl());
 }



-
Issues with Publishing and Subscribing Rates for H.264 Video Streaming over RabbitMQ
7 octobre 2024, par LuisI am working on a project to stream an H.264 video file using RabbitMQ (AMQP protocol) and display it in a web application. The setup involves capturing video frames, encoding them, sending them to RabbitMQ, and then consuming and decoding them on the web application side using Flask and Flask-SocketIO.


However, I am encountering performance issues with the publishing and subscribing rates in RabbitMQ. I cannot seem to achieve more than 10 messages per second. This is not sufficient for smooth video streaming.
I need help to diagnose and resolve these performance bottlenecks.


Here is my code :


- 

- Video Capture and Publishing Script :




# RabbitMQ setup
RABBITMQ_HOST = 'localhost'
EXCHANGE = 'DRONE'
CAM_LOCATION = 'Out_Front'
KEY = f'DRONE_{CAM_LOCATION}'
QUEUE_NAME = f'DRONE_{CAM_LOCATION}_video_queue'

# Path to the H.264 video file
VIDEO_FILE_PATH = 'videos/FPV.h264'

# Configure logging
logging.basicConfig(level=logging.INFO)

@contextmanager
def rabbitmq_channel(host):
 """Context manager to handle RabbitMQ channel setup and teardown."""
 connection = pika.BlockingConnection(pika.ConnectionParameters(host))
 channel = connection.channel()
 try:
 yield channel
 finally:
 connection.close()

def initialize_rabbitmq(channel):
 """Initialize RabbitMQ exchange and queue, and bind them together."""
 channel.exchange_declare(exchange=EXCHANGE, exchange_type='direct')
 channel.queue_declare(queue=QUEUE_NAME)
 channel.queue_bind(exchange=EXCHANGE, queue=QUEUE_NAME, routing_key=KEY)

def send_frame(channel, frame):
 """Encode the video frame using FFmpeg and send it to RabbitMQ."""
 ffmpeg_path = 'ffmpeg/bin/ffmpeg.exe'
 cmd = [
 ffmpeg_path,
 '-f', 'rawvideo',
 '-pix_fmt', 'rgb24',
 '-s', '{}x{}'.format(frame.shape[1], frame.shape[0]),
 '-i', 'pipe:0',
 '-f', 'h264',
 '-vcodec', 'libx264',
 '-pix_fmt', 'yuv420p',
 '-preset', 'ultrafast',
 'pipe:1'
 ]
 
 start_time = time.time()
 process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 out, err = process.communicate(input=frame.tobytes())
 encoding_time = time.time() - start_time
 
 if process.returncode != 0:
 logging.error("ffmpeg error: %s", err.decode())
 raise RuntimeError("ffmpeg error")
 
 frame_size = len(out)
 logging.info("Sending frame with shape: %s, size: %d bytes", frame.shape, frame_size)
 timestamp = time.time()
 formatted_timestamp = datetime.fromtimestamp(timestamp).strftime('%H:%M:%S.%f')
 logging.info(f"Timestamp: {timestamp}") 
 logging.info(f"Formatted Timestamp: {formatted_timestamp[:-3]}")
 timestamp_bytes = struct.pack('d', timestamp)
 message_body = timestamp_bytes + out
 channel.basic_publish(exchange=EXCHANGE, routing_key=KEY, body=message_body)
 logging.info(f"Encoding time: {encoding_time:.4f} seconds")

def capture_video(channel):
 """Read video from the file, encode frames, and send them to RabbitMQ."""
 if not os.path.exists(VIDEO_FILE_PATH):
 logging.error("Error: Video file does not exist.")
 return
 cap = cv2.VideoCapture(VIDEO_FILE_PATH)
 if not cap.isOpened():
 logging.error("Error: Could not open video file.")
 return
 try:
 while True:
 start_time = time.time()
 ret, frame = cap.read()
 read_time = time.time() - start_time
 if not ret:
 break
 frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
 frame_rgb = np.ascontiguousarray(frame_rgb) # Ensure the frame is contiguous
 send_frame(channel, frame_rgb)
 cv2.imshow('Video', frame)
 if cv2.waitKey(1) & 0xFF == ord('q'):
 break
 logging.info(f"Read time: {read_time:.4f} seconds")
 finally:
 cap.release()
 cv2.destroyAllWindows()



- 

- the backend (flask) :




app = Flask(__name__)
CORS(app)
socketio = SocketIO(app, cors_allowed_origins="*")

RABBITMQ_HOST = 'localhost'
EXCHANGE = 'DRONE'
CAM_LOCATION = 'Out_Front'
QUEUE_NAME = f'DRONE_{CAM_LOCATION}_video_queue'

def initialize_rabbitmq():
 connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_HOST))
 channel = connection.channel()
 channel.exchange_declare(exchange=EXCHANGE, exchange_type='direct')
 channel.queue_declare(queue=QUEUE_NAME)
 channel.queue_bind(exchange=EXCHANGE, queue=QUEUE_NAME, routing_key=f'DRONE_{CAM_LOCATION}')
 return connection, channel

def decode_frame(frame_data):
 # FFmpeg command to decode H.264 frame data
 ffmpeg_path = 'ffmpeg/bin/ffmpeg.exe'
 cmd = [
 ffmpeg_path,
 '-f', 'h264',
 '-i', 'pipe:0',
 '-pix_fmt', 'bgr24',
 '-vcodec', 'rawvideo',
 '-an', '-sn',
 '-f', 'rawvideo',
 'pipe:1'
 ]
 process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 start_time = time.time() # Start timing the decoding process
 out, err = process.communicate(input=frame_data)
 decoding_time = time.time() - start_time # Calculate decoding time
 
 if process.returncode != 0:
 print("ffmpeg error: ", err.decode())
 return None
 frame_size = (960, 1280, 3) # frame dimensions expected by the frontend
 frame = np.frombuffer(out, np.uint8).reshape(frame_size)
 print(f"Decoding time: {decoding_time:.4f} seconds")
 return frame

def format_timestamp(ts):
 dt = datetime.fromtimestamp(ts)
 return dt.strftime('%H:%M:%S.%f')[:-3]

def rabbitmq_consumer():
 connection, channel = initialize_rabbitmq()
 for method_frame, properties, body in channel.consume(QUEUE_NAME):
 message_receive_time = time.time() # Time when the message is received

 # Extract the timestamp from the message body
 timestamp_bytes = body[:8]
 frame_data = body[8:]
 publish_timestamp = struct.unpack('d', timestamp_bytes)[0]

 print(f"Message Receive Time: {message_receive_time:.4f} ({format_timestamp(message_receive_time)})")
 print(f"Publish Time: {publish_timestamp:.4f} ({format_timestamp(publish_timestamp)})")

 frame = decode_frame(frame_data)
 decode_time = time.time() - message_receive_time # Calculate decode time

 if frame is not None:
 _, buffer = cv2.imencode('.jpg', frame)
 frame_data = buffer.tobytes()
 socketio.emit('video_frame', {'frame': frame_data, 'timestamp': publish_timestamp}, namespace='/')
 emit_time = time.time() # Time after emitting the frame

 # Log the time taken to emit the frame and its size
 rtt = emit_time - publish_timestamp # Calculate RTT from publish to emit
 print(f"Current Time: {emit_time:.4f} ({format_timestamp(emit_time)})")
 print(f"RTT: {rtt:.4f} seconds")
 print(f"Emit time: {emit_time - message_receive_time:.4f} seconds, Frame size: {len(frame_data)} bytes")
 channel.basic_ack(method_frame.delivery_tag)

@app.route('/')
def index():
 return render_template('index.html')

@socketio.on('connect')
def handle_connect():
 print('Client connected')

@socketio.on('disconnect')
def handle_disconnect():
 print('Client disconnected')

if __name__ == '__main__':
 consumer_thread = threading.Thread(target=rabbitmq_consumer)
 consumer_thread.daemon = True
 consumer_thread.start()
 socketio.run(app, host='0.0.0.0', port=5000)




How can I optimize the publishing and subscribing rates to handle a higher number of messages per second ?


Any help or suggestions would be greatly appreciated !


I attempted to use threading and multiprocessing to handle multiple frames concurrently and I tried to optimize the frame decoding function to make it faster but with no success.


-
dockerized python application takes a long time to trim a video with ffmpeg
15 avril 2024, par Ukpa UchechiThe project trims YouTube videos.


When I ran the ffmpeg command on the terminal, it didn't take too long to respond. The code below returns the trimmed video to the front end but it takes too long to respond. A 10 mins trim length takes about 5mins to respond. I am missing something, but I can't pinpoint the issue.


backend


main.py


import os

from flask import Flask, request, send_file
from flask_cors import CORS, cross_origin


app = Flask(__name__)
cors = CORS(app)


current_directory = os.getcwd()
folder_name = "youtube_videos"
save_path = os.path.join(current_directory, folder_name)
output_file_path = os.path.join(save_path, 'video.mp4')

os.makedirs(save_path, exist_ok=True)

def convert_time_seconds(time_str):
 hours, minutes, seconds = map(int, time_str.split(':'))
 total_seconds = (hours * 3600) + (minutes * 60) + seconds

 return total_seconds
def convert_seconds_time(total_seconds):
 new_hours = total_seconds // 3600
 total_seconds %= 3600
 new_minutes = total_seconds // 60
 new_seconds = total_seconds % 60

 new_time_str = f'{new_hours:02}:{new_minutes:02}:{new_seconds:02}'

 return new_time_str
def add_seconds_to_time(time_str, seconds_to_add):
 total_seconds = convert_time_seconds(time_str)

 total_seconds -= seconds_to_add
 new_time_str = convert_seconds_time(total_seconds)

 return new_time_str

def get_length(start_time, end_time):
 start_time_seconds = convert_time_seconds(start_time)
 end_time_seconds = convert_time_seconds(end_time)

 length = end_time_seconds - start_time_seconds

 length_str = convert_seconds_time(length)
 return length_str
 
def download_url(url):
 command = [
 "yt-dlp",
 "-g",
 url
 ]
 
 try:
 links = subprocess.run(command, capture_output=True, text=True, check=True)
 
 video, audio = links.stdout.strip().split("\n")
 
 return video, audio

 except subprocess.CalledProcessError as e:
 print(f"Command failed with return code {e.returncode}.")
 print(f"Error output: {e.stderr}")
 return None
 except ValueError:
 print("Error: Could not parse video and audio links.")
 return None
 


def download_trimmed_video(video_link, audio_link, start_time, end_time):
 new_start_time = add_seconds_to_time(start_time, 30)
 new_end_time = get_length(start_time, end_time)

 if os.path.exists(output_file_path):
 os.remove(output_file_path)


 command = [
 'ffmpeg',
 '-ss', new_start_time + '.00',
 '-i', video_link,
 '-ss', new_start_time + '.00',
 '-i', audio_link,
 '-map', '0:v',
 '-map', '1:a',
 '-ss', '30',
 '-t', new_end_time + '.00',
 '-c:v', 'libx264',
 '-c:a', 'aac',
 output_file_path
 ]
 try:
 result = subprocess.run(command, capture_output=True, text=True, check=True)

 if result.returncode == 0:
 return "Trimmed video downloaded successfully!"
 else:
 return "Error occurred while downloading trimmed video"
 except subprocess.CalledProcessError as e:
 print(f"Command failed with return code {e.returncode}.")
 print(f"Error output: {e.stderr}")


app = Flask(__name__)


@app.route('/trimvideo', methods =["POST"])
@cross_origin()
def trim_video():
 print("here")
 data = request.get_json()
 video_link, audio_link = download_url(data["url"])
 if video_link and audio_link:
 print("Downloading trimmed video...")
 download_trimmed_video(video_link, audio_link, data["start_time"], data["end_time"])
 response = send_file(output_file_path, as_attachment=True, download_name='video.mp4')
 
 response.status_code = 200

 return response
 else:
 return "Error downloading video", 400

 




if __name__ == '__main__':
 app.run(debug=True, port=5000, host='0.0.0.0')



dockerfile


FROM ubuntu:latest

# Update the package list and install wget and ffmpeg
RUN apt-get update \
 && apt-get install -y wget ffmpeg python3 python3-pip \
 && rm -rf /var/lib/apt/lists/*

# Download the latest version of yt-dlp and install it
RUN wget https://github.com/yt-dlp/yt-dlp/releases/latest/download/yt-dlp -O /usr/local/bin/yt-dlp \
 && chmod a+rx /usr/local/bin/yt-dlp

WORKDIR /app

COPY main.py /app/
COPY requirements.txt /app/


RUN pip install --no-cache-dir -r requirements.txt


# Set the default command
CMD ["python3", "main.py"]



requirements.txt


blinker==1.7.0
click==8.1.7
colorama==0.4.6
Flask==3.0.3
Flask-Cors==4.0.0
itsdangerous==2.1.2
Jinja2==3.1.3
MarkupSafe==2.1.5
Werkzeug==3.0.2



frontend


App.js



import React, { useState } from 'react';
import './App.css';
import axios from 'axios';
async function handleSubmit(event, url, start_time, end_time, setVideoUrl, setIsSubmitted){
 event.preventDefault();

 if( url && start_time && end_time){

 try {
 setIsSubmitted(true);
 const response = await axios.post('http://127.0.0.1:5000/trimvideo', {
 url: url,
 start_time: start_time,
 end_time: end_time
 },
 {
 responseType: 'blob',
 headers: {'Content-Type': 'application/json'}
 }
 )
 const blob = new Blob([response.data], { type: 'video/mp4' });
 const newurl = URL.createObjectURL(blob);


 setVideoUrl(newurl);
 } catch (error) {
 console.error('Error trimming video:', error);
 }

 } else {
 alert('Please fill all the fields');
 }
}


function App() {
 const [url, setUrl] = useState('');
 const [startTime, setStartTime] = useState('');
 const [endTime, setEndTime] = useState('');
 const [videoUrl, setVideoUrl] = useState('');
 const [isSubmitted, setIsSubmitted] = useState(false);
 return (
 <div classname="App">
 <div classname="app-header">TRIM AND DOWNLOAD YOUR YOUTUBE VIDEO HERE</div>
 <input classname="input-url" placeholder="'Enter" value="{url}" />setUrl(e.target.value)}/>
 <div classname="input-container">
 <input classname="start-time-url" placeholder="start time" value="{startTime}" />setStartTime(e.target.value)}/>
 <input classname="end-time-url" placeholder="end time" value="{endTime}" />setEndTime(e.target.value)}/>
 
 </div>
 {
 !isSubmitted && <button>> handleSubmit(event, url, startTime, endTime, setVideoUrl, setIsSubmitted)} className='trim-button'>Trim</button>
 }

 {
 ( isSubmitted && !videoUrl) && <div classname="dot-pulse"></div>
 }


 {
 videoUrl && <video controls="controls" autoplay="autoplay" width="500" height="360">
 <source src="{videoUrl}" type="'video/mp4'"></source>
 </video>
 }

 
 </div>
 );
}

export default App;