用Python做大批量请求发送

时间:2023-03-09 15:56:10
用Python做大批量请求发送

原创. 禁转.

大批量请求发送需要考虑的几个因素:

1. 服务器承载能力(网络带宽/硬件配置);

2. 客户端IO情况, 客户端带宽, 硬件配置;

方案:

1. 方案都是相对的;

2. 因为这里我的情况是客户机只有一台,所以不能考虑使用分布式了, 服务器承载能力也非常有限(经过不断调试得知);

3. 这里没有使用Jmeter, 虽然jmeter也是可以做到的.

注: 如无特殊说明以下代码基于windows7 64位/Centos 6.5 64位, Python3.6+

Python里面支持发送大批量的方案有很多, 这里只介绍我所用过的几种:

1. 使用grequests:

grequests可以一次性发送超大批量的请求, 但是底层听说修改了socket通信, 可能不稳定或者不安全? 而且如果你需要校验对比每个请求的发送信息与返回信息, 比较不方便, 因为它是批量发送,然后批量收回, 示例代码 :

import grequests
import time
from collections import OrderedDict
import hashlib
import os
import xlrd
import json
import datetime class voiceSearchInterface:
@classmethod
def formatUrlAndHeader(self, des, singer, songName):
#生成url和header的逻辑
return url, h @classmethod
def exeRequests(self):
errorLog = open("errorLog.txt","w+")
startTime = datetime.datetime.now()
rightCount = 0
errCount = 0
descr = ["播放", "搜索", "搜", "听", "我要听", "我想听", "来一首", "来一个", "来一段", "来一曲", "来首", "来个", "来段", "来曲"]
orgPath = os.path.join(os.path.dirname(os.getcwd()), "test","SongsAndSingers","singersAndSongs3.txt")
f = open(orgPath,"rb")
i = 0
urlsAndHs = []
for line in f.readlines():
temp = line.decode().split("\t")
orgSinger = temp[0]
orgSong = temp[1].replace("\n","")
for k in descr:
urlAndH = self.formatUrlAndHeader(k, orgSinger,orgSong)
urlsAndHs.append(urlAndH)
f.close()
rs = (grequests.get(u[0], headers = u[1], stream = False) for u in urlsAndHs)
rsText = grequests.imap(rs, size=20)
for r in rsText:
executingLog = open("Log.txt","w+")
i+=1
try:
searchResult = json.loads(r.text)
searchItem = searchResult["data"]["searchitem"]
tt = searchItem.split("Split")
searchSinger = tt[1]
searchSong = tt[-1]
resultSinger = searchResult["data"]["sounds"][0]["singer"]
resultSong = searchResult["data"]["sounds"][0]["title"]
if(searchSinger==resultSinger and searchSong==resultSong): rightCount += 1
else: errCount += 1
print(searchSinger, "\t",resultSinger, "\t",searchSong,"\t", resultSong)
except Exception:
errCount += 1
errorLog.write((r.text+"\n").encode('latin-1').decode('unicode_escape'))
print(i)
executingLog.write(str(int(i/14)))
errorLog.close()
executingLog.close()
endTime = datetime.datetime.now()
print("耗时: %d秒, 正确数: %d, 异常数: %d, 总数: %d, 通过率: %.2f%%" % ((endTime-startTime).seconds, rightCount, errCount, i, (rightCount)/i*100)) voiceSearchInterface.exeRequests()

注意: 使用grequests可能有坑, 因为它修改了底层socket通信, 可能会造成系统有问题,我目前虽然还没遇到,但还是在这里友情提醒下.

2. 使用多进程+requests库:

Python里面的多进程库multiprocessing和requests库都是神器, 下面直接上代码:

#_*_coding=utf-8_*_
import multiprocessing
import time
from collections import OrderedDict
import hashlib
import linecache
import os
import requests
import json def formatUrlAndHeader(des, singer, songName):
#生成url和header的逻辑
return url, h #每个进程都去读各自的文件,然后以写文件的方式保存当前的执行记录,为了预防断电或者其他程序异常终止情况
def worker(fileName):
descr = ["播放", "搜索", "搜", "听", "我要听", "我想听", "来一首", "来一个", "来一段", "来一曲", "来首", "来个", "来段", "来曲"]
Logprefix = os.path.split(fileName)[1].replace(".txt", "")
resultLogPath = os.path.join(os.getcwd(), "log", Logprefix+".log")
logbreakPoint = os.path.join(os.getcwd(), "log", Logprefix+".txt")
with open(logbreakPoint, "r") as b:
startLine = int(b.read())
b.close()
with open(resultLogPath, "a+", encoding="utf-8") as logF:
with open(fileName, "r", encoding="utf-8") as f:
lines = f.readlines()
f.close()
LineNum = startLine
for j in range(len(lines)-startLine+1):
LineContent = linecache.getline(fileName, LineNum)
for i in descr:
line = LineContent.split("\t")
singer = line[0]
song = line[1].replace("\n","")
uAndH = formatUrlAndHeader(i, singer, song)
try:
r = requests.get(url=uAndH[0], headers = uAndH[1])
with open(logbreakPoint, "w") as w:
w.write(str(LineNum))
print("searching:%s, line: %d\n" % (fileName, LineNum))
result = json.loads(r.text)
resultSinger = result["data"]["sounds"][0]["singer"]
resultSong = result["data"]["sounds"][0]["title"]
if not (resultSinger==singer and resultSong==song):
logF.write("Error: search des: %s, singer:%s, song:%s;return: %s\n" %(i,singer,song, r.text.encode('latin-1').decode('unicode_escape')))
except Exception as e:
logF.write("Error: search des: %s, singer:%s, song:%s;return: %s\n" %(i,singer,song,str(e).encode('latin-1').decode('unicode_escape')))
LineNum += 1
logF.close() if __name__=='__main__':
orgPath = os.path.join(os.getcwd(), "data")
files = os.listdir(orgPath)
for i in files:
f =os.path.join(orgPath,i)
if os.path.isfile(f):
p = multiprocessing.Process(target=worker, args=(f,))
p.start()

程序会根据数据源文件数量, 生成相应的进程数. 每个进程各自读各自的数据源文件, 然后调用formatUrlAndHeader方法获取url和heade, 挨个发送请求并保存当前执行记录到指定文件. 这种方式的好处在于针对每个请求, 都能对比发送前的参数和收回的请求相应数据.

3. 使用异步asyncio, aiohttp

asyncio是python3.4+才进入的新东西, 是Python3.4+以上的标准库, 是推荐采用的方式, 而aiohttp需要单独安装, 代码如下:

#_*_coding=utf-8_*_
import aiohttp
import time
from collections import OrderedDict
import hashlib
import asyncio
import os
import linecache
import threading def formatUrlAndHeader(des, singer, songName):
#生成url和header的逻辑
return url, h async def fetch_async(uandh):
u, h = uandh[0],uandh[1]
with aiohttp.Timeout(301):
async with aiohttp.request('GET', url=u, headers=h) as r:
data = await r.text()
return data loop = asyncio.get_event_loop()
descr = ["播放", "搜索", "搜", "听", "我要听", "我想听", "来一首", "来一个", "来一段", "来一曲", "来首", "来个", "来段", "来曲"]
orgPath = os.path.join(os.path.dirname(os.getcwd()), "test","SongsAndSingers","singersAndSongs3.txt") def runRequests(startNum):
start = time.time()
urlsAndHs = []
for i in range(20):
line = linecache.getline(orgPath, startNum+i).split("\t")
orgSinger = line[0]
orgSong = line[1].replace("\n","")
for k in descr:
urlAndH = formatUrlAndHeader(k, orgSinger,orgSong)
urlsAndHs.append(urlAndH)
linecache.clearcache()
tasks = [fetch_async(uandh) for uandh in urlsAndHs]
done, pending = loop.run_until_complete(asyncio.wait(tasks))
for i in done:
print(i.result().encode('latin-1').decode('unicode_escape'))
end = time.time()
print(end-start) for i in range(1,50,20):
t = threading.Thread(target=runRequests, args=(i,))
t.start()
t.join()

一个源数据文件, 多线程. 每个线程根据传入的起始行号连续读取文件的20行, 然后批量发送20个请求, 下一个线程必须等待上一个线程结束才开始. 这种方式也是批量发, 批量收回,不能单独对比每个请求的请求前参数, 请求后相应.

以上3种方式, 任何一种都能满足我的测试要求. 实际过程中发现:

1. PHP接口对于单个请求, 参数pagesize对相应速度影响甚大, 具体原因未知;

2. 服务器对IO密集型的操作, 非常消耗CPU. 以上3种方式, 基本上都是每次只发20个请求左右, 而服务器的CPU(8核)已经满载了!