본문 바로가기

개발/02-1.Spring Batch

[SpringBatch, DEVOCEAN] Week8- (EZ하게) CompositeItemProcessor 으로 여러단계에 걸쳐 데이터 Transform하기

배경 : 스프링배치 8주차 과제 를 수행하기 위한  정리를 진행함.

 

참고 : 8주차 교재

(아래 글은 한국 스프링 사용자 모임(KSUG)에서 진행된 스프링 배치 스터디 내용을 정리한 게시글입니다.
DEVOCEAN에 연재 중인 KIDO님의 글을 참고하여 실습한 내용을 기록했습니다.)

 

https://devocean.sk.com/blog/techBoardDetail.do?ID=166950

 

[SpringBatch 연재 08] CompositeItemProcessor 으로 여러단계에 걸쳐 데이터 Transform하기

 

devocean.sk.com

 

 


항상 스터디 관련 시작 하기전에, 맨 처음 하는 것은, 그 주의 스터디에서 공부할 키워드를 검색해본다.

CompositeItemProcessor 로 검색을 하니, ItemProcessor 가 그 상위 개념으로 나와서.. 기억을 돌이켜보니..

 

Spring Batch 의 기본 아키텍쳐

 

2주차 스터디에서 언급된 스프링배치 기본 아키텍쳐에 ItemProcessor 에 대해 이번시간에는 알아보는 거구나...

공부 시작해보자.

 


ItemProcessor 

  • 데이터를 가공하거나 필터링 하는 역할. (필수 x)
  • ItemWriter 에서 구현 가능
  • 분리함으로써 비즈니스 코드 섞이는 것 방지 가능

역할

변환 : ItemReader 에서 읽은 데이터를 원하는 타입으로 변환하여, Writer에 넘겨줄 수 있음.

필터 : ItemReader에서 넘겨준 데이터를 Writer로 넘길 것인지 결정할 수 있음. null 을 받으면 Writer에 전달되지 않음.

 

구현체

  • ItemProcessorAdapter
  • ValidationItemProcessor
  • CompositeItemProcessor

3개중 우리는 CompositeItemProcessor에 대해 집중적으로 알아볼 것이다.  왜냐 .. 우리 메인교제 내용이니까..


CompositeItemProcessor

여러 개의 ItemProcessor를 하나의 Processor로 연결하여 여러 단계의 처리를 수행할 수 있도록 한다.

 

구성요소

  • Delegates : 처리를 수행할 ItemProcessor 목록
  • TransactionAttribute : 트랜잭션 속성을 설정

장점

  • 단계별 처리 : 여러 단계로 나누어 처리를 수행. 코드를 명확하고 이해하기 쉽게 만들 수 있음
  • 재사용 가능성 : 각 단계별 Processor를 재사용 하여 다른 Job에서도 활용할 수 있다.
  • 유연성 : 다양한 ItemProcessor를 조합하여 원하는 처리 과정을 구현

단점

  • 설정 복잡성 : 여러 개의 Processor를 설정하고 관리해야 하기 때문에 설정 복잡해질 수 있음.
  • 성능 저하 : 여러 단계 처리 과정을 거치므로 성능이 저하될 수 있음.

위 설명 내용이 실제 그런지 샘플 소스를 통해 체득해보자...


실습 코드

기존 7주차 코드 를 활용하여 진행

 

LowerCaseItemProcessor.java

package com.ksko.spring_batch.batch_sample.jobs.mybatis;

import com.ksko.spring_batch.batch_sample.jobs.models.Customer;
import org.springframework.batch.item.ItemProcessor;

/**
 * 이름, 성별을 소문자로 변경하는 ItemProcessor
 */
public class LowerCaseItemProcessor implements ItemProcessor<Customer, Customer> {


    @Override
    public Customer process(Customer item) throws Exception {
        item.setName(item.getName().toLowerCase());
        item.setGender(item.getGender().toLowerCase());
        return item;
    }
}

 

After20YearsItemProcessor.java

package com.ksko.spring_batch.batch_sample.jobs.mybatis;

import com.ksko.spring_batch.batch_sample.jobs.models.Customer;
import org.springframework.batch.item.ItemProcessor;

/**
 * 나이에 20년 더하는 ItemProcessor
 */
public class After20YearsItemProcessor implements ItemProcessor<Customer, Customer> {

    @Override
    public Customer process(Customer item) throws Exception {
        item.setAge(item.getAge() + 20);
        return item;
    }
}

 

MybatisReaderJobConfig.java

package com.ksko.spring_batch.batch_sample.jobs.mybatis;

import com.ksko.spring_batch.batch_sample.jobs.models.Customer;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisPagingItemReader;
import org.mybatis.spring.batch.builder.MyBatisPagingItemReaderBuilder;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.batch.item.support.builder.CompositeItemProcessorBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;
import java.util.List;

@Slf4j
@Configuration
public class MyBatisReaderJobConfig {

    /**
     * CHUNK 크기를 지정한다.
     */

    public static final int CHUNK_SIZE = 2;
    public static final String ENCODING = "UTF-8";
    public static final String MYBATIS_CHUNK_JOB = "MYBATIS_CHUNK_JOB";

    @Autowired
    DataSource dataSource;

    @Autowired
    SqlSessionFactory sqlSessionFactory;

    @Bean
    public MyBatisPagingItemReader<Customer> myBatisItemReader() throws Exception {
        return new MyBatisPagingItemReaderBuilder<Customer>()
                .sqlSessionFactory(sqlSessionFactory) // MyBatis의 SqlSessionFactory를 설정, 데이터베이스 연결과 매퍼 쿼리 실행에 필요한 MyBatis 환경을 제공
                .pageSize(CHUNK_SIZE) // 한 번에 읽어올 데이터의 크기를 설정
                .queryId("com.ksko.spring_batch.batch_sample.jobs.selectCustomers")  // 실행할 MyBatis 매퍼 쿼리의 ID를 지정, `com.ksko.spring_batch.batch_sample.jobs` 패키지 아래에 있는 `selectCustomers` 쿼리를 실행
                .build(); // 설정이 완료된 MyBatisPagingItemReader 객체를 생성
    }
    @Bean
    public FlatFileItemWriter<Customer> customerCursorFlatFileItemWriter() {
        return new FlatFileItemWriterBuilder<Customer>()
                .name("customerCursorFlatFileItemWriter")
                .resource(new FileSystemResource("./output/customer_new_v4.csv"))
                .encoding(ENCODING)
                .delimited().delimiter("\t")
                .names("Name", "Age", "Gender")
                .build();
    }

    //CompositeItemProcessor 구현
    @Bean
    public CompositeItemProcessor<Customer, Customer> compositeItemProcessor() {
        return new CompositeItemProcessorBuilder<Customer, Customer>()
                .delegates(List.of( // delegate 역할 : 1.ItemProcessor 체이닝, 2. 순서대로 처리, 3.데이터 변환 및 필터링, 4.유연한 처리 로직, 5. 코드 재사용성
                        new LowerCaseItemProcessor(),
                        new After20YearsItemProcessor()
                ))
                .build();
    }

    @Bean
    public Step customerJdbcCursorStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) throws Exception {
        log.info("------------------ Init customerJdbcCursorStep -----------------");

        return new StepBuilder("customerJdbcCursorStep", jobRepository)
                .<Customer, Customer> chunk(CHUNK_SIZE, transactionManager)
                .reader(myBatisItemReader())
                //.processor(new CustomerItemProcessor())
                .processor(compositeItemProcessor()) // compositeItemProcessor 셋팅, 기존 커스텀 아이템 프로세서 주석
                .writer(items -> items.forEach(System.out::println)) // 결과 로그 찍기위한 셋팅
                //.writer(customerCursorFlatFileItemWriter())
                .build();
    }

    @Bean
    public Job customerJdbcCursorPagingJob(Step customerJdbcCursorStep, JobRepository jobRepository) {
        log.info("------------------ Init customerJdbcCursorPagingJob -----------------");
        return new JobBuilder(MYBATIS_CHUNK_JOB, jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(customerJdbcCursorStep)
                .build();
    }
}

 

DB 데이터

실행 결과

Customer(id=1, name=alice, age=66, gender=f, grade=null)
Customer(id=2, name=bob, age=71, gender=m, grade=null)
Customer(id=3, name=charlie, age=63, gender=m, grade=null)
Customer(id=4, name=diana, age=68, gender=f, grade=null)
Customer(id=5, name=eve, age=70, gender=f, grade=null)
Customer(id=6, name=frank, age=76, gender=m, grade=null)
Customer(id=7, name=grace, age=65, gender=f, grade=null)
Customer(id=8, name=hank, age=69, gender=m, grade=null)
Customer(id=9, name=ivy, age=67, gender=f, grade=null)
Customer(id=10, name=jack, age=72, gender=m, grade=null)
Customer(id=11, name=kate, age=70, gender=f, grade=null)
Customer(id=12, name=leo, age=80, gender=m, grade=null)
Customer(id=13, name=mia, age=68, gender=f, grade=null)
Customer(id=14, name=nate, age=76, gender=m, grade=null)
Customer(id=15, name=olivia, age=74, gender=f, grade=null)
Customer(id=16, name=paul, age=81, gender=m, grade=null)
Customer(id=17, name=quinn, age=72, gender=m, grade=null)
Customer(id=18, name=rachel, age=75, gender=f, grade=null)
Customer(id=19, name=sam, age=79, gender=m, grade=null)
Customer(id=20, name=tina, age=73, gender=f, grade=null)
Customer(id=27, name=unclebae, age=45, gender=m, grade=null)
Customer(id=28, name=ksko, age=62, gender=m, grade=null)
Customer(id=29, name=sja1008, age=43, gender=f, grade=null)

 

정리

CompositeItemProcessor 는 ItemProcessor를 여러개 생성 하고 이를 순서대로 적용할 수 있음

 

728x90