📚FOS Study
홈카테고리
홈카테고리
📚FOS Study

개발 학습 기록을 정리하는 블로그입니다.

바로가기

  • 홈
  • 카테고리

소셜

  • GitHub
  • Source Repository

© 2025 FOS Study. Built with Next.js & Tailwind CSS

목록으로 돌아가기
☕java/ spring-framework

드디어 정체를 드러낸 ItemStream

약 7분
GitHub에서 보기

드디어 정체를 드러낸 ItemStream

지난 장에서 우리는 ItemStream 이라는 인터페이스가 반복적으로 등장하는 것을 확인했다.
실제로 ItemStream은 Spring Batch의 핵심 인터페이스로 Spring Batch의 대부분의 ItemReader와 일부 ItemWriter 구현체에서 이 ItemStream이라는 인터페이스를 공통적으로 구현하고 있다.

public interface ItemStream {
    default void open(ExecutionContext executionContext) throws ItemStreamException {}
    default void update (ExecutionContext executionContext) throws ItemStreamExeception {}
    default void close() throws ItemStreamExeception {}
}

이제 이 ItemStream의 역할을 자세히 살펴보자.
Spring Batch에서 ItemStream은 다음의 두 가지 역할을 담당한다.

  • 자원 초기화 및 해제
  • 메타데이터 관리 및 상태 추적

자원 초기화 및 해제

우리는 지금까지 데이터를 읽고 쓰는 것에 집중했찌, 읽고 쓸 파일을 열고 생성하는 것에는 관심을 두지 않았다.
커서 기반 ItemReader를 다루면서 ItemReader의 초기화 시점에 커서를 생성한다는 말은 했지만, 정확히 그 역할이 어디서 수행되는지는 살펴보지 않았고 해당 커서를 닫는 것에는 관심을 두지 않았다.

왜 자원 관리가 중요한가?

배치 작업은 다양한 시스템 자원을 다루게 된다.

  • 파일 핸들
  • 데이터베이스 커넥션
  • 메모리 버퍼

이런 자원들은 사용 전 적절한 준비가 필요하고, 사용 후에는 반드시 정리되어야 한다.

  • 적절한 초기화:
    • 파일을 읽기 전에는 파일을 열어야 한다.
    • 커서 기반 ItemReader에서는 데이터베이스와의 커넥션을 맺어야 한다.
    • 데이터 처리를 위한 메모리 버퍼를 할당해야 한다.
  • 적절한 해제
    • 파일 핸들 누수: 파일을 열었지만 닫지 않으면 시스템의 파일 핸들이 고갈된다.
    • 데이터베이스 커넥션 누수: DB 커넥션을 해제하지 않으면 커넥션 풀이 고갈되어 다른 작업들도 마비된다.
    • 메모리 누수: 사용하지 않는 자원이 가용 메모리를 갉아먹어 결국 애플리케이션을 죽음으로 이끈다.

Spring Batch에서는 이런 자원을 초기화하고 해제하는 작업이 바로 ItemStream 인터페이스의 open() / close() 메서드의 역할이다.

ItemReader와 ItemWriter 구현체들이 실제로 ItemStream의 open() / close() 메서드를 어떻게 구현했는지 살펴보자.

open() / close() 사례를, FlatFileItemReader로 부터 살펴보기

  • doOpen() 메서드에서는 파일 존재 여부를 검사하고, 해당 리소스에서 데이터를 읽어들일 수 있도록 BufferedReader를 초기화하는 것을 코드로 볼 수 있다.
if (!resource.exists()) {
   if (strict) {
      throw new IllegalStateException("Input resource must exist (reader is in 'strict' mode): " + resource);
   }
   logger.warn("Input resource does not exist " + resource.getDescription());
   return;
}
...
reader = bufferedReaderFactory.create(resource, encoding);

  • doClose() 메서드에서는 doOpen() 메서드에서 생성했던 BufferedReader를 다음과 같이 닫고 있다.
@Override
protected void doClose() throws Exception {
    ...
    reader.close();
    ...
}

파일 기반 ItemReader를 봤으니 이제 커서 기반 ItemReader의 자원 관리는 어떻게 이뤄지는지 살펴보자.

JdbcCursorItemReader로 살펴보기

  • JdbcCursorItemReader의 부모 클래스인 AbstractCursorItemReader의 doOpen() 메서드를 살펴보자.
@Override
protected void doOpen() throws Exception {
    initializeConnection(); // DB 연결 초기화 this.con = dataSource.getConnection();
    openCursor(con); // 커서를 열어 데이터를 읽을 준비
}
  • 다음으로 doClose() 메서드를 살펴보자.
    • 이 메서드에서는 다음과 같이 ResultSet, 커서, 그리고 DB 커넥션을 순차적으로 종료한다.
protected void doClose() throws Exception {
    ...
    JdbcUtils.closeResultSet(this.rs);
    rs = null;
    cleanupOnClose(con);
    ...
    JdbcUtils.closeConnection(this.con);
}

이처럼 자원의 초기화와 해제는 ItemStream의 open(), close() 메서드를 통해 처리된다.
따라서 커스텀 ItemReader 또는 ItemWriter를 구현할 떄 자원 초기화 및 해제가 필요하다면 반드시 ItemStream을 구현해야 한다.

이 메서드들은 Spring Batch 스텝이 호출하며, 스텝은 시작 직후에 ItemStream.open()을 호출하고, 실행을 완료하면 ItemStream.close()를 호출한다.

그렇다면 ItemStream의 또 다른 메서드인 update()의 역할은 무엇이며, 언제 호출되는 것일까?
다음에 다룰 ItemStream의 또 다른 역할을 알면 그 답을 얻을 수 있을 것이다.

메타 데이터 관리 및 상태 추적

ItemStream은 Spring Batch 스텝의 실행 정보를 관리(저장 및 복구)하는 역할도 맡고 있다.

왜 메타데이터 관리가 필요한가?

운영환경에서 배치 작업은 언제든 실패할 수 있다. 네트워크 장애가 발생할 수 도 있고, 서버가 예기치 않게 종료될 수도 있다. 이런 상황에서 우리에게는 두 가지 선택지가 있다.

  • 처음 부터 모든 데이터를 다시 처리한다
  • 실패한 지점부터 정확하게 재시작한다 (이것이 우리가 추구해야할 방향이다)

Spring Batch는 두 번째 방식을 채택했다. 이를 위해 매 트랜잭션마다 현재 실행 상태를 메타데이터 저장소에 기록한다. 이것이 바로 ItemStream이 담당하는 메타데이터 관리의 핵심이다.

ItemStream의 메타데이터 관리는 크게 두 가지 작업으로 나뉜다.

open() 메서드 (저장된 실행 정보를 복원한다)

open() 메서드는 ExecutionContext를 입력으로 받는다. 이 ExecutionContext에는 이전 스텝 실행의 정보가 담겨있으며, 이 정보를 사용해 자신의 이전 상태를 복원한다. (처음 실행되는 것이라면 빈 상태로 전달된다)

어떻게 동작하는지 실제 코드를 통해 살펴보자.

FlatFileItemReader로 살펴보기

부모 클래스인 AbstractItemCountingItemStreamItemReader의 open 메서드를 보자.

public void open(ExecutionContext executionContext) throws ItemStreamException {
    ...
   if (executionContext.containsKey(getExecutionContextKey(READ_COUNT_MAX))) {
       maxItemCount = executionContext.getInt(getExecutionContextKey(READ_COUNT_MAX));
   }

   int itemCount = 0;
   if (executionContext.containsKey(getExecutionContextKey(READ_COUNT))) {
       itemCount = executionContext.getInt(getExecutionContextKey(READ_COUNT));
   }
   ...
   if (itemCount > 0 && itemCount < maxItemCount) {
       try {
           jumpToItem(itemCount);
       }
       catch (Exception e) {
           throw new ItemStreamException("Could not move to stored position on restart", e);
       }
   }

   currentItemCount = itemCount;
}

이 메서드는 ExecutionContext로 부터 두 가지 핵심 정보를 복원한다.

  • maxItemCount: 최대 몇 개의 아이템을 읽을 것인지에 대한 설정 값
    • 이 값은 FlatFileItemReaderBuilder를 통해 설정할 수 있다.
  • itemCount: 이전 실행에서 몇 개의 아이템을 읽었는지에 대한 정보

그리고 이전에 읽었던 itemCount만큼 jumpToItem() 메서드를 호출하여 파일의 현재 읽기 위치를 이동시킨다.

JdbcCursorItemReader로 살펴보기

흥미로운 점은 JdbcCursorItemReader 또한 방금 살펴본 AbstractCountingItemStreamItemReader를 상속하고 있다는 것이다. 따라서 JdbcCursorItemReade.open()의 동작도 동일하게 적용된다.

그렇다면 jumpToItem() 구현은 어떨까? 부모 클래스인 AbstractCursorItemReader의 코드를 살펴보자.

@Override
protected void jumpToItem(int itemIndex) throws Exception {
   if (driverSupportsAbsolute) {
      ...
      rs.absolute(itemIndex); // ResultSet.absolute()
      ...
   }
   else {
      moveCursorToRow(itemIndex);
   }
}
  • driverSupportsAbsolute 값이 true인 경우, ResultSet.absolute() 메서드를 호출하여 즉시 원하는 위치로 커서를 이동시킨다.
    • 이 속성은 JdbcCursorItemReaderBuilder를 통해 설정할 수 있다. 만약 사용 중인 JDBC 드라이버가 지원한다면, true로 설정하여 재시작 시 커서 이동 성능을 크게 향상 시킬 수 있다.
    • 그러나 JdbcCursorItemReader는 기본적으로 커서의 순방향 이동만 지원하는 ResultSet.TYPE_FORWARD_ONLY를 사용하기 때문에 드라이버 지원 여부와 관계없이 ResultSet.absolute()를 사용할 수 없다.
    • 따라서 false(기본값)로 유지하는 것이 안전하다.
  • 반면 driverSupportsAbsolute가 false인 경우에는 moveCursorToRow() 메서드를 통해 itemIndex만큼 ResultSet.next()를 순차적으로 호출한다.
    • 이는 마치 파일을 처음부터 한 줄씩 읽어나가듯이, 데이터베이스 커서를 한 row씩 이동시켜 원하는 위치까지 도달시킨다.

update() 메서드(상태 저장)

현재 작업이 어디까지 진행되었는지를 저장하는 역할은 update() 메서드가 맡고 있다. 이렇게 저장된 정보는 작업이 실패했을 떄 정확한 재시작 지점을 파악하는데 사용된다.

default void update(ExecutionContext executionContext) throws ItemStreamException {
}
  • update() 메서드 역시 ExecutionContext를 파라미터로 받는다. ItemStream 구현체들은 update() 메서드에서 자신의 실행 정보를 ExecutionContext에 저장한다.
  • update() 메서드는 매 트랜잭션의 커밋 직전에 호출된다.
    • 처리 도중 예외가 발생하여 트랜잭션이 롤백되는 경우에는 호출되지 않는다.

update() 메서드를 통한 상태 저장이 실제로 어떻게 동작하는지 코드를 통해 살펴보자.

AbstractItemCountingItemStreamItemReader의 update() 메서드

@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
    ...

    executionContext.putInt(getExecutionContextKey(READ_COUNT), currentItemCount);

    if (maxItemCount < Integer.MAX_VALUE) {
        executionContext.putInt(getExecutionContextKey(READ_COUNT_MAX), maxItemCount);
    }

    ...
}

이 메서드는 ExecutionContext에 두 가지 핵심 정보를 저장한다.

  • currentItemCount: 현재까지 읽은 데이터의 개수
  • maxItemCount: 읽어들일 수 있는 최대 데이터의 개수

재시작 불가 사례: RedisItemReader

3장에서 설명했듯이, RedisItemReader는 SCAN 명령의 순서 불일치떄문에 재시작을 지원하지 않는다. 따라서 RedisItemReader 코드를 보면 update() 메서드를 구현하지 않는 것을 알 수 있다.

이처럼 ItemStream을 통한 메타데이터 관리가 있기에 우리는 대용량 배치 처리에서도 안정성을 보장할 수 있다. 실패가 발생하더라도 이미 처리한 데이터를 다시 처리할 필요가 없으며, 정확히 실패 지점부터 작업을 재개할 수 있다.

ItemStream의 위임 구조

3장의 마지막 작전에서 살펴본 CompositeItemReader는 스스로 데이터를 읽지 않고, 위임 대상 ItemReader에게 그 작업을 위임했다.

그렇다면 위임 대상 ItemReader의 open(), update(), close() 메서드는 어떻게 호출될까? 답은 CompositeItemReader의 코드에 있다.

@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
    for (ItemStreamReader<? extends T> delegate : delegates) {
       delegate.open(executionContext);
    }
}

@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
    if (this.currentDelegate != null) {
        this.currentDelegate.update(executionContext);
    }
}

@Override
public void close() throws ItemStreamException {
    for (ItemStreamReader<? extends T> delegate : delegates) {
        delegate.close();
    }
}

read() 메서드가 위임 대상의 read() 메서드를 호출하는 것과 동일 패턴으로, open(), update(), close() 메서드를 호출한다.

이런 위임이 가능한 이유는 CompositeItemReader가 ItemReader뿐만 아니라 ItemStream도 구현했기 떄문이다.

다른 위임 패턴들의 ItemStream 구현

이러한 위임구조는 CompositeItemWRiter, MultiResourceItemReader, MultiResourceItemWriter 에도 적용되어 open(), update(), close() 호출을 자신의 위임 대상을에게 전달(bypass)한다.


다음 작전에서는 청크 지향 처리의 또 다른 핵심 구성요소인 ItemProcessor를 살펴보도록 하자.

java 카테고리의 다른 글 보기수정 제안하기
목차
  • 드디어 정체를 드러낸 ItemStream
  • 자원 초기화 및 해제
  • 왜 자원 관리가 중요한가?
  • open() / close() 사례를, FlatFileItemReader로 부터 살펴보기
  • JdbcCursorItemReader로 살펴보기
  • 메타 데이터 관리 및 상태 추적
  • 왜 메타데이터 관리가 필요한가?
  • open() 메서드 (저장된 실행 정보를 복원한다)
  • FlatFileItemReader로 살펴보기
  • JdbcCursorItemReader로 살펴보기
  • update() 메서드(상태 저장)
  • AbstractItemCountingItemStreamItemReader의 update() 메서드
  • 재시작 불가 사례: RedisItemReader
  • ItemStream의 위임 구조
  • 다른 위임 패턴들의 ItemStream 구현