(Spring Batch) 병렬 처리 방법 모음
(Spring Batch) 병렬 처리 방법 모음
Spring Batch에서 지원하는 배치 병렬 처리 방식
- AsyncItemProcessor / AsyncItemWriter → 한 step 내에서 processor만 병렬 수행해야 할 때
- Multi-threaded Step → 한 step 내에서 reader, processor, writer를 chunk 단위로 병렬 수행해야 할 때
- Parallel Steps → 여러 step들을 병렬로 수행해야 할 때
- Externalizing Batch Process Execution → 외부 remote 서버에서 병렬 수행 필요할 때. (master-worker 모델)
- Remote Chunking of Step→ 스텝 내의 Processor, Writer가 무거운 작업이라 외부 remote 서버들에서 병렬로 돌리고 싶을 때
- Remote Partitioning (Partitioning a Step) → Reader Query 부터가 무거운 작업이라 Query를 포함한 Step 전체를 병렬로 돌리고 싶을 때. (외부 remote worker 서버에 작업 분할도 가능하고, 꼭 외부가 아니라 local thread로도 가능)
https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html
AsyncItemProcessor / AsyncItemWriter
- https://docs.spring.io/spring-batch/docs/4.2.x/reference/html/index-single.html#asynchronous-processors
- https://docs.spring.io/spring-batch/docs/current/api/org/springframework/batch/integration/async/AsyncItemProcessor.html
- reader, writer는 단일 thread로 동작하고 processor 부분만 taskExecutor가 붙어 병렬 처리하게 된다.
- processor 내에서 전체를 감싸 병렬처리 하는 것과 비슷한 효과인데, AsyncItemProcessor/AsyncItemWriter가 기존 processor/writer에 delegate 하는 방식이기 때문에 기존 로직 변경 없이 쉽게 사용 가능 하다.
- reader가 병렬이 아니라서 중단 지점 부터 재시작 가능하다.
- 보통 배치에서 오래 걸리는 부분은 processor이기 때문에, processor만 병렬로 처리해도 충분한 경우가 많다.
- 그리고 Multi-threaded Step과 달리
@JobScope빈에 대한 접근 해결이 가능하다. - 그래서 Multi-threaded Step 보다 선호하는 방법.
[!tip] VT 환경에서 최대 동시성 제어는 Semaphore 써야 한다.
Multi-threaded Step
- https://jojoldu.tistory.com/493
- https://stackoverflow.com/questions/36645648/spring-batch-corepoolsize-vs-throttle-limit
- Spring Batch Multi-threaded Step 사용 시 chunk 구성에 대한 오해
- reader query는 병렬로 실행되지 않는다. (synchronized)
- 하지만 query 결과로 조회된 item list를 reader들이 공유해서 당겨가 사용하기 때문에, 중단 지점 부터 재시작이 불가능하다.
- thread safe한 Reader, Writer를 사용해야 한다.
AbstractPagingItemReader::doRead에서 synchronized block을 사용하기 때문에 이를 상속한 PagingItemReader는 기본적으로 thread-safe다.- FlatFileItemReader는? sync wrapping이 필요하다.참조
- thread는 일단 CorePoolSize 만큼 뜨고 각자 Chunk를 만드려 시도하기 때문에 ThreadSafe 해야한다. 배타적으로 접근해야 paging이 틀어지는 것을 막을 수 있다.
- 동시 접근하는 변수(e.g., count)가 있는 경우 주의한다. (atomic 필요)
- 너무 많은 thread가 돌게 되면 db connection pool이 고갈 될 수도 있는 등 주의가 필요하다.
[!tip] VT 환경에서 최대 동시성 제어는 throttleLimit으로 가능하다.
StepConfig에서 DI 받은 JobParam(@JobScope)을 reader, processor, writer에서 접근하면 에러가 발생할 때
Multi-threaded Step을 구성할 때, Spring Batch는 스레드 풀에 작업을 분배하기 위해 사전 준비를 함- 이 준비 단계에서
@StepScope로 선언된ItemReaderBean의 실제 인스턴스(scopedTarget)를 생성하려고 시도 - 그런데 이
readerBean은@JobScope인JobParamBean을 필요로 함 - 문제는, 이 ‘사전 준비’가 일어나는 시점이 아직 ‘Job 스코프가 완전히 활성화되기 전’ 이라는 점
- 즉, 작업자들이 일할 준비를 하러 왔는데, 아직 현장 사무소(Job Scope) 문이 열리지 않아서 필요한 자재(
aegisMappingJobParam)를 꺼낼 수 없는 상황 - sol 1 )
@Value를 통해 직접 주입받으면 해결됨. - sol 2 )
AsyncItem*는setDelegate바깥에서 로컬 변수로 빼내서 클로저로 만들면 문제가 생기지 않으므로, Multi-threaded step 대신AsyncItem*사용하면 해결됨. - sol 3 ) 람다 바깥에 변수를 정의하면? « 안해봄
TaskExecutor의 종류
- https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/core/task/TaskExecutor.html
- All Known Implementing Classes 부분 참조
- https://www.baeldung.com/java-threadpooltaskexecutor-core-vs-max-poolsize
Parallel Steps
- [(step1,step2)와 step3]을 병렬 처리 하고 싶을 때.
FlowBuilder::split / SimpleJobBuilder::split을 이용- https://godekdls.github.io/Spring%20Batch/scalingandparallelprocessing/#72-parallel-steps
- https://godekdls.github.io/Spring%20Batch/configuringastep/#535-split-flows
Remote Chunking
- https://godekdls.github.io/Spring%20Batch/scalingandparallelprocessing/#73-remote-chunking
- https://godekdls.github.io/Spring%20Batch/springbatchintegration/#remote-chunking
Remote Partitioning
- https://godekdls.github.io/Spring%20Batch/scalingandparallelprocessing/#74-partitioning
- https://godekdls.github.io/Spring%20Batch/springbatchintegration/#remote-partitioning
[!warning] Partitioning은 ItemReader 인스턴스가 thread 별로 따로 생성된다.
reader query도 병렬로 실행하기 때문이다. (원래 목적이 remote server에서 돌리기 위함이므로)
그래서 단일 서버에서 multi thread로 partitioning 하는 경우, 파티션키가 명확하게 구분되어 있어야 한다.
ItemReader 인스턴스를 partition 마다 따로 가지기 때문에 static 변수에 주의해야 한다.
[!warning] Partitioning으로 reader query를 병렬 실행하면 DB 부하가 급증 할 수 있다.
reader query가 paging인데, 이게 느려서 이걸 병렬 실행한다면 느린 쿼리가 DB에 지속 유입되어 오히려 전체적인 수행 시간이 더 느려질 수도 있다.
reader가 느리다면, 올바른 해결책은 seek method로 변경하는 것이다.
Cursor-based RepositoryItemReader 구현과 case 정리
참고
- https://backtony.github.io/spring/2022-01-29-spring-batch-11/
- https://godekdls.github.io/Spring%20Batch/scalingandparallelprocessing/
- https://docs.spring.io/spring-batch/docs/4.2.x/reference/html/index-single.html#scalability
kafka를 중간 저장소로 사용하는 방식
- 하나의 table에서 데이터를 읽어서, type에 따라 서로 다른 3개의 배치 작업을 수행해야 하는 상황
- 데이터가 너무 많아서 병렬 작업으로 처리해야 함. (reader, processor, writer 모두)
- 이런 경우 Multi-threaded Step 방법으로 접근하게 되는데
- type에 index를 걸고, 조건에 넣어서 read 함에도 불구하고, 대상 테이블의 크기가 너무 크고, 병렬 처리로 인해 해당 테이블에 너무 많은 read가 발생해 성능이 안나오는 경우라면 ( 또는 connection pool 고갈 )
- 디스크 헤드가 이리저리 왔다갔다 하면서 읽어야 하니까… 한 테이블에 너무 많은 배치 read를 동시에 넣는 것은 응답 성능이 좋지 않을 수 있다.
- 대상 테이블에 각각의 reader thread가 동시에, 반복 접근해서 read 하는 것 보다, 하나의 thread가 read 대상 데이터를 처음부터 끝까지 한 번만 쭉 읽는 것이 효과적일 수 있다.
- 읽은 데이터를 어딘가 임시 저장소에 담아서 처리해야 하는데, 보통 임시 테이블을 사용하지만, kafka를 사용 할 수도 있다.
This post is licensed under CC BY 4.0 by the author.
