使用python循环进行多线程/多处理

时间:2023-02-05 18:17:25

I have an script that loops through a range of URLs to pull item location based on returned json data. However, the script takes an 60 minutes to run and 55 minutes of that (per cprofile) is spent waiting for json data to load.

我有一个脚本循环遍历一系列URL,以根据返回的json数据提取项目位置。但是,该脚本需要60分钟才能运行,55分钟(每个cprofile)用于等待加载json数据。

I would like to multithread to run multiple POST requests at a time to speed this up and have initially split up URL ranges into two halves to do this. Where I am getting stuck is how to implement multithreading or asyncio.

我想多线程一次运行多个POST请求以加快速度,并且最初将URL范围分成两半来执行此操作。我遇到的问题是如何实现多线程或asyncio。

Slimmed down code:

修剪下来的代码:

import asyncio
import aiohttp

# i am not recommend to use globals
results = dict()
url = "https://www.website.com/store/ajax/search"
query = "store={}&size=18&query=17360031"

# this is default url opener got from aiohttp documentation
async def open_url(store, loop=None):
    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.post(url, data={'searchQuery': query.format(store)}) as resp:
            return await resp.json(), store

async def processing(loop=None):
    # U need to use 'global' keyworld if U wan't to write to global variables
    global results
    # one of the simplest ways to parallelize requests, is to init Future, and when data will be ready save it to global
    tasks = [open_url(store, loop=event_loop) for store in range(0, 5)]
    for coro in asyncio.as_completed(tasks, loop=loop):
        try:
            data, store = await coro
            results[store] = data['searchResults']['results'][0]['location']['aisle']
        except (IndexError, KeyError):
            continue


if __name__ == '__main__':
    event_loop = asyncio.new_event_loop()
    event_loop.run_until_complete(processing(loop=event_loop))

# Print Results
for store, data in results.items():
    print(store, data)

json:

JSON:

    {u'count': 1,
     u'results': [{u'department': {u'name': u'Home', u'storeDeptId': -1},
           u'location': {u'aisle': [A], u'detailed': [A.536]},
           u'score': u'0.507073'}],
     u'totalCount': 1}

2 个解决方案

#1


0  

Even if you use multithreading or multiprocessing, each thread/process will still block until the JSON data is retrieved. This could speed up things a little but it's still not your best choice.

即使您使用多线程或多处理,每个线程/进程仍将阻塞,直到检索到JSON数据。这可以加速一些事情,但它仍然不是你的最佳选择。

Since you're using requests, try grequests which combines this one with gevent. This lets you define a series of HTTP requests that run asynchronously. As a result you'll get a huge speed boost. The usage is very simple: just create a list of requests (using grequests.get) and pass it grequests.map.

由于您正在使用请求,请尝试将此请求与gevent相结合的grequests。这使您可以定义一系列异步运行的HTTP请求。因此,您将获得巨大的速度提升。用法非常简单:只需创建一个请求列表(使用grequests.get)并将其传递给grequests.map。

Hope this helps!

希望这可以帮助!

#2


0  

If u wan't to parallelize requests( i hope u ask for this). This code snippet will help. There are request opener,and 2000 post requests sent via aiohttp and asyncio. python3.5 used

如果你不想并行化请求(我希望你这样做)。此代码段将有所帮助。有请求开启,以及通过aiohttp和asyncio发送的2000个帖子请求。用python3.5

import asyncio
import aiohttp

# i am not recommend to use globals
results = dict()
MAX_RETRIES = 5
MATCH_SLEEP_TIME = 3  # i am recommend U to move this variables to other file like constants.py or any else
url = "https://www.website.com/store/ajax/search"
query = "store={}&size=18&query=44159"

# this is default url opener got from aiohttp documentation
async def open_url(store, semaphore, loop=None):
    for _ in range(MAX_RETRIES):
        with await semarhore:
            try:
                async with aiohttp.ClientSession(loop=loop) as session:
                    async with session.post(url, data={'searchQuery': query.format(store)}) as resp:
                        return await resp.json(), store
            except ConnectionResetError:
                # u can handle more exceptions here, and sleep if they are raised
                await asyncio.sleep(MATCH_SLEEP_TIME, loop=loop)
                continue
    return None

async def processing(semaphore, loop=None):
    # U need to use 'global' keyworld if U wan't to write to global     variables
    global results
    # one of the simplest ways to parallelize requests, is to init     Future, and when data will be ready save it to global
    tasks = [open_url(store, semaphore, loop=event_loop) for store in range(0,     2000)]
    for coro in asyncio.as_completed(tasks, loop=loop):
        try:
            response = await coro
            if response is None:
                continue
            data, store = response
            results[store] = data['searchResults']['results'][0]['location']['aisle']
        except (IndexError, KeyError):
            continue


if __name__ == '__main__':
    event_loop = asyncio.new_event_loop()
    semaphore = asyncio.Semaphore(50, loop=event_loop)  # count of concurrent requests
    event_loop.run_until_complete(processing(semaphore, loop=event_loop))

#1


0  

Even if you use multithreading or multiprocessing, each thread/process will still block until the JSON data is retrieved. This could speed up things a little but it's still not your best choice.

即使您使用多线程或多处理,每个线程/进程仍将阻塞,直到检索到JSON数据。这可以加速一些事情,但它仍然不是你的最佳选择。

Since you're using requests, try grequests which combines this one with gevent. This lets you define a series of HTTP requests that run asynchronously. As a result you'll get a huge speed boost. The usage is very simple: just create a list of requests (using grequests.get) and pass it grequests.map.

由于您正在使用请求,请尝试将此请求与gevent相结合的grequests。这使您可以定义一系列异步运行的HTTP请求。因此,您将获得巨大的速度提升。用法非常简单:只需创建一个请求列表(使用grequests.get)并将其传递给grequests.map。

Hope this helps!

希望这可以帮助!

#2


0  

If u wan't to parallelize requests( i hope u ask for this). This code snippet will help. There are request opener,and 2000 post requests sent via aiohttp and asyncio. python3.5 used

如果你不想并行化请求(我希望你这样做)。此代码段将有所帮助。有请求开启,以及通过aiohttp和asyncio发送的2000个帖子请求。用python3.5

import asyncio
import aiohttp

# i am not recommend to use globals
results = dict()
MAX_RETRIES = 5
MATCH_SLEEP_TIME = 3  # i am recommend U to move this variables to other file like constants.py or any else
url = "https://www.website.com/store/ajax/search"
query = "store={}&size=18&query=44159"

# this is default url opener got from aiohttp documentation
async def open_url(store, semaphore, loop=None):
    for _ in range(MAX_RETRIES):
        with await semarhore:
            try:
                async with aiohttp.ClientSession(loop=loop) as session:
                    async with session.post(url, data={'searchQuery': query.format(store)}) as resp:
                        return await resp.json(), store
            except ConnectionResetError:
                # u can handle more exceptions here, and sleep if they are raised
                await asyncio.sleep(MATCH_SLEEP_TIME, loop=loop)
                continue
    return None

async def processing(semaphore, loop=None):
    # U need to use 'global' keyworld if U wan't to write to global     variables
    global results
    # one of the simplest ways to parallelize requests, is to init     Future, and when data will be ready save it to global
    tasks = [open_url(store, semaphore, loop=event_loop) for store in range(0,     2000)]
    for coro in asyncio.as_completed(tasks, loop=loop):
        try:
            response = await coro
            if response is None:
                continue
            data, store = response
            results[store] = data['searchResults']['results'][0]['location']['aisle']
        except (IndexError, KeyError):
            continue


if __name__ == '__main__':
    event_loop = asyncio.new_event_loop()
    semaphore = asyncio.Semaphore(50, loop=event_loop)  # count of concurrent requests
    event_loop.run_until_complete(processing(semaphore, loop=event_loop))