ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • spring-batch start (Spring batch 5.1.0)
    spring-batch 2023. 11. 26. 11:21

    배치 작업이란?

    - 특정 주기마다 데이터를 처리하는 작업!

    - 예시

        ㆍ이메일 쿠폰 발송

        ㆍ가맹점 정산

        ㆍ거래 명세서 생성

        ㆍ추천 시스템 데이터 작업

     

    스프링 배치의 구조

    - 스프링 배치로 작업이 필요한 이유

        ㆍ풍부한 기능

        ㆍ일관성된 코드

        ㆍ기존 서비스가 스프링 프레임워크로 되어 있는 경우 호환가능

     

    - JobRepository : 배치가 수행될 때 수행되는 메타 데이터를 관리하고 시작시간, 종료시간, job의 상태 등 배치 수행 관련 데이터들을 저장한다.

    - JobLauncher : 잡을 실행시켜주는 역할

    - Job : 하나의 배치 작업

     


    Hello World 띄우기!  (spring-batch 5.x 버전)

    1. Deprecated 

        ㆍJobBuilderFactory

        ㆍStepBuilderFactory

        ㆍJobBuilder(String name)

        ㆍStepBuilder(String name)

     

    2. @EnableBatchProcessing 사용 X

        ㆍspring-boot 3.0 이상은 @EnableBatchProcessing 을 사용하지 않는다!

        ㆍ@EnableBatchProcessing 사용 시 spring batch 기본 설정이 백오프된다고 한다.

     

    * 참고 : https://velog.io/@calaf/Spring-Batch-%EC%82%AC%EC%9A%A9%ED%95%B4%EB%B3%B4%EA%B8%B0-1.-%EC%8B%9C%EC%9E%91%ED%95%98%EA%B8%B0

     

    @Configuration
    public class HelloWorldJobConfig {
    
        @Bean
        public Job helloWorldJob(JobRepository jobRepository, Step helloWorldStep) {
            return new JobBuilder("helloWorldJob", jobRepository)
                    .incrementer(new RunIdIncrementer())
                    .start(helloWorldStep)
                    .build();
        }
    
        @Bean
        @JobScope
        public Step helloWorldStep(JobRepository jobRepository, Tasklet helloWorldTasklet, PlatformTransactionManager transactionManager) {
            return new StepBuilder("helloWorldStep",jobRepository)
                    .tasklet(helloWorldTasklet, transactionManager)
                    .build();
        }
    
        @Bean
        @StepScope
        public Tasklet helloWorldTasklet() {
            return (contribution, chunkContext) -> {
                System.out.println("Hello World Spring Batch");
                return RepeatStatus.FINISHED;
            };
        }
    
    }

     

    - 2번째 실행한다면?!

    Step already complete or not restartable, so no action to execute

        ㆍ이런 메시지가 뜬다면?

              - spring batch에서는 동일한 Job을 동일한 파라미터로 수행하면 위와 같은 메시지가 출력된다고한다.

    * 참고 : https://devfunny.tistory.com/758

     

    - 실행 확인

    select * from spring_batch.BATCH_JOB_EXECUTION;

     


    배치 실행 시 파일 이름 받기

    @StepScope
    @Bean
    public Tasklet validatedParamTasklet(@Value("#{jobParameters['-fileName']}") String fileName) {
        return (contribution, chunkContext) -> {
            System.out.println(fileName);   // test.csv
            System.out.println("validated Param Tasklet");
            return RepeatStatus.FINISHED;
        };
    }

     


    validation check

    - 1. JobParametersValidator를 구현한 클래스 만들기

    public class FileParamValidator implements JobParametersValidator {
        @Override
        public void validate(JobParameters parameters) throws JobParametersInvalidException {
            String fileName = parameters.getString("-fileName");
            System.out.println("validate : " + fileName);
            if(!StringUtils.endsWithIgnoreCase(fileName, "csv")) {
                throw new JobParametersInvalidException("This is not csv file");
            }
        }
    }

    - 2. JobBuilder에 추가하기

    @Bean
    public Job validatedParamJob(JobRepository jobRepository, @Qualifier("validatedParamStep1") Step validatedParamStep) {
        return new JobBuilder("validatedParamJob", jobRepository)
                .incrementer(new RunIdIncrementer())
                .validator(new FileParamValidator())
                .start(validatedParamStep)
                .build();
    }

     

    - * 여러 validator를 추가하고 싶다면?! - CompositeJobParametersValidator 사용!

    @Bean
    public Job validatedParamJob(JobRepository jobRepository, @Qualifier("validatedParamStep1") Step validatedParamStep) {
        return new JobBuilder("validatedParamJob", jobRepository)
                .incrementer(new RunIdIncrementer())
    //                .validator(new FileParamValidator())
                .validator(multipleValidator())
                .start(validatedParamStep)
                .build();
    }
    
    private CompositeJobParametersValidator multipleValidator() {
        CompositeJobParametersValidator validator = new CompositeJobParametersValidator();
        validator.setValidators(Arrays.asList(new FileParamValidator()));
    
        return validator;
    }

     


    JobListener

    - 1. JobExecutionListener가 구현된 클래스 만들기

    @Slf4j
    public class JobLoggerListener implements JobExecutionListener {
    
        private final static String BEFORE_MESSAGE = "{} Job is Running";
        private final static String AFTER_MESSAGE = "{} Job is Done. (Status: {})";
    
        @Override
        public void beforeJob(JobExecution jobExecution) {
            log.info(BEFORE_MESSAGE, jobExecution.getJobInstance().getJobName());
        }
    
        @Override
        public void afterJob(JobExecution jobExecution) {
            log.info(AFTER_MESSAGE, jobExecution.getJobInstance().getJobName(), jobExecution.getStatus());
            if (jobExecution.getStatus() == BatchStatus.FAILED) {
                // email
                log.info("Job is Failed");
            }
        }
    }

    - 2. JobBuilder에 추가하기

    @Bean
    public Job jobListenerJob(JobRepository jobRepository, @Qualifier("jobListenerStep") Step jobListenerStep) {
        return new JobBuilder("jobListenerJob", jobRepository)
                .incrementer(new RunIdIncrementer())
                .listener(new JobListenerConfig())
                .start(jobListenerStep)
                .build();
    }

     


    DB 데이터 이관하기

    - Deprecated

        ㆍ<read-entity, write-entity>chunk(transaction-count)

    - example

    @Bean
    @JobScope
    public Step trMigrationStep(JobRepository jobRepository, PlatformTransactionManager transactionManager,
                                @Qualifier("trOrdersReader") ItemReader itemReader,
                                @Qualifier("trOrderProcessor") ItemProcessor itemProcessor,
                                @Qualifier("trOrdersWriter") ItemWriter itemWriter) {
        return new StepBuilder("trMigrationStep",jobRepository)
                .chunk(5, transactionManager)   // 몇개의 단위로 데이터를 처리할 것인가?, 5개의 트랜잭션 갯수
                .reader(itemReader)
                .processor(itemProcessor)
                .writer(itemWriter)
                .build();
    }
    
    //    @StepScope
    //    @Bean
    //    public RepositoryItemWriter<Accounts> trOrdersWriter() {
    //        return new RepositoryItemWriterBuilder<Accounts>()
    //                .repository(accountsRepository)
    //                .methodName("save")
    //                .build();
    //    }
    
    @StepScope
    @Bean
    public ItemWriter<Accounts> trOrdersWriter() {
        return new ItemWriter<Accounts>() {
            @Override
            public void write(Chunk<? extends Accounts> chunk) throws Exception {
                chunk.forEach(item -> accountsRepository.save(item));
            }
        };
    }
    
    @StepScope
    @Bean
    public ItemProcessor<Orders, Accounts> trOrderProcessor() {
        return item -> new Accounts(item);
    }
    
    @StepScope
    @Bean
    public RepositoryItemReader<Orders> trOrdersReader() {
        return new RepositoryItemReaderBuilder<Orders>()
                .name("trOrdersReader")
                .repository(ordersRepository)
                .methodName("findAll")
                .pageSize(5)
                .arguments(Arrays.asList())
                .sorts(Collections.singletonMap("id", Sort.Direction.ASC))
                .build();
    }

        ㆍwriter로 RepositoryItemWriter와 ItemWriter 둘 다 사용 가능

     

     

    - example : 여러 Step 사용 + 이전 Step의 데이터 사용하기

    @Bean
    public Job multipleStepJob(JobRepository jobRepository,
                               @Qualifier("multipleStep1") Step step1,
                               @Qualifier("multipleStep2") Step step2,
                               @Qualifier("multipleStep3") Step step3) {
        return new JobBuilder("multipleStepJob", jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(step1)
                .next(step2)
                .next(step3)
                .build();
    }
    
    @JobScope
    @Bean
    public Step multipleStep1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("multipleStep1",jobRepository)
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("step1");
                    return RepeatStatus.FINISHED;
                }, transactionManager)
                .build();
    }
    @JobScope
    @Bean
    public Step multipleStep2(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("multipleStep2",jobRepository)
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("step2");
    
                    ExecutionContext executionContext = chunkContext
                            .getStepContext()
                            .getStepExecution()
                            .getJobExecution()
                            .getExecutionContext();
    
                    executionContext.put("someKey", "hello!");
    
                    return RepeatStatus.FINISHED;
                }, transactionManager)
                .build();
    }
    @JobScope
    @Bean
    public Step multipleStep3(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("multipleStep3",jobRepository)
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("step3");
    
                    ExecutionContext executionContext = chunkContext
                            .getStepContext()
                            .getStepExecution()
                            .getJobExecution()
                            .getExecutionContext();
    
                    System.out.println(executionContext.get("someKey"));
    
                    return RepeatStatus.FINISHED;
                }, transactionManager)
                .build();
    }

     

    - example : Step 결과에 따른 분기 처리

    @Bean
    public Job conditionalStepJob(JobRepository jobRepository,
                                  @Qualifier("conditionalStartStep") Step startStep,
                                  @Qualifier("conditionalAllStep") Step allStep,
                                  @Qualifier("conditionalFailStep") Step failStep,
                                  @Qualifier("conditionalCompleteStep") Step completeStep) {
        return new JobBuilder("multipleStepJob", jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(startStep)
                	.on("FAILED").to(failStep)
                .from(startStep)
                	.on("COMPLETED").to(completeStep)
                .from(startStep)
                	.on("*").to(allStep)
                .end()
                .build();
    }

     


    Test 코드

    1. 기본 세팅

    @Configuration
    @EnableAutoConfiguration
    public class SpringBatchTestConfig {
    }

        ㆍApplicationTest와 같은 패키지에 생성

    #default
    spring:
      profiles:
        active: local
    
    ---
    
    spring:
      config:
        activate:
          on-profile: local
      batch:
        job:
          name: ${job.name:None}
          enabled: false
        jdbc:
          initialize-schema: always
      datasource:
        url: jdbc:mysql://127.0.0.1:3306/spring_batch
        driver-class-name: com.mysql.cj.jdbc.Driver
        username: root
        password: 
      jpa:
        show-sql: true
    
    ---
    
    spring:
      config:
        activate:
          on-profile: test
      jpa:
        database: h2

        ㆍ@ActiveProfiles 를 사용하기 위한 yml 파일 변경

     

    2. example

    @SpringBatchTest    // jobLauncherTestUtils
    @SpringBootTest(classes = {HelloWorldJobConfig.class, SpringBatchTestConfig.class})
    @RunWith(SpringRunner.class)
    @ActiveProfiles("test")
    class HelloWorldJobConfigTest {
    
        @Autowired
        private JobLauncherTestUtils jobLauncherTestUtils;
    
        @Test
        void success() throws Exception {
            // when
            JobExecution execution = jobLauncherTestUtils.launchJob();
    
            // then
            Assertions.assertEquals(execution.getExitStatus(), ExitStatus.COMPLETED);
        }
    }

        ㆍ1개의 테스트는 1개의 Job을 테스트할 수 있다.


    배치 작업 실행하기 (스프링 스케줄링)

    @Component
    public class SampleScheduler {
        private final Job helloWorldJob;
        private final JobLauncher jobLauncher;
    
        public SampleScheduler(@Qualifier("helloWorldJob") Job helloWorldJob, JobLauncher jobLauncher) {
            this.helloWorldJob = helloWorldJob;
            this.jobLauncher = jobLauncher;
        }
    
        @Scheduled(cron = "0 */1 * * * *")
        public void helloworldJobRun() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
            JobParameters jobParameters = new JobParameters(
                    Collections.singletonMap("requestTime", new JobParameter(System.currentTimeMillis(), Long.class))
            );
    
            jobLauncher.run(helloWorldJob, jobParameters);
            // jobParameters 값 없이 job을 실행시키면 동일한 파라미터로 실행되어 spring-batch에서는 똑같은 job을 실행하다고 생각하여
            // job이 실행이 안된다.
        }
    
    
    }

        ㆍjobParameters 값 설정을 안해준다면 ?!

              - job을 실행시키면 동일한 파라미터로 실행되어 spring-batch에서는 똑같은 job을 실행한다고 생각하여 job 실행이 안된다!

     

     

     


    후기

    - spring-batch 5.0 버전에 와서 deprecated 된 부분이 많아서 하나씩 찾으면서 예제를 만들어보니 재밌었다.

    - 그리고 빈등록은 기본적으로 메서드 이름으로 되고 같은 class내에 그 메서드 이름을 받으면 DI가 되는 것을 알고 있고 찾아도 그렇게 나오는데....내 작업할 때는 메서드 이름으로 된 빈을 제대로 못찾아서 @Qualifier 으로 다 명시해줬다. 나중에 왜 안되는지에 대한 이유를 찾아봐야 겠다!

    - 내가 스케줄러를 만들 때는 @Schedule와 cron을 이용해서 그냥 Service class를 만들어 그곳에 모든 로직을 다 넣었는데, 확실히 spring-batch는 역할들이 정해져있다보니 협업 및 클린한 코드를 만들기 더 수월할 거 같다는 생각이 들었다.

     


    번외

        ㆍtest할 때 Bean을 못찾는다는 컴파일 에러가 너무 거슬려서 이 부분도 남겨본다.

     

    - 일단 JobLauncherTestUtils 를 bean으로 등록하는 부분을 찾아보면...

        ㆍ어노테이션으로 등록하는 것이 아니라 registry를 이용하는 것을 볼 수 있다.

     

    - 이 부분을 ChatGpt 한테 물어보면?!

     

    - 해결방안

        ㆍ엄청나게 깔끔한 방법은 아닌 거 같지만 컴파일에러가 나는 것보다는 괜찮다고 생각한다.

     

     

     

     

    github : https://github.com/minit97/spring-batch-start

    댓글

Designed by Tistory.