EnrichCatalogBatchConfiguration.java
package com.sintia.ffl.admin.optique.catalogue.batch.config.enrichcatalog;
import com.sintia.ffl.admin.optique.catalogue.batch.config.CatalogsDecider;
import com.sintia.ffl.admin.optique.catalogue.batch.config.ChunkCountListener;
import com.sintia.ffl.admin.optique.catalogue.batch.config.InfiniteSkipPolicy;
import com.sintia.ffl.admin.optique.catalogue.batch.config.InvalidCatalogParametersExtractor;
import com.sintia.ffl.admin.optique.catalogue.batch.config.ListCatalogsFilesStepExecutionListener;
import com.sintia.ffl.admin.optique.catalogue.batch.processor.enrichcatalog.CheckCatalogItemProcessor;
import com.sintia.ffl.admin.optique.catalogue.batch.processor.enrichcatalog.EnrichExtrasItemProcessor;
import com.sintia.ffl.admin.optique.catalogue.batch.processor.enrichcatalog.EnrichGlassesItemProcessor;
import com.sintia.ffl.admin.optique.catalogue.batch.reader.enrichcatalog.EnrichCatalogAssociationItemReader;
import com.sintia.ffl.admin.optique.catalogue.batch.reader.enrichcatalog.EnrichCatalogAssociationItemReaderListener;
import com.sintia.ffl.admin.optique.catalogue.batch.reader.enrichcatalog.EnrichCatalogExtrasItemReader;
import com.sintia.ffl.admin.optique.catalogue.batch.reader.enrichcatalog.EnrichCatalogExtrasItemReaderListener;
import com.sintia.ffl.admin.optique.catalogue.batch.reader.enrichcatalog.EnrichCatalogGlassesItemReader;
import com.sintia.ffl.admin.optique.catalogue.batch.reader.enrichcatalog.EnrichCatalogGlassesItemReaderListener;
import com.sintia.ffl.admin.optique.catalogue.batch.tasklet.enrichcatalog.CopyAssociationsFiles;
import com.sintia.ffl.admin.optique.catalogue.batch.tasklet.enrichcatalog.EnrichmentArchiveFiles;
import com.sintia.ffl.admin.optique.catalogue.batch.tasklet.enrichcatalog.EnrichmentListCatalogsFiles;
import com.sintia.ffl.admin.optique.catalogue.batch.tasklet.enrichcatalog.EnrichmentProcessInvalidCatalog;
import com.sintia.ffl.admin.optique.catalogue.batch.writer.EnrichedExtrasItemWriter;
import com.sintia.ffl.admin.optique.catalogue.batch.writer.EnrichedGlassesItemWriter;
import com.sintia.ffl.admin.optique.catalogue.batch.writer.NoOpItemWriter;
import com.sintia.ffl.admin.optique.catalogue.models.AssociationCSV;
import com.sintia.ffl.admin.optique.catalogue.models.EnrichedExtrasCSV;
import com.sintia.ffl.admin.optique.catalogue.models.EnrichedGlassesCSV;
import com.sintia.ffl.admin.optique.catalogue.models.ExtrasCSV;
import com.sintia.ffl.admin.optique.catalogue.models.GlassesCSV;
import com.sintia.ffl.admin.optique.catalogue.util.Constants;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Configure the bean to launch the enriching catalog job
*
* @author jumazet
*/
@Configuration
@EnableBatchProcessing
public class EnrichCatalogBatchConfiguration {
@Autowired
public ListCatalogsFilesStepExecutionListener enrichCatalogListFilesStepExecutionListener;
@Autowired
public EnrichCatalogJobCompletionNotificationListener listener;
@Autowired
public EnrichCatalogInvalidCatalogJobCompletionNotificationListener listenerInvalidCatalog;
@Autowired
public EnrichAllCatalogsJobCompletionNotificationListener loopListener;
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
public EnrichCatalogAssociationItemReader associationItemReader;
@Autowired
public EnrichCatalogGlassesItemReader glassesItemReader;
@Autowired
public EnrichCatalogExtrasItemReader extrasItemReader;
@Autowired
public CheckCatalogItemProcessor checkCatalogProcessor;
@Autowired
public EnrichGlassesItemProcessor enrichGlassesItemProcessor;
@Autowired
public EnrichExtrasItemProcessor enrichExtrasItemProcessor;
@Autowired
public NoOpItemWriter noOpWriter;
@Autowired
public EnrichedGlassesItemWriter enrichedGlassesItemWriter;
@Autowired
public EnrichedExtrasItemWriter enrichExtrasItemWriter;
@Autowired
public ChunkCountListener chunkCountListener;
@Autowired
public CopyAssociationsFiles copyAssociationsFiles;
@Autowired
public EnrichmentArchiveFiles enrichmentArchiveFiles;
@Autowired
public EnrichmentProcessInvalidCatalog enrichmentProcessInvalidCatalog;
@Autowired
public EnrichmentListCatalogsFiles listCatalogsFiles;
@Autowired
public EnrichCatalogParametersExtractor jobParametersExtractor;
@Autowired
public InvalidCatalogParametersExtractor invalidCatalogjobParametersExtractor;
@Autowired
public CatalogsDecider enrichAllCatalogsDecider;
@Autowired
public EnrichCatalogDecider enrichCatalogDecider;
@Autowired
public InfiniteSkipPolicy infiniteSkipPolicy;
@Autowired
public EnrichCatalogAssociationItemReaderListener enrichCatalogAssociationItemReaderListener;
@Autowired
public EnrichCatalogGlassesItemReaderListener enrichCatalogGlassesItemReaderListener;
@Autowired
public EnrichCatalogExtrasItemReaderListener enrichCatalogExtrasItemReaderListener;
/**
* Main job that loop on all the catalogs available on the FTP, and launch for each one the enrichment job
*
* @return
*/
@Bean
public Job enrichAllCatalogs() {
FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("enrichAllCatalogsJob");
Flow flow = flowBuilder
.start(listCatalogsFilesToEnrichStep())
.next(enrichAllCatalogsDecider).on(CatalogsDecider.NEXT_VALID).to(enrichCatalogStep())
.from(enrichAllCatalogsDecider).on(CatalogsDecider.NEXT_INVALID).to(invalidCatalogsStep())
.from(enrichAllCatalogsDecider).on(CatalogsDecider.END).stop()
.next(enrichCatalogStep())
.next(enrichAllCatalogsDecider).on(CatalogsDecider.NEXT_VALID).to(enrichCatalogStep())
.from(enrichAllCatalogsDecider).on(CatalogsDecider.NEXT_INVALID).to(invalidCatalogsStep())
.from(enrichAllCatalogsDecider).on(CatalogsDecider.END).stop()
.next(invalidCatalogsStep())
.next(enrichAllCatalogsDecider).on(CatalogsDecider.NEXT_VALID).to(enrichCatalogStep())
.from(enrichAllCatalogsDecider).on(CatalogsDecider.NEXT_INVALID).to(invalidCatalogsStep())
.from(enrichAllCatalogsDecider).on(CatalogsDecider.END).stop()
.build();
return jobBuilderFactory.get("enrichAllCatalogs")
.incrementer(new RunIdIncrementer())
.listener(loopListener)
.start(flow)
.end()
.build();
}
/**
* List all the catalogs files to enrich available on the FTP
*
* @return
*/
@Bean
public Step listCatalogsFilesToEnrichStep() {
return stepBuilderFactory.get("list catalogs files to enrich")
.tasklet(listCatalogsFiles)
.listener(enrichCatalogListFilesStepExecutionListener)
.build();
}
/**
* A step only used to launch a "sub-job" that process one specific invalid catalog
*
* @return
*/
@Bean
public Step invalidCatalogsStep() {
return stepBuilderFactory.get("invalid catalogs")
.job(invalidCatalog())
.parametersExtractor(invalidCatalogjobParametersExtractor)
.build();
}
/**
* Job that enrich one specific catalog, which is given in the job parameters
*
* @return
*/
@Bean
public Job invalidCatalog() {
FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("invalidCatalog");
Flow flow = flowBuilder
.start(reportInvalidCatalog())
.build();
return jobBuilderFactory.get("invalidCatalog")
.incrementer(new RunIdIncrementer())
.listener(listenerInvalidCatalog)
.start(flow)
.end()
.build();
}
/**
* Add to the reporter the invalid catalog
*
* @return
*/
@Bean
public Step reportInvalidCatalog() {
return stepBuilderFactory.get("report invalid catalog")
.tasklet(enrichmentProcessInvalidCatalog)
.build();
}
/**
* A step only used to launch a "sub-job" that enrich only one specific catalog
*
* @return
*/
@Bean
public Step enrichCatalogStep() {
return stepBuilderFactory.get("enrich one catalog")
.job(enrichCatalog())
.parametersExtractor(jobParametersExtractor)
.build();
}
/**
* Job that enrich one specific catalog, which is given in the job parameters
*
* @return
*/
@Bean
public Job enrichCatalog() {
FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("enrichCatalog");
Flow flow = flowBuilder
.start(enrichCatalogDecider).on(Constants.NEXT).to(checkCatalog())
.from(enrichCatalogDecider).on(Constants.INCOMPLETE_JOB_PARAMETERS).end(Constants.INCOMPLETE_JOB_PARAMETERS)
.from(enrichCatalogDecider).on(Constants.INVALID_JOB_PARAMETERS).end(Constants.INVALID_JOB_PARAMETERS)
.from(enrichCatalogDecider).on(Constants.BAD_MAKER_PROVIDER).end(Constants.BAD_MAKER_PROVIDER)
.next(checkCatalog())
.next(enrichGlassesCatalog())
.next(enrichExtrasCatalog())
.next(copyAssociationsFile())
.next(archiveCatalogFiles())
.build();
return jobBuilderFactory.get("enrichCatalog")
.incrementer(new RunIdIncrementer())
.listener(listener)
.start(flow)
.end()
.build();
}
/**
* Read the associations file and check that every glasses/extras referenced really exist in the 2 others files.
*
* @return
*/
@Bean
public Step checkCatalog() {
return stepBuilderFactory.get("check catalog consistency")
.<AssociationCSV, AssociationCSV> chunk(1000)
.faultTolerant()
.skip(Exception.class)
.skipPolicy(infiniteSkipPolicy)
.reader(associationItemReader)
.processor(checkCatalogProcessor)
.writer(noOpWriter)
.listener(enrichCatalogAssociationItemReaderListener)
.build();
}
/**
* Enrich the glasses file from the catalog
*
* @return
*/
@Bean
public Step enrichGlassesCatalog() {
return stepBuilderFactory.get("enrich glasses file")
.<GlassesCSV, EnrichedGlassesCSV> chunk(1000)
.faultTolerant()
.skip(Exception.class)
.skipPolicy(infiniteSkipPolicy)
.reader(glassesItemReader)
.processor(enrichGlassesItemProcessor)
.writer(enrichedGlassesItemWriter)
.listener(enrichCatalogGlassesItemReaderListener)
.listener(chunkCountListener)
.build();
}
/**
* Enrich the extras file from the catalog
*
* @return
*/
@Bean
public Step enrichExtrasCatalog() {
return stepBuilderFactory.get("enrich extras file")
.<ExtrasCSV, EnrichedExtrasCSV> chunk(1000)
.faultTolerant()
.skip(Exception.class)
.skipPolicy(infiniteSkipPolicy)
.reader(extrasItemReader)
.processor(enrichExtrasItemProcessor)
.writer(enrichExtrasItemWriter)
.listener(chunkCountListener)
.listener(enrichCatalogExtrasItemReaderListener)
.build();
}
/**
* Move the associations file to the right (local) directory
*
* @return
*/
@Bean
public Step copyAssociationsFile() {
return stepBuilderFactory.get("copy associations file")
.tasklet(copyAssociationsFiles)
.build();
}
/**
* Archive all the files (in input and output) processed in the job
*
* @return
*/
@Bean
public Step archiveCatalogFiles() {
return stepBuilderFactory.get("archive catalog files")
.tasklet(enrichmentArchiveFiles)
.build();
}
}