Эта запись блога открывает технический цикл статей по программированию. Не так давно столкнулся с задачей переноса данных из базы данных Oracle в Elastisearch, быть может кому-нибудь будет полезен данный опыт.
Перенос выполнялся с использованием Spring Batch и для 4,5 миллионов записей он занял порядка 16 минут.
Сам процесс не сложен, поэтому подробно описывать его не буду, а лишь приведу фрагменты кода:
Конфигурируем Step:
import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.database.JdbcCursorItemReader; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.sql.DataSource; @Configuration @EnableBatchProcessing public class StepConfiguration { private StepBuilderFactory stepBuilderFactory; private DataSource dataSource; private EsDocumentRepository esRepository; private final int FETCH_SIZE = 100000; @Autowired public StepConfiguration(StepBuilderFactory stepBuilderFactory, DataSource dataSource, EsDocumentRepository esRepository) { this.stepBuilderFactory = stepBuilderFactory; this.dataSource = dataSource; this.esRepository = esRepository; } @Bean public ItemReader<EdiDocument> reader() { JdbcCursorItemReader<EdiDocument> databaseReader = new JdbcCursorItemReader<>(); databaseReader.setDataSource(dataSource); databaseReader.setSql(LoadResource.loadResourceToString("sql/initLoadFromDocs.sql")); databaseReader.setRowMapper(new EdiDocumentRowMapper()); databaseReader.setFetchSize(FETCH_SIZE); return databaseReader; } @Bean public DocProcessor processor() { return new DocProcessor(); } @Bean public ElasticsearchItemWriter writer() { return new ElasticsearchItemWriter(esRepository); } @Bean public Step step() { return stepBuilderFactory.get("step") .<EdiDocument, EdiDocument>chunk(100000) .reader(reader()) .processor(processor()) .writer(writer()) .build(); } }
Классы используемые в выше указанном фрагменте, листинг которых, я думаю нет смысла приводить:
— класс EdiDocument — класс описывающий структуру документа Elasticsearch (@Document);
— класс EdiDocumentRowMapper — класс имплементирующий RowMapper;
— интерфейс EsDocumentRepository наследуемый от ElasticsearchCrudRepository.
Утилитный класс LoadResource, используется для чтения SQL-скрипта из внешнего файла:
import java.io.IOException; import java.io.InputStream; public class LoadResource { public static String loadResourceToString(final String path) { final InputStream stream = Thread.currentThread().getContextClassLoader().getResourceAsStream(path); try { return IOUtils.toString(stream, "UTF-8"); } catch (final IOException e) { throw new IllegalStateException(e); } finally { IOUtils.closeQuietly(stream); } } }
В нашем случае step Spring Batch состоит из трёх основных элементов: reader, processor, writer.
Reader — используется JdbcCursorItemReader.
Processor, в нашем случае каких-либо преобразований не производит, элемент из базы данных Oracle напрямую передаётся в Elasticsearch:
import org.springframework.batch.item.ItemProcessor; @Component public class DocProcessor implements ItemProcessor<EdiDocument, EdiDocument> { @Override public EdiDocument process(final EdiDocument item) throws Exception { return item; } }
Writer — класс ElasticsearchItemWriter имплементирующий ItemWriter:
import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; import java.util.List; @Component public class ElasticsearchItemWriter implements ItemWriter<EdiDocument>, InitializingBean { private EsDocumentRepository repository; public ElasticsearchItemWriter(EsDocumentRepository repository) { this.repository = repository; } @Override public void write(final List<? extends EdiDocument> items) throws Exception { repository.save(items); } @Override public void afterPropertiesSet() throws Exception { } }
И наконец, класс инициализирующий и запускающий Job:
import org.springframework.batch.core.*; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class JobsLauncher { private JobLauncher jobLauncher; private JobBuilderFactory jobBuilderFactory; private Step createStep; @Autowired public JobsLauncher(final JobLauncher jobLauncher, final JobBuilderFactory jobBuilderFactory, final Step createStep) { this.jobLauncher = jobLauncher; this.jobBuilderFactory = jobBuilderFactory; this.createStep = createStep; } private Job databaseToElasticsearchJob() { return jobBuilderFactory.get("databaseToElasticsearch") .incrementer(new RunIdIncrementer()) .flow(createStep) .end() .build(); } public void launch() { try { JobParameters jobParameters = new JobParametersBuilder() .addLong("time", System.currentTimeMillis()) .toJobParameters(); jobLauncher.run(databaseToElasticsearchJob(), jobParameters); } catch (Exception e) { e.printStackTrace(); } } }
Если для кого-то информации об используемых технологиях окажется недостаточно, вы можете обратиться к основным источникам: