From Database to Elasticsearch with Spring Batch | Усачёвы - Блог

From Database to Elasticsearch with Spring Batch

Must read

Эта запись блога открывает технический цикл статей по программированию. Не так давно столкнулся с задачей переноса данных из базы данных Oracle в Elastisearch, быть может кому-нибудь будет полезен данный опыт.

Перенос выполнялся с использованием Spring Batch и для 4,5 миллионов записей он занял порядка 16 минут.

Must read
Must read

Сам процесс не сложен, поэтому подробно описывать его не буду, а лишь приведу фрагменты кода:

Конфигурируем Step:

import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;

@Configuration
@EnableBatchProcessing
public class StepConfiguration {

    private StepBuilderFactory stepBuilderFactory;
    private DataSource dataSource;
    private EsDocumentRepository esRepository;
    private final int FETCH_SIZE = 100000;

    @Autowired
    public StepConfiguration(StepBuilderFactory stepBuilderFactory, DataSource dataSource, EsDocumentRepository esRepository) {
        this.stepBuilderFactory = stepBuilderFactory;
        this.dataSource = dataSource;
        this.esRepository = esRepository;
    }

    @Bean
    public ItemReader<EdiDocument> reader() {
        JdbcCursorItemReader<EdiDocument> databaseReader = new JdbcCursorItemReader<>();
        databaseReader.setDataSource(dataSource);
        databaseReader.setSql(LoadResource.loadResourceToString("sql/initLoadFromDocs.sql"));
        databaseReader.setRowMapper(new EdiDocumentRowMapper());
        databaseReader.setFetchSize(FETCH_SIZE);
        return databaseReader;
    }

    @Bean
    public DocProcessor processor() {
        return new DocProcessor();
    }

    @Bean
    public ElasticsearchItemWriter writer() {
        return new ElasticsearchItemWriter(esRepository);
    }

    @Bean
    public Step step() {
        return stepBuilderFactory.get("step")
                .<EdiDocument, EdiDocument>chunk(100000)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .build();
    }
}

Классы используемые в выше указанном фрагменте, листинг которых, я думаю нет смысла приводить:
— класс EdiDocument — класс описывающий структуру документа Elasticsearch (@Document);
— класс EdiDocumentRowMapper — класс имплементирующий RowMapper;
— интерфейс EsDocumentRepository наследуемый от ElasticsearchCrudRepository.

Утилитный класс LoadResource, используется для чтения SQL-скрипта из внешнего файла:

import java.io.IOException;
import java.io.InputStream;

public class LoadResource {
    public static String loadResourceToString(final String path) {
        final InputStream stream = Thread.currentThread().getContextClassLoader().getResourceAsStream(path);
        try {
            return IOUtils.toString(stream, "UTF-8");
        } catch (final IOException e) {
            throw new IllegalStateException(e);
        } finally {
            IOUtils.closeQuietly(stream);
        }
    }
}
Электронное чтиво
Электронное чтиво

В нашем случае step Spring Batch состоит из трёх основных элементов: reader, processor, writer.

Reader — используется JdbcCursorItemReader.

Processor, в нашем случае каких-либо преобразований не производит, элемент из базы данных Oracle напрямую передаётся в Elasticsearch:

import org.springframework.batch.item.ItemProcessor;

@Component
public class DocProcessor implements ItemProcessor<EdiDocument, EdiDocument> {
    @Override
    public EdiDocument process(final EdiDocument item) throws Exception {
        return item;
    }
}

Writer — класс ElasticsearchItemWriter имплементирующий ItemWriter:

import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class ElasticsearchItemWriter implements ItemWriter<EdiDocument>, InitializingBean {

    private EsDocumentRepository repository;

    public ElasticsearchItemWriter(EsDocumentRepository repository) {
        this.repository = repository;
    }

    @Override
    public void write(final List<? extends EdiDocument> items) throws Exception {
        repository.save(items);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
    }
}

И наконец, класс инициализирующий и запускающий Job:

import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class JobsLauncher {

    private JobLauncher jobLauncher;

    private JobBuilderFactory jobBuilderFactory;

    private Step createStep;

    @Autowired
    public JobsLauncher(final JobLauncher jobLauncher, final JobBuilderFactory jobBuilderFactory, final Step createStep) {
        this.jobLauncher = jobLauncher;
        this.jobBuilderFactory = jobBuilderFactory;
        this.createStep = createStep;
    }

    private Job databaseToElasticsearchJob() {
        return jobBuilderFactory.get("databaseToElasticsearch")
                .incrementer(new RunIdIncrementer())
                .flow(createStep)
                .end()
                .build();
    }

    public void launch() {
        try {
            JobParameters jobParameters = new JobParametersBuilder()
                    .addLong("time", System.currentTimeMillis())
                    .toJobParameters();
            jobLauncher.run(databaseToElasticsearchJob(), jobParameters);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
С музыкой по жизни
С музыкой по жизни

Если для кого-то информации об используемых технологиях окажется недостаточно, вы можете обратиться к основным источникам:

 

Опубликовано 28 марта 2017 г.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Лимит времени истёк. Пожалуйста, перезагрузите CAPTCHA.