지난 장에서 우리는 ItemStream 이라는 인터페이스가 반복적으로 등장하는 것을 확인했다.
실제로 ItemStream은 Spring Batch의 핵심 인터페이스로 Spring Batch의 대부분의 ItemReader와 일부 ItemWriter 구현체에서 이 ItemStream이라는 인터페이스를 공통적으로 구현하고 있다.
public interface ItemStream {
default void open(ExecutionContext executionContext) throws ItemStreamException {}
default void update (ExecutionContext executionContext) throws ItemStreamExeception {}
default void close() throws ItemStreamExeception {}
}
이제 이 ItemStream의 역할을 자세히 살펴보자.
Spring Batch에서 ItemStream은 다음의 두 가지 역할을 담당한다.
우리는 지금까지 데이터를 읽고 쓰는 것에 집중했찌, 읽고 쓸 파일을 열고 생성하는 것에는 관심을 두지 않았다.
커서 기반 ItemReader를 다루면서 ItemReader의 초기화 시점에 커서를 생성한다는 말은 했지만, 정확히 그 역할이 어디서 수행되는지는 살펴보지 않았고 해당 커서를 닫는 것에는 관심을 두지 않았다.
배치 작업은 다양한 시스템 자원을 다루게 된다.
이런 자원들은 사용 전 적절한 준비가 필요하고, 사용 후에는 반드시 정리되어야 한다.
Spring Batch에서는 이런 자원을 초기화하고 해제하는 작업이 바로 ItemStream 인터페이스의 open() / close() 메서드의 역할이다.
ItemReader와 ItemWriter 구현체들이 실제로 ItemStream의 open() / close() 메서드를 어떻게 구현했는지 살펴보자.
if (!resource.exists()) {
if (strict) {
throw new IllegalStateException("Input resource must exist (reader is in 'strict' mode): " + resource);
}
logger.warn("Input resource does not exist " + resource.getDescription());
return;
}
...
reader = bufferedReaderFactory.create(resource, encoding);
@Override
protected void doClose() throws Exception {
...
reader.close();
...
}
파일 기반 ItemReader를 봤으니 이제 커서 기반 ItemReader의 자원 관리는 어떻게 이뤄지는지 살펴보자.
@Override
protected void doOpen() throws Exception {
initializeConnection(); // DB 연결 초기화 this.con = dataSource.getConnection();
openCursor(con); // 커서를 열어 데이터를 읽을 준비
}
protected void doClose() throws Exception {
...
JdbcUtils.closeResultSet(this.rs);
rs = null;
cleanupOnClose(con);
...
JdbcUtils.closeConnection(this.con);
}
이처럼 자원의 초기화와 해제는 ItemStream의 open(), close() 메서드를 통해 처리된다.
따라서 커스텀 ItemReader 또는 ItemWriter를 구현할 떄 자원 초기화 및 해제가 필요하다면 반드시 ItemStream을 구현해야 한다.
이 메서드들은 Spring Batch 스텝이 호출하며, 스텝은 시작 직후에 ItemStream.open()을 호출하고, 실행을 완료하면 ItemStream.close()를 호출한다.
그렇다면 ItemStream의 또 다른 메서드인 update()의 역할은 무엇이며, 언제 호출되는 것일까?
다음에 다룰 ItemStream의 또 다른 역할을 알면 그 답을 얻을 수 있을 것이다.
ItemStream은 Spring Batch 스텝의 실행 정보를 관리(저장 및 복구)하는 역할도 맡고 있다.
운영환경에서 배치 작업은 언제든 실패할 수 있다. 네트워크 장애가 발생할 수 도 있고, 서버가 예기치 않게 종료될 수도 있다. 이런 상황에서 우리에게는 두 가지 선택지가 있다.
Spring Batch는 두 번째 방식을 채택했다. 이를 위해 매 트랜잭션마다 현재 실행 상태를 메타데이터 저장소에 기록한다. 이것이 바로 ItemStream이 담당하는 메타데이터 관리의 핵심이다.
ItemStream의 메타데이터 관리는 크게 두 가지 작업으로 나뉜다.
open() 메서드는 ExecutionContext를 입력으로 받는다. 이 ExecutionContext에는 이전 스텝 실행의 정보가 담겨있으며, 이 정보를 사용해 자신의 이전 상태를 복원한다. (처음 실행되는 것이라면 빈 상태로 전달된다)
어떻게 동작하는지 실제 코드를 통해 살펴보자.
부모 클래스인 AbstractItemCountingItemStreamItemReader의 open 메서드를 보자.
public void open(ExecutionContext executionContext) throws ItemStreamException {
...
if (executionContext.containsKey(getExecutionContextKey(READ_COUNT_MAX))) {
maxItemCount = executionContext.getInt(getExecutionContextKey(READ_COUNT_MAX));
}
int itemCount = 0;
if (executionContext.containsKey(getExecutionContextKey(READ_COUNT))) {
itemCount = executionContext.getInt(getExecutionContextKey(READ_COUNT));
}
...
if (itemCount > 0 && itemCount < maxItemCount) {
try {
jumpToItem(itemCount);
}
catch (Exception e) {
throw new ItemStreamException("Could not move to stored position on restart", e);
}
}
currentItemCount = itemCount;
}
이 메서드는 ExecutionContext로 부터 두 가지 핵심 정보를 복원한다.
그리고 이전에 읽었던 itemCount만큼 jumpToItem() 메서드를 호출하여 파일의 현재 읽기 위치를 이동시킨다.
흥미로운 점은 JdbcCursorItemReader 또한 방금 살펴본 AbstractCountingItemStreamItemReader를 상속하고 있다는 것이다.
따라서 JdbcCursorItemReade.open()의 동작도 동일하게 적용된다.
그렇다면 jumpToItem() 구현은 어떨까? 부모 클래스인 AbstractCursorItemReader의 코드를 살펴보자.
@Override
protected void jumpToItem(int itemIndex) throws Exception {
if (driverSupportsAbsolute) {
...
rs.absolute(itemIndex); // ResultSet.absolute()
...
}
else {
moveCursorToRow(itemIndex);
}
}
driverSupportsAbsolute 값이 true인 경우, ResultSet.absolute() 메서드를 호출하여 즉시 원하는 위치로 커서를 이동시킨다.
true로 설정하여 재시작 시 커서 이동 성능을 크게 향상 시킬 수 있다.ResultSet.TYPE_FORWARD_ONLY를 사용하기 때문에 드라이버 지원 여부와 관계없이 ResultSet.absolute()를 사용할 수 없다.false(기본값)로 유지하는 것이 안전하다.driverSupportsAbsolute가 false인 경우에는 moveCursorToRow() 메서드를 통해 itemIndex만큼 ResultSet.next()를 순차적으로 호출한다.
현재 작업이 어디까지 진행되었는지를 저장하는 역할은 update() 메서드가 맡고 있다. 이렇게 저장된 정보는 작업이 실패했을 떄 정확한 재시작 지점을 파악하는데 사용된다.
default void update(ExecutionContext executionContext) throws ItemStreamException {
}
update() 메서드를 통한 상태 저장이 실제로 어떻게 동작하는지 코드를 통해 살펴보자.
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
...
executionContext.putInt(getExecutionContextKey(READ_COUNT), currentItemCount);
if (maxItemCount < Integer.MAX_VALUE) {
executionContext.putInt(getExecutionContextKey(READ_COUNT_MAX), maxItemCount);
}
...
}
이 메서드는 ExecutionContext에 두 가지 핵심 정보를 저장한다.
3장에서 설명했듯이, RedisItemReader는 SCAN 명령의 순서 불일치떄문에 재시작을 지원하지 않는다. 따라서 RedisItemReader 코드를 보면 update() 메서드를 구현하지 않는 것을 알 수 있다.
이처럼 ItemStream을 통한 메타데이터 관리가 있기에 우리는 대용량 배치 처리에서도 안정성을 보장할 수 있다. 실패가 발생하더라도 이미 처리한 데이터를 다시 처리할 필요가 없으며, 정확히 실패 지점부터 작업을 재개할 수 있다.
3장의 마지막 작전에서 살펴본 CompositeItemReader는 스스로 데이터를 읽지 않고, 위임 대상 ItemReader에게 그 작업을 위임했다.
그렇다면 위임 대상 ItemReader의 open(), update(), close() 메서드는 어떻게 호출될까? 답은 CompositeItemReader의 코드에 있다.
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
for (ItemStreamReader<? extends T> delegate : delegates) {
delegate.open(executionContext);
}
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
if (this.currentDelegate != null) {
this.currentDelegate.update(executionContext);
}
}
@Override
public void close() throws ItemStreamException {
for (ItemStreamReader<? extends T> delegate : delegates) {
delegate.close();
}
}
read() 메서드가 위임 대상의 read() 메서드를 호출하는 것과 동일 패턴으로, open(), update(), close() 메서드를 호출한다.
이런 위임이 가능한 이유는 CompositeItemReader가 ItemReader뿐만 아니라 ItemStream도 구현했기 떄문이다.
이러한 위임구조는 CompositeItemWRiter, MultiResourceItemReader, MultiResourceItemWriter 에도 적용되어 open(), update(), close() 호출을 자신의 위임 대상을에게 전달(bypass)한다.
다음 작전에서는 청크 지향 처리의 또 다른 핵심 구성요소인 ItemProcessor를 살펴보도록 하자.