Spring Batch Listener
- 리스너는 배치 처리의 주요 순간들을 관찰하고, 각 시점에 필요한 동작을 정의할 수 있는 도구
- 배치 처리 중 발생하는 특정 이벤트를 감지하고 원하는 로직을 실행 할 수 있게 해줌
- Job 시작 전후, Step 실행 전후는 물론, 청크 단위 또는 아이템 단위 처리 시점까지 모든 과정에 개입할 수 있음
- 이를 통해 로깅, 모니터링, 에러 처리등 우리가 원하는 로직을 자유롭게 추가 가능
JobExecutionListener
- Job 실행의 시작과 종료 시점에 호출되는 리스너 인터페이스
public interface JobExecutionListener {
default void beforeJob(JobExecution jobExecution) { }
default void afterJob(JobExecution jobExecution) { }
}
- beforeJob()
- Job 실행 직전 호출
- 필요한 리소스 준비등의 초기화 작업을 수행할 수 있음
- afterJob()
- Job 실행 후에 호출
- Job 실행 결과를 이메일로 전송, 리소스를 정리하는 등의 후처리 작업을 수행할 수 있음
- Job의 성공/실패 여부와 관계없이 무조건 호출
StepExecutionListener
- Step의 실행 시작과 종료 시점에 호출되는 리스너 인터페이스
- Step의 시작 시간, 종료 시간, 처리된 데이터 수를 로그로 기록하는 등의 사용자 정의 작업 추가 가능
public interface StepExecutionListener extends StepListener {
default void beforeStep(StepExecution stepExecution) {}
@Nullable
default ExitStatus afterStep(StepExecution stepExecution) {
return null;
}
}
- beforeStep()
- Step의 시작 시점, 정확히는 StepScope가 활성화된 직후에 호출
- afterStep()
– Step의 종료 시점에 호출
- JobExecutionListener의 afterJob()과 마찬가지로 Step의 성공/실패 여부와 관계없이 무조건 호출
ChunkListener
- 청크 단위 처리가 시작되기 전, 완료된 후, 실행 도중 에러가 발생했을 때 호출되는 리스너 인터페이스
public interface ChunkListener extends StepListener {
default void beforeChunk(ChunkContext context) { }
default void afterChunk(ChunkContext context) { }
default void afterChunkError(ChunkContext context) { }
}
- afterChunk()
- afterChunkError()
Tasklet 지향 Step에서도 동작한다.
- 이름과 달리 ChunkListener는 청크 지향 Step뿐만 아니라 Tasklet 지향 Step에서도 동작
- Tasklet의 execute() 호출 전후, 그리고 execute() 실행 중 오류 발생 시에도 실행 됨
- Tasklet 지향 Step에서도 ChunkListener를 활용한 모니터링이 가능
- execute()에서
RepeatStatus.CONTINUABLE을 반환하면 execute() 실행이 반복되므로, 해당 범위로 동작
Item[Read|Process|Write]Listener
- 아이템 읽기, 처리, 쓰기 작업이 수행되는 시점에 호출되는 리스너 인터페이스
public interface ItemReadListener<T> extends StepListener {
default void beforeRead() { }
default void afterRead() { }
default void onReadError(Exception ex) { }
}
public interface ItemProcessListener<T, S> extends StepListener {
default void beforeProcess(T item) { }
default void afterProcess(T item, @Nullable S result) { }
default void onProcessError(T item, Exception e) { }
}
public interface ItemWriteListener<S> extends StepListener {
default void beforeWrite(Chunk<? extends S> items) { }
default void afterWrite(Chunk<? extends S> items) { }
default void onWriteError(Exception exception, Chunk<? extends S> items) { }
}
- 눈여겨봐야 할 포인트
ItemReadListener.afterRead()는 ItemReader.read() 호출 후에 호출되지만, ItemReader.read() 메서드가 더 이상 읽을 데이터가 없어 null을 반환할 떄는 호출되지 않음
ItemProcessListener.afterProcess는 ItemProcessor.process() 메서드가 null을 반환하더라도 호출된다.
- 참고로 ItemProcessor에서
null을 반환하는 것은 해당 데이터를 필터링하겠다는 의미
ItemWriteListener.afterWrite()는 트랜잭션이 커밋되기 전, 그리고 ChunkListener.afterChunk()가 호출되기 전에 호출된다
여기까지의 흐름도
전체 Job / Step 실행 흐름
+----------------+
| JobLauncher |
+----------------+
|
| beforeJob()
v
+-------------+
| Job |
+-------------+
|
| beforeStep()
v
+-------------+
| Step |
+-------------+
|
| beforeChunk()
v
==================================
|| Chunk #N ||
==================================
|
v
[ Item Processing ]
|
^
| afterChunk()
+-------------+
| Step |
+-------------+
|
| afterStep()
v
+-------------+
| Job |
+-------------+
Chunk 내부 상세 흐름 (Reader -> Processor -> Writer)
+--------------------------------------------------+
| Chunk #N |
| |
| beforeRead() |
| | |
| v |
| read() --- 반복 --- |
| | |
| afterRead() |
| |
| beforeProcess() |
| | |
| v |
| process() --- 반복 --- |
| | |
| afterProcess() |
| |
| beforeWrite() |
| | |
| v |
| write() (List 단위) |
| | |
| afterWrite() |
| |
+--------------------------------------------------+
리스너 활용
- 단계별 모니터링과 추적
- Job과 Step 실행 전후에 로그를 남길 수 있음
- Step이 언제 시작하고, 언제 끝났는지, 몇 개의 데이터를 처리했는지를 기록하고 추적 가능
- 실행 결과에 따른 후속 처리
- 예를 들어, Job의 종료 상태를 확인하고 종료 상태에 따른 후속 조치를 취할 수 있음
- 데이터 가공과 전달
- 예를 들어, Step 간에 데이터를 전달하거나, 다음 처리에 필요한 정보를 미리 준비할 수 있음
- 부가 기능 분리
- 예를 들어, 오류가 발생한 경우 메서드에서 관리자에게 알림 메일 보내는 등의 부가적인 일을 분리할 수 있음
왜 JobParameters가 아닌 ExecutionContext를 사용할까?
- 한 번 생성된 JobParameters는 변경할 수 없기 때문
- Spring Batch의 핵심 철학 중 하나는 배치 작업의 **재현 가능성(Repeatability)과 일관성(Consistency)**을 보장하는 것
- 이를 위해 JobParameters는 불변(immutable)하게 설계되었음
- 재현 가능성 : 동일한 JobParameters로 실행한 Job은 항상 돌일한 결과를 생성해야 함
- 실행 중간에 JobParameters가 변경되면 이를 보장할 수 없음
- 추적 가능성 : 배치 작업의 실행 기록(JobInstance, JobExecution)과 JobParamteres는 메타데이터 저장소에 저장
- JobParamteres가 변경 가능하다면 기록과 실제 작업의 불일치가 발생할 수 있음
- 따라서 Job 실행 중에 동적으로 생성되거나 변경되어야 하는 데이터는 ExecutionContext를 통해 관리하는 것이 좋음
- 주의할점은 JobParameters만으로 충분히 처리할 수 있는 경우에도 JobExecution으로 처리하는 경우가 있는데 이렇게는 하지 말자
- 예를 들어, 타겟 날짜를 JobExecution으로 관리해버리면, 재 처리가 불가능하다.
- 하드 코딩된 방식은 배치의 유연성을 떨어뜨리고, 필요한 순간에 원하는 데이터를 처리할 수 없게 만듬
ExecutionContextPromotionListener를 활용한 Step 간 데이터 공유
- ExecutionContextPromotionListener는 Step 수준 ExecutionContext의 데이터를 Job 수준 ExecutionContext로 등록시켜주는 StepExecutionListener의 구현체
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[]{"data"});
return listener;
}
- 불가피한 경우가 아니라면 Step 간 데이터 의존성은 최소화하는 것이 좋다
Listener와 @JobScope, @StepScope 통합
- 리스너와 Spring Batch Scope를 통합하면 리스너에서 잡 파라미터를 매우 쉽게 다룰 수 있음
- 실행 시점에 결정되는 값들을 리스너내에서도 활용할 수 있다
Listener 마지막 훈련 : 성능과 모범 사례
어떤 리스너를 선택해야 하는가?
- JobExecutionListener : 전체 Job의 시작과 종료를 통제
- StepExecutionListener : 각 Step의 단계 실행을 감시
- ChunkListener : 시스템을 청크단위로 처리할 때, 반복의 시작과 종료 시점을 통제
- Item[Read|Process|Write]Listener : 개별 아이템 식별 통제
예외 처리는 신중하게
- JobExecutionListener의 beforeJob()과 StepExecutionListener의 beforeStep()에서 예외가 발생하면 Job과 Ste이 실패한 것으로 판단 됨
단일 책임 원칙 준수
- 리스너는 감시와 통제만 담당
- 실제 비즈니스 로직은 분리하자
- 리스너가 너무 많은 로직을 담당하면 유지보수가 어려워짐