LoadCatalogBatchConfiguration.java

package com.sintia.ffl.admin.optique.catalogue.batch.config.loadcatalog;

import com.sintia.ffl.admin.optique.catalogue.batch.config.CatalogsDecider;
import com.sintia.ffl.admin.optique.catalogue.batch.config.ChunkCountListener;
import com.sintia.ffl.admin.optique.catalogue.batch.config.InfiniteSkipPolicy;
import com.sintia.ffl.admin.optique.catalogue.batch.config.InvalidCatalogParametersExtractor;
import com.sintia.ffl.admin.optique.catalogue.batch.processor.loadcatalog.LoadCatalogAssociationProcessor;
import com.sintia.ffl.admin.optique.catalogue.batch.processor.loadcatalog.LoadCatalogEnrichedExtrasProcessor;
import com.sintia.ffl.admin.optique.catalogue.batch.processor.loadcatalog.LoadCatalogEnrichedGlassesProcessor;
import com.sintia.ffl.admin.optique.catalogue.batch.reader.loadcatalog.LoadCatalogAssociationItemReader;
import com.sintia.ffl.admin.optique.catalogue.batch.reader.loadcatalog.LoadCatalogEnrichedExtrasItemReader;
import com.sintia.ffl.admin.optique.catalogue.batch.reader.loadcatalog.LoadCatalogEnrichedExtrasItemReaderListener;
import com.sintia.ffl.admin.optique.catalogue.batch.reader.loadcatalog.LoadCatalogEnrichedGlassesItemReader;
import com.sintia.ffl.admin.optique.catalogue.batch.reader.loadcatalog.LoadCatalogEnrichedGlassesItemReaderListener;
import com.sintia.ffl.admin.optique.catalogue.batch.tasklet.loadcatalog.LoadCatalogArchiveFiles;
import com.sintia.ffl.admin.optique.catalogue.batch.tasklet.loadcatalog.LoadCatalogCheckCatalogFiles;
import com.sintia.ffl.admin.optique.catalogue.batch.tasklet.loadcatalog.LoadCatalogCleanLocalDirectory;
import com.sintia.ffl.admin.optique.catalogue.batch.tasklet.loadcatalog.LoadCatalogListCatalogsFiles;
import com.sintia.ffl.admin.optique.catalogue.batch.tasklet.loadcatalog.LoadCatalogProcessInvalidCatalog;
import com.sintia.ffl.admin.optique.catalogue.models.AssociationCSV;
import com.sintia.ffl.admin.optique.catalogue.models.EnrichedExtrasCSV;
import com.sintia.ffl.admin.optique.catalogue.models.EnrichedGlassesCSV;
import com.sintia.ffl.admin.optique.dal.entities.catalogue.ModeleVerreCatalogue;
import com.sintia.ffl.admin.optique.dal.entities.catalogue.SupplementVerreCatalogue;
import com.sintia.ffl.admin.optique.dal.entities.catalogue.VerreSupplementAssoCatalogue;
import com.sintia.ffl.admin.optique.services.dto.MakerProvider;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.persistence.EntityManagerFactory;
import java.util.List;

/**
 * Configure the bean to launch the catalog loading job
 *
 * @author jumazet
 */
@Configuration
@EnableBatchProcessing
public class LoadCatalogBatchConfiguration {

	@Autowired
	public EntityManagerFactory emf;

	@Autowired
	public LoadCatalogJobCompletionNotificationListener listener;

	@Autowired
	public JobBuilderFactory jobBuilderFactory;

	@Autowired
	public StepBuilderFactory stepBuilderFactory;

	@Autowired
	public CatalogParametersValidator catalogParametersValidator;

	@Autowired
	public LoadCatalogCheckCatalogFiles checkCatalogFilesTasklet;

	@Autowired
	public InfiniteSkipPolicy infiniteSkipPolicy;

	@Autowired
	public LoadCatalogEnrichedGlassesItemReader enrichedGlassesItemReader;
	@Autowired
	public LoadCatalogEnrichedExtrasItemReader enrichedExtrasItemReader;
	@Autowired
	public LoadCatalogAssociationItemReader associationItemReader;

	@Autowired
	public LoadCatalogEnrichedGlassesProcessor enrichedGlassesProcessor;
	@Autowired
	public LoadCatalogEnrichedExtrasProcessor loadCatalogEnrichedExtrasProcessor;
	@Autowired
	public LoadCatalogAssociationProcessor loadCatalogAssociationProcessor;

	@Autowired
	public LoadCatalogEnrichedGlassesItemReaderListener enrichedGlassesItemReaderListener;
	@Autowired
	public LoadCatalogEnrichedExtrasItemReaderListener enrichedExtrasItemReaderListener;
	@Autowired
	public LoadCatalogExecutionContextPromotionListener loadCatalogExecutionContextPromotionListener;
	@Autowired
	public LoadCatalogEnrichedGlassesStepExecutionListener loadCatalogEnrichedGlassesStepExecutionListener;
	@Autowired
	public LoadCatalogAssociationStepExecutionListener loadCatalogAssociationStepExecutionListener;

	@Autowired
	public LoadCatalogEnrichedGlasseSkipListener loadCatalogEnrichGlassesSkipListener;
	@Autowired
	public LoadCatalogEnrichedExtraSkipListener loadCatalogEnrichedExtraSkipListener;
	@Autowired
	public LoadCatalogEnrichedAssociationSkipListener loadCatalogEnrichedAssociationSkipListener;

	@Autowired
	public ChunkCountListener chunkCountListener;
	
	@Autowired
	public LoadCatalogCleanLocalDirectory loadCatalogCleanLocalDirectory;

	@Autowired
	public LoadCatalogArchiveFiles loadCatalogArchiveFiles;

	@Autowired
	public LoadCatalogParametersExtractor jobParametersExtractor;

	@Autowired
	public LoadCatalogListCatalogsFiles listCatalogsFiles;
	
	@Autowired
	public CatalogsDecider loadAllCatalogsDecider;
	
	@Autowired
	public LoadAllCatalogsJobCompletionNotificationListener loopListener;
	
	@Autowired
	public InvalidCatalogParametersExtractor invalidCatalogjobParametersExtractor;
	
	@Autowired
	public LoadCatalogInvalidCatalogJobCompletionNotificationListener listenerInvalidCatalog;
	
	@Autowired
	public LoadCatalogProcessInvalidCatalog loadCatalogProcessInvalidCatalog;
	
	public static List<MakerProvider> makersProfilers;
	
	
	@Bean
	public Job loadAllCatalogs() {
		FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("load all catalogs Job");
		Flow flow = flowBuilder
				.start(listCatalogsFilesToLoadStep())

				.next(loadAllCatalogsDecider)
				.on(CatalogsDecider.NEXT_VALID)
				.to(loadCatalogStep())

				.from(loadAllCatalogsDecider)
				.on(CatalogsDecider.NEXT_INVALID)
				.to(invalidCatalogsToLoadStep())

				.from(loadAllCatalogsDecider)
				.on(CatalogsDecider.END)
				.stop()

				.next(loadCatalogStep())

				.next(loadAllCatalogsDecider)
				.on(CatalogsDecider.NEXT_VALID)
				.to(loadCatalogStep())

				.from(loadAllCatalogsDecider)
				.on(CatalogsDecider.NEXT_INVALID)
				.to(invalidCatalogsToLoadStep())

				.from(loadAllCatalogsDecider)
				.on(CatalogsDecider.END)
				.stop()

				.next(invalidCatalogsToLoadStep())

				.next(loadAllCatalogsDecider)
				.on(CatalogsDecider.NEXT_VALID)
				.to(loadCatalogStep())

				.from(loadAllCatalogsDecider)
				.on(CatalogsDecider.NEXT_INVALID)
				.to(invalidCatalogsToLoadStep())

				.from(loadAllCatalogsDecider)
				.on(CatalogsDecider.END)
				.stop()

				.build();
		
		return jobBuilderFactory.get("loadAllCatalogs")
				.incrementer(new RunIdIncrementer())
				.listener(loopListener)
				.start(flow)
				.end()
				.build();
	}
	
	
	/**
	 * A step only used to launch a "sub-job" that process one specific invalid
	 * catalog
	 *
	 * @return
	 */
	@Bean
	public Step invalidCatalogsToLoadStep() {
		return stepBuilderFactory.get("invalid catalogs")
				.job(invalidCatalogToLoad())
				.parametersExtractor(invalidCatalogjobParametersExtractor)
				.build();
	}
	
	/**
	 * Job that enrich one specific catalog, which is given in the job parameters
	 *
	 * @return
	 */
	@Bean
	public Job invalidCatalogToLoad() {
		FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("invalidCatalog");
		
		Flow flow = flowBuilder.start(reportInvalidCatalogToLoad())
				.build();
		
		return jobBuilderFactory.get("invalidCatalog")
				.incrementer(new RunIdIncrementer())
				.listener(listenerInvalidCatalog)
				.start(flow)
				.end()
				.build();
	}
	
	/**
	 * Add to the reporter the invalid catalog
	 *
	 * @return
	 */
	@Bean
	public Step reportInvalidCatalogToLoad() {
		return stepBuilderFactory.get("report invalid catalog")
				.tasklet(loadCatalogProcessInvalidCatalog)
				.build();
	}
	
	/**
	 * A step only used to launch a "sub-job" that load only one specific catalog
	 *
	 * @return
	 */
	@Bean
	public Step loadCatalogStep() {
		return stepBuilderFactory.get("load one catalog")
				.job(loadCatalog())
				.parametersExtractor(jobParametersExtractor)
				.build();
	}
	
	
	
	
	/**
	 * Main job that loop on all the catalogs available on the FTP, and launch for
	 * each one the load job
	 *
	 * @return
	 */
	@Bean
	public Job loadCatalog() {
		FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("loadCatalogFlow");
		Flow flow = flowBuilder
				.start(listCatalogsFilesToLoadStep())
				.next(checkFiles())
				.next(loadGlasses())
				.next(loadExtras())
				.next(loadAssociations())
				.next(archiveLoadCatalogFiles())
				.build();
		
		return jobBuilderFactory.get("loadCatalog")
				.incrementer(new RunIdIncrementer())
				// To do something at the end of the job execution
				.listener(listener)
				// To check that the job parameters values are valid
				.validator(catalogParametersValidator)
				.start(flow)
				.end()
				.build();
	}

	/**
	 * List all the catalogs files to enrich available on the FTP
	 *
	 * @return
	 */
	@Bean
	public Step listCatalogsFilesToLoadStep() {
		return stepBuilderFactory.get("list catalogs files to load")
				.tasklet(listCatalogsFiles)
				.build();
	}
	

	/**
	 * Check that the catalog files are present in the right directory
	 *
	 * @return
	 */
	@Bean
	public Step checkFiles() {
		return stepBuilderFactory.get("checkFiles")
				.tasklet(checkCatalogFilesTasklet)
				.build();
	}

	/**
	 * Load (ie save in the database) the glasses from the glasses file
	 *
	 * @return
	 */
	@Bean
	public Step loadGlasses() {

		JpaItemWriter<ModeleVerreCatalogue> writer = new JpaItemWriter<>();
		writer.setEntityManagerFactory(emf);

		return stepBuilderFactory.get("loadGlasses")
				.<EnrichedGlassesCSV, ModeleVerreCatalogue>chunk(1000)
				.faultTolerant()
				.skip(Exception.class)
				.skipPolicy(infiniteSkipPolicy)
				.listener(loadCatalogEnrichGlassesSkipListener)
				.reader(enrichedGlassesItemReader)
				.processor(enrichedGlassesProcessor)
				.writer(writer)
				.listener(enrichedGlassesItemReaderListener)
				.listener(loadCatalogEnrichedGlassesStepExecutionListener)
				.listener(loadCatalogExecutionContextPromotionListener)
				.listener(chunkCountListener)
				.build();
	}

	/**
	 * Load (ie save in the database) the extras from the extras file
	 *
	 * @return
	 */
	@Bean
	public Step loadExtras() {

		JpaItemWriter<SupplementVerreCatalogue> writer = new JpaItemWriter<SupplementVerreCatalogue>();
		writer.setEntityManagerFactory(emf);

		return stepBuilderFactory.get("loadExtras")
				.<EnrichedExtrasCSV, SupplementVerreCatalogue>chunk(1000)
				.faultTolerant()
				.skip(Exception.class)
				.skipPolicy(infiniteSkipPolicy)
				.listener(loadCatalogEnrichedExtraSkipListener)
				.reader(enrichedExtrasItemReader)
				.processor(loadCatalogEnrichedExtrasProcessor)
				.writer(writer)
				.listener(enrichedExtrasItemReaderListener)
				.listener(chunkCountListener)
				.build();
	}

	/**
	 * Load (ie save in the database) the associations from the associations file
	 *
	 * @return
	 */
	@Bean
	public Step loadAssociations() {
		JpaItemWriter<VerreSupplementAssoCatalogue> writer = new JpaItemWriter<VerreSupplementAssoCatalogue>();
		writer.setEntityManagerFactory(emf);

		return stepBuilderFactory.get("loadAssociations")
				.<AssociationCSV, VerreSupplementAssoCatalogue>chunk(1000)
				.faultTolerant()
				.skip(Exception.class)
				.skipPolicy(infiniteSkipPolicy)
				.listener(loadCatalogEnrichedAssociationSkipListener)
				.reader(associationItemReader)
				.processor(loadCatalogAssociationProcessor)
				.writer(writer)
				.listener(loadCatalogAssociationStepExecutionListener)
				.listener(chunkCountListener)
				.build();
	}

	/**
	 * Empty all the local directories used by this job (ie input, output, archived
	 * input, archived output)
	 *
	 * @return
	 */
	@Bean
	public Step cleanLoadCatalogLocalDirectories() {
		return stepBuilderFactory.get("cleanLoadCatalogLocalDirectories")
				.tasklet(loadCatalogCleanLocalDirectory)
				.build();
	}

	/**
	 * Archive all the files (in input and output) processed in the job
	 *
	 * @return
	 */
	@Bean
	public Step archiveLoadCatalogFiles() {
		return stepBuilderFactory.get("archiveLoadCatalogFiles")
				.tasklet(loadCatalogArchiveFiles)
				.build();
	}
}