Эта запись блога открывает технический цикл статей по программированию. Не так давно столкнулся с задачей переноса данных из базы данных Oracle в Elastisearch, быть может кому-нибудь будет полезен данный опыт.
Перенос выполнялся с использованием Spring Batch и для 4,5 миллионов записей он занял порядка 16 минут.

Сам процесс не сложен, поэтому подробно описывать его не буду, а лишь приведу фрагменты кода:
Конфигурируем Step:
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
@Configuration
@EnableBatchProcessing
public class StepConfiguration {
private StepBuilderFactory stepBuilderFactory;
private DataSource dataSource;
private EsDocumentRepository esRepository;
private final int FETCH_SIZE = 100000;
@Autowired
public StepConfiguration(StepBuilderFactory stepBuilderFactory, DataSource dataSource, EsDocumentRepository esRepository) {
this.stepBuilderFactory = stepBuilderFactory;
this.dataSource = dataSource;
this.esRepository = esRepository;
}
@Bean
public ItemReader<EdiDocument> reader() {
JdbcCursorItemReader<EdiDocument> databaseReader = new JdbcCursorItemReader<>();
databaseReader.setDataSource(dataSource);
databaseReader.setSql(LoadResource.loadResourceToString("sql/initLoadFromDocs.sql"));
databaseReader.setRowMapper(new EdiDocumentRowMapper());
databaseReader.setFetchSize(FETCH_SIZE);
return databaseReader;
}
@Bean
public DocProcessor processor() {
return new DocProcessor();
}
@Bean
public ElasticsearchItemWriter writer() {
return new ElasticsearchItemWriter(esRepository);
}
@Bean
public Step step() {
return stepBuilderFactory.get("step")
.<EdiDocument, EdiDocument>chunk(100000)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
}
Классы используемые в выше указанном фрагменте, листинг которых, я думаю нет смысла приводить:
— класс EdiDocument — класс описывающий структуру документа Elasticsearch (@Document);
— класс EdiDocumentRowMapper — класс имплементирующий RowMapper;
— интерфейс EsDocumentRepository наследуемый от ElasticsearchCrudRepository.
Утилитный класс LoadResource, используется для чтения SQL-скрипта из внешнего файла:
import java.io.IOException;
import java.io.InputStream;
public class LoadResource {
public static String loadResourceToString(final String path) {
final InputStream stream = Thread.currentThread().getContextClassLoader().getResourceAsStream(path);
try {
return IOUtils.toString(stream, "UTF-8");
} catch (final IOException e) {
throw new IllegalStateException(e);
} finally {
IOUtils.closeQuietly(stream);
}
}
}

В нашем случае step Spring Batch состоит из трёх основных элементов: reader, processor, writer.
Reader — используется JdbcCursorItemReader.
Processor, в нашем случае каких-либо преобразований не производит, элемент из базы данных Oracle напрямую передаётся в Elasticsearch:
import org.springframework.batch.item.ItemProcessor;
@Component
public class DocProcessor implements ItemProcessor<EdiDocument, EdiDocument> {
@Override
public EdiDocument process(final EdiDocument item) throws Exception {
return item;
}
}
Writer — класс ElasticsearchItemWriter имплементирующий ItemWriter:
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class ElasticsearchItemWriter implements ItemWriter<EdiDocument>, InitializingBean {
private EsDocumentRepository repository;
public ElasticsearchItemWriter(EsDocumentRepository repository) {
this.repository = repository;
}
@Override
public void write(final List<? extends EdiDocument> items) throws Exception {
repository.save(items);
}
@Override
public void afterPropertiesSet() throws Exception {
}
}
И наконец, класс инициализирующий и запускающий Job:
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class JobsLauncher {
private JobLauncher jobLauncher;
private JobBuilderFactory jobBuilderFactory;
private Step createStep;
@Autowired
public JobsLauncher(final JobLauncher jobLauncher, final JobBuilderFactory jobBuilderFactory, final Step createStep) {
this.jobLauncher = jobLauncher;
this.jobBuilderFactory = jobBuilderFactory;
this.createStep = createStep;
}
private Job databaseToElasticsearchJob() {
return jobBuilderFactory.get("databaseToElasticsearch")
.incrementer(new RunIdIncrementer())
.flow(createStep)
.end()
.build();
}
public void launch() {
try {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(databaseToElasticsearchJob(), jobParameters);
} catch (Exception e) {
e.printStackTrace();
}
}
}

Если для кого-то информации об используемых технологиях окажется недостаточно, вы можете обратиться к основным источникам: