programing

간단한 Python 루프를 병렬화하려면 어떻게 해야 하나요?

procenter 2023. 2. 2. 21:54
반응형

간단한 Python 루프를 병렬화하려면 어떻게 해야 하나요?

이것은 아마 사소한 질문일 것입니다만, 파이썬에서 다음 루프를 병렬화하려면 어떻게 해야 합니까?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)

    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)

Python에서 단일 스레드를 시작하는 방법은 알지만 결과를 수집하는 방법은 모릅니다.

이 경우 가장 쉬운 프로세스라면 여러 프로세스도 상관없습니다.현재 Linux를 사용하고 있지만 Windows와 Mac에서도 코드가 실행되어야 합니다.

이 코드를 병렬화하는 가장 쉬운 방법은 무엇입니까?

CPython에서 여러 스레드를 사용해도 글로벌 인터프리터 잠금(GIL)으로 인해 순수 Python 코드의 성능이 향상되지 않습니다.대신 모듈을 사용할 것을 권장합니다.

pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

인터랙티브 인터프리터에서는 동작하지 않습니다.

GIL 주변의 일반적인 FUD를 피하기 위해: 이 예에서는 스레드를 사용해도 아무런 이점이 없습니다.여기서는 스레드가 아닌 프로세스를 사용해야 합니다. 왜냐하면 스레드는 많은 문제를 회피하기 때문입니다.

from joblib import Parallel, delayed
def process(i):
    return i * i
    
results = Parallel(n_jobs=2)(delayed(process)(i) for i in range(10))
print(results)  # prints [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

은, 제 인스톨 되어 만, 의((((((( (Ubuntu, libliblib joblib)를 통해서 할 수.pip install joblib를 참조해 주세요.

https://blog.dominodatalab.com/simple-parallelization/에서 가져온 정보


31일 : 2021년 3월 31일 편집: Onjoblib,multiprocessing,threading ★★★★★★★★★★★★★★★★★」asyncio

  • joblib에서는 ""를 사용합니다.import multiprocessing (GIL에 ) (GIL에 ) (CPU에 의해)
  • 할 수 있다joblib이( "사용"을 사용),import threading직접)는 스레드가 I/O에 상당한 시간을 소비하는 경우에만 유용합니다(예: 디스크에 읽기/쓰기, HTTP 요청 전송).작업의 I/O 실행을 .
  • .7 , Python 3.7의 threadingasyncio와 작업을 병행할 수 있지만 다음과 같은 조언이 적용됩니다.import threading에서는 1개의 스레드만 사용됩니다. 플러스 측면에서는asyncio에는 비동기 프로그래밍에 도움이 되는 기능이 많이 있습니다.
  • 여러 프로세스를 사용하면 오버헤드가 발생합니다.생각해 보세요.일반적으로 각 프로세스는 계산을 실행하는 데 필요한 모든 것을 초기화/로드해야 합니다.위 코드 조각이 당신의 월 타임을 향상시키는지 스스로 점검할 필요가 있습니다. 또 하나 , , 기기 here here here here here here here here here here here here here here here here here here here here here here here here here 。joblib하다
import time
from joblib import Parallel, delayed

def countdown(n):
    while n>0:
        n -= 1
    return n


t = time.time()
for _ in range(20):
    print(countdown(10**7), end=" ")
print(time.time() - t)  
# takes ~10.5 seconds on medium sized Macbook Pro


t = time.time()
results = Parallel(n_jobs=2)(delayed(countdown)(10**7) for _ in range(20))
print(results)
print(time.time() - t)
# takes ~6.3 seconds on medium sized Macbook Pro

단순한 for 루프를 병렬화하기 위해 joblib는 멀티프로세싱의 원시 사용에 많은 가치를 가져옵니다.짧은 구문뿐만 아니라 반복이 매우 빠를 때(오버헤드를 제거하기 위해) 투과적인 번들링이나 자 프로세스의 트레이스백 캡처 등 오류 보고 기능을 향상시킬 수 있습니다.

면책사항:나는 잡립의 원저자다.

이 코드를 병렬화하는 가장 쉬운 방법은 무엇입니까?

Pool Executor의 Pool 합니다.concurrent.futures이치노하려면 먼저 '아, 아, 아, 아, 아, 아, 아, 아, 아, 아, 아, 아, 아, 아, 아, 아, 아, 아, 아, 아, 아, 아, 아, 네.executor.map:

...
with ProcessPoolExecutor() as executor:
    for out1, out2, out3 in executor.map(calc_stuff, parameters):
        ...

또는 각 콜을 개별적으로 송신하는 것으로 분류됩니다.

...
with ThreadPoolExecutor() as executor:
    futures = []
    for parameter in parameters:
        futures.append(executor.submit(calc_stuff, parameter))

    for future in futures:
        out1, out2, out3 = future.result() # this will block
        ...

컨텍스트를 벗어나면 실행자에게 리소스를 해방하라는 신호입니다.

스레드 또는 프로세스를 사용하여 동일한 인터페이스를 사용할 수 있습니다.

작업 예

다음은 의 값을 나타내는 동작 예시 코드입니다.

이 파일을 future test 파일에 저장합니다.py:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from time import time
from http.client import HTTPSConnection

def processor_intensive(arg):
    def fib(n): # recursive, processor intensive calculation (avoid n > 36)
        return fib(n-1) + fib(n-2) if n > 1 else n
    start = time()
    result = fib(arg)
    return time() - start, result

def io_bound(arg):
    start = time()
    con = HTTPSConnection(arg)
    con.request('GET', '/')
    result = con.getresponse().getcode()
    return time() - start, result

def manager(PoolExecutor, calc_stuff):
    if calc_stuff is io_bound:
        inputs = ('python.org', 'stackoverflow.com', 'stackexchange.com',
                  'noaa.gov', 'parler.com', 'aaronhall.dev')
    else:
        inputs = range(25, 32)
    timings, results = list(), list()
    start = time()
    with PoolExecutor() as executor:
        for timing, result in executor.map(calc_stuff, inputs):
            # put results into correct output list:
            timings.append(timing), results.append(result)
    finish = time()
    print(f'{calc_stuff.__name__}, {PoolExecutor.__name__}')
    print(f'wall time to execute: {finish-start}')
    print(f'total of timings for each call: {sum(timings)}')
    print(f'time saved by parallelizing: {sum(timings) - (finish-start)}')
    print(dict(zip(inputs, results)), end = '\n\n')

def main():
    for computation in (processor_intensive, io_bound):
        for pool_executor in (ProcessPoolExecutor, ThreadPoolExecutor):
            manager(pool_executor, calc_stuff=computation)

if __name__ == '__main__':
    main()

여기 한 의 실행 있습니다.python -m futuretest:

processor_intensive, ProcessPoolExecutor
wall time to execute: 0.7326343059539795
total of timings for each call: 1.8033506870269775
time saved by parallelizing: 1.070716381072998
{25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229, 30: 832040, 31: 1346269}

processor_intensive, ThreadPoolExecutor
wall time to execute: 1.190223217010498
total of timings for each call: 3.3561410903930664
time saved by parallelizing: 2.1659178733825684
{25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229, 30: 832040, 31: 1346269}

io_bound, ProcessPoolExecutor
wall time to execute: 0.533886194229126
total of timings for each call: 1.2977914810180664
time saved by parallelizing: 0.7639052867889404
{'python.org': 301, 'stackoverflow.com': 200, 'stackexchange.com': 200, 'noaa.gov': 301, 'parler.com': 200, 'aaronhall.dev': 200}

io_bound, ThreadPoolExecutor
wall time to execute: 0.38941240310668945
total of timings for each call: 1.6049387454986572
time saved by parallelizing: 1.2155263423919678
{'python.org': 301, 'stackoverflow.com': 200, 'stackexchange.com': 200, 'noaa.gov': 301, 'parler.com': 200, 'aaronhall.dev': 200}

프로세서를 많이 사용하는 분석

Python에서가 높은 할 때는 Python을 .ProcessPoolExecutorThreadPoolExecutor.

Global Interpreter Lock(GIL; 글로벌 인터프리터 잠금)으로 인해 스레드는 여러 프로세서를 사용할 수 없으므로 각 계산의 시간과 벽면 시간(실시간 경과)이 길어질 것으로 예상됩니다.

IO 바인딩 분석

바인드 조작을 「」, 「IO」를 상정해 .ThreadPoolExecutor이 좋다ProcessPoolExecutor.

Python의 스레드는 실제 OS 스레드입니다.운영체제에 의해 sleeve 상태로 전환되고 정보가 도착하면 재기동할 수 있습니다.

마지막 생각

Windows는 포킹을 지원하지 않기 때문에 새로운 프로세스를 시작할 때마다 시간이 걸리기 때문에 Windows에서는 멀티프로세싱이 느려질 것으로 생각됩니다.

여러 프로세스 내에 여러 스레드를 중첩할 수 있지만 여러 프로세스를 스핀오프하기 위해 여러 스레드를 사용하지 않는 것이 좋습니다.

Python에서 처리량이 많은 문제에 직면했을 경우, 추가 프로세스로 확장할 수 있지만 스레드화에서는 확장할 수 없습니다.

이게 가장 쉬운 방법이에요!

asyncio 를 사용할 수 있습니다.(매뉴얼은 이쪽에서 확인하실 수 있습니다.고성능 네트워크 및 웹 서버, 데이터베이스 연결 라이브러리, 분산 작업 대기열 등을 제공하는 여러 Python 비동기 프레임워크의 기반으로 사용됩니다.또한 고급 API와 하위 API를 모두 갖추고 있어 모든 문제에 대응할 수 있습니다.

import asyncio

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

@background
def your_function(argument):
    #code

이제 이 기능은 메인 프로그램이 대기 상태가 되지 않고 호출될 때마다 병렬로 실행됩니다.루프의 병렬화에도 사용할 수 있습니다.for 루프를 호출하면 루프는 순차적이지만 인터프리터가 도착하자마자 모든 반복이 메인 프로그램과 병렬로 실행됩니다.

1. 대기하지 않고 주 나사산에 병렬로 점화 루프

여기에 이미지 설명 입력

@background
def your_function(argument):
    time.sleep(5)
    print('function finished for '+str(argument))


for i in range(10):
    your_function(i)


print('loop finished')

이것에 의해, 다음의 출력이 생성됩니다.

loop finished
function finished for 4
function finished for 8
function finished for 0
function finished for 3
function finished for 6
function finished for 2
function finished for 5
function finished for 7
function finished for 9
function finished for 1

갱신일 : 2022년 5월

이것이 원래의 질문에 대한 답변이지만 업베이트된 코멘트의 요구에 따라 루프가 완료될 때까지 기다릴 수 있는 방법이 있습니다.그래서 여기에 그것들을 추가합니다.의 열쇠는 다음과 같습니다.asyncio.gather()&run_until_complete() 이치노

import asyncio
import time

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

@background
def your_function(argument, other_argument): # Added another argument
    time.sleep(5)
    print(f"function finished for {argument=} and {other_argument=}")

def code_to_run_before():
    print('This runs Before Loop!')

def code_to_run_after():
    print('This runs After Loop!')

2. 병렬로 실행하되 완료를 기다립니다.

여기에 이미지 설명 입력

code_to_run_before()                                                         # Anything you want to run before, run here!

loop = asyncio.get_event_loop()                                              # Have a new event loop

looper = asyncio.gather(*[your_function(i, 1) for i in range(1, 5)])         # Run the loop
                               
results = loop.run_until_complete(looper)                                    # Wait until finish

code_to_run_after()                                                          # Anything you want to run after, run here!

이것에 의해, 다음의 출력이 생성됩니다.

This runs Before Loop!
function finished for argument=2 and other_argument=1
function finished for argument=3 and other_argument=1
function finished for argument=1 and other_argument=1
function finished for argument=4 and other_argument=1
This runs After Loop!

3. 여러 루프를 병렬로 실행하고 완료를 기다립니다.

여기에 이미지 설명 입력

code_to_run_before()                                                         # Anything you want to run before, run here!   

loop = asyncio.get_event_loop()                                              # Have a new event loop

group1 = asyncio.gather(*[your_function(i, 1) for i in range(1, 2)])         # Run all the loops you want
group2 = asyncio.gather(*[your_function(i, 2) for i in range(3, 5)])         # Run all the loops you want
group3 = asyncio.gather(*[your_function(i, 3) for i in range(6, 9)])         # Run all the loops you want

all_groups = asyncio.gather(group1, group2, group3)                          # Gather them all                                    
results = loop.run_until_complete(all_groups)                                # Wait until finish

code_to_run_after()                                                          # Anything you want to run after, run here!

이것에 의해, 다음의 출력이 생성됩니다.

This runs Before Loop!
function finished for argument=3 and other_argument=2
function finished for argument=1 and other_argument=1
function finished for argument=6 and other_argument=3
function finished for argument=4 and other_argument=2
function finished for argument=7 and other_argument=3
function finished for argument=8 and other_argument=3
This runs After Loop!

(4) 순차적으로 동작하는 루프, 그러나 병렬로 동작하는 각 루프의 반복

여기에 이미지 설명 입력

code_to_run_before()                                                               # Anything you want to run before, run here!

for loop_number in range(3):

    loop = asyncio.get_event_loop()                                                # Have a new event loop

    looper = asyncio.gather(*[your_function(i, loop_number) for i in range(1, 5)]) # Run the loop
                             
    results = loop.run_until_complete(looper)                                      # Wait until finish

    print(f"finished for {loop_number=}")       

code_to_run_after()                                                                # Anything you want to run after, run here!

이것에 의해, 다음의 출력이 생성됩니다.

This runs Before Loop!
function finished for argument=3 and other_argument=0
function finished for argument=4 and other_argument=0
function finished for argument=1 and other_argument=0
function finished for argument=2 and other_argument=0
finished for loop_number=0
function finished for argument=4 and other_argument=1
function finished for argument=3 and other_argument=1
function finished for argument=2 and other_argument=1
function finished for argument=1 and other_argument=1
finished for loop_number=1
function finished for argument=1 and other_argument=2
function finished for argument=4 and other_argument=2
function finished for argument=3 and other_argument=2
function finished for argument=2 and other_argument=2
finished for loop_number=2
This runs After Loop!

업데이트: 2022년 6월

현재 형태로는 일부 버전의 주피터 노트북에서는 실행되지 않을 수 있습니다.이벤트 루프를 이용한 주피터 노트북이 원인입니다. , 「」를 참조해 주세요.nest_asyncio(이름은 이벤트루프를 내포하고 있습니다)가 그 방법입니다. 셀 맨 됩니다

import nest_asyncio
nest_asyncio.apply()

또한 위에서 설명한 모든 기능은 노트북 환경에서도 이용할 수 있어야 합니다.

Ray를 사용하면 다음과 같은 많은 이점이 있습니다.

  • (같은 코드를 사용하여) 여러 코어와 더불어 여러 시스템에 걸쳐 병렬화할 수 있습니다.
  • 공유 메모리(및 제로 카피시리얼라이제이션)를 통해 수치 데이터를 효율적으로 처리합니다.
  • 분산 스케줄링으로 높은 작업 처리량.
  • 폴트 톨러런스

사용자의 경우 Ray를 시작하고 원격 기능을 정의할 수 있습니다.

import ray

ray.init()

@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
    # Do something.
    return 1, 2, 3

그리고 동시에 호출합니다.

output1, output2, output3 = [], [], []

# Launch the tasks.
for j in range(10):
    id1, id2, id3 = calc_stuff.remote(parameter=j)
    output1.append(id1)
    output2.append(id2)
    output3.append(id3)

# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)

클러스터에서 동일한 예를 실행할 경우 변경되는 유일한 행은 ray.init()에 대한 호출입니다.관련 문서는 여기에서 찾을 수 있습니다.

저는 Ray의 개발을 돕고 있습니다.

joblib매우 도움이 됩니다.이치노

from joblib import Parallel, delayed
def yourfunction(k):   
    s=3.14*k*k
    print "Area of a circle with a radius ", k, " is:", s

element_run = Parallel(n_jobs=-1)(delayed(yourfunction)(k) for k in range(1,10))

n_param=-1: 사용 가능한 모든 코어 사용

Dask 선물들; 나는 아직 아무도 그것에 대해 언급하지 않은 것이 놀랍다.

from dask.distributed import Client

client = Client(n_workers=8) # In this example I have 8 cores and processes (can also use threads if desired)

def my_function(i):
    output = <code to execute in the for loop here>
    return output

futures = []

for i in <whatever you want to loop across here>:
    future = client.submit(my_function, i)
    futures.append(future)

results = client.gather(futures)
client.close()

스레드와 하나의 뮤텍스를 사용하여 하나의 글로벌 목록을 보호하면 어떨까요?

import os
import re
import time
import sys
import thread

from threading import Thread

class thread_it(Thread):
    def __init__ (self,param):
        Thread.__init__(self)
        self.param = param
    def run(self):
        mutex.acquire()
        output.append(calc_stuff(self.param))
        mutex.release()   


threads = []
output = []
mutex = thread.allocate_lock()

for j in range(0, 10):
    current = thread_it(j * offset)
    threads.append(current)
    current.start()

for t in threads:
    t.join()

#here you have output list filled with data

명심해라, 너는 가장 느린 실처럼 빨라질 것이다.

감사합니다 @iuryxavier

from multiprocessing import Pool
from multiprocessing import cpu_count


def add_1(x):
    return x + 1

if __name__ == "__main__":
    pool = Pool(cpu_count())
    results = pool.map(add_1, range(10**12))
    pool.close()  # 'TERM'
    pool.join()   # 'KILL'

비동기 함수가 있다고 칩시다.

async def work_async(self, student_name: str, code: str, loop):
"""
Some async function
"""
    # Do some async procesing    

대규모 어레이에서 실행해야 합니다.일부 속성은 프로그램에 전달되고 일부는 배열의 사전 요소 속성에서 사용됩니다.

async def process_students(self, student_name: str, loop):
    market = sys.argv[2]
    subjects = [...] #Some large array
    batchsize = 5
    for i in range(0, len(subjects), batchsize):
        batch = subjects[i:i+batchsize]
        await asyncio.gather(*(self.work_async(student_name,
                                           sub['Code'],
                                           loop)
                       for sub in batch))

tqdm 라이브러리에 의한 동시 래퍼는 장기 실행 코드를 병렬화하는 좋은 방법입니다.tqdm은 스마트 진행률 미터를 통해 현재 진행 상황과 남은 시간에 대한 피드백을 제공합니다.이것은 긴 계산에 매우 유용하다고 생각합니다.

는 ''로 쓸 수 .thread_map, 「」에의 한 콜을 개입시켜 캐스트로서 할 수 있습니다process_map:

from tqdm.contrib.concurrent import thread_map, process_map


def calc_stuff(num, multiplier):
    import time

    time.sleep(1)

    return num, num * multiplier


if __name__ == "__main__":

    # let's parallelize this for loop:
    # results = [calc_stuff(i, 2) for i in range(64)]

    loop_idx = range(64)
    multiplier = [2] * len(loop_idx)

    # either with threading:
    results_threading = thread_map(calc_stuff, loop_idx, multiplier)

    # or with multi-processing:
    results_processes = process_map(calc_stuff, loop_idx, multiplier)

이것은 Python에서 다중 처리 및 병렬/분산 컴퓨팅을 구현할 때 유용할 수 있습니다.

techila 패키지 사용에 대한 YouTube 튜토리얼

Techila는 분산 컴퓨팅 미들웨어로, Techila 패키지를 사용하여 Python과 직접 통합됩니다.패키지의 피치 함수는 루프 구조의 병렬화에 도움이 됩니다.(다음 코드 스니펫은 Techila 커뮤니티 포럼에서 참조).

techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
    files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
    jobs = jobcount # Number of Jobs in the Project
    )

이것 좀 보세요.

http://docs.python.org/library/queue.html

이게 올바른 방법이 아닐 수도 있지만, 저는 다음과 같은 일을 하고 싶습니다.

실제 코드

from multiprocessing import Process, JoinableQueue as Queue 

class CustomWorker(Process):
    def __init__(self,workQueue, out1,out2,out3):
        Process.__init__(self)
        self.input=workQueue
        self.out1=out1
        self.out2=out2
        self.out3=out3
    def run(self):
            while True:
                try:
                    value = self.input.get()
                    #value modifier
                    temp1,temp2,temp3 = self.calc_stuff(value)
                    self.out1.put(temp1)
                    self.out2.put(temp2)
                    self.out3.put(temp3)
                    self.input.task_done()
                except Queue.Empty:
                    return
                   #Catch things better here
    def calc_stuff(self,param):
        out1 = param * 2
        out2 = param * 4
        out3 = param * 8
        return out1,out2,out3
def Main():
    inputQueue = Queue()
    for i in range(10):
        inputQueue.put(i)
    out1 = Queue()
    out2 = Queue()
    out3 = Queue()
    processes = []
    for x in range(2):
          p = CustomWorker(inputQueue,out1,out2,out3)
          p.daemon = True
          p.start()
          processes.append(p)
    inputQueue.join()
    while(not out1.empty()):
        print out1.get()
        print out2.get()
        print out3.get()
if __name__ == '__main__':
    Main()

도움이 됐으면 좋겠다.

병렬 처리의 매우 간단한 예는 다음과 같습니다.

from multiprocessing import Process

output1 = list()
output2 = list()
output3 = list()

def yourfunction():
    for j in range(0, 10):
        # calc individual parameter value
        parameter = j * offset
        # call the calculation
        out1, out2, out3 = calc_stuff(parameter=parameter)

        # put results into correct output list
        output1.append(out1)
        output2.append(out2)
        output3.append(out3)

if __name__ == '__main__':
    p = Process(target=pa.yourfunction, args=('bob',))
    p.start()
    p.join()

언급URL : https://stackoverflow.com/questions/9786102/how-do-i-parallelize-a-simple-python-loop

반응형