Search

Spring Batch 란

Tags

1. 개요

회사에서 Spring Batch 관련된 여러 업무를 맡게 되었습니다. 이와 관련하여 학습했던 내용들을 깊게 정리해보려고 합니다.

2. Batch 란?

batch 는 한국어로 일괄 이라는 뜻을 가지고 있습니다. Spring Batch와 Batch Application도 의미하는 바가 같습니다.
개발자가 정의한 작업을 일괄 처리하는 어플리케이션

2-1. 어떤 상황에서 사용될 수 있을까요?

방대한 데이터를 처리해야 한다고 가정해봅시다. 실시간으로 데이터를 처리한다면 해당 작업이 실행되는 동안 다른 작업이 일을 못할 수 있습니다. (CPU와 I/O 자원을 급격하게 소모하기 때문에) 하지만 우리는 이런 의문을 가질 수 있습니다.
 데이터를 굳이 지금 처리해야하나?
맞습니다. 트래픽이 모이지 않는 새벽에 모든 데이터를 수집해 놓은 다음 사용자가 요청할 때 전달하면 그만입니다.
실제 사용 사례도 다음과 같습니다.
매출 데이터를 통한 일 매출 집계
대용량 기반 데이터를 활용한 크롤링
정해진 시간에 회원들에게 일괄적으로 메일을 전송
따라서 (1) 일정 주기로 실행 해야 할 때, (2)실시간 처리가 어려운 대용량 트래픽을 처리할 때 하나의 어플리케이션이 아닌 배치 어플리케이션을 통해 구현합니다.

2-2. Batch Application의 조건

대용량 데이터 : 대량의 데이터를 가져오거나, 전달하거나, 계산하는 등의 처리를 할 수 있어야 합니다.
자동화 : 심각한 문제 해결을 제외하고는 사용자 개입 없이 실행되어야 합니다.
견고성 : 잘못된 데이터를 충돌/중단 없이 처리할 수 있어야 합니다.
신뢰성 : 무엇이 잘못되었는지를 추적할 수 있어야 합니다.
성능 : 지정한 시간 안에 처리를 완료하거나 동시에 실행되는 다른 어플리케이션을 방해하지 않도록 수행되어야합니다.

2-3. Spring Batch 용어

아래의 그림은 Spring Batch의 Process입니다. 용어들의 핵심 키워드를 살펴보도록 하겠습니다.
우선 Batch Process는 여러 개의 Step으로 구성된 Job으로 감싸져있습니다. Step 또한 ItemReader, ItemProcessor, ItemWriter로 구성되어 있습니다.
JobJobLauncher에 의해 실행되며, 모든 메타 데이터는 JobRepository에 저장됩니다.
각각의 Job은 여러 개의 JobInstance로 생성될 수 있으며 이는 JobParameter로 구분됩니다.
JobExecutionJob을 실행하는 단일 시도이며, 하나의 JobInstance에서 여러 JobExecution이 생성될 수 있습니다.
관계를 표현하면 다음과 같습니다.
Job 1 * JobInstance 1 * JobExecution
+ 참고로 Spring Batch를 사용하면 자동으로 삽입되는 테이블이 존재합니다. 이 중 batch_job_execution 이라는 테이블에는 JobExecution의 기록들, 예를 들어 작업이 잘 수행될 수 있는지 확인하는 Status, 실패 했다면 그 원인을 파악할 수 있는 ExitCode나 ExitMessage 들이 컬럼으로 존재합니다.
StepJob의 배치 처리 과정을 정의하며 독립적으로 구성됩니다. Job은 하나 이상의 Step으로 이루어져 있습니다. JobExecution과 동일하게 Step 또한 StepExecution이 존재합니다.
ExecutionContext는 StepExecution이나 JobExecution의 범위 내에 있는 정보들을 키-값 쌍으로 가지고 있는 형태입니다. Spring Batch는 ExecutionContext를 영속성 관리하며 배치 작업을 다시 시작하고 싶을 때 유용하게 사용할 수 있습니다.
JobRepository는 이 모든걸 가능하게 해줍니다. 모든 인스턴스를 영속화 하여 관리하며 JobRepository를 통해 execution 정보들을 CRUD 할 수 있습니다.
마지막으로 Item, ItemReader, ItemWriter, ItemProcessor는 다음과 같이 표현할 수 있습니다.
Item은 작업에 사용하는 데이터며 데이터베이스에 있는 하나의 행하나의 아이템
ItemReader는 Step에서 Item을 하나씩 읽어오며 모든 항목 소진 시 null을 반환
ItemProcessor는 비즈니스 처리를 담당하며 아이템이 유효하지 않다면 null을 반환
ItemWriter는 처리된 데이터를 데이터베이스에 저장

2-4. Chunk-Oriented Processing

Chunk-Oriented ProcessingItemReader, ItemProcessor에서 작업을 Chunk 단위로 하는 것입니다.
🥹 Chunk란?
각 커밋 사이에 처리될 행 (item) 의 수이며 성공 시 chunk 만큼 커밋하며 실패 시 chunk 만큼 롤백됩니다.
따라서 ItemWriter에서는 ReaderProcessor가 처리한 ItemchunkSize 만큼 쌓였을 때 한 번에 write 하는 과정을 거칩니다.
데이터를 가져오는 방법에는 Cursor 와 Page 두 종류로 나뉘게 됩니다.
우선 Paging 방식을 사용할 경우 chunk size와 page size의 개수를 맞추는 것이 좋습니다. 만약, chunk size가 page size 보다 크다면 하나의 트랜섹션 처리를 위해 여러 번 조회 해야 합니다.

3. Spring Batch Framework 시작하기

Spring Batch에 필요한 의존성은 다음과 같습니다.
implementation 'org.springframework.boot:spring-boot-starter-batch' testImplementation 'org.springframework.batch:spring-batch-test'
Java
복사
그리고 Application 단에 다음 어노테이션을 추가하면 됩니다.
@EnableBatchProcessing @SpringBootApplication public class BatchApplication { ... }
Java
복사
@EnableBatchProcessing을 통해 Spring Batch의 여러 기능이 사용가능하며, Batch 작업에 필요한 기본 작업들이 설정됩니다.
application.yml
추가적인 예외 상황

4. Batch Application 운영

반드시 테스트 코드를 작성해야 합니다.
먼저 배치 어플리케이션의 경우 QA 하기가 매우 어렵습니다. 따라서 테스트 코드가 필요합니다. 또한, 복잡한 쿼리를 실행한 결과를 다시 처리하고 데이터베이스에 작업하는 작업이기 때문에 통합 테스트를 실행합니다.
단위 테스트를 이용해서 내부 작업을 검사하고 전체 테스트 코드를 다시 작성하는 것을 목표로 합니다.
예제 코드는 다음과 같습니다.
// when JobExecution jobExecution = myTextJobLanucher.launchJob(); List<StepExeption> stepExceptions = new ArrayList<>(jobExecution.getStepExecution()); Step Execution stepExecution = stepExecutions.get(0); // then assertThat(jobExecution.getExitStatus().getExitCode()).isEqualTo("FAILED"); assertThat(stepExecution.getWriteCount()).isEqualTo(2); assertThat(stepExecution.getRollbackCount()).isEqualTo(1);
Java
복사

4-1. 관리 도구

Cron : 리눅스 작업 스케쥴러
Spring MVC + API Call
Spring Batch Admin : Deprecated
Quartz + Admin : 스케쥴러 프레임워크 + 관리자 페이지
CI Tool(Jenkins)

4-2. 실전 예제

@Slf4j @RequiredArgsConstructor @Configuration public class NotificationTriggerConfig { private final DataSource dataSource; @Bean public Job notificationTriggerJob(final JobRepository jobRepository, final Step notificationTriggerStep) { log.info("Start notificationTriggerJob()"); return new JobBuilder("notificationTriggerJob") .repository(jobRepository) .start(notificationTriggerStep) .build(); } @Bean @JobScope public Step notificationTriggerStep(JobRepository jobRepository, PlatformTransactionManager transactionManager, @Value("#{jobParameters['notificationId']}") Long notificationId) { log.info("Start notificationTriggerStep()"); return new StepBuilder("notificationTriggerStep") // <T1, T2> 에서 T1은 Reader에서 반환하는 타입, T2는 프로세서에서 처리한 후 Writer 에게 전달하는 타입 // 추가적으로 chunk size는 page size와 동일하게 가져가는 것이 좋다 .<Notification, Notification>chunk(10) .reader(notificationTriggerReader(notificationId)) .processor(notificationTriggerProcessor()) .writer(notificationTriggerWriter()) .repository(jobRepository) .transactionManager(transactionManager) .build(); } @Bean @StepScope public JdbcCursorItemReader<Notification> notificationTriggerReader( @Value("#{jobParameters['notificationId']}") Long notificationId) { log.info("Start notificationTriggerReader()"); return new JdbcCursorItemReaderBuilder<Notification>() .name("notificationTriggerReader") .dataSource(dataSource) .sql( "SELECT id, staff_id, message, target FROM tb_notification_3 WHERE id = ?") .preparedStatementSetter(ps -> ps.setLong(1, notificationId)) .rowMapper(new RowMapper<Notification>() { @Override public Notification mapRow(ResultSet rs, int rowNum) throws SQLException { Notification notification = new Notification(); notification.setId(rs.getLong("id")); notification.setStaffId(rs.getLong("staff_id")); notification.setMessage(rs.getString("message")); notification.setTarget(rs.getString("target")); return notification; } }) .build(); } @Bean @StepScope public ItemProcessor<Notification, Notification> notificationTriggerProcessor() { log.info("Start ItemProcessor()"); JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource); return notification -> { log.info("Start update modDate"); jdbcTemplate.update( "UPDATE TB_NOTIFICATION_3 SET MOD_DATE = NOW() WHERE ID = ?", notification.getId()); return notification; }; } // ItemReader에 의해 조회된 Notification 객체들이 Writer로 전달된다. @Bean public ItemWriter<Notification> notificationTriggerWriter() { log.info("Start ItemWriter()"); JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource); return notifications -> { for (Notification notification : notifications) { jdbcTemplate.update( "INSERT INTO tb_notification_4 (notification_id, staff_id, message, target, is_checked) VALUES (?, ?, ?, ?, ?)", notification.getId(), notification.getStaffId(), notification.getMessage(), notification.getTarget(), false); } }; } }
Java
복사

4-3. 실전 오류

“JobRepository is mandatory”
→ Step에 JobRepository를 추가합니다.
@Bean @JobScope public Step notificationTriggerStep(JobRepository jobRepository, @Value("#{jobParameters['notificationId']}") Long notificationId) { log.info("Start notificationTriggerStep()"); return new StepBuilder("notificationTriggerStep") // <T1, T2> 에서 T1은 Reader에서 반환하는 타입, T2는 프로세서에서 처리한 후 Writer 에게 전달하는 타입 // 추가적으로 chunk size는 page size와 동일하게 가져가는 것이 좋다 .<Notification, Notification>chunk(10) .reader(notificationTriggerReader(notificationId)) .processor(notificationTriggerProcessor()) .writer(notificationTriggerWriter()) .repository(jobRepository) .build(); }
Java
복사
“A transaction manager must be provided”
→ PlatformTransactionManager를 추가합니다.
public Step notificationTriggerStep(JobRepository jobRepository, PlatformTransactionManager transactionManager, ... .transactionManager(transactionManager) .build() }
Java
복사
“두 개 이상의 JobParameters를 넘겨주고 싶을 때”
JobParameters jobParameters = new JobParametersBuilder() .addLong("notificationId", notification.getId()) .addLong("staffId", notification.getStaffId()) .toJobParameters();
Java
복사
“A job instance already exists and is complete for parameters={notificationId=1, staffId=1}. If you want to run this job again, change the parameters.”
→ 매 실행 시 다른 파라미터 값을 넘겨줄 수 있도록 시간 값을 넣어주었습니다.
JobParameters jobParameters = new JobParametersBuilder() .addLong("notificationId", notification.getId()) .addLong("staffId", notification.getStaffId()) .addLong("time", System.currentTimeMillis()) .toJobParameters(); jobLauncher.run(notificationDispatcherJob, jobParameters);
Java
복사

5. Reference