programing

asyncio와 다중 처리를 결합하면 어떤 종류의 문제 (있는 경우)가 있습니까?

goodcopy 2021. 1. 15. 19:15
반응형

asyncio와 다중 처리를 결합하면 어떤 종류의 문제 (있는 경우)가 있습니까?


거의 모든 사람들이 Python에서 스레딩을 처음 볼 때 알고 있듯이 실제로 병렬 처리를 원하는 사람들에게 삶을 비참하게 만드는 GIL이 있습니다. 또는 적어도 기회를 제공합니다.

현재 Reactor 패턴과 같은 것을 구현하려고합니다. 효과적으로 나는 하나의 스레드에서 들어오는 소켓 연결을 듣고 싶습니다. 누군가가 연결을 시도 할 때 그 연결을 받아들이고 처리를 위해 다른 스레드와 같이 전달합니다.

나는 (아직) 내가 어떤 종류의 부하를 겪고 있는지 확실하지 않습니다. 현재 수신 메시지에 2MB 제한이 설정되어 있음을 알고 있습니다. 이론적으로 우리는 초당 수천을 얻을 수 있습니다 (실제로 우리가 그런 것을 본 적이 있는지는 모르겠습니다). 메시지를 처리하는 시간의 양이 아닌 정말 중요하지만 분명히 더 빨리 더 좋을 것이다.

나는 Reactor 패턴을 조사하고 있었고 multiprocessing(적어도 테스트에서) 잘 작동 하는 라이브러리를 사용하여 작은 예제를 개발했습니다 . 그러나 지금 / 곧 우리는 나를 위해 이벤트 루프를 처리 할 asyncio 라이브러리를 사용할 수있게 될 것입니다.

asyncio결합하여 나를 물릴 수있는 것이 multiprocessing있습니까?


안전하게 결합 할 수 있어야 asyncio하고 multiprocessing사용하지 않아야하지만, 너무 많은 문제없이 multiprocessing직접. asyncio(및 기타 이벤트 루프 기반 비동기 프레임 워크) 의 주요 죄악은 이벤트 루프를 차단하고 있습니다. multiprocessing직접 사용하려고 하면 자식 프로세스를 기다리기 위해 차단할 때마다 이벤트 루프가 차단됩니다. 분명히 이것은 나쁘다.

이를 방지하는 가장 간단한 방법 BaseEventLoop.run_in_executorconcurrent.futures.ProcessPoolExecutor. ProcessPoolExecutor는를 사용하여 구현 된 프로세스 풀이 multiprocessing.Process지만 asyncio이벤트 루프를 차단하지 않고 함수를 실행하기위한 기본 제공 지원이 있습니다. 다음은 간단한 예입니다.

import time
import asyncio
from concurrent.futures import ProcessPoolExecutor

def blocking_func(x):
   time.sleep(x) # Pretend this is expensive calculations
   return x * 5

@asyncio.coroutine
def main():
    #pool = multiprocessing.Pool()
    #out = pool.apply(blocking_func, args=(10,)) # This blocks the event loop.
    executor = ProcessPoolExecutor()
    out = yield from loop.run_in_executor(executor, blocking_func, 10)  # This does not
    print(out)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

대부분의 경우 이것은 기능만으로 충분합니다. 당신이에서 다른 구조를 필요로하는 자신을 발견 할 경우 multiprocessing, 같은 Queue, Event, Manager, 등이라고하는 타사 라이브러리가 aioprocessing제공 : (내가 쓴 전체 공개) asyncio모두의 호환 버전의 multiprocessing데이터 구조를. 다음은이를 시연하는 예입니다.

import time
import asyncio
import aioprocessing
import multiprocessing

def func(queue, event, lock, items):
    with lock:
        event.set()
        for item in items:
            time.sleep(3)
            queue.put(item+5)
    queue.close()

@asyncio.coroutine
def example(queue, event, lock):
    l = [1,2,3,4,5]
    p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l)) 
    p.start()
    while True:
        result = yield from queue.coro_get()
        if result is None:
            break
        print("Got result {}".format(result))
    yield from p.coro_join()

@asyncio.coroutine
def example2(queue, event, lock):
    yield from event.coro_wait()
    with (yield from lock):
        yield from queue.coro_put(78)
        yield from queue.coro_put(None) # Shut down the worker

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    queue = aioprocessing.AioQueue()
    lock = aioprocessing.AioLock()
    event = aioprocessing.AioEvent()
    tasks = [ 
        asyncio.async(example(queue, event, lock)),
        asyncio.async(example2(queue, event, lock)),
    ]   
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

예, 당신을 물릴 수도 있고 물지 않을 수도있는 비트가 꽤 있습니다.

  • asyncio하나의 스레드 또는 프로세스에서 실행될 것으로 예상 되는 것과 같은 것을 실행할 때 . 이것은 병렬 처리와 함께 작동하지 않습니다. IO 작업 (특히 소켓에있는 작업)을 단일 스레드 / 프로세스에 남겨두고 어떻게 든 작업을 분산해야합니다.
  • While your idea to hand off individual connections to a different handler process is nice, it is hard to implement. The first obstacle is that you need a way to pull the connection out of asyncio without closing it. The next obstacle is that you cannot simply send a file descriptor to a different process unless you use platform-specific (probably Linux) code from a C-extension.
  • Note that the multiprocessing module is known to create a number of threads for communication. Most of the time when you use communication structures (such as Queues), a thread is spawned. Unfortunately those threads are not completely invisible. For instance they can fail to tear down cleanly (when you intend to terminate your program), but depending on their number the resource usage may be noticeable on its own.

If you really intend to handle individual connections in individual processes, I suggest to examine different approaches. For instance you can put a socket into listen mode and then simultaneously accept connections from multiple worker processes in parallel. Once a worker is finished processing a request, it can go accept the next connection, so you still use less resources than forking a process for each connection. Spamassassin and Apache (mpm prefork) can use this worker model for instance. It might end up easier and more robust depending on your use case. Specifically you can make your workers die after serving a configured number of requests and be respawned by a master process thereby eliminating much of the negative effects of memory leaks.


See PEP 3156, in particular the section on Thread interaction:

http://www.python.org/dev/peps/pep-3156/#thread-interaction

This documents clearly the new asyncio methods you might use, including run_in_executor(). Note that the Executor is defined in concurrent.futures, I suggest you also have a look there.

ReferenceURL : https://stackoverflow.com/questions/21159103/what-kind-of-problems-if-any-would-there-be-combining-asyncio-with-multiproces

반응형