본문 바로가기

개발/02-1.Spring Batch

[SpringBatch, DEVOCEAN] Week5-JdbcPagingItemReader로 DB내용을 읽고, JdbcBatchItemWriter로 DB에 쓰기

배경 : 스프링배치 5주차 스터디 2~4주차 미흡한 과제 이행으로 인한 자발적 패널티 부여로 발표자로 지원하여 정리를 진행함.

 

참고 : 5주차 교재

(아래 글은 한국 스프링 사용자 모임(KSUG)에서 진행된 스프링 배치 스터디 내용을 정리한 게시글입니다.
DEVOCEAN에 연재 중인 KIDO님의 글을 참고하여 실습한 내용을 기록했습니다.)

 

[SpringBatch 연재 05] JdbcPagingItemReader로 DB내용을 읽고, JdbcBatchItemWriter로 DB에 쓰기

 

devocean.sk.com


내용

복습

지난 4주차때는 FlatFileItemReader 로 CSV 파일을 읽고, FlatFileItemWriter로 파일에 셋팅하는 것을

1~3주차때는 Chunk Model과 Tasklet Model 및 스프링배치 기본 아키텍처와 흐름, JobLauncher, Step 등의 개념들을 살펴 보았습니다.

 

제가 다시 리뷰 할겸 중요 하다고 생각하는 것들을 이 문서에 적어두고, 여러분들께 설명드리고 5주차 내용을 진행할 까 합니다

스프링 배치 흐름

스프링 배치 흐름

 

Chunk Model vs Tasklet Model

Chunk 모델 실행 flow

 

Chunk model

큰 데이터 분할 해서 처리시 위 프로세스처럼 ItemReader, ItemProcessor, ItemWriter 의 순으로 처리가 필요한 경우 

 

Tasklet model

위 케이스 처럼 데이터 처리가 맞지 않을 경우, 단순 경량데이터 처리시 T

 

FlatFileItemReader -> FlatFileItemWriter 

FlatFileItemReader 와 FlatFileITemWriter 사용 샘플

 

FlatFileItemReader 와 FlatFileItemWriter 에서 builder 패턴으로 사용하는 명령어들이 어떤 기능을 하는지..

알아 보았음.

 

이번주 내용은?

5주차 주요 내용

 

[SpringBatch 연재 05] JdbcPagingItemReader로 DB내용을 읽고, JdbcBatchItemWriter로 DB에 쓰기

 

빠른 습득을 위해 샘플코드를 따라하면서, 스프링배치의 기능을 익혀보자.

근데 샘플코드 프로젝트 구조 따라하려고 하는데 어떻게 만들지?

 

https://github.com/schooldevops/spring-batch-tutorials/tree/main

 

GitHub - schooldevops/spring-batch-tutorials

Contribute to schooldevops/spring-batch-tutorials development by creating an account on GitHub.

github.com

(기존에 기도님이 참고하라고 공유해주신 github 소스 링크가 있어서 들어가보았고, 실습 예제 폴더 관련해서 예시가 있었다. 앞으로 활용하면 좋을 것 같음)

 

JdbcPagingItemReader 

JdbcPagingItemReader는 Spring Batch에서 제공하는 ItemReader 로, 데이터베이스에서 데이터를 페이징 처리하며 읽어오는 역할을 합니다. 대량의 데이터를 메모리에 한꺼번에 로드하는 것이 비효율적이거나 메모리 부족 문제가 발생할 수 있는 상황에서 유용하게 사용됩니다.

특징

  • 페이징(Paging): 데이터를 한꺼번에 로드하지 않고, 설정된 페이지 크기만큼 데이터를 나누어 읽습니다. 이를 통해 데이터베이스로부터 필요한 만큼의 데이터를 효율적으로 가져올 수 있습니다.
  • JDBC 기반: JDBC를 사용하여 데이터베이스에 쿼리를 실행하고 데이터를 가져옵니다. 따라서, 데이터베이스에서 데이터를 읽어와야 하는 배치 작업에 적합합니다.
  • 상태 저장: JdbcPagingItemReader는 Spring Batch가 제공하는 리더로서, 배치 작업 중 재시작이 발생할 경우에도 이전에 읽은 데이터를 기억하고 다시 읽지 않도록 상태를 관리할 수 있습니다.

주요 구성요소

 

  • DataSource: 데이터베이스 연결정보 설정
  • SqlQuery: 데이터를 읽을 SQL 쿼리를 설정
  • RowMapper: 읽은 데이터를 도메인 객체로 변환 (SQL 쿼리 결과를 Item으로 변환)
  • PageSize: 한 번에 읽어올 페이지 크기 설정
  •  
  • QueryProvider: 페이징 SQL 쿼리 제공
  • SortKeys: 페이징 시 정렬 기준
  • FetchSize: JDBC 드라이버에서 한 번에 읽어올 데이터 수
  • SaveState: 배치 중 상태 저장 여부

 

 

 

Customer 클래스 생성 (models 패키지)

 

JdbcPagingReaderJobConfig  생성 (쿼리 Provider 생성 --> JdbcPagingItemReader)

(소스 보면서 해당 기능들 설명)

package com.ksko.spring_batch.batch_sample.jobs.jdbc;

import com.ksko.spring_batch.batch_sample.jobs.models.Customer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.PagingQueryProvider;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Slf4j
@Configuration
public class JdbcPagingReaderJobConfig {

    /**
     * CHUNK 크기를 지정한다.
     */
    public static final int CHUNK_SIZE = 2;
    public static final String ENCODING = "UTF-8";
    public static final String JDBC_PAGING_CHUNK_JOB = "JDBC_PAGING_CHUNK_JOB";

    @Autowired
    DataSource dataSource;

    @Bean
    public PagingQueryProvider queryProvider() throws Exception { // PagingQueryProvider : 데이터베이스에서 페이징 처리된 쿼리를 생성하는 역할
        //SqlPagingQueryProviderFactoryBean : 일반적인 팩토리 클래스로, 데이터베이스 타입에 따라 적절한 PagingQueryProvider 구현체를 생성해
        SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
        queryProvider.setDataSource(dataSource); //데이터 소스 설정
        queryProvider.setSelectClause("id, name, age, gender"); //select 할 컬럼명 지정
        queryProvider.setFromClause("from customer"); //테이블 조회
        queryProvider.setWhereClause("where age >= :age"); //조건절

        Map<String, Order> sortKeys = new HashMap<>(1);
        sortKeys.put("id", Order.DESCENDING);

        queryProvider.setSortKeys(sortKeys);

        return queryProvider.getObject();
    }

    @Bean
    public JdbcPagingItemReader<Customer> jdbcPagingItemReader() throws Exception {
        Map<String, Object> parameterValue = new HashMap<>(); //파라미터로 사용할 값을 저장할 맵 생성
        parameterValue.put("age", 20); //age 컬럼에 20 셋팅

        return new JdbcPagingItemReaderBuilder<Customer>()
                .name("jdbcPagingItemReader") //아이템리더 이름 설정
                .fetchSize(CHUNK_SIZE) //한번에 읽어올 데이터 사이즈 설정
                .dataSource(dataSource) // db와 연결 정보 셋팅
                .rowMapper(new BeanPropertyRowMapper<>(Customer.class)) //DB에서 읽어온 ResultSet을 Customer 객케로 변환할때 사용할 맵퍼 설정
                .queryProvider(queryProvider()) //db별 적합한 페이징 쿼리 생성
                .parameterValues(parameterValue) //쿼리에서 사용할 파라미터 전달 (조건 age=20 적용)
                .build();
    }


    @Bean
    public FlatFileItemWriter<Customer> customerFlatFileItemWriter() {
        return new FlatFileItemWriterBuilder<Customer>()
                .name("customerFlatFileItemWriter")
                .resource(new FileSystemResource("./output/customer_new_v1.csv"))
                .encoding(ENCODING)
                .delimited().delimiter("\t")
                .names("Name", "Age", "Gender")
                .build();
    }


    @Bean
    public Step customerJdbcPagingStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) throws Exception {
        log.info("------------------ Init customerJdbcPagingStep -----------------");

        return new StepBuilder("customerJdbcPagingStep", jobRepository)
                .<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
                .reader(jdbcPagingItemReader())
                .writer(customerFlatFileItemWriter())
                .build();
    }

    @Bean
    public Job customerJdbcPagingJob(Step customerJdbcPagingStep, JobRepository jobRepository) {
        log.info("------------------ Init customerJdbcPagingJob -----------------");
        return new JobBuilder(JDBC_PAGING_CHUNK_JOB, jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(customerJdbcPagingStep)
                .build();
    }



}

 

 

 

결과

JDK 버전 차이 로 오류

 

 

JdbcBatchItemWriter

Spring Batch에서 제공하는 JDBC 기반의 ItemWriter 구현체로, 데이터를 일괄적으로 데이터베이스에 삽입하거나 업데이트하는 데 사용됩니다. 이 클래스는 ItemWriter 인터페이스를 구현하며, 대량의 데이터를 모아서 한 번에 처리함으로써 데이터베이스 연산의 성능을 최적화합니다.

 

주요 특징

  • JDBC 사용: JDBC를 사용해 데이터베이스에 직접적으로 접근하여 데이터를 처리합니다.
  • 일괄 처리 (Batch Processing): 한 번에 한 개의 레코드가 아닌, 일정량의 레코드를 모아 한 번에 삽입, 업데이트, 또는 삭제 작업을 수행하여 성능을 향상시킵니다.
  • SQL 쿼리 설정 가능: 직접 SQL 쿼리를 정의할 수 있어, INSERT, UPDATE, DELETE 등의 다양한 작업을 지원합니다.
  • PreparedStatement: PreparedStatement를 사용하여 데이터의 안전한 삽입을 도와주고, SQL 주입 공격을 방지할 수 있습니다.

JdbcBatchItemWriter 구성 요소

  • DataSource: JDBC 연결을 위해 DataSource를 설정해야 합니다. 이 데이터 소스를 통해 데이터베이스와 연결.
  • SqlStatementCreator: INSERT 쿼리를 생성하는 역할을 한다.
  • PreparedStatementSetter: INSERT 쿼리의 파라미터 값을 설정하는 역할을 한다.
  • ItemSqlParameterSourceProvider: Item 객체를 기반으로 PreparedStatementSetter에 전달할 파라미터 값을 생성하는 역할을 한다.

 

실습 예제

JdbcBatchItemJobConfig.java

package com.ksko.spring_batch.batch_sample.jobs.flatfilereader;

import com.ksko.spring_batch.batch_sample.jobs.models.Customer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

@Slf4j
@Configuration
public class JdbcBatchItemJobConfig {

    /**
     * CHUNK 크기를 지정한다.
     */
    public static final int CHUNK_SIZE = 100;
    public static final String ENCODING = "UTF-8";
    public static final String JDBC_BATCH_WRITER_CHUNK_JOB = "JDBC_BATCH_WRITER_CHUNK_JOB";

    @Autowired
    DataSource dataSource;

    @Bean
    public FlatFileItemReader<Customer> flatFileItemReader() {
        return new FlatFileItemReaderBuilder<Customer>()
                .name("FlatFileItemReader")
                .resource(new ClassPathResource("./customer.csv")) //github 통한 파일 다운로드
                .encoding(ENCODING)
                .delimited().delimiter(",")
                .names("name", "age", "gender")
                .targetType(Customer.class)
                .build();
    }

    @Bean
    public JdbcBatchItemWriter<Customer> flatFileItemWriter() {
        return new JdbcBatchItemWriterBuilder<Customer>()
                .dataSource(dataSource) // // 1. 사용할 데이터소스를 설정 (DB 연결 정보)
                .sql("INSERT INTO customer (name, age, gender) VALUES (:name, :age, :gender)") // 2. 실행할 SQL 쿼리를 설정 (Customer 객체의 필드를 삽입하는 INSERT 쿼리)
                .itemSqlParameterSourceProvider(new CustomerItemSqlParameterSourceProvider()) // 3. SQL 파라미터 소스를 제공하는 클래스를 설정 (Customer 객체의 속성을 SQL 파라미터로 변환)
                .build();
    }

    @Bean
    public Step flatFileStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        log.info("------------------ Init flatFileStep -----------------");

        return new StepBuilder("flatFileStep", jobRepository)
                .<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
                .reader(flatFileItemReader())
                .writer(flatFileItemWriter())
                .build();
    }

    @Bean
    public Job flatFileJob(Step flatFileStep, JobRepository jobRepository) {
        log.info("------------------ Init flatFileJob -----------------");
        return new JobBuilder(JDBC_BATCH_WRITER_CHUNK_JOB, jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(flatFileStep)
                .build();
    }

}

 

CustomerItemSqlParameterSourceProvider.java

package com.ksko.spring_batch.batch_sample.jobs.flatfilereader;

import com.ksko.spring_batch.batch_sample.jobs.models.Customer;
import org.springframework.batch.item.database.ItemSqlParameterSourceProvider;
import org.springframework.jdbc.core.namedparam.BeanPropertySqlParameterSource;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;

// Customer 객체를 SQL 파라미터로 변환하는 ItemSqlParameterSourceProvider 구현 클래스
public class CustomerItemSqlParameterSourceProvider implements ItemSqlParameterSourceProvider<Customer> {
    // SQL 파라미터를 생성하는 메서드의 구현
    @Override
    public SqlParameterSource createSqlParameterSource(Customer item) {
        // 주어진 Customer 객체의 속성을 SQL 파라미터로 변환하기 위해 BeanPropertySqlParameterSource 사용
        return new BeanPropertySqlParameterSource(item);
    }
}

 

 

결과

JDK 버전 차이 로 오류

 

 

 

728x90