日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

python ffmpeg直播_python+ffmpeg视频并发直播压力测试

發布時間:2024/8/1 python 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python ffmpeg直播_python+ffmpeg视频并发直播压力测试 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

通過python與ffmpeg結合使用,可生成進行視頻點播、直播的壓力測試腳本。可支持不同類型的視頻流,比如rtmp或者hls形式。

通過如下方式執行腳本:python multiRealPlay.py [rtmp|http] [thread counts] [interval Time]

[rtmp | http]:視頻播放的不同形式

[thread counts]:并發線程數

[interval Time]:啟動每個線程的間隔時間

代碼:

#!/usr/bin/python

# -*- coding: utf-8 -*-

'''

Created on 2015年7月22日

@author: LiBiao

'''

import datetime,time

import threading

import subprocess

import os, base64

import sys

import Queue

queue = Queue.Queue()

#需要手動配置的參數

#啟動序號

SLEEP_TIME = 0

#直播地址

FULL_ADDR = {}

#需要手動配置的參數

RTMP_ADDR = 'rtmp://192.168.1.208:1935/live/'

HTTP_ADDR = 'http://192.168.1.208:80/live'

liveID = '100002750' #來自于萬視無憂中創建的直播

urlKey = 'a1e5c680f7bfc85851de8ab2e63b0a33' #來自于萬視無憂安全設置模塊

liveResCode = '71ac6c06d3' #直播源碼

#生成MD5值

def getMD5_Value(inputdata):

try:

import hashlib

hash = hashlib.md5(inputdata.encode('utf-8'))

except ImportError:

#for python << 2.5

import md5

hash = md5.new()

return hash.hexdigest()

#直播地址組裝

def build_live_addr():

t = time.strftime('%Y%m%d%H%M%S',time.localtime())[2:]

data = '%s#%s#%s' %(liveID, t, urlKey)

secret = getMD5_Value(data)

rtmp_addr = '%s%s?liveID=%s&time=%s&secret=%s' %(RTMP_ADDR, liveResCode, liveID, t, secret)

http_addr = '%s/%s/playlist.m3u8?liveID=%s&time=%s&secret=%s' %(HTTP_ADDR, liveResCode, liveID, t, secret)

FULL_ADDR['rtmp'] = rtmp_addr

FULL_ADDR['http'] = http_addr

return FULL_ADDR

#獲取本機ip地址,用來產生區別于其他機器的數據

def get_local_ip():

try:

ip = os.popen("ifconfig | grep 'inet addr' | awk '{print $2}'").read()

ip = ip[ip.find(':') + 1:ip.find('\n')]

except Exception,e:

print e

return ip

class Video_To_Live(threading.Thread):

def __init__(self,queue):

threading.Thread.__init__(self)

self.queue = queue

def run(self):

liveAddr = self.queue.get()

#print liveAddr

try:

print liveAddr

subprocess.call('./ffmpeg -i \"%s\" -c:v copy -c:a copy -bsf:a aac_adtstoasc -y -f flv -timeout 4000 /dev/null 2>/dev/null' %liveAddr,stdout=subprocess.PIPE,shell=True)

except Exception as e:

wiriteLog('ERROR',str(e))

self.queue.task_done()

if __name__ == "__main__":

time.sleep(SLEEP_TIME)

parser = argparse.ArgumentParser(description = "Live Play")

parser.add_argument('--liveType',action = "store",dest = "liveType",required = False)

parser.add_argument('--pnum',action = "store",dest = "pnum",type = int,required = False)

parser.add_argument('--itime',action = "store",dest = "itime",required = False)

given_args = parser.parse_args()

liveType = given_args.liveType

threadNum = given_args.pnum

intervalTime = given_args.itime

print "%d 個 %s 進程開始運行........" %(threadNum, Video_To_Live)

for i in xrange(threadNum):

videotolive = Video_To_Live(queue)

videotolive.setDaemon(True)

videotolive.start()

for i in xrange(threadNum):

if liveType in ["http","rtmp"]:

addr = build_live_addr()

liveaddr = addr[liveType]

queue.put(liveaddr)

time.sleep(intervalTime)

queue.join()

print "進程退出"

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持找一找教程網。

總結

以上是生活随笔為你收集整理的python ffmpeg直播_python+ffmpeg视频并发直播压力测试的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。