asyncio와 다중 처리를 결합하면 어떤 종류의 문제 (있는 경우)가 있습니까?
거의 모든 사람들이 Python에서 스레딩을 처음 볼 때 알고 있듯이 실제로 병렬 처리를 원하는 사람들에게 삶을 비참하게 만드는 GIL이 있습니다. 또는 적어도 기회를 제공합니다.
현재 Reactor 패턴과 같은 것을 구현하려고합니다. 효과적으로 나는 하나의 스레드에서 들어오는 소켓 연결을 듣고 싶습니다. 누군가가 연결을 시도 할 때 그 연결을 받아들이고 처리를 위해 다른 스레드와 같이 전달합니다.
나는 (아직) 내가 어떤 종류의 부하를 겪고 있는지 확실하지 않습니다. 현재 수신 메시지에 2MB 제한이 설정되어 있음을 알고 있습니다. 이론적으로 우리는 초당 수천을 얻을 수 있습니다 (실제로 우리가 그런 것을 본 적이 있는지는 모르겠습니다). 메시지를 처리하는 시간의 양이 아닌 정말 중요하지만 분명히 더 빨리 더 좋을 것이다.
나는 Reactor 패턴을 조사하고 있었고 multiprocessing
(적어도 테스트에서) 잘 작동 하는 라이브러리를 사용하여 작은 예제를 개발했습니다 . 그러나 지금 / 곧 우리는 나를 위해 이벤트 루프를 처리 할 asyncio 라이브러리를 사용할 수있게 될 것입니다.
asyncio
과 결합하여 나를 물릴 수있는 것이 multiprocessing
있습니까?
안전하게 결합 할 수 있어야 asyncio
하고 multiprocessing
사용하지 않아야하지만, 너무 많은 문제없이 multiprocessing
직접. asyncio
(및 기타 이벤트 루프 기반 비동기 프레임 워크) 의 주요 죄악은 이벤트 루프를 차단하고 있습니다. multiprocessing
직접 사용하려고 하면 자식 프로세스를 기다리기 위해 차단할 때마다 이벤트 루프가 차단됩니다. 분명히 이것은 나쁘다.
이를 방지하는 가장 간단한 방법 BaseEventLoop.run_in_executor
은 concurrent.futures.ProcessPoolExecutor
. ProcessPoolExecutor
는를 사용하여 구현 된 프로세스 풀이 multiprocessing.Process
지만 asyncio
이벤트 루프를 차단하지 않고 함수를 실행하기위한 기본 제공 지원이 있습니다. 다음은 간단한 예입니다.
import time
import asyncio
from concurrent.futures import ProcessPoolExecutor
def blocking_func(x):
time.sleep(x) # Pretend this is expensive calculations
return x * 5
@asyncio.coroutine
def main():
#pool = multiprocessing.Pool()
#out = pool.apply(blocking_func, args=(10,)) # This blocks the event loop.
executor = ProcessPoolExecutor()
out = yield from loop.run_in_executor(executor, blocking_func, 10) # This does not
print(out)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
대부분의 경우 이것은 기능만으로 충분합니다. 당신이에서 다른 구조를 필요로하는 자신을 발견 할 경우 multiprocessing
, 같은 Queue
, Event
, Manager
, 등이라고하는 타사 라이브러리가 aioprocessing
제공 : (내가 쓴 전체 공개) asyncio
모두의 호환 버전의 multiprocessing
데이터 구조를. 다음은이를 시연하는 예입니다.
import time
import asyncio
import aioprocessing
import multiprocessing
def func(queue, event, lock, items):
with lock:
event.set()
for item in items:
time.sleep(3)
queue.put(item+5)
queue.close()
@asyncio.coroutine
def example(queue, event, lock):
l = [1,2,3,4,5]
p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l))
p.start()
while True:
result = yield from queue.coro_get()
if result is None:
break
print("Got result {}".format(result))
yield from p.coro_join()
@asyncio.coroutine
def example2(queue, event, lock):
yield from event.coro_wait()
with (yield from lock):
yield from queue.coro_put(78)
yield from queue.coro_put(None) # Shut down the worker
if __name__ == "__main__":
loop = asyncio.get_event_loop()
queue = aioprocessing.AioQueue()
lock = aioprocessing.AioLock()
event = aioprocessing.AioEvent()
tasks = [
asyncio.async(example(queue, event, lock)),
asyncio.async(example2(queue, event, lock)),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
예, 당신을 물릴 수도 있고 물지 않을 수도있는 비트가 꽤 있습니다.
asyncio
하나의 스레드 또는 프로세스에서 실행될 것으로 예상 되는 것과 같은 것을 실행할 때 . 이것은 병렬 처리와 함께 작동하지 않습니다. IO 작업 (특히 소켓에있는 작업)을 단일 스레드 / 프로세스에 남겨두고 어떻게 든 작업을 분산해야합니다.- While your idea to hand off individual connections to a different handler process is nice, it is hard to implement. The first obstacle is that you need a way to pull the connection out of
asyncio
without closing it. The next obstacle is that you cannot simply send a file descriptor to a different process unless you use platform-specific (probably Linux) code from a C-extension. - Note that the
multiprocessing
module is known to create a number of threads for communication. Most of the time when you use communication structures (such asQueue
s), a thread is spawned. Unfortunately those threads are not completely invisible. For instance they can fail to tear down cleanly (when you intend to terminate your program), but depending on their number the resource usage may be noticeable on its own.
If you really intend to handle individual connections in individual processes, I suggest to examine different approaches. For instance you can put a socket into listen mode and then simultaneously accept connections from multiple worker processes in parallel. Once a worker is finished processing a request, it can go accept the next connection, so you still use less resources than forking a process for each connection. Spamassassin and Apache (mpm prefork) can use this worker model for instance. It might end up easier and more robust depending on your use case. Specifically you can make your workers die after serving a configured number of requests and be respawned by a master process thereby eliminating much of the negative effects of memory leaks.
See PEP 3156, in particular the section on Thread interaction:
http://www.python.org/dev/peps/pep-3156/#thread-interaction
This documents clearly the new asyncio methods you might use, including run_in_executor(). Note that the Executor is defined in concurrent.futures, I suggest you also have a look there.
ReferenceURL : https://stackoverflow.com/questions/21159103/what-kind-of-problems-if-any-would-there-be-combining-asyncio-with-multiproces
'programing' 카테고리의 다른 글
Maven 저장소에서 오래된 종속성을 정리하는 방법은 무엇입니까? (0) | 2021.01.15 |
---|---|
부트 스트랩 3-점보트론 배경 이미지 효과 (0) | 2021.01.15 |
NaN과 달리 부동 소수점 무한대가 동일한 이유는 무엇입니까? (0) | 2021.01.14 |
RecyclerView가 항목 배치를 완료 한시기를 아는 방법은 무엇입니까? (0) | 2021.01.14 |
내 Git bash에 "MINGW64"가 나타나는 이유는 무엇입니까? (0) | 2021.01.14 |