LoadCatalogBatchConfiguration.java
package com.sintia.ffl.admin.optique.catalogue.batch.config.loadcatalog;
import com.sintia.ffl.admin.optique.catalogue.batch.config.CatalogsDecider;
import com.sintia.ffl.admin.optique.catalogue.batch.config.ChunkCountListener;
import com.sintia.ffl.admin.optique.catalogue.batch.config.InfiniteSkipPolicy;
import com.sintia.ffl.admin.optique.catalogue.batch.config.InvalidCatalogParametersExtractor;
import com.sintia.ffl.admin.optique.catalogue.batch.processor.loadcatalog.LoadCatalogAssociationProcessor;
import com.sintia.ffl.admin.optique.catalogue.batch.processor.loadcatalog.LoadCatalogEnrichedExtrasProcessor;
import com.sintia.ffl.admin.optique.catalogue.batch.processor.loadcatalog.LoadCatalogEnrichedGlassesProcessor;
import com.sintia.ffl.admin.optique.catalogue.batch.reader.loadcatalog.LoadCatalogAssociationItemReader;
import com.sintia.ffl.admin.optique.catalogue.batch.reader.loadcatalog.LoadCatalogEnrichedExtrasItemReader;
import com.sintia.ffl.admin.optique.catalogue.batch.reader.loadcatalog.LoadCatalogEnrichedExtrasItemReaderListener;
import com.sintia.ffl.admin.optique.catalogue.batch.reader.loadcatalog.LoadCatalogEnrichedGlassesItemReader;
import com.sintia.ffl.admin.optique.catalogue.batch.reader.loadcatalog.LoadCatalogEnrichedGlassesItemReaderListener;
import com.sintia.ffl.admin.optique.catalogue.batch.tasklet.loadcatalog.LoadCatalogArchiveFiles;
import com.sintia.ffl.admin.optique.catalogue.batch.tasklet.loadcatalog.LoadCatalogCheckCatalogFiles;
import com.sintia.ffl.admin.optique.catalogue.batch.tasklet.loadcatalog.LoadCatalogCleanLocalDirectory;
import com.sintia.ffl.admin.optique.catalogue.batch.tasklet.loadcatalog.LoadCatalogListCatalogsFiles;
import com.sintia.ffl.admin.optique.catalogue.batch.tasklet.loadcatalog.LoadCatalogProcessInvalidCatalog;
import com.sintia.ffl.admin.optique.catalogue.models.AssociationCSV;
import com.sintia.ffl.admin.optique.catalogue.models.EnrichedExtrasCSV;
import com.sintia.ffl.admin.optique.catalogue.models.EnrichedGlassesCSV;
import com.sintia.ffl.admin.optique.dal.entities.catalogue.ModeleVerreCatalogue;
import com.sintia.ffl.admin.optique.dal.entities.catalogue.SupplementVerreCatalogue;
import com.sintia.ffl.admin.optique.dal.entities.catalogue.VerreSupplementAssoCatalogue;
import com.sintia.ffl.admin.optique.services.dto.MakerProvider;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.persistence.EntityManagerFactory;
import java.util.List;
/**
* Configure the bean to launch the catalog loading job
*
* @author jumazet
*/
@Configuration
@EnableBatchProcessing
public class LoadCatalogBatchConfiguration {
@Autowired
public EntityManagerFactory emf;
@Autowired
public LoadCatalogJobCompletionNotificationListener listener;
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
public CatalogParametersValidator catalogParametersValidator;
@Autowired
public LoadCatalogCheckCatalogFiles checkCatalogFilesTasklet;
@Autowired
public InfiniteSkipPolicy infiniteSkipPolicy;
@Autowired
public LoadCatalogEnrichedGlassesItemReader enrichedGlassesItemReader;
@Autowired
public LoadCatalogEnrichedExtrasItemReader enrichedExtrasItemReader;
@Autowired
public LoadCatalogAssociationItemReader associationItemReader;
@Autowired
public LoadCatalogEnrichedGlassesProcessor enrichedGlassesProcessor;
@Autowired
public LoadCatalogEnrichedExtrasProcessor loadCatalogEnrichedExtrasProcessor;
@Autowired
public LoadCatalogAssociationProcessor loadCatalogAssociationProcessor;
@Autowired
public LoadCatalogEnrichedGlassesItemReaderListener enrichedGlassesItemReaderListener;
@Autowired
public LoadCatalogEnrichedExtrasItemReaderListener enrichedExtrasItemReaderListener;
@Autowired
public LoadCatalogExecutionContextPromotionListener loadCatalogExecutionContextPromotionListener;
@Autowired
public LoadCatalogEnrichedGlassesStepExecutionListener loadCatalogEnrichedGlassesStepExecutionListener;
@Autowired
public LoadCatalogAssociationStepExecutionListener loadCatalogAssociationStepExecutionListener;
@Autowired
public LoadCatalogEnrichedGlasseSkipListener loadCatalogEnrichGlassesSkipListener;
@Autowired
public LoadCatalogEnrichedExtraSkipListener loadCatalogEnrichedExtraSkipListener;
@Autowired
public LoadCatalogEnrichedAssociationSkipListener loadCatalogEnrichedAssociationSkipListener;
@Autowired
public ChunkCountListener chunkCountListener;
@Autowired
public LoadCatalogCleanLocalDirectory loadCatalogCleanLocalDirectory;
@Autowired
public LoadCatalogArchiveFiles loadCatalogArchiveFiles;
@Autowired
public LoadCatalogParametersExtractor jobParametersExtractor;
@Autowired
public LoadCatalogListCatalogsFiles listCatalogsFiles;
@Autowired
public CatalogsDecider loadAllCatalogsDecider;
@Autowired
public LoadAllCatalogsJobCompletionNotificationListener loopListener;
@Autowired
public InvalidCatalogParametersExtractor invalidCatalogjobParametersExtractor;
@Autowired
public LoadCatalogInvalidCatalogJobCompletionNotificationListener listenerInvalidCatalog;
@Autowired
public LoadCatalogProcessInvalidCatalog loadCatalogProcessInvalidCatalog;
public static List<MakerProvider> makersProfilers;
@Bean
public Job loadAllCatalogs() {
FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("load all catalogs Job");
Flow flow = flowBuilder
.start(listCatalogsFilesToLoadStep())
.next(loadAllCatalogsDecider)
.on(CatalogsDecider.NEXT_VALID)
.to(loadCatalogStep())
.from(loadAllCatalogsDecider)
.on(CatalogsDecider.NEXT_INVALID)
.to(invalidCatalogsToLoadStep())
.from(loadAllCatalogsDecider)
.on(CatalogsDecider.END)
.stop()
.next(loadCatalogStep())
.next(loadAllCatalogsDecider)
.on(CatalogsDecider.NEXT_VALID)
.to(loadCatalogStep())
.from(loadAllCatalogsDecider)
.on(CatalogsDecider.NEXT_INVALID)
.to(invalidCatalogsToLoadStep())
.from(loadAllCatalogsDecider)
.on(CatalogsDecider.END)
.stop()
.next(invalidCatalogsToLoadStep())
.next(loadAllCatalogsDecider)
.on(CatalogsDecider.NEXT_VALID)
.to(loadCatalogStep())
.from(loadAllCatalogsDecider)
.on(CatalogsDecider.NEXT_INVALID)
.to(invalidCatalogsToLoadStep())
.from(loadAllCatalogsDecider)
.on(CatalogsDecider.END)
.stop()
.build();
return jobBuilderFactory.get("loadAllCatalogs")
.incrementer(new RunIdIncrementer())
.listener(loopListener)
.start(flow)
.end()
.build();
}
/**
* A step only used to launch a "sub-job" that process one specific invalid
* catalog
*
* @return
*/
@Bean
public Step invalidCatalogsToLoadStep() {
return stepBuilderFactory.get("invalid catalogs")
.job(invalidCatalogToLoad())
.parametersExtractor(invalidCatalogjobParametersExtractor)
.build();
}
/**
* Job that enrich one specific catalog, which is given in the job parameters
*
* @return
*/
@Bean
public Job invalidCatalogToLoad() {
FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("invalidCatalog");
Flow flow = flowBuilder.start(reportInvalidCatalogToLoad())
.build();
return jobBuilderFactory.get("invalidCatalog")
.incrementer(new RunIdIncrementer())
.listener(listenerInvalidCatalog)
.start(flow)
.end()
.build();
}
/**
* Add to the reporter the invalid catalog
*
* @return
*/
@Bean
public Step reportInvalidCatalogToLoad() {
return stepBuilderFactory.get("report invalid catalog")
.tasklet(loadCatalogProcessInvalidCatalog)
.build();
}
/**
* A step only used to launch a "sub-job" that load only one specific catalog
*
* @return
*/
@Bean
public Step loadCatalogStep() {
return stepBuilderFactory.get("load one catalog")
.job(loadCatalog())
.parametersExtractor(jobParametersExtractor)
.build();
}
/**
* Main job that loop on all the catalogs available on the FTP, and launch for
* each one the load job
*
* @return
*/
@Bean
public Job loadCatalog() {
FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("loadCatalogFlow");
Flow flow = flowBuilder
.start(listCatalogsFilesToLoadStep())
.next(checkFiles())
.next(loadGlasses())
.next(loadExtras())
.next(loadAssociations())
.next(archiveLoadCatalogFiles())
.build();
return jobBuilderFactory.get("loadCatalog")
.incrementer(new RunIdIncrementer())
// To do something at the end of the job execution
.listener(listener)
// To check that the job parameters values are valid
.validator(catalogParametersValidator)
.start(flow)
.end()
.build();
}
/**
* List all the catalogs files to enrich available on the FTP
*
* @return
*/
@Bean
public Step listCatalogsFilesToLoadStep() {
return stepBuilderFactory.get("list catalogs files to load")
.tasklet(listCatalogsFiles)
.build();
}
/**
* Check that the catalog files are present in the right directory
*
* @return
*/
@Bean
public Step checkFiles() {
return stepBuilderFactory.get("checkFiles")
.tasklet(checkCatalogFilesTasklet)
.build();
}
/**
* Load (ie save in the database) the glasses from the glasses file
*
* @return
*/
@Bean
public Step loadGlasses() {
JpaItemWriter<ModeleVerreCatalogue> writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(emf);
return stepBuilderFactory.get("loadGlasses")
.<EnrichedGlassesCSV, ModeleVerreCatalogue>chunk(1000)
.faultTolerant()
.skip(Exception.class)
.skipPolicy(infiniteSkipPolicy)
.listener(loadCatalogEnrichGlassesSkipListener)
.reader(enrichedGlassesItemReader)
.processor(enrichedGlassesProcessor)
.writer(writer)
.listener(enrichedGlassesItemReaderListener)
.listener(loadCatalogEnrichedGlassesStepExecutionListener)
.listener(loadCatalogExecutionContextPromotionListener)
.listener(chunkCountListener)
.build();
}
/**
* Load (ie save in the database) the extras from the extras file
*
* @return
*/
@Bean
public Step loadExtras() {
JpaItemWriter<SupplementVerreCatalogue> writer = new JpaItemWriter<SupplementVerreCatalogue>();
writer.setEntityManagerFactory(emf);
return stepBuilderFactory.get("loadExtras")
.<EnrichedExtrasCSV, SupplementVerreCatalogue>chunk(1000)
.faultTolerant()
.skip(Exception.class)
.skipPolicy(infiniteSkipPolicy)
.listener(loadCatalogEnrichedExtraSkipListener)
.reader(enrichedExtrasItemReader)
.processor(loadCatalogEnrichedExtrasProcessor)
.writer(writer)
.listener(enrichedExtrasItemReaderListener)
.listener(chunkCountListener)
.build();
}
/**
* Load (ie save in the database) the associations from the associations file
*
* @return
*/
@Bean
public Step loadAssociations() {
JpaItemWriter<VerreSupplementAssoCatalogue> writer = new JpaItemWriter<VerreSupplementAssoCatalogue>();
writer.setEntityManagerFactory(emf);
return stepBuilderFactory.get("loadAssociations")
.<AssociationCSV, VerreSupplementAssoCatalogue>chunk(1000)
.faultTolerant()
.skip(Exception.class)
.skipPolicy(infiniteSkipPolicy)
.listener(loadCatalogEnrichedAssociationSkipListener)
.reader(associationItemReader)
.processor(loadCatalogAssociationProcessor)
.writer(writer)
.listener(loadCatalogAssociationStepExecutionListener)
.listener(chunkCountListener)
.build();
}
/**
* Empty all the local directories used by this job (ie input, output, archived
* input, archived output)
*
* @return
*/
@Bean
public Step cleanLoadCatalogLocalDirectories() {
return stepBuilderFactory.get("cleanLoadCatalogLocalDirectories")
.tasklet(loadCatalogCleanLocalDirectory)
.build();
}
/**
* Archive all the files (in input and output) processed in the job
*
* @return
*/
@Bean
public Step archiveLoadCatalogFiles() {
return stepBuilderFactory.get("archiveLoadCatalogFiles")
.tasklet(loadCatalogArchiveFiles)
.build();
}
}