-
spring-batch start (Spring batch 5.1.0)spring-batch 2023. 11. 26. 11:21
배치 작업이란?
- 특정 주기마다 데이터를 처리하는 작업!
- 예시
ㆍ이메일 쿠폰 발송
ㆍ가맹점 정산
ㆍ거래 명세서 생성
ㆍ추천 시스템 데이터 작업
스프링 배치의 구조
- 스프링 배치로 작업이 필요한 이유
ㆍ풍부한 기능
ㆍ일관성된 코드
ㆍ기존 서비스가 스프링 프레임워크로 되어 있는 경우 호환가능
- JobRepository : 배치가 수행될 때 수행되는 메타 데이터를 관리하고 시작시간, 종료시간, job의 상태 등 배치 수행 관련 데이터들을 저장한다.
- JobLauncher : 잡을 실행시켜주는 역할
- Job : 하나의 배치 작업
Hello World 띄우기! (spring-batch 5.x 버전)
1. Deprecated
ㆍJobBuilderFactory
ㆍStepBuilderFactory
ㆍJobBuilder(String name)
ㆍStepBuilder(String name)
2. @EnableBatchProcessing 사용 X
ㆍspring-boot 3.0 이상은 @EnableBatchProcessing 을 사용하지 않는다!
ㆍ@EnableBatchProcessing 사용 시 spring batch 기본 설정이 백오프된다고 한다.
@Configuration public class HelloWorldJobConfig { @Bean public Job helloWorldJob(JobRepository jobRepository, Step helloWorldStep) { return new JobBuilder("helloWorldJob", jobRepository) .incrementer(new RunIdIncrementer()) .start(helloWorldStep) .build(); } @Bean @JobScope public Step helloWorldStep(JobRepository jobRepository, Tasklet helloWorldTasklet, PlatformTransactionManager transactionManager) { return new StepBuilder("helloWorldStep",jobRepository) .tasklet(helloWorldTasklet, transactionManager) .build(); } @Bean @StepScope public Tasklet helloWorldTasklet() { return (contribution, chunkContext) -> { System.out.println("Hello World Spring Batch"); return RepeatStatus.FINISHED; }; } }
- 2번째 실행한다면?!
Step already complete or not restartable, so no action to execute
ㆍ이런 메시지가 뜬다면?
- spring batch에서는 동일한 Job을 동일한 파라미터로 수행하면 위와 같은 메시지가 출력된다고한다.
* 참고 : https://devfunny.tistory.com/758
- 실행 확인
select * from spring_batch.BATCH_JOB_EXECUTION;
배치 실행 시 파일 이름 받기
@StepScope @Bean public Tasklet validatedParamTasklet(@Value("#{jobParameters['-fileName']}") String fileName) { return (contribution, chunkContext) -> { System.out.println(fileName); // test.csv System.out.println("validated Param Tasklet"); return RepeatStatus.FINISHED; }; }
validation check
- 1. JobParametersValidator를 구현한 클래스 만들기
public class FileParamValidator implements JobParametersValidator { @Override public void validate(JobParameters parameters) throws JobParametersInvalidException { String fileName = parameters.getString("-fileName"); System.out.println("validate : " + fileName); if(!StringUtils.endsWithIgnoreCase(fileName, "csv")) { throw new JobParametersInvalidException("This is not csv file"); } } }
- 2. JobBuilder에 추가하기
@Bean public Job validatedParamJob(JobRepository jobRepository, @Qualifier("validatedParamStep1") Step validatedParamStep) { return new JobBuilder("validatedParamJob", jobRepository) .incrementer(new RunIdIncrementer()) .validator(new FileParamValidator()) .start(validatedParamStep) .build(); }
- * 여러 validator를 추가하고 싶다면?! - CompositeJobParametersValidator 사용!
@Bean public Job validatedParamJob(JobRepository jobRepository, @Qualifier("validatedParamStep1") Step validatedParamStep) { return new JobBuilder("validatedParamJob", jobRepository) .incrementer(new RunIdIncrementer()) // .validator(new FileParamValidator()) .validator(multipleValidator()) .start(validatedParamStep) .build(); } private CompositeJobParametersValidator multipleValidator() { CompositeJobParametersValidator validator = new CompositeJobParametersValidator(); validator.setValidators(Arrays.asList(new FileParamValidator())); return validator; }
JobListener
- 1. JobExecutionListener가 구현된 클래스 만들기
@Slf4j public class JobLoggerListener implements JobExecutionListener { private final static String BEFORE_MESSAGE = "{} Job is Running"; private final static String AFTER_MESSAGE = "{} Job is Done. (Status: {})"; @Override public void beforeJob(JobExecution jobExecution) { log.info(BEFORE_MESSAGE, jobExecution.getJobInstance().getJobName()); } @Override public void afterJob(JobExecution jobExecution) { log.info(AFTER_MESSAGE, jobExecution.getJobInstance().getJobName(), jobExecution.getStatus()); if (jobExecution.getStatus() == BatchStatus.FAILED) { // email log.info("Job is Failed"); } } }
- 2. JobBuilder에 추가하기
@Bean public Job jobListenerJob(JobRepository jobRepository, @Qualifier("jobListenerStep") Step jobListenerStep) { return new JobBuilder("jobListenerJob", jobRepository) .incrementer(new RunIdIncrementer()) .listener(new JobListenerConfig()) .start(jobListenerStep) .build(); }
DB 데이터 이관하기
- Deprecated
ㆍ<read-entity, write-entity>chunk(transaction-count)
- example
@Bean @JobScope public Step trMigrationStep(JobRepository jobRepository, PlatformTransactionManager transactionManager, @Qualifier("trOrdersReader") ItemReader itemReader, @Qualifier("trOrderProcessor") ItemProcessor itemProcessor, @Qualifier("trOrdersWriter") ItemWriter itemWriter) { return new StepBuilder("trMigrationStep",jobRepository) .chunk(5, transactionManager) // 몇개의 단위로 데이터를 처리할 것인가?, 5개의 트랜잭션 갯수 .reader(itemReader) .processor(itemProcessor) .writer(itemWriter) .build(); } // @StepScope // @Bean // public RepositoryItemWriter<Accounts> trOrdersWriter() { // return new RepositoryItemWriterBuilder<Accounts>() // .repository(accountsRepository) // .methodName("save") // .build(); // } @StepScope @Bean public ItemWriter<Accounts> trOrdersWriter() { return new ItemWriter<Accounts>() { @Override public void write(Chunk<? extends Accounts> chunk) throws Exception { chunk.forEach(item -> accountsRepository.save(item)); } }; } @StepScope @Bean public ItemProcessor<Orders, Accounts> trOrderProcessor() { return item -> new Accounts(item); } @StepScope @Bean public RepositoryItemReader<Orders> trOrdersReader() { return new RepositoryItemReaderBuilder<Orders>() .name("trOrdersReader") .repository(ordersRepository) .methodName("findAll") .pageSize(5) .arguments(Arrays.asList()) .sorts(Collections.singletonMap("id", Sort.Direction.ASC)) .build(); }
ㆍwriter로 RepositoryItemWriter와 ItemWriter 둘 다 사용 가능
- example : 여러 Step 사용 + 이전 Step의 데이터 사용하기
@Bean public Job multipleStepJob(JobRepository jobRepository, @Qualifier("multipleStep1") Step step1, @Qualifier("multipleStep2") Step step2, @Qualifier("multipleStep3") Step step3) { return new JobBuilder("multipleStepJob", jobRepository) .incrementer(new RunIdIncrementer()) .start(step1) .next(step2) .next(step3) .build(); } @JobScope @Bean public Step multipleStep1(JobRepository jobRepository, PlatformTransactionManager transactionManager) { return new StepBuilder("multipleStep1",jobRepository) .tasklet((contribution, chunkContext) -> { System.out.println("step1"); return RepeatStatus.FINISHED; }, transactionManager) .build(); } @JobScope @Bean public Step multipleStep2(JobRepository jobRepository, PlatformTransactionManager transactionManager) { return new StepBuilder("multipleStep2",jobRepository) .tasklet((contribution, chunkContext) -> { System.out.println("step2"); ExecutionContext executionContext = chunkContext .getStepContext() .getStepExecution() .getJobExecution() .getExecutionContext(); executionContext.put("someKey", "hello!"); return RepeatStatus.FINISHED; }, transactionManager) .build(); } @JobScope @Bean public Step multipleStep3(JobRepository jobRepository, PlatformTransactionManager transactionManager) { return new StepBuilder("multipleStep3",jobRepository) .tasklet((contribution, chunkContext) -> { System.out.println("step3"); ExecutionContext executionContext = chunkContext .getStepContext() .getStepExecution() .getJobExecution() .getExecutionContext(); System.out.println(executionContext.get("someKey")); return RepeatStatus.FINISHED; }, transactionManager) .build(); }
- example : Step 결과에 따른 분기 처리
@Bean public Job conditionalStepJob(JobRepository jobRepository, @Qualifier("conditionalStartStep") Step startStep, @Qualifier("conditionalAllStep") Step allStep, @Qualifier("conditionalFailStep") Step failStep, @Qualifier("conditionalCompleteStep") Step completeStep) { return new JobBuilder("multipleStepJob", jobRepository) .incrementer(new RunIdIncrementer()) .start(startStep) .on("FAILED").to(failStep) .from(startStep) .on("COMPLETED").to(completeStep) .from(startStep) .on("*").to(allStep) .end() .build(); }
Test 코드
1. 기본 세팅
@Configuration @EnableAutoConfiguration public class SpringBatchTestConfig { }
ㆍApplicationTest와 같은 패키지에 생성
#default spring: profiles: active: local --- spring: config: activate: on-profile: local batch: job: name: ${job.name:None} enabled: false jdbc: initialize-schema: always datasource: url: jdbc:mysql://127.0.0.1:3306/spring_batch driver-class-name: com.mysql.cj.jdbc.Driver username: root password: jpa: show-sql: true --- spring: config: activate: on-profile: test jpa: database: h2
ㆍ@ActiveProfiles 를 사용하기 위한 yml 파일 변경
2. example
@SpringBatchTest // jobLauncherTestUtils @SpringBootTest(classes = {HelloWorldJobConfig.class, SpringBatchTestConfig.class}) @RunWith(SpringRunner.class) @ActiveProfiles("test") class HelloWorldJobConfigTest { @Autowired private JobLauncherTestUtils jobLauncherTestUtils; @Test void success() throws Exception { // when JobExecution execution = jobLauncherTestUtils.launchJob(); // then Assertions.assertEquals(execution.getExitStatus(), ExitStatus.COMPLETED); } }
ㆍ1개의 테스트는 1개의 Job을 테스트할 수 있다.
배치 작업 실행하기 (스프링 스케줄링)
@Component public class SampleScheduler { private final Job helloWorldJob; private final JobLauncher jobLauncher; public SampleScheduler(@Qualifier("helloWorldJob") Job helloWorldJob, JobLauncher jobLauncher) { this.helloWorldJob = helloWorldJob; this.jobLauncher = jobLauncher; } @Scheduled(cron = "0 */1 * * * *") public void helloworldJobRun() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException { JobParameters jobParameters = new JobParameters( Collections.singletonMap("requestTime", new JobParameter(System.currentTimeMillis(), Long.class)) ); jobLauncher.run(helloWorldJob, jobParameters); // jobParameters 값 없이 job을 실행시키면 동일한 파라미터로 실행되어 spring-batch에서는 똑같은 job을 실행하다고 생각하여 // job이 실행이 안된다. } }
ㆍjobParameters 값 설정을 안해준다면 ?!
- job을 실행시키면 동일한 파라미터로 실행되어 spring-batch에서는 똑같은 job을 실행한다고 생각하여 job 실행이 안된다!
후기
- spring-batch 5.0 버전에 와서 deprecated 된 부분이 많아서 하나씩 찾으면서 예제를 만들어보니 재밌었다.
- 그리고 빈등록은 기본적으로 메서드 이름으로 되고 같은 class내에 그 메서드 이름을 받으면 DI가 되는 것을 알고 있고 찾아도 그렇게 나오는데....내 작업할 때는 메서드 이름으로 된 빈을 제대로 못찾아서 @Qualifier 으로 다 명시해줬다. 나중에 왜 안되는지에 대한 이유를 찾아봐야 겠다!
- 내가 스케줄러를 만들 때는 @Schedule와 cron을 이용해서 그냥 Service class를 만들어 그곳에 모든 로직을 다 넣었는데, 확실히 spring-batch는 역할들이 정해져있다보니 협업 및 클린한 코드를 만들기 더 수월할 거 같다는 생각이 들었다.
번외
ㆍtest할 때 Bean을 못찾는다는 컴파일 에러가 너무 거슬려서 이 부분도 남겨본다.
- 일단 JobLauncherTestUtils 를 bean으로 등록하는 부분을 찾아보면...
ㆍ어노테이션으로 등록하는 것이 아니라 registry를 이용하는 것을 볼 수 있다.
- 이 부분을 ChatGpt 한테 물어보면?!
- 해결방안
ㆍ엄청나게 깔끔한 방법은 아닌 거 같지만 컴파일에러가 나는 것보다는 괜찮다고 생각한다.