배경 : 스프링배치 5주차 스터디 2~4주차 미흡한 과제 이행으로 인한 자발적 패널티 부여로 발표자로 지원하여 정리를 진행함.
참고 : 5주차 교재
(아래 글은 한국 스프링 사용자 모임(KSUG)에서 진행된 스프링 배치 스터디 내용을 정리한 게시글입니다.
DEVOCEAN에 연재 중인 KIDO님의 글을 참고하여 실습한 내용을 기록했습니다.)
[SpringBatch 연재 05] JdbcPagingItemReader로 DB내용을 읽고, JdbcBatchItemWriter로 DB에 쓰기
devocean.sk.com
내용
복습
지난 4주차때는 FlatFileItemReader 로 CSV 파일을 읽고, FlatFileItemWriter로 파일에 셋팅하는 것을
1~3주차때는 Chunk Model과 Tasklet Model 및 스프링배치 기본 아키텍처와 흐름, JobLauncher, Step 등의 개념들을 살펴 보았습니다.
제가 다시 리뷰 할겸 중요 하다고 생각하는 것들을 이 문서에 적어두고, 여러분들께 설명드리고 5주차 내용을 진행할 까 합니다
스프링 배치 흐름
Chunk Model vs Tasklet Model
Chunk model
큰 데이터 분할 해서 처리시 위 프로세스처럼 ItemReader, ItemProcessor, ItemWriter 의 순으로 처리가 필요한 경우
Tasklet model
위 케이스 처럼 데이터 처리가 맞지 않을 경우, 단순 경량데이터 처리시 T
FlatFileItemReader -> FlatFileItemWriter
FlatFileItemReader 와 FlatFileItemWriter 에서 builder 패턴으로 사용하는 명령어들이 어떤 기능을 하는지..
알아 보았음.
이번주 내용은?
[SpringBatch 연재 05] JdbcPagingItemReader로 DB내용을 읽고, JdbcBatchItemWriter로 DB에 쓰기
빠른 습득을 위해 샘플코드를 따라하면서, 스프링배치의 기능을 익혀보자.
근데 샘플코드 프로젝트 구조 따라하려고 하는데 어떻게 만들지?
https://github.com/schooldevops/spring-batch-tutorials/tree/main
GitHub - schooldevops/spring-batch-tutorials
Contribute to schooldevops/spring-batch-tutorials development by creating an account on GitHub.
github.com
(기존에 기도님이 참고하라고 공유해주신 github 소스 링크가 있어서 들어가보았고, 실습 예제 폴더 관련해서 예시가 있었다. 앞으로 활용하면 좋을 것 같음)
JdbcPagingItemReader
JdbcPagingItemReader는 Spring Batch에서 제공하는 ItemReader 로, 데이터베이스에서 데이터를 페이징 처리하며 읽어오는 역할을 합니다. 대량의 데이터를 메모리에 한꺼번에 로드하는 것이 비효율적이거나 메모리 부족 문제가 발생할 수 있는 상황에서 유용하게 사용됩니다.
특징
- 페이징(Paging): 데이터를 한꺼번에 로드하지 않고, 설정된 페이지 크기만큼 데이터를 나누어 읽습니다. 이를 통해 데이터베이스로부터 필요한 만큼의 데이터를 효율적으로 가져올 수 있습니다.
- JDBC 기반: JDBC를 사용하여 데이터베이스에 쿼리를 실행하고 데이터를 가져옵니다. 따라서, 데이터베이스에서 데이터를 읽어와야 하는 배치 작업에 적합합니다.
- 상태 저장: JdbcPagingItemReader는 Spring Batch가 제공하는 리더로서, 배치 작업 중 재시작이 발생할 경우에도 이전에 읽은 데이터를 기억하고 다시 읽지 않도록 상태를 관리할 수 있습니다.
주요 구성요소
- DataSource: 데이터베이스 연결정보 설정
- SqlQuery: 데이터를 읽을 SQL 쿼리를 설정
- RowMapper: 읽은 데이터를 도메인 객체로 변환 (SQL 쿼리 결과를 Item으로 변환)
- PageSize: 한 번에 읽어올 페이지 크기 설정
- QueryProvider: 페이징 SQL 쿼리 제공
- SortKeys: 페이징 시 정렬 기준
- FetchSize: JDBC 드라이버에서 한 번에 읽어올 데이터 수
- SaveState: 배치 중 상태 저장 여부
Customer 클래스 생성 (models 패키지)
JdbcPagingReaderJobConfig 생성 (쿼리 Provider 생성 --> JdbcPagingItemReader)
(소스 보면서 해당 기능들 설명)
package com.ksko.spring_batch.batch_sample.jobs.jdbc;
import com.ksko.spring_batch.batch_sample.jobs.models.Customer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.PagingQueryProvider;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
public class JdbcPagingReaderJobConfig {
/**
* CHUNK 크기를 지정한다.
*/
public static final int CHUNK_SIZE = 2;
public static final String ENCODING = "UTF-8";
public static final String JDBC_PAGING_CHUNK_JOB = "JDBC_PAGING_CHUNK_JOB";
@Autowired
DataSource dataSource;
@Bean
public PagingQueryProvider queryProvider() throws Exception { // PagingQueryProvider : 데이터베이스에서 페이징 처리된 쿼리를 생성하는 역할
//SqlPagingQueryProviderFactoryBean : 일반적인 팩토리 클래스로, 데이터베이스 타입에 따라 적절한 PagingQueryProvider 구현체를 생성해
SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
queryProvider.setDataSource(dataSource); //데이터 소스 설정
queryProvider.setSelectClause("id, name, age, gender"); //select 할 컬럼명 지정
queryProvider.setFromClause("from customer"); //테이블 조회
queryProvider.setWhereClause("where age >= :age"); //조건절
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.DESCENDING);
queryProvider.setSortKeys(sortKeys);
return queryProvider.getObject();
}
@Bean
public JdbcPagingItemReader<Customer> jdbcPagingItemReader() throws Exception {
Map<String, Object> parameterValue = new HashMap<>(); //파라미터로 사용할 값을 저장할 맵 생성
parameterValue.put("age", 20); //age 컬럼에 20 셋팅
return new JdbcPagingItemReaderBuilder<Customer>()
.name("jdbcPagingItemReader") //아이템리더 이름 설정
.fetchSize(CHUNK_SIZE) //한번에 읽어올 데이터 사이즈 설정
.dataSource(dataSource) // db와 연결 정보 셋팅
.rowMapper(new BeanPropertyRowMapper<>(Customer.class)) //DB에서 읽어온 ResultSet을 Customer 객케로 변환할때 사용할 맵퍼 설정
.queryProvider(queryProvider()) //db별 적합한 페이징 쿼리 생성
.parameterValues(parameterValue) //쿼리에서 사용할 파라미터 전달 (조건 age=20 적용)
.build();
}
@Bean
public FlatFileItemWriter<Customer> customerFlatFileItemWriter() {
return new FlatFileItemWriterBuilder<Customer>()
.name("customerFlatFileItemWriter")
.resource(new FileSystemResource("./output/customer_new_v1.csv"))
.encoding(ENCODING)
.delimited().delimiter("\t")
.names("Name", "Age", "Gender")
.build();
}
@Bean
public Step customerJdbcPagingStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) throws Exception {
log.info("------------------ Init customerJdbcPagingStep -----------------");
return new StepBuilder("customerJdbcPagingStep", jobRepository)
.<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
.reader(jdbcPagingItemReader())
.writer(customerFlatFileItemWriter())
.build();
}
@Bean
public Job customerJdbcPagingJob(Step customerJdbcPagingStep, JobRepository jobRepository) {
log.info("------------------ Init customerJdbcPagingJob -----------------");
return new JobBuilder(JDBC_PAGING_CHUNK_JOB, jobRepository)
.incrementer(new RunIdIncrementer())
.start(customerJdbcPagingStep)
.build();
}
}
결과
JdbcBatchItemWriter
Spring Batch에서 제공하는 JDBC 기반의 ItemWriter 구현체로, 데이터를 일괄적으로 데이터베이스에 삽입하거나 업데이트하는 데 사용됩니다. 이 클래스는 ItemWriter 인터페이스를 구현하며, 대량의 데이터를 모아서 한 번에 처리함으로써 데이터베이스 연산의 성능을 최적화합니다.
주요 특징
- JDBC 사용: JDBC를 사용해 데이터베이스에 직접적으로 접근하여 데이터를 처리합니다.
- 일괄 처리 (Batch Processing): 한 번에 한 개의 레코드가 아닌, 일정량의 레코드를 모아 한 번에 삽입, 업데이트, 또는 삭제 작업을 수행하여 성능을 향상시킵니다.
- SQL 쿼리 설정 가능: 직접 SQL 쿼리를 정의할 수 있어, INSERT, UPDATE, DELETE 등의 다양한 작업을 지원합니다.
- PreparedStatement: PreparedStatement를 사용하여 데이터의 안전한 삽입을 도와주고, SQL 주입 공격을 방지할 수 있습니다.
JdbcBatchItemWriter 구성 요소
- DataSource: JDBC 연결을 위해 DataSource를 설정해야 합니다. 이 데이터 소스를 통해 데이터베이스와 연결.
- SqlStatementCreator: INSERT 쿼리를 생성하는 역할을 한다.
- PreparedStatementSetter: INSERT 쿼리의 파라미터 값을 설정하는 역할을 한다.
- ItemSqlParameterSourceProvider: Item 객체를 기반으로 PreparedStatementSetter에 전달할 파라미터 값을 생성하는 역할을 한다.
실습 예제
JdbcBatchItemJobConfig.java
package com.ksko.spring_batch.batch_sample.jobs.flatfilereader;
import com.ksko.spring_batch.batch_sample.jobs.models.Customer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
@Slf4j
@Configuration
public class JdbcBatchItemJobConfig {
/**
* CHUNK 크기를 지정한다.
*/
public static final int CHUNK_SIZE = 100;
public static final String ENCODING = "UTF-8";
public static final String JDBC_BATCH_WRITER_CHUNK_JOB = "JDBC_BATCH_WRITER_CHUNK_JOB";
@Autowired
DataSource dataSource;
@Bean
public FlatFileItemReader<Customer> flatFileItemReader() {
return new FlatFileItemReaderBuilder<Customer>()
.name("FlatFileItemReader")
.resource(new ClassPathResource("./customer.csv")) //github 통한 파일 다운로드
.encoding(ENCODING)
.delimited().delimiter(",")
.names("name", "age", "gender")
.targetType(Customer.class)
.build();
}
@Bean
public JdbcBatchItemWriter<Customer> flatFileItemWriter() {
return new JdbcBatchItemWriterBuilder<Customer>()
.dataSource(dataSource) // // 1. 사용할 데이터소스를 설정 (DB 연결 정보)
.sql("INSERT INTO customer (name, age, gender) VALUES (:name, :age, :gender)") // 2. 실행할 SQL 쿼리를 설정 (Customer 객체의 필드를 삽입하는 INSERT 쿼리)
.itemSqlParameterSourceProvider(new CustomerItemSqlParameterSourceProvider()) // 3. SQL 파라미터 소스를 제공하는 클래스를 설정 (Customer 객체의 속성을 SQL 파라미터로 변환)
.build();
}
@Bean
public Step flatFileStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
log.info("------------------ Init flatFileStep -----------------");
return new StepBuilder("flatFileStep", jobRepository)
.<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
.reader(flatFileItemReader())
.writer(flatFileItemWriter())
.build();
}
@Bean
public Job flatFileJob(Step flatFileStep, JobRepository jobRepository) {
log.info("------------------ Init flatFileJob -----------------");
return new JobBuilder(JDBC_BATCH_WRITER_CHUNK_JOB, jobRepository)
.incrementer(new RunIdIncrementer())
.start(flatFileStep)
.build();
}
}
CustomerItemSqlParameterSourceProvider.java
package com.ksko.spring_batch.batch_sample.jobs.flatfilereader;
import com.ksko.spring_batch.batch_sample.jobs.models.Customer;
import org.springframework.batch.item.database.ItemSqlParameterSourceProvider;
import org.springframework.jdbc.core.namedparam.BeanPropertySqlParameterSource;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
// Customer 객체를 SQL 파라미터로 변환하는 ItemSqlParameterSourceProvider 구현 클래스
public class CustomerItemSqlParameterSourceProvider implements ItemSqlParameterSourceProvider<Customer> {
// SQL 파라미터를 생성하는 메서드의 구현
@Override
public SqlParameterSource createSqlParameterSource(Customer item) {
// 주어진 Customer 객체의 속성을 SQL 파라미터로 변환하기 위해 BeanPropertySqlParameterSource 사용
return new BeanPropertySqlParameterSource(item);
}
}
결과