본문 바로가기

개발/02-1.Spring Batch

[SpringBatch, DEVOCEAN] Week9 (EZ하게) 입맛에 맞는 배치 처리를 위한 Custom ItemReader/ItemWriter 구현방법 알아보기

배경 : 스프링배치 9주차 과제 를 수행하기 위한  정리를 진행함.

 

참고 : 9주차 교재

(아래 글은 한국 스프링 사용자 모임(KSUG)에서 진행된 스프링 배치 스터디 내용을 정리한 게시글입니다.
DEVOCEAN에 연재 중인 KIDO님의 글을 참고하여 실습한 내용을 기록했습니다.)

 

https://devocean.sk.com/blog/techBoardDetail.do?ID=167030

 

[SpringBatch 연재 09] 입맛에 맞는 배치 처리를 위한 Custom ItemReader/ItemWriter 구현방법 알아보기

 

devocean.sk.com

 

내용

특이 케이스나, 비즈니스 로직에 맞는 배치를 위해 사용하는 CustomItemReader / CustomItemWriter 에 대해 알아보자.

 

금주 샘플 예제

- QuerydslPagingItemReader : Querydsl을 이용 -> Paging 하여 데이터베이스 값 읽는

- CustomItemWriter : 타 서비스 호출 

 

참고 github 예제 링크 (프로젝트 구조를 맞추어 진행)

https://github.com/schooldevops/spring-batch-tutorials/tree/10.01.FlowControl/06.04.QuerydslPagingItemReaderSample

 

spring-batch-tutorials/06.04.QuerydslPagingItemReaderSample at 10.01.FlowControl · schooldevops/spring-batch-tutorials

Contribute to schooldevops/spring-batch-tutorials development by creating an account on GitHub.

github.com

 

 


QuerydslPagingItemReader

AbstractPagingItemReader 를 이용 -> Querydsl을 활용한 ItemReader 사용 가능

Querydsl : SQL과 유사한 쿼리를 자바 코드로 작성할 수 있게 해주는 프레임워크

 - JPA 엔티티 추상화 : JPA 엔티티에 의존하지 않고, 추상화된 쿼리를 통해, 코드 유지 관리성 높힘.

 

QuerydslPagingItemReader 생성

위 파일 개발 전, Querydsl 및 JPA, Q클래스 사용하기 위한 환경 설정 필요

build.gradle

// https://g-db.tistory.com/entry/Spring-Boot-Spring-Boot%EC%97%90%EC%84%9C-JPA-QueryDSL-%EC%A0%81%EC%9A%A9-%EB%B0%A9%EB%B2%95#Gradle%EC%97%90%20QueryDSL%20%EC%9D%98%EC%A1%B4%EC%84%B1%20%EC%B6%94%EA%B0%80-1
// helped by sajacaros
// Querydsl 빌드 옵션 설정
def generated = 'src/main/generated'

// querydsl QClass 파일 생성 위치를 지정
tasks.withType(JavaCompile) {
    options.getGeneratedSourceOutputDirectory().set(file(generated))
}

// java source set 에 querydsl QClass 위치 추가
sourceSets {
    main.java.srcDirs += [ generated ]
}

// gradle clean 시에 QClass 디렉토리 삭제
clean {
    delete file(generated)
}

 

JPA 사용하기 위한 설정 셋팅 (Entity 등 추가)

 

Customer.java 

package com.ksko.spring_batch.batch_sample.jobs.models;

import jakarta.persistence.*;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Entity
@Table(name = "customer")
@NoArgsConstructor
@AllArgsConstructor
@Data
public class Customer {

    @Id
    @GeneratedValue(strategy = GenerationType.AUTO)
    private Long id;
    
    private String name;
    private int age;
    private String gender;
    private String grade; //준수님 DB 깔 맞추기위해 추가
}

 

QuerydslPagingItemReader.java

package com.ksko.spring_batch.batch_sample.jobs.jpa.reader;

import com.querydsl.jpa.impl.JPAQuery;
import com.querydsl.jpa.impl.JPAQueryFactory;
import jakarta.persistence.EntityManager;
import jakarta.persistence.EntityManagerFactory;
import org.springframework.batch.item.database.AbstractPagingItemReader;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;

public class QuerydslPagingItemReader<T> extends AbstractPagingItemReader<T> {
    //어떤 순서로 개발해야하는지 음 ㅠㅠ 쉽지않네유

    //AbstractPagingItemReader -> 어댑터 패턴 (호환되지 않는 인터페이스를 가진 객체들이 함께 작동할 수 있도록 해주는 구조적 패턴)
    //4 (EntityManager 사용을 위한 data jpa 의존성 주입)
    private EntityManager em;
    private final Function<JPAQueryFactory, JPAQuery<T>> querySupplier;

    private final Boolean alwaysReadFromZero;

    public QuerydslPagingItemReader(EntityManagerFactory entityManagerFactory, Function<JPAQueryFactory, JPAQuery<T>> querySupplier, int chunkSize) {
        this(ClassUtils.getShortName(QuerydslPagingItemReader.class), entityManagerFactory, querySupplier, chunkSize, false);
    }
    
    public QuerydslPagingItemReader(String name, EntityManagerFactory entityManagerFactory, Function<JPAQueryFactory, JPAQuery<T>> querySupplier, int chunkSize, Boolean alwaysReadFromZero) {
        super.setPageSize(chunkSize); //페이징 처리 위한 페이지 크기
        setName(name); //itemReader 구분 위한 이름
        this.querySupplier = querySupplier;
        this.em = entityManagerFactory.createEntityManager();
        this.alwaysReadFromZero = alwaysReadFromZero; //0부터 페이징 읽을지 여부 지정

    }

    @Override
    protected void doClose() throws Exception {
        if(em != null) {
            em.close();
        }
        super.doClose();
    }

    @Override
    protected void doReadPage() {
        //1
        initQueryResult();

        //3 (사용 위한 querydsl 의존성 주입 필요)
        JPAQueryFactory jpaQueryFactory = new JPAQueryFactory(em);
        long offset = 0;
        if (alwaysReadFromZero) {
            offset = (long) getPage() * getPageSize();
        }

        JPAQuery<T> query = querySupplier.apply(jpaQueryFactory).offset(offset).limit(getPageSize());

        List<T> queryResult = query.fetch();
        for (T entity: queryResult) {
            em.detach(entity);
            results.add(entity);
        }

    }

    private void initQueryResult() {
        //2
        if (CollectionUtils.isEmpty(results)) {
            results = new CopyOnWriteArrayList<>();
        } else {
            results.clear();
        }
    }

}

 

참고

AbstractPagingItemReader 는 어댑터 패턴

 

[참고] 어댑터 패턴이란?

어댑터 패턴은 호환되지 않는 인터페이스를 가진 객체들이 협업할 수 있도록 하는 구조적 디자인 패턴입니다. 이 패턴은 기존 클래스의 인터페이스를 클라이언트가 기대하는 다른 인터페이스로 변환하여, 코드 수정 없이 기존 클래스를 재사용할 수 있게 합니다.

 

자세히 써준 블로그 링크로 연결https://inpa.tistory.com/entry/GOF-%F0%9F%92%A0-%EC%96%B4%EB%8C%91%ED%84%B0Adaptor-%ED%8C%A8%ED%84%B4-%EC%A0%9C%EB%8C%80%EB%A1%9C-%EB%B0%B0%EC%9B%8C%EB%B3%B4%EC%9E%90

 

💠 어댑터(Adaptor) 패턴 - 완벽 마스터하기

Adaptor Pattern 어댑터 패턴(Adaptor Pattern) 이란 이름 그대로 클래스를 어댑터로서 사용되는 구조 패턴이다. 어댑터는 우리 주변에도 많이 볼 수 있는 것으로서, 대표적으로 110V 전용 가전제품에 220V

inpa.tistory.com

 

 

QuerydslPagingTiemReaderBuilder.java - 생성자가 복잡해서 빌더를 생성했다고 함.. (음 ㅠㅠ)

package com.ksko.spring_batch.batch_sample.jobs.jpa.reader;

import com.querydsl.jpa.impl.JPAQuery;
import com.querydsl.jpa.impl.JPAQueryFactory;
import jakarta.persistence.EntityManagerFactory;
import org.springframework.util.ClassUtils;

import java.util.function.Function;

public class QuerydslPagingItemReaderBuilder<T> {

    private EntityManagerFactory entityManagerFactory;
    private Function<JPAQueryFactory, JPAQuery<T>> querySupplier;

    private int chunkSize = 10;

    private String name;

    private Boolean alwaysReadFromZero;

    public QuerydslPagingItemReaderBuilder<T> entityManagerFactory(EntityManagerFactory entityManagerFactory) {
        this.entityManagerFactory = entityManagerFactory;
        return this;
    }

    public QuerydslPagingItemReaderBuilder<T> querySupplier(Function<JPAQueryFactory, JPAQuery<T>> querySupplier) {
        this.querySupplier = querySupplier;
        return this;
    }

    public QuerydslPagingItemReaderBuilder<T> chunkSize(int chunkSize) {
        this.chunkSize = chunkSize;
        return this;
    }

    public QuerydslPagingItemReaderBuilder<T> name(String name) {
        this.name = name;
        return this;
    }

    public QuerydslPagingItemReaderBuilder<T> alwaysReadFromZero(Boolean alwaysReadFromZero) {
        this.alwaysReadFromZero = alwaysReadFromZero;
        return this;
    }

    public QuerydslPagingItemReader<T> build() {
        if (name == null) {
            this.name = ClassUtils.getShortName(QuerydslPagingItemReader.class);
        }
        if (this.entityManagerFactory == null) {
            throw new IllegalArgumentException("EntityManagerFactory can not be null.!");
        }
        if (this.querySupplier == null) {
            throw new IllegalArgumentException("Function<JPAQueryFactory, JPAQuery<T>> can not be null.!");
        }
        if (this.alwaysReadFromZero == null) {
            alwaysReadFromZero = false;
        }
        return new QuerydslPagingItemReader<>(this.name, entityManagerFactory, querySupplier, chunkSize, alwaysReadFromZero);
    }
}

 

 

QueryDSLPagingReaderJobConfig.java - 실행 위한

package com.ksko.spring_batch.batch_sample.jobs.jpa;

import com.ksko.spring_batch.batch_sample.jobs.jpa.reader.QuerydslPagingItemReader;
import com.ksko.spring_batch.batch_sample.jobs.jpa.reader.QuerydslPagingItemReaderBuilder;
import com.ksko.spring_batch.batch_sample.jobs.models.Customer;
import com.ksko.spring_batch.batch_sample.jobs.models.QCustomer;
import com.ksko.spring_batch.batch_sample.jobs.mybatis.CustomerItemProcessor;
import jakarta.persistence.EntityManagerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

@Slf4j
@Configuration
public class QueryDSLPagingReaderJobConfig {

    /**
     * CHUNK 크기를 지정한다.
     */
    public static final int CHUNK_SIZE = 2;
    public static final String ENCODING = "UTF-8";
    public static final String QUERYDSL_PAGING_CHUNK_JOB = "QUERYDSL_PAGING_CHUNK_JOB";

    @Autowired
    DataSource dataSource;

    @Autowired
    EntityManagerFactory entityManagerFactory;

//    @Bean
//    public QuerydslPagingItemReader<Customer> customerQuerydslPagingItemReader() throws Exception {
//
//        Function<JPAQueryFactory, JPAQuery<Customer>> query = jpaQueryFactory -> jpaQueryFactory.select(QCustomer.customer).from(QCustomer.customer);
//
//        return new QuerydslPagingItemReader<>("customerQuerydslPagingItemReader", entityManagerFactory, query, CHUNK_SIZE, false);
//    }

    @Bean
    public QuerydslPagingItemReader<Customer> customerQuerydslPagingItemReader() {
        return new QuerydslPagingItemReaderBuilder<Customer>()
                .name("customerQuerydslPagingItemReader")
                .entityManagerFactory(entityManagerFactory)
                .chunkSize(2)
                .querySupplier(jpaQueryFactory -> jpaQueryFactory.select(QCustomer.customer).from(QCustomer.customer).where(QCustomer.customer.age.gt(20)))
                .build();
    }

    @Bean
    public FlatFileItemWriter<Customer> customerQuerydslFlatFileItemWriter() {

        return new FlatFileItemWriterBuilder<Customer>()
                .name("customerQuerydslFlatFileItemWriter")
                .resource(new FileSystemResource("./output/customer_new_v2.csv"))
                .encoding(ENCODING)
                .delimited().delimiter("\t")
                .names("Name", "Age", "Gender")
                .build();
    }


    @Bean
    public Step customerQuerydslPagingStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) throws Exception {
        log.info("------------------ Init customerQuerydslPagingStep -----------------");

        return new StepBuilder("customerJpaPagingStep", jobRepository)
                .<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
                .reader(customerQuerydslPagingItemReader())
                .processor(new CustomerItemProcessor())
                .writer(customerQuerydslFlatFileItemWriter())
                .build();
    }
    //step의 경우 
    @Bean
    public Job customerJpaPagingJob(Step customerJdbcPagingStep, JobRepository jobRepository) {
        log.info("------------------ Init customerJpaPagingJob -----------------");
        return new JobBuilder(QUERYDSL_PAGING_CHUNK_JOB, jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(customerJdbcPagingStep)
                .build();
    }
}

 

 

실행 결과 - 이게 맞나요? ㅠㅠ

1, 2 무한루프 인데 이제 맞나? ㅠㅠ

 

 


CustomItemWriter

기본 ItemWriter 클래스로 제공되지 않는 특정 기능 구현시 사용

 

구성 요소

  • ItemWriter 인터페이스 인 write() 메소드 구현 --> 해당 메소드 내 데이터 처리 로직 구현
  • 필요한 라이브러리 및 객체 선언하여 구현 가능

장점

  • 기본 ItemWriter 클래스로 제공되지 않는 특정 기능 구현 가능 (유연성)
  • 다양한 방식으로 데이터 처리 확장 가능 (확장성)
  • 데이터 처리 과정을 완벽하게 제어 가능 (제어 가능성)

단점

  • 기본 ItemWriter 클래스보다 개발 과정 복잡 (개발 복잡성)
  • 테스트 작성이 어렵다
  • 문제 발생시 디버깅이 어려울 수 있다.

 

커스텀 Writer 예제로 개념 이해하는 방식으로 진행

(데보션 블로그 소스만으로는 이해가 어려워 ㅠㅠ)

https://github.com/schooldevops/spring-batch-tutorials/tree/10.01.FlowControl/08.05.CustomerItemWriterSample

 

CustomService.java

package com.ksko.spring_batch.batch_sample.jobs.mybatis;

import com.ksko.spring_batch.batch_sample.jobs.models.Customer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.Map;

@Slf4j
@Service
public class CustomService {

    public Map<String, String> processToOtherService(Customer item) {
        log.info("Call API to OtherService....");
        
        return Map.of("code", "200", "message", "OK");
    }
}

 

CustomItemWriter.java

package com.ksko.spring_batch.batch_sample.jobs.mybatis;

import com.ksko.spring_batch.batch_sample.jobs.models.Customer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class CustomItemWriter implements ItemWriter<Customer> {

    private final CustomService customService;

    public CustomItemWriter(CustomService customService) {
        this.customService = customService;
    }


    @Override
    public void write(Chunk<? extends Customer> chunk) throws Exception {
        for (Customer customer : chunk) {
            log.info("Call Porcess in CustomItemWriter...");
            customService.processToOtherService(customer);
        }
    }
}

 

기존 구현된 부분에서 위 writer 실행하기 위해 기존 구현된 mybatis writer 활용

 

MybatisItemWriterJobConfig.java

2024-12-16T16:42:15.435+09:00  INFO 39012 --- [           main] j.LocalContainerEntityManagerFactoryBean : Initialized JPA EntityManagerFactory for persistence unit 'default'
2024-12-16T16:42:15.445+09:00  WARN 39012 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'entityManagerFactory' of type [org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying). Is this bean getting eagerly injected into a currently created BeanPostProcessor [jobRegistryBeanPostProcessor]? Check the corresponding BeanPostProcessor declaration and its dependencies.
2024-12-16T16:42:15.446+09:00  WARN 39012 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'entityManagerFactory' of type [jdk.proxy2.$Proxy91] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying). Is this bean getting eagerly injected into a currently created BeanPostProcessor [jobRegistryBeanPostProcessor]? Check the corresponding BeanPostProcessor declaration and its dependencies.
2024-12-16T16:42:15.451+09:00  WARN 39012 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'transactionManager' of type [org.springframework.orm.jpa.JpaTransactionManager] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying). Is this bean getting eagerly injected into a currently created BeanPostProcessor [jobRegistryBeanPostProcessor]? Check the corresponding BeanPostProcessor declaration and its dependencies.
2024-12-16T16:42:15.469+09:00  WARN 39012 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration$SpringBootBatchConfiguration' of type [org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration$SpringBootBatchConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying). The currently created BeanPostProcessor [jobRegistryBeanPostProcessor] is declared through a non-static factory method on that class; consider declaring it as static instead.
2024-12-16T16:42:15.742+09:00  INFO 39012 --- [           main] c.k.s.b.j.m.MybatisItemWriterJobConfig   : ------------------ Init flatFileStep -----------------
2024-12-16T16:42:15.789+09:00  INFO 39012 --- [           main] c.k.s.b.j.m.MybatisItemWriterJobConfig   : ------------------ Init flatFileJob -----------------
2024-12-16T16:42:15.977+09:00  INFO 39012 --- [           main] c.k.s.b.BatchSampleApplication           : Started BatchSampleApplication in 3.837 seconds (process running for 4.528)
2024-12-16T16:42:15.980+09:00  INFO 39012 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []
2024-12-16T16:42:16.264+09:00  INFO 39012 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=MY_BATIS_ITEM_WRITER]] launched with the following parameters: [{'run.id':'{value=1, type=class java.lang.Long, identifying=true}'}]
2024-12-16T16:42:16.456+09:00  INFO 39012 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [flatFileStep]
2024-12-16T16:42:16.544+09:00  INFO 39012 --- [           main] c.k.s.b.jobs.mybatis.CustomItemWriter    : Call Porcess in CustomItemWriter...
2024-12-16T16:42:16.544+09:00  INFO 39012 --- [           main] c.k.s.b.jobs.mybatis.CustomService       : Call API to OtherService....
2024-12-16T16:42:16.544+09:00  INFO 39012 --- [           main] c.k.s.b.jobs.mybatis.CustomItemWriter    : Call Porcess in CustomItemWriter...
2024-12-16T16:42:16.544+09:00  INFO 39012 --- [           main] c.k.s.b.jobs.mybatis.CustomService       : Call API to OtherService....
2024-12-16T16:42:16.544+09:00  INFO 39012 --- [           main] c.k.s.b.jobs.mybatis.CustomItemWriter    : Call Porcess in CustomItemWriter...
2024-12-16T16:42:16.544+09:00  INFO 39012 --- [           main] c.k.s.b.jobs.mybatis.CustomService       : Call API to OtherService....
2024-12-16T16:42:16.599+09:00  INFO 39012 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [flatFileStep] executed in 142ms
2024-12-16T16:42:16.705+09:00  INFO 39012 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=MY_BATIS_ITEM_WRITER]] completed with the following parameters: [{'run.id':'{value=1, type=class java.lang.Long, identifying=true}'}] and the following status: [COMPLETED] in 390ms
2024-12-16T16:42:16.710+09:00  INFO 39012 --- [ionShutdownHook] j.LocalContainerEntityManagerFactoryBean : Closing JPA EntityManagerFactory for persistence unit 'default'
2024-12-16T16:42:16.711+09:00  INFO 39012 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2024-12-16T16:42:18.523+09:00  INFO 39012 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.

 

실행은 된거 같은데 ㅠㅠ

나중에 다시 봐야겠다 ㅠㅠ

 

우선 9주차 늦게나마 끝 ㅠㅠ

728x90