我对asyncio还是很陌生,并且在如何处理循环中的循环方面有些挣扎:
import asyncio
import concurrent.futures
import logging
import sys
import time
sub_dict = {
1: ['one', 'commodore', 'apple', 'linux', 'windows'],
2: ['two', 'commodore', 'apple', 'linux', 'windows'],
3: ['three', 'commodore', 'apple', 'linux', 'windows'],
4: ['four', 'commodore', 'apple', 'linux', 'windows'],
5: ['five', 'commodore', 'apple', 'linux', 'windows'],
6: ['six', 'commodore', 'apple', 'linux', 'windows'],
7: ['seven', 'commodore', 'apple', 'linux', 'windows'],
8: ['eight', 'commodore', 'apple', 'linux', 'windows']
}
def blocks(key, value):
for v in value:
log = logging.getLogger('blocks({} {})'.format(key, v))
log.info('running')
log.info('done')
time.sleep(5)
return key, v
async def run_blocking_tasks(executor, sub_dict2):
log = logging.getLogger('run_blocking_tasks')
log.info('starting')
log.info('creating executor tasks')
loop = asyncio.get_event_loop()
blocking_tasks = [
loop.run_in_executor(executor, blocks, key, value)
for key, value in sub_dict2.items()
]
log.info('waiting for executor tasks')
completed, pending = await asyncio.wait(blocking_tasks)
results = [t.result() for t in completed]
log.info('results: {!r}'.format(results))
log.info('exiting')
def new_func():
logging.basicConfig(
level=logging.INFO,
format='%(threadName)10s %(name)18s: %(message)s',
stream=sys.stderr,
)
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=8,
)
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(
run_blocking_tasks(executor, sub_dict)
)
event_loop.close()
new_func()
在这里您可以看到每个元素的所有值项都分配给同一线程。例如,元素“ 1”的所有值都在线程零上。
我足够了解这是因为我的for v in value
循环未正确插入到异步中。
我期望的输出是,如果我分配了五个工作程序,则元素'1'的每个值项都将位于其自己的线程上,编号为0-4,总共有五个线程。然后,将对元素2到8重复此过程。
我应该分配40个线程,8个字典元素*每个元素5个值项=每个字典项1个唯一线程。
希望如此...
我对asyncio还是很陌生,并且在如何处理循环中的循环方面有些挣扎:import asyncio importcurrent.futures import logging import sys import time sub_dict = {1:['...
关于SO的问题似乎总是在触发我的智商。答案是这样,如果有人感兴趣的话: