threading ( multiprocess )

  1. 직접 `` threading.Thread`` 클래스의 객체를 생성하는 방법
  2. `` threading.Thread``를 상속받은 다음 이 클래스의 객체를 생성하는 방법

(한 가지 방법이 더 있지만 어차피 안쓴다)

 

#1

import threading

def say(msg):
    for i in range(2):
        time.sleep(1)
        print(msg)

for msg in ['THREAD1', 'THREAD2', 'THREAD3']:
    t = threading.Thread(target=say, args=(msg, ))
    # recv() 같은 blocking 함수를 호출하는 thread는 flag를 False로 만든다고 해도
    # 애초에 recv()에서 넘어가질 않기 때문에 종료되지 않는다. 이런 경우 daemon으로 만들어준다.
    t.daemon = True
    t.start()

# thread에서 while flag로 도는 경우 flag를 False로 만들어 종료될 수 있도록 한다.(단, blocking함수를 사용하지 않는 경우.)
flag = False

args 마지막에 꼭 ,를 써줘야 한다.

 

#2

class UserThread(threading.Thread):
    def __init__(self, msg):
        threading.Thread.__init__(self)
        self.msg=msg

    def run(self):
        while True:
            time.sleep(1)
            print(self.msg)

t = UserThread(msg)
t.start()

반드시 super class인 Thread의 생성자에 self를 넘겨줘야 한다

start()를 호출하면 내부의 run()이 자동으로 호출되어 실행된다.

스레드를 외부에서 종료하기 위해서는, 강제 종료하는건 좋지 않기 때문에 스레드가 `` stop_flag``를 가지고 있게 하고 이를 반복적으로 체크하도록 구성해야 한다. 메인 스레드에서는 플래그 set하고 join하는 방식으로...

근데 이렇게 스레드가 멤버를 가지도록 하려면 #2처럼 객체로 구성해야 한다.

 

thread 객체는 한 번만 start()할 수 있다.

import threading
import time

class TestThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        print(1)


a = TestThread()
a.start()
time.sleep(1)
print(a.isAlive())
a.start()
1
False
Traceback (most recent call last):
  File "test.py", line 16, in <module>
    a.start()
  File "D:\Python\Anaconda3\lib\threading.py", line 842, in start
    raise RuntimeError("threads can only be started once")
RuntimeError: threads can only be started once
 
그래서 짧은 작업을 수행하고 종료되는 스레드 같은 경우 객체를 새로 만들어야 함.
 

threading.Timer로 일정 시간마다 특정 작업을 수행할 수 있다.

조건에 따라 스레드 실행 wait 관리

notify(), wait()를 호출해서 스레드 간 실행 흐름을 제어할 수 있다.
 

daemon

``python daemon=False``이면 메인 스레드가 이 스레드가 종료될 때 까지 대기한다.

더 이상 non-daemon threads가 존재하지 않고 daemon threads만 남았을 때 전체 프로그램을 종료하기 때문에

``python daemon=True``인 threads들은 더 이상 non-daemon threads가 없으면 바로 종료된다.

 

특별한 상황이 아니면 `` join()``이랑 같이 쓰면 안된다.

`` join()``을 호출한다는건 (메인) 스레드에서 워커 스레드의 종료를 기다리겠다는 것이다.

근데 daemon=True라는건 메인 스레드가 종료될 때 같이 종료되겠다는걸 의미한다.(주로 recv같은 blocking 함수.)

이게 좀 앞뒤가 안맞는다. 워커 스레드는 메인 스레드가 종료될 때 까지 일하다가 메인 스레드가 종료되면서 같이 꺼질건데, 메인 스레드가 워커 스레드의 종료를 기다리고 있다.

즉 서로가 종료되기를 기다리는 이런 상황에서, 워커 스레드에 blocking이 걸려 있는 상태라면 join이 끝나지를 않아 메인 스레드가 종료되지 않고, 이는 워커 스레드도 같이 종료되지 않는다는걸 의미한다.

그래서 스레드라고 무조건 종료 기다려야 하니까 join 불러주는게 아니라, 상황에 맞게 써야 한다.

 

직관적으로 daemon이면 메인 스레드가 종료되어도 백그라운드에서 실행되어야 하니까 ``python daemon=True``면 메인 스레드 종료되어도 계속 돌아가야 하는거 아닌가 싶지만

daemon thread와 daemon process는 역할이 다르다.

daemon process는 상기한 대로 백그라운드에서 계속 실행되며 특정 작업을 수행하는 역할을 하지만

daemon thread는 non-daemon thread(일반 스레드)의 작업을 돕는 역할을 한다.

그래서 non-daemon thread 더 이상 남아있지 않으면 데몬 스레드도 종료되어야 하는 것이 맞고, 실제로 특정 스레드가 종료되었는데 그를 보조하는 스레드가 계속 작업을 수행하는 것을 막기 위해 사용한다.

 

lock

thread를 사용하는 경우 반드시 lock을 사용한다! 그리고 lock을 쓰는 경우 반드시 finally 쓴다. (finally 안쓰면 Exception 시 release가 안된다. 그니까 웬만하면 with 쓰자.)
from threading/multiprocessing import Lock

mutex = Lock()
### 1
with mutex:
    # routines

### 2
try:
    mutex.acquire()
    # routines
finally:
    mutex.release()

파이썬에서 mutex는 ``python Lock()`` 객체다. ``python acquire(), release()`` 두가지 함수만 지원한다. 

critical section에 진입하기 전 `` acquire()``, 빠져나오면서 `` release()`` 해주면 끝이다.

 

``python with``문을 사용할 때 각 객체의 컨텍스트 관리자는 ``python with`` 블록에 들어가기 전에 알아서 `` acquire()``를 호출하고, 빠져나오면서 `` release()``를 호출하게 된다.

 

* `` Lock``을비롯한 `` threading`` 모듈 객체에는 모두 컨텍스트 관리자가 있다. 

 

파이썬에서도 멀티스레드 쓰면 critical section이 생긴다.

critical section은 스레드들이 동시 접근해서는 안되는, 공유 자원에 접근하는 코드 영역을 말한다. 즉 변수나 I/O 모듈을 의미하는게 아니라 코드의 일부분을 의미한다. (windows에서는 이러한 critical section의 상호 배제를 위해 제공되는 자료구조 그 자체를 의미하기도 한다.)

Race condition이 발생해 자원이 이상한 값을 가지지 않도록 critical section에서는 연속적인 실행이 보장되어야 한다.

 
파이썬의 멀티스레드 모델도 critical section이 존재할 수 있다.
아니, GIL때문에 한 순간에 하나의 스레드만 실행되니까 자원 접근도 한 순간에 하나의 스레드만 하게 되는 것 아닌가라고 착각할 수 있겠지만, 자원 접근 수정같은 오퍼레이션은 여러 instruction으로 구성되기 때문에 atomic operation이 아니면 그냥 실행하다가 컨텍스트 스위칭 되면서 동시성 문제가 발생할 수 있다. 단일 코어에서도 동시성 문제가 발생할 수 있는 맥락이랑 같다고 보면 된다.
 

GIL ( Global Interpreter Lock )

파이썬 코드는 파이썬 가상머신( 인터프리터의 메인 루프 ) 위에서 돌아간다.

문제는, 인터프리터에서 한번에 하나의 스레드만 수행되도록 설계되었다는 점이다.

그래서 어떤 한 순간에, 코어가 여러 개 있어도 단 하나의 스레드만 돌아가게 된다.

이는 인터프리터에서 파이썬 가상머신에 엑세스할 때 전역 인터프리터 락(GIL)을 사용하기 때문이다.

인터프리터는 다음과 같은 행위를 반복하게 되므로, Lock이 걸려 스레드가 여러개의 코어에서 돌아갈 수가 없다.

  1. GIL을 설정하고,
  2. 어떤 스레드 실행
  3. 스레드 슬립
  4. GIL 해제
그래서 연산을 병렬처리해야 하는 경우에는 손해가 크다.(이 경우 스레드는 그냥 라운드 로빈 방식으로 처리되므로 컨텍스트 스위칭 오버헤드만 생긴다.)
GIL 방식은 I/O에 사용하는 경우 작은 단위로 Lock을 하는 방식보다 더 빠를 수 있다는 장점이 있다. 또한 C와 바인딩 하는 경우도 더 빠르다고 한다.
하지만 IO 작업도, thread 수를 늘려나가다가 코어를 100% 쓰는 경우 부터는 thread 수를 늘려도 IO가 더 빨라지지 않는다. (오히려 context switching 때문에 느려질 수 있다.) 이 때 부터는 process를 분리하여 멀티코어 쓸 수 있도록 해야 한다. 아무리 IO 작업이어도 단일 코어로는 한계가 있다.

GIL 때문에 python process 1개가 점유 할 수 있는 최대 CPU load는, 1개 코어분 만큼이다. 이 시스템은 8코어로, 100 / 8 = 12.5 최대 약 12.5% 점유 가능하다.

 

IO bound 작업이라도 여기서 thread를 더 늘린다고 더 빨라지지 않는다. 더 빨리 수행하려면 multiprocess 사용해야 한다.

(1%정도 오차가 있지만) 각각의 python process가 최대 12.5% 까지 점유 할 수 있다.


아무튼, 이러한 단점때문에 파이썬은 `` threading`` module과 동일한 인터페이스를 가진 `` multiprocess`` module을 제공한다.
IPC에서 약간 손해를 보겠지만, 이를 지원하는 각종 메커니즘도 존재하며 thread와 똑같이 사용하면 된다.

 

ThreadPoolExecutor

  • 어차피 thread를 쓸거라면, ``py concurrent.futures.ThreadPoolExecutor``를 사용해 스레드 풀을 구성하는 편이 좋다.
  • thread를 한 번에 ``c 0x100``개 정도 만들면 `` thread.error: can't start new thread``가 발생한다. 이는 thread 수가 너무 많아서 발생하는 문제. ``c 0x80``까지는 괜찮은 듯. (환경에 따라 다르겠지만)
 
with ThreadPoolExecutor(max_workers=20) as e:
    futures = []
    for i, hash_digest in enumerate(hash_list):
        t = QueryThread(file_list[i], hash_digest)
        e.submit(t.run)   
        # ()없음에 유의. 함수 포인터 넘기면 t.run() 호출해준다. 인자는 뒤 파라미터로 추가적으로 넘길 수 있음.

        # 최신 버전은 future를 활용해서 아래처럼 쓴다. 직접 Thread 안쓰고.
        future = executor.submit(pow, 323, 1235)
        futures.append(future)
    
    for future in as_completed(futures):
        print(future.exception())

모든 future가 작업이 완료 될 때 까지 대기? => with 구문이 자동으로 해준다. 아래 링크 참조.

https://docs.python.org/3.7/library/concurrent.futures.html#concurrent.futures.Executor.shutdown

 

with Lock()을 쓸지? future로 반환 받을지?

왠지 Lock을 쓰면 pending이 있을 것 같지만,

실제 테스트 시(100만 submit, max_workers=100) 25s 정도 소모되는 것으로 봐서는 성능 차이는 미미함.
상황에 맞게 사용하면 된다.

 

ThreadPoolExecutor 쓸 때 memory load율 치솟는 문제 : 반드시 알아야 하는 부분

  • https://developpaper.com/memory-overflow-after-threadpoolexecutor-is-used-i/
  • 일단 `` print(executor._work_queue.qsize())`` 찍어보면 알 수 있는데, 지금 작업 가능한 thread가 있든 없든 일단 queue에 다 넣어버린다.
  • 즉, thread가 준비가 될 때 다음 loop를 도는게 아니다! 일단 non-blocking으로 모든 loop를 마치고 queue에 있는 작업이 끝날 때 까지 with 구문 끝에서 대기하는거다.
    • 그래서 실제로 thread가 함수를 몇 번 실행 했는지와, loop가 몇 번 실행되었는지를 측정하고 싶다면 둘을 별개의 카운터 변수로 두어야 한다.
    • queue에 작업을 모두 올려놓은 다음 부터는, 기다리면 queue에 있는 작업이 줄어들면서 다시 메모리 사용률이 떨어지는 모습을 볼 수 있다.
from concurrent.futures import ThreadPoolExecutor
from time import sleep

def main(i):
    sleep(0.5)
    print("thread-{}".format(i))

if __name__ == '__main__':

    with ThreadPoolExecutor(max_workers=3) as executor:
        for i in range(30):
            sleep(0.1)
            executor.submit(main, i)
            print("loop-{}".format(i))
        print("end of loop")
...
loop-26
thread-14
loop-27
loop-28
loop-29
end of loop
thread-15
thread-16
thread-17
...

 

worker thread가 많고 머신 스펙이 좋으면 worker가 가져가는 속도가 빨라서 메모리가 잘 늘어나지 않을 수도 있음.

즉, 메인 스레드에서는 계속 작업을 work_queue에 적재하고 / worker thread에서는 작업을 가져가서 해소하고

이런 과정이 동시에 일어나게 되는데, worker 수 조절해가면서 테스트해보면 아래 사항을 알 수 있다.

 

  • worker 수가 적거나 머신이 느리면, queue에 적재하는 속도가 훨씬 빨라서 점점 queue의 사이즈가 늘어나면서 => 메모리가 증가한다.
  • 반면 worker가 충분히 많고, 작업을 해소하는데 시간이 오래 걸리지 않는다면 queue에서 가져가는 속도가 더 빨라서 queue 사이즈가 그다지 늘어나지 않는다 => 메모리를 거의 안쓰는 수준.

 

실제 사례)

파일 크기 170MB csv 파일 읽어 한 라인 씩 submit 할 때, (약 200만 라인, 200만 submit)

max_workers=5 : queue에 쌓이는 속도가 훨씬 빨라 일단 다 쌓는 수준. mem=5G 까지 증가한다.

max_workers=200 : worker가 빠르게 가져가주어서 mem=300MB 수준으로 유지

 

물론 routine 속도에 따라 200 worker여도 queue에 쌓이는 속도가 더 빠를 수 있다. 절대적인 수치는 아니니 참고만.
200만 라인 처리 시간도 routine 속도에 따라 크게 달라지므로 의미가 없을 수도 있겠지만, 참고용으로 적어둔다.
1process, max_workers=200로 1.5시간 소모.

 

메모리가 모자르다면 work_queue 사이즈를 제한하는 방식으로 한 번에 모두 submit 하지 않게끔 하여 해결 할 수 있다.

https://stackoverflow.com/questions/48263704/threadpoolexecutor-how-to-limit-the-queue-maxsize/66984088#66984088

queue size를 넉넉하게 10만 정도로 주면 속도도 거의 느려지지 않는다. 꼭 사용하는 것을 추천.

class ThreadPoolExecutorWithQueueSizeLimit(ThreadPoolExecutor):
    def __init__(self, max_size=100000, *args, **kwargs):
        super(ThreadPoolExecutorWithQueueSizeLimit, self).__init__(*args, **kwargs)
        self._work_queue = queue.Queue(maxsize=max_size)

 

ProcessPoolExecutor와 multiprocessing.pool

* 주의. ``py if __name__``에 써야함. 전역 스코프에서 쓰면 에러난다. recursive하게 프로세스가 뜨면서 에러가 발생한다.

 

from concurrent.futures import ProcessPoolExecutor

def f(x):
    return x*x

result = []
def main():
    with ProcessPoolExecutor(max_workers=3) as e:
        for square in e.map(f, range(10)):
            result.append(square)
    return result

if __name__ == "__main__":
    print(main())
from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        a = p.map(f, range(10))
        print(a)
  • 둘 다 되는데, ProcessPoolExecutor가 좀 더 wrapping되어 편리한 부분이 있고, ThreadPoolExecutor와 동일한 인터페이스로 제공되기 때문에 이걸 쓰는게 좋아보인다.
  • 근데 process 병렬 실행 자체는, sh를 하나 두고 그걸로 python 커맨드를 병렬 실행해도 되고, 젠킨스로 여러개 트리거해서 실행해도 되는거라서 꼭 python에서 처리할 필요는 없다.
    • python에서 처리하는 경우 ProcessPool을 구성 하면서, 항상 설정한 만큼의 리소스를 풀로 쓸 수 있다는게 장점.
    • 반면 sh에서 &로 연결해서 4개 돌리게 되면, 이 4개의 작업이 모두 끝나기 전 까지는 다음 작업을 실행하지 않게 된다. 작업 3개가 끝나서 리소스가 남아도, 나머지 1개의 작업이 끝날 때 까지는 대기하게 되므로 ProcessPool에 비해 손해다.
    • 그리고 날짜 등 파라미터 처리나 반복이 sh에서는 좀 더 까다로울 수 있다는 점. 가능은 하다.

 

작업 종료?

`` kill -9 {실행된_프로세스_중_아무_pid}`` 하는 경우 ProcessPool 전체가 종료된다.

콘솔에서 실행하고 KeyboardInterrupt 넘기는 케이스와는 동작이 다르므로 유의.

 

executor.submit에 최상위 함수 넘기면 동작하는데 class method 넘기면 동작 안하는 현상

버그인 듯?

# 이건 동작안한다. 에러도 없음. 그냥 executor.submit 부분이 아무 효과가 없이 지나간다.

if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=3) as executor:
        for target_fpath in target_fpaths:
        	requester = Requester(target_fpath)
            executor.submit(requester.main)
# 이건 동작한다.
def request(target_fpath):
    requester = Requester(target_fpath)
    requester.main()

if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=3) as executor:
        for target_fpath in target_fpaths:
            executor.submit(request, target_fpath)

 

예제 코드

https://github.com/umbum/Python-snippet/commit/d8e50a2c2bbe58e39cc0f574309622e051e7c1d8