日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

接口测试——并行上传文件

發(fā)布時(shí)間:2023/12/18 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 接口测试——并行上传文件 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

任務(wù)目標(biāo):用requests構(gòu)造一個(gè)上傳文件的接口測試,文件從一個(gè)文件夾里面取,可以并發(fā)上傳多個(gè)文件

一、分解任務(wù):

①使用requests構(gòu)造一個(gè)上傳文件的接口測試。

②實(shí)現(xiàn)遍歷上傳測試文件,作為并發(fā)上傳的效率對(duì)比。

③實(shí)現(xiàn)多線程上傳測試文件。

④從文件夾獲取該文件夾中全部文件的文件名,并以列表的方式存儲(chǔ)。

⑤對(duì)大數(shù)量的文件,實(shí)現(xiàn)指定線程數(shù)量的多線程上傳。

⑥生成測試報(bào)告。

二、使用requests構(gòu)造一個(gè)上傳文件的接口測試。

①在cmd中輸入pip install requests,安裝測試所需的依賴

②調(diào)用requests中post方法,獲取接口返回?cái)?shù)據(jù)。

三、實(shí)現(xiàn)遍歷上傳測試文件,作為并發(fā)上傳的效率對(duì)比。

創(chuàng)建testList,對(duì)testList中元素進(jìn)行遍歷。

url = 'http://xxx'

params = {'fileName':'fileName','fileExt':'fileExt'}

testList1=['fileName1','fileName2','fileName3','fileName4','fileName5']??

#遍歷上傳:

print(ctime())

for i in range(len(testList1)):

??? files = {'file':open(testList1[i],'rb')}

??? r = requests.post(url,params,files=files)

??? print(i+1)

??? print("")

??? print(r.text)

print(ctime())

?四、實(shí)現(xiàn)多線程上傳測試文件。

①注意開頭要聲明 ?# -*- coding: UTF-8 -*- ? ,否則可能出現(xiàn)亂碼

②創(chuàng)建線程容器

③遍歷上傳上傳任務(wù)到線程容器

④啟動(dòng)進(jìn)程

#封裝send方法

def send(file):?

??? files = {'file':open(file,'rb')}???

??? r = requests.post(url,params,files=files)

??? print(r.text)

#多線程:

print(ctime())

threads = []

for file in testList2:

??? print(file+':')

??? t = threading.Thread(send(file))

??? threads.append(t)

?

if __name__ == '__main__':

??? #啟動(dòng)進(jìn)程

??? for i in range(len(testList)):

??????? threads[i].start()

??? for i in range(len(testList)):

??????? threads[i].join()?

print(ctime())

此時(shí),已經(jīng)能夠?qū)Ρ闅v和多線程方式的效率進(jìn)行對(duì)比了。但由于數(shù)量太少,對(duì)比結(jié)果并不明顯,從我的結(jié)果來看,大致速度提升了2S。?我們需要大量的測試文件來對(duì)比。

五、從文件夾獲取該文件夾中全部文件的文件名,并以列表的方式存儲(chǔ)

①獲取文件名,以列表存儲(chǔ)

#獲取文件名
def file_name(path,testList): ??
? ? for root, dirs, files in os.walk(path): ?
? ? ? ? for file in files: ?
? ? ? ? ? ? if os.path.splitext(file)[1] == '.xlsx': ?
? ? ? ? ? ? ? ? testList.append(os.path.join(root, file))

?②將大列表拆分成小列表

thread_num = [i for i in range(len(testList2))]
n = 5 ? ? ?#定義小列表長度
list_temp = [testList2[i:i+n] for i in range(0,len(testList2),n)]

?六、對(duì)大數(shù)量的文件,實(shí)現(xiàn)指定線程數(shù)量的多線程上傳

上面定義了小列表長度為5。這里使用了兩個(gè)for循環(huán),內(nèi)層將小列表中的5個(gè)元素分配任務(wù)并裝入線程容器,外層則對(duì)小列表進(jìn)行遍歷。(這里一定要注意啟動(dòng)進(jìn)程后,一定要?jiǎng)?chuàng)建新的線程容器,否則會(huì)報(bào)錯(cuò):RuntimeError: threads can only be started once)

threads = []
for temp in list_temp:
? ? #填充線程容器
? ? for file in temp:
? ? ? ? print(file+':')
? ? ? ? t = threading.Thread(target = send ,args = (file,))
? ? ? ? threads.append(t)

? ? if __name__ == '__main__':
? ? ? ? #啟動(dòng)進(jìn)程
? ? ? ? for i in range(len(temp)):
? ? ? ? ? ? threads[i].start()
? ? ? ? for i in range(len(temp)):
? ? ? ? ? ? threads[i].join()
? ? print('-------------------')
? ? print(ctime())

? ? #新線程容器
? ? threads = [] ????? ??? ? ? ? ? ? ? ? ? ? ? ? ?
print(ctime())

通過對(duì)500個(gè)測試文檔的轉(zhuǎn)換效率的測試,得出多線程的速度大致是單線程的n(n為線程數(shù))倍?,理論測試數(shù)據(jù)量越大,越接近n值。n值的最優(yōu)取值取決于計(jì)算機(jī)CPU單核的性能。

但針對(duì)于計(jì)算密集的CPU運(yùn)算,則不適合采用多線程。假設(shè)單線程需要120S完成密集的運(yùn)算工作,則多線程可能需要130S才能完成,原因是因?yàn)镃PU的線程機(jī)制,具體什么機(jī)制百度有很詳細(xì)的解釋。而需要等待網(wǎng)絡(luò)響應(yīng)的操作則特別適合多線程,因?yàn)橛?jì)算機(jī)的響應(yīng)速度,導(dǎo)致單線程會(huì)浪費(fèi)很多的CPU資源,這時(shí)候使用多線程則能非常合理的利用到浪費(fèi)的CPU資源,從而極大的提升運(yùn)行效率。

?七、整合。

# -*- coding: UTF-8 -*-
import requests

from time import ctime,sleep
import threading
import os


url = 'http://xxx'
params = {'fileName':'xxx','fileExt':'xxx'}

#文件路徑
fileHomes1 = r"E:\\test\\excel1"
fileHomes2 = r"E:\\test\\excel2"

#獲取文件名
def file_name(path,testList): ??
? ? for root, dirs, files in os.walk(path): ?
? ? ? ? for file in files: ?
? ? ? ? ? ? if os.path.splitext(file)[1] == '.xlsx': ?
? ? ? ? ? ? ? ? testList.append(os.path.join(root, file))

#上傳:
def send(file):
? ? ? ? print("")
? ? ? ? files = {'file':open(file,'rb')} ? ?
? ? ? ? r = requests.post(url,params,files=files)
? ? ? ? print(r.text)


#定義testList
testList1=[] ??
testList2=[]
#賦值
file_name(fileHomes1,testList1)
file_name(fileHomes2,testList2)
#拆分列表
thread_num = [i for i in range(len(testList2))]
n = 5 ? ? ?#定義小列表長度
list_temp = [testList2[i:i+n] for i in range(0,len(testList2),n)]

print("遍歷上傳:")
print("----------------------------") ? ?
print(ctime())
print("----------------------------") ? ?
for i in range(len(testList1)): ? ? ?
? ? print(i+1)
? ? print("")
? ? send(testList1[i])
? ? print("") ? ?
print("----------------------------") ? ?
print(ctime())
print("----------------------------") ? ?


print('\n\n\n')
#多線程上傳
print("----------------------------") ? ?
print("多線程上傳:")
print("----------------------------")
print(ctime())
print("----------------------------")
#線程容器
threads = []
for temp in list_temp:
? ? #填充線程容器
? ? for file in temp:
? ? ? ? print(file+':')
? ? ? ? t = threading.Thread(target = send ,args = (file,))
? ? ? ? threads.append(t)

? ? if __name__ == '__main__':
? ? ? ? #啟動(dòng)進(jìn)程
? ? ? ? for i in range(len(temp)):
? ? ? ? ? ? threads[i].start()
? ? ? ? for i in range(len(temp)):
? ? ? ? ? ? threads[i].join()
? ? print('-------------------')
? ? print(ctime())
? ? #新容器
? ? threads = [] ??
? ? print('-------------------')
? ??? ? ? ? ? ? ? ? ? ? ? ? ?
print(ctime())
print("----------------------------")?

八、測試報(bào)告的生成

?

總結(jié)

以上是生活随笔為你收集整理的接口测试——并行上传文件的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。