EnrichCatalogBatchConfiguration.java

package com.sintia.ffl.admin.optique.catalogue.batch.config.enrichcatalog;

import com.sintia.ffl.admin.optique.catalogue.batch.config.CatalogsDecider;
import com.sintia.ffl.admin.optique.catalogue.batch.config.ChunkCountListener;
import com.sintia.ffl.admin.optique.catalogue.batch.config.InfiniteSkipPolicy;
import com.sintia.ffl.admin.optique.catalogue.batch.config.InvalidCatalogParametersExtractor;
import com.sintia.ffl.admin.optique.catalogue.batch.config.ListCatalogsFilesStepExecutionListener;
import com.sintia.ffl.admin.optique.catalogue.batch.processor.enrichcatalog.CheckCatalogItemProcessor;
import com.sintia.ffl.admin.optique.catalogue.batch.processor.enrichcatalog.EnrichExtrasItemProcessor;
import com.sintia.ffl.admin.optique.catalogue.batch.processor.enrichcatalog.EnrichGlassesItemProcessor;
import com.sintia.ffl.admin.optique.catalogue.batch.reader.enrichcatalog.EnrichCatalogAssociationItemReader;
import com.sintia.ffl.admin.optique.catalogue.batch.reader.enrichcatalog.EnrichCatalogAssociationItemReaderListener;
import com.sintia.ffl.admin.optique.catalogue.batch.reader.enrichcatalog.EnrichCatalogExtrasItemReader;
import com.sintia.ffl.admin.optique.catalogue.batch.reader.enrichcatalog.EnrichCatalogExtrasItemReaderListener;
import com.sintia.ffl.admin.optique.catalogue.batch.reader.enrichcatalog.EnrichCatalogGlassesItemReader;
import com.sintia.ffl.admin.optique.catalogue.batch.reader.enrichcatalog.EnrichCatalogGlassesItemReaderListener;
import com.sintia.ffl.admin.optique.catalogue.batch.tasklet.enrichcatalog.CopyAssociationsFiles;
import com.sintia.ffl.admin.optique.catalogue.batch.tasklet.enrichcatalog.EnrichmentArchiveFiles;
import com.sintia.ffl.admin.optique.catalogue.batch.tasklet.enrichcatalog.EnrichmentListCatalogsFiles;
import com.sintia.ffl.admin.optique.catalogue.batch.tasklet.enrichcatalog.EnrichmentProcessInvalidCatalog;
import com.sintia.ffl.admin.optique.catalogue.batch.writer.EnrichedExtrasItemWriter;
import com.sintia.ffl.admin.optique.catalogue.batch.writer.EnrichedGlassesItemWriter;
import com.sintia.ffl.admin.optique.catalogue.batch.writer.NoOpItemWriter;
import com.sintia.ffl.admin.optique.catalogue.models.AssociationCSV;
import com.sintia.ffl.admin.optique.catalogue.models.EnrichedExtrasCSV;
import com.sintia.ffl.admin.optique.catalogue.models.EnrichedGlassesCSV;
import com.sintia.ffl.admin.optique.catalogue.models.ExtrasCSV;
import com.sintia.ffl.admin.optique.catalogue.models.GlassesCSV;
import com.sintia.ffl.admin.optique.catalogue.util.Constants;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Configure the bean to launch the enriching catalog job
 *
 * @author jumazet
 */
@Configuration
@EnableBatchProcessing
public class EnrichCatalogBatchConfiguration {

	@Autowired
	public ListCatalogsFilesStepExecutionListener enrichCatalogListFilesStepExecutionListener;

	@Autowired
	public EnrichCatalogJobCompletionNotificationListener listener;

	@Autowired
	public EnrichCatalogInvalidCatalogJobCompletionNotificationListener listenerInvalidCatalog;

	@Autowired
	public EnrichAllCatalogsJobCompletionNotificationListener loopListener;

	@Autowired
	public JobBuilderFactory jobBuilderFactory;

	@Autowired
	public StepBuilderFactory stepBuilderFactory;

	@Autowired
	public EnrichCatalogAssociationItemReader associationItemReader;

	@Autowired
	public EnrichCatalogGlassesItemReader glassesItemReader;

	@Autowired
	public EnrichCatalogExtrasItemReader extrasItemReader;

	@Autowired
	public CheckCatalogItemProcessor checkCatalogProcessor;

	@Autowired
	public EnrichGlassesItemProcessor enrichGlassesItemProcessor;

	@Autowired
	public EnrichExtrasItemProcessor enrichExtrasItemProcessor;

	@Autowired
	public NoOpItemWriter noOpWriter;

	@Autowired
	public EnrichedGlassesItemWriter enrichedGlassesItemWriter;

	@Autowired
	public EnrichedExtrasItemWriter enrichExtrasItemWriter;

	@Autowired
	public ChunkCountListener chunkCountListener;

	@Autowired
	public CopyAssociationsFiles copyAssociationsFiles;

	@Autowired
	public EnrichmentArchiveFiles enrichmentArchiveFiles;

	@Autowired
	public EnrichmentProcessInvalidCatalog enrichmentProcessInvalidCatalog;

	@Autowired
	public EnrichmentListCatalogsFiles listCatalogsFiles;

	@Autowired
	public EnrichCatalogParametersExtractor jobParametersExtractor;

	@Autowired
	public InvalidCatalogParametersExtractor invalidCatalogjobParametersExtractor;

	@Autowired
	public CatalogsDecider enrichAllCatalogsDecider;

	@Autowired
	public EnrichCatalogDecider enrichCatalogDecider;

	@Autowired
	public InfiniteSkipPolicy infiniteSkipPolicy;

	@Autowired
	public EnrichCatalogAssociationItemReaderListener enrichCatalogAssociationItemReaderListener;

	@Autowired
	public EnrichCatalogGlassesItemReaderListener enrichCatalogGlassesItemReaderListener;

	@Autowired
	public EnrichCatalogExtrasItemReaderListener enrichCatalogExtrasItemReaderListener;

	/**
	 * Main job that loop on all the catalogs available on the FTP, and launch for each one the enrichment job
	 *
	 * @return
	 */
	@Bean
	public Job enrichAllCatalogs() {
		FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("enrichAllCatalogsJob");
		Flow flow = flowBuilder
			.start(listCatalogsFilesToEnrichStep())
			.next(enrichAllCatalogsDecider).on(CatalogsDecider.NEXT_VALID).to(enrichCatalogStep())
			.from(enrichAllCatalogsDecider).on(CatalogsDecider.NEXT_INVALID).to(invalidCatalogsStep())
			.from(enrichAllCatalogsDecider).on(CatalogsDecider.END).stop()
			.next(enrichCatalogStep())
			.next(enrichAllCatalogsDecider).on(CatalogsDecider.NEXT_VALID).to(enrichCatalogStep())
			.from(enrichAllCatalogsDecider).on(CatalogsDecider.NEXT_INVALID).to(invalidCatalogsStep())
			.from(enrichAllCatalogsDecider).on(CatalogsDecider.END).stop()
			.next(invalidCatalogsStep())
			.next(enrichAllCatalogsDecider).on(CatalogsDecider.NEXT_VALID).to(enrichCatalogStep())
			.from(enrichAllCatalogsDecider).on(CatalogsDecider.NEXT_INVALID).to(invalidCatalogsStep())
			.from(enrichAllCatalogsDecider).on(CatalogsDecider.END).stop()
			.build();

		return jobBuilderFactory.get("enrichAllCatalogs")
			.incrementer(new RunIdIncrementer())
			.listener(loopListener)
			.start(flow)
			.end()
			.build();
	}

	/**
	 * List all the catalogs files to enrich available on the FTP
	 *
	 * @return
	 */
	@Bean
	public Step listCatalogsFilesToEnrichStep() {
		return stepBuilderFactory.get("list catalogs files to enrich")
			.tasklet(listCatalogsFiles)
			.listener(enrichCatalogListFilesStepExecutionListener)
			.build();
	}

	/**
	 * A step only used to launch a "sub-job" that process one specific invalid catalog
	 *
	 * @return
	 */
	@Bean
	public Step invalidCatalogsStep() {
		return stepBuilderFactory.get("invalid catalogs")
			.job(invalidCatalog())
			.parametersExtractor(invalidCatalogjobParametersExtractor)
			.build();
	}

	/**
	 * Job that enrich one specific catalog, which is given in the job parameters
	 *
	 * @return
	 */
	@Bean
	public Job invalidCatalog() {
		FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("invalidCatalog");

		Flow flow = flowBuilder
			.start(reportInvalidCatalog())
			.build();

		return jobBuilderFactory.get("invalidCatalog")
			.incrementer(new RunIdIncrementer())
			.listener(listenerInvalidCatalog)
			.start(flow)
			.end()
			.build();
	}

	/**
	 * Add to the reporter the invalid catalog
	 *
	 * @return
	 */
	@Bean
	public Step reportInvalidCatalog() {
		return stepBuilderFactory.get("report invalid catalog")
			.tasklet(enrichmentProcessInvalidCatalog)
			.build();
	}

	/**
	 * A step only used to launch a "sub-job" that enrich only one specific catalog
	 *
	 * @return
	 */
	@Bean
	public Step enrichCatalogStep() {
		return stepBuilderFactory.get("enrich one catalog")
			.job(enrichCatalog())
			.parametersExtractor(jobParametersExtractor)
			.build();
	}

	/**
	 * Job that enrich one specific catalog, which is given in the job parameters
	 *
	 * @return
	 */
	@Bean
	public Job enrichCatalog() {
		FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("enrichCatalog");

		Flow flow = flowBuilder
			.start(enrichCatalogDecider).on(Constants.NEXT).to(checkCatalog())
			.from(enrichCatalogDecider).on(Constants.INCOMPLETE_JOB_PARAMETERS).end(Constants.INCOMPLETE_JOB_PARAMETERS)
			.from(enrichCatalogDecider).on(Constants.INVALID_JOB_PARAMETERS).end(Constants.INVALID_JOB_PARAMETERS)
			.from(enrichCatalogDecider).on(Constants.BAD_MAKER_PROVIDER).end(Constants.BAD_MAKER_PROVIDER)
			.next(checkCatalog())
			.next(enrichGlassesCatalog())
			.next(enrichExtrasCatalog())
			.next(copyAssociationsFile())
			.next(archiveCatalogFiles())
			.build();

		return jobBuilderFactory.get("enrichCatalog")
			.incrementer(new RunIdIncrementer())
			.listener(listener)
			.start(flow)
			.end()
			.build();
	}

	/**
	 * Read the associations file and check that every glasses/extras referenced really exist in the 2 others files.
	 *
	 * @return
	 */
	@Bean
	public Step checkCatalog() {
		return stepBuilderFactory.get("check catalog consistency")
			.<AssociationCSV, AssociationCSV> chunk(1000)
			.faultTolerant()
			.skip(Exception.class)
			.skipPolicy(infiniteSkipPolicy)
			.reader(associationItemReader)
			.processor(checkCatalogProcessor)
			.writer(noOpWriter)
			.listener(enrichCatalogAssociationItemReaderListener)
			.build();
	}

	/**
	 * Enrich the glasses file from the catalog
	 *
	 * @return
	 */
	@Bean
	public Step enrichGlassesCatalog() {
		return stepBuilderFactory.get("enrich glasses file")
			.<GlassesCSV, EnrichedGlassesCSV> chunk(1000)
			.faultTolerant()
			.skip(Exception.class)
			.skipPolicy(infiniteSkipPolicy)
			.reader(glassesItemReader)
			.processor(enrichGlassesItemProcessor)
			.writer(enrichedGlassesItemWriter)
			.listener(enrichCatalogGlassesItemReaderListener)
			.listener(chunkCountListener)
			.build();
	}

	/**
	 * Enrich the extras file from the catalog
	 *
	 * @return
	 */
	@Bean
	public Step enrichExtrasCatalog() {
		return stepBuilderFactory.get("enrich extras file")
			.<ExtrasCSV, EnrichedExtrasCSV> chunk(1000)
			.faultTolerant()
			.skip(Exception.class)
			.skipPolicy(infiniteSkipPolicy)
			.reader(extrasItemReader)
			.processor(enrichExtrasItemProcessor)
			.writer(enrichExtrasItemWriter)
			.listener(chunkCountListener)
			.listener(enrichCatalogExtrasItemReaderListener)
			.build();
	}

	/**
	 * Move the associations file to the right (local) directory
	 *
	 * @return
	 */
	@Bean
	public Step copyAssociationsFile() {
		return stepBuilderFactory.get("copy associations file")
			.tasklet(copyAssociationsFiles)
			.build();
	}

	/**
	 * Archive all the files (in input and output) processed in the job
	 *
	 * @return
	 */
	@Bean
	public Step archiveCatalogFiles() {
		return stepBuilderFactory.get("archive catalog files")
			.tasklet(enrichmentArchiveFiles)
			.build();
	}

}