引入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-task</artifactId>
<version>2.3.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
<!-- sql server -->
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<scope>runtime</scope>
</dependency>
配置yml文件
app:
json-file-root: D:\2021\
etl:
#mysql 数据源
datasource:
url: jdbc:mysql://localhost:3306/odsxxx?allowPublicKeyRetrieval=true&useSSL=false
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: root
#sqlserver 数据源
slaveDatasource:
url: jdbc:sqlserver://localhost:1433;DatabaseName=DataInterface
driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
username: sa
password: 1qazXSW@
spring:
main:
allow-bean-definition-overriding: true
application:
name: mingdao-batch-app
#spring batch 记录任务用到的数据源
datasource:
url: jdbc:mysql://localhost:3306/dataflow?allowPublicKeyRetrieval=true&useSSL=false
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: root
sql:
init:
mode: always
jpa:
hibernate:
#
dialect: org.hibernate.dialect.SQLServer2017Dialect
#ddl-auto: update
#
default_schema: dbo
#
naming:
implicit-strategy: org.hibernate.boot.model.naming.ImplicitNamingStrategyLegacyJpaImpl
physical-strategy: org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl
#
strategy: org.hibernate.cfg.ImprovedNamingStrategy
properties:
hibernate:
dialect: org.hibernate.dialect.MySQL8Dialect
show-sql: false
show-sql: false
#
database-platform: com.ford.configuration.SqlServerDialect
batch:
job:
enabled: false
jdbc:
initialize-schema: always
cloud:
task:
name: 零食有鸣 BATCH TASK
batch:
fail-on-job-failure: true
initialize-enabled: true
output:
ansi:
enabled: detect
profiles:
active: default
编写配置文件映射的实体类
package com.qydata.bacth.lsym.properties;
import lombok.Data;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import org.springframework.validation.annotation.Validated;
import javax.validation.Valid;
import javax.validation.constraints.NotBlank;
/**
* Application properties
*
* @author outengfei
* @email t15294977908@163.com
* @date 2022-01-10 18:12
*/
@Slf4j
@Data
@ToString
@Validated
@ConfigurationProperties(prefix = "app")
@Component
public class AppProperties {
/**
* 本地 Json 数据保存根目录
*/
@Valid
@NotBlank
String jsonFileRoot;
}
把yml配置数据源信息映射到实体类
package com.qydata.bacth.lsym.properties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import lombok.Data;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import org.springframework.validation.annotation.Validated;
import javax.annotation.PostConstruct;
import javax.validation.Valid;
import javax.validation.constraints.NotBlank;
import java.io.IOException;
import java.io.StringWriter;
/**
* ETL 使用的 Properties
*
* @author outengfei
* @email t15294977908@163.com
* @date 2022-01-10 18:12
*/
@Slf4j
@Data
@Validated
@ToString
@ConfigurationProperties(prefix = "app.etl")
@Component
public class EtlProperties {
private Datasource datasource;
private SlaveDatasource slaveDatasource;
@Data
public static class Datasource {
/**
* ETL 目标数据库连线 Url (默认:本地测试数据库 url)
*/
@Valid
@NotBlank
String url;
String driverClassName;
/**
* ETL 目标数据库连线用户名 (默认:本地测试数据库用户)
*/
@Valid
@NotBlank
String username;
/**
* ETL 目标数据库连线密码
*/
@Valid
@NotBlank
String password;
}
@Data
public static class SlaveDatasource{
/**
* ETL 目标数据库连线 Url (默认:本地测试数据库 url)
*/
@Valid
@NotBlank
String url;
String driverClassName;
/**
* ETL 目标数据库连线用户名 (默认:本地测试数据库用户)
*/
@Valid
@NotBlank
String username;
/**
* ETL 目标数据库连线密码
*/
@Valid
@NotBlank
String password;
}
@PostConstruct
public void postConstruct() throws IOException {
ObjectMapper mapper = new ObjectMapper();
mapper.enable(SerializationFeature.INDENT_OUTPUT);
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
StringWriter sw = new StringWriter();
mapper.writeValue(sw, this);
log.info("\n\n" +
"================= ETL - Properties =================\n" +
sw + "\n" +
"================================================================" +
"\n\n");
}
}
公共属性
package com.qydata.bacth.lsym.config;
/**
* @author outengfei
* @email t15294977908@163.com
* @date 2022-01-11 13:46
*/
public class AppConstants {
public static final String JSON_FILE_NAME = "lsym-dimCategory-batch.json";
public static final String JOB_NAME_PARAM_NAME = "jobName";
}
注入数据源 (一共三个数据源)
package com.qydata.bacth.lsym.config;
import com.qydata.bacth.lsym.properties.EtlProperties;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.batch.BatchDataSource;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.cloud.task.configuration.DefaultTaskConfigurer;
import org.springframework.cloud.task.configuration.TaskConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import javax.sql.DataSource;
/**
* 设定 Spring batch JobRepository Datasource, 以及 ETL repository datasource
* @author outengfei
* @email t15294977908@163.com
* @date 2022-01-05 9:50
*/
@Configuration
@RequiredArgsConstructor
public class DataSourceConfig {
private final Environment env;
private final EtlProperties etlProperties;
/**
* Specify datasource for batch jobRepository (using @BatchDataSource annotation)
*
* @return Datasource
*/
@Bean(name = "springBatchDs")
@BatchDataSource
public DataSource springBatchDs() {
return DataSourceBuilder.create()
.driverClassName(env.getProperty("spring.datasource.driver-class-name"))
.url(env.getProperty("spring.datasource.url"))
.username(env.getProperty("spring.datasource.username"))
.password(env.getProperty("spring.datasource.password"))
.build();
}
/**
* for Spring Task multiple datasource config
*
* @param dataSource datasource
* @return TaskConfigurer
*/
@Bean
public TaskConfigurer taskConfigurer(@Qualifier("springBatchDs") DataSource dataSource) {
return new DefaultTaskConfigurer(dataSource);
}
/**
* Specify datasource for primary etl sink (using spring bean name and @Primary annotation)
*
* @return Datasource
*/
@Primary
@Bean(name = "etlDataSource")
public DataSource primaryDatasource() {
EtlProperties.Datasource ds = this.etlProperties.getDatasource();
return DataSourceBuilder.create()
.driverClassName(ds.getDriverClassName())
.url(ds.getUrl())
.username(ds.getUsername())
.password(ds.getPassword())
.build();
}
@Bean(name = "secondaryDataSource")
public DataSource secondaryDataSource() {
EtlProperties.SlaveDatasource ds = this.etlProperties.getSlaveDatasource();
return DataSourceBuilder.create()
.driverClassName(ds.getDriverClassName())
.url(ds.getUrl())
.username(ds.getUsername())
.password(ds.getPassword())
.build();
}
}
指定数据源要操控的repository
设定mysql 数据源要操作的repository
package com.qydata.bacth.lsym.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateProperties;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateSettings;
import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import javax.persistence.EntityManager;
import javax.sql.DataSource;
import java.util.Map;
/**
* @author outengfei
* @email t15294977908@163.com
* @date 2022-01-10 18:57
*/
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(
entityManagerFactoryRef="entityManagerFactoryPrimary",
transactionManagerRef="transactionManagerPrimary",
basePackages= { "com.qydata.bacth.lsym.repository" })// repository 这个包下面的repository 自动会用下面指定的数据源
public class PrimaryConfig {
@Autowired
@Qualifier("etlDataSource")
private DataSource etlDataSource;
@Autowired
private JpaProperties jpaProperties;
@Autowired
private HibernateProperties hibernateProperties;
private Map<String, Object> getVendorProperties() {
return hibernateProperties.determineHibernateProperties(jpaProperties.getProperties(), new HibernateSettings());
}
@Primary
@Bean(name = "entityManagerPrimary")
public EntityManager entityManager(EntityManagerFactoryBuilder builder) {
return entityManagerFactoryPrimary(builder).getObject().createEntityManager();
}
@Primary
@Bean(name = "entityManagerFactoryPrimary")
public LocalContainerEntityManagerFactoryBean entityManagerFactoryPrimary (EntityManagerFactoryBuilder builder) {
return builder
.dataSource(etlDataSource)
.packages("com.qydata.bacth.lsym.domain") //设置实体类所在位置
.persistenceUnit("primaryPersistenceUnit")
.properties(getVendorProperties())
.build();
}
@Primary
@Bean(name = "transactionManagerPrimary")
public PlatformTransactionManager transactionManagerPrimary(EntityManagerFactoryBuilder builder) {
return new JpaTransactionManager(entityManagerFactoryPrimary(builder).getObject());
}
}
设定sqlserver 要操作的repository
package com.qydata.bacth.lsym.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateProperties;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateSettings;
import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import javax.persistence.EntityManager;
import javax.sql.DataSource;
import java.util.Map;
/**
* @author outengfei
* @email t15294977908@163.com
* @date 2022-01-10 18:48
*/
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(
entityManagerFactoryRef="entityManagerFactorySecondary",
transactionManagerRef="transactionManagerSecondary",
basePackages= { "com.qydata.bacth.lsym.repo" })
public class SecondaryConfig {
@Autowired
@Qualifier("secondaryDataSource")
private DataSource secondaryDataSource;
@Autowired
private JpaProperties jpaProperties;
@Autowired
private HibernateProperties hibernateProperties;
private Map<String, Object> getVendorProperties() {
return hibernateProperties.determineHibernateProperties(jpaProperties.getProperties(), new HibernateSettings());
}
@Bean(name = "entityManagerSecondary")
public EntityManager entityManager(EntityManagerFactoryBuilder builder) {
return entityManagerFactorySecondary(builder).getObject().createEntityManager();
}
@Bean(name = "entityManagerFactorySecondary")
public LocalContainerEntityManagerFactoryBean entityManagerFactorySecondary (EntityManagerFactoryBuilder builder) {
return builder
.dataSource(secondaryDataSource)
.packages("com.qydata.bacth.lsym.entity")
.persistenceUnit("secondaryPersistenceUnit")
.properties(getVendorProperties())
.build();
}
@Bean(name = "transactionManagerSecondary")
PlatformTransactionManager transactionManagerSecondary(EntityManagerFactoryBuilder builder) {
return new JpaTransactionManager(entityManagerFactorySecondary(builder).getObject());
}
}
配置sqlserver 表名转大写
因为sqlserver 表名和字段名默认是大写 所以需要把表名跟字段名转换成大写
package com.qydata.bacth.lsym.config;
import org.hibernate.boot.model.naming.Identifier;
import org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl;
import org.hibernate.engine.jdbc.env.spi.JdbcEnvironment;
import org.springframework.stereotype.Component;
/**
* 转换表名小写为大写
* @author outengfei
* @email t15294977908@163.com
* @date 2022-01-11 9:43
*/
@Component
public class UpperTableStrategy extends PhysicalNamingStrategyStandardImpl {
private static final long serialVersionUID = 1383021413247872469L;
@Override
public Identifier toPhysicalTableName(Identifier name, JdbcEnvironment context) {
// 将表名全部转换成大写
String tableName = name.getText().toUpperCase();
return Identifier.toIdentifier(tableName);
}
}
设置sqlserver 方言类
package com.qydata.bacth.lsym.config;
import org.hibernate.dialect.SQLServer2012Dialect;
import org.hibernate.type.StandardBasicTypes;
import org.springframework.stereotype.Component;
import java.sql.Types;
/**
* 自定义方言类,处理使用原生Sql查询,数据类型和hibernate对应关系
继承SQLServer2012Dialect,如果sqlServer版本是2008则继承
SQLServer2008Dialect,其他版本继承各自的版本
* @author outengfei
* @email t15294977908@163.com
* @date 2022-01-11 9:43
*/
@Component
public class SqlServerDialect extends SQLServer2012Dialect {
/**
* 设置sql server 数据库方言转换
*/
public SqlServerDialect() {
registerHibernateType(Types.NCHAR, StandardBasicTypes.CHARACTER.getName());
registerHibernateType(Types.NCHAR, 1, StandardBasicTypes.CHARACTER.getName());
registerHibernateType(Types.NCHAR, 255, StandardBasicTypes.STRING.getName());
registerHibernateType(Types.NVARCHAR, StandardBasicTypes.STRING.getName());
registerHibernateType(Types.LONGNVARCHAR, StandardBasicTypes.TEXT.getName());
registerHibernateType(Types.NCLOB, StandardBasicTypes.CLOB.getName());
}
}
至此 框架方面已经搭建完成 下面开始实际的业务代码
编写Mysql数据库中映射实体类
package com.qydata.bacth.lsym.domain;
import lombok.Data;
import lombok.ToString;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
import java.io.Serializable;
@Data
@Entity
@ToString
@Table(name = "dim_category")
public class DimCategory implements Serializable {
/**
*
*/
@Id
@Column(name = "cate_id")
private String cateId;
/**
*
*/
@Column(name = "parent_id")
private String parentId;
/**
*
*/
@Column(name = "level_id")
private Integer levelId;
/**
*
*/
@Column(name = "cate_name")
private String cateName;
/**
*
*/
@Column(name = "is_sale")
private Integer isSale;
private static final long serialVersionUID = 1L;
}
编写Sql Server实体类
package com.qydata.bacth.lsym.entity;
import lombok.Data;
import lombok.ToString;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
import java.io.Serializable;
import java.util.Date;
/**
* @author outengfei
* @email t15294977908@163.com
* @date 2022-01-10 14:21
*/
@Data
@Entity(name = "Sh3_Catcategory")
@ToString
@Table(name = "SH3_Catcategory")
public class Sh3Catcategory implements Serializable {
@Id
@Column(name = "Level")
private String Level;
@Column(name = "FLevel")
private String FLevel;
@Column(name = "LevelFlag")
private Integer LevelFlag;
@Column(name = "ClsName")
private String ClsName;
@Column(name = "StateFlag")
private Integer StateFlag;
@Column(name = "UpdateFlg")
private Integer UpdateFlg;
@Column(name = "UpdateTime")
private Date UpdateTime;
}
编写Mysql Repository(采用jpa 的方式)
package com.qydata.bacth.lsym.repository;
import com.qydata.bacth.lsym.domain.DimCategory;
import org.springframework.data.repository.PagingAndSortingRepository;
import org.springframework.stereotype.Repository;
/**
* @author outengfei
* @email t15294977908@163.com
* @date 2022-01-10 12:03
*/
@Repository
public interface DimCategoryRepo extends PagingAndSortingRepository<DimCategory, String> {
}
编写Sql Server Repository
package com.qydata.bacth.lsym.repo;
import com.qydata.bacth.lsym.entity.Sh3Catcategory;
import org.springframework.data.repository.PagingAndSortingRepository;
import org.springframework.stereotype.Repository;
/**
* @author outengfei
* @email t15294977908@163.com
* @date 2022-01-10 18:12
*/
@Repository
public interface Sh3CatcategoryRepo extends PagingAndSortingRepository<Sh3Catcategory, String> {
}
实体类和 Repository已经完成了 开始写具体的job
配置Spring batch 配置类
package com.qydata.bacth.lsym.config;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
/**
* @author outengfei
* @email t15294977908@163.com
* @date 2022-01-10 12:25
*/
@Configuration
@EnableBatchProcessing
public class BatchJobConfig {
private final JobBuilderFactory jobBuilderFactory;
@Qualifier("mysqlToJsonStep")
private final Step mysqlToJsonStep;
@Qualifier("jsonToSqlServerStep")
private final Step jsonToSqlServerStep;
@Lazy
public BatchJobConfig(
JobBuilderFactory jobBuilderFactory,
Step mysqlToJsonStep,
Step jsonToSqlServerStep
){
this.jobBuilderFactory = jobBuilderFactory;
this.mysqlToJsonStep = mysqlToJsonStep;
this.jsonToSqlServerStep = jsonToSqlServerStep;
}
//
/**
* 任务名称 任务有两个步骤
* 1. 从Mysql 数据库中查询数据 保存保存成json 数据
* 2. 读取json 数据并转化成sql server 对应的实体类数据 并保存到sqlserver 数据库中
*/
@Bean(name = "OdsDimCategoryToSqlServer")
public Job OdsDimCategoryToSqlServer(){
return jobBuilderFactory.get("OdsDimCategoryToSqlServer")
.start(mysqlToJsonStep)
.next(jsonToSqlServerStep)
.build();
}
/**
* 任务启动器
* @param jobRepository
* @return
*/
@Bean
public SimpleJobLauncher jobLauncher(JobRepository jobRepository) {
SimpleJobLauncher launcher = new SimpleJobLauncher();
launcher.setJobRepository(jobRepository);
return launcher;
}
}
具体的步骤 mysqlToJsonStep
package com.qydata.bacth.lsym.step;
import com.qydata.bacth.lsym.config.AppConstants;
import com.qydata.bacth.lsym.domain.DimCategory;
import com.qydata.bacth.lsym.properties.AppProperties;
import com.qydata.bacth.lsym.repository.DimCategoryRepo;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.data.RepositoryItemReader;
import org.springframework.batch.item.json.JacksonJsonObjectMarshaller;
import org.springframework.batch.item.json.JsonFileItemWriter;
import org.springframework.batch.item.json.builder.JsonFileItemWriterBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.PathResource;
import org.springframework.data.domain.Sort;
import java.io.File;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
/**
* @author outengfei
* @email t15294977908@163.com
* @date 2022-01-10 12:29
*/
@Slf4j
@Configuration
@RequiredArgsConstructor
public class MysqlToJsonStepConfig {
private final StepBuilderFactory stepBuilderFactory;
private final AppProperties appProperties;
private final DimCategoryRepo dimCategoryRepo;
@Bean
public Step mysqlToJsonStep() {
return stepBuilderFactory
.get("mysqlToJsonStep")
.<DimCategory, DimCategory>chunk(200)
.reader(itemReader())
.writer(jsonFileItemWriter())
.build();
}
/**
* 写入数据到json文件
* @return
*/
private JsonFileItemWriter<DimCategory> jsonFileItemWriter() {
URI source = new File(appProperties.getJsonFileRoot() + AppConstants.JSON_FILE_NAME).toURI();
return new JsonFileItemWriterBuilder<DimCategory>()
.jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>())
.resource(new PathResource(source))
.name("restJsonFileItemWriter")
.build();
}
/**
* 读取数据
* @return
*/
private RepositoryItemReader<DimCategory> itemReader() {
Map<String, Sort.Direction> map = new HashMap<>();
map.put("cateId", Sort.Direction.ASC);
RepositoryItemReader<DimCategory> reader = new RepositoryItemReader<>();
reader.setRepository(dimCategoryRepo);
reader.setMethodName("findAll");
reader.setSort(map);
return reader;
}
}
具体的 jsonToSqlServerStep 读取json数据转化并保存
package com.qydata.bacth.lsym.step;
import com.qydata.bacth.lsym.config.AppConstants;
import com.qydata.bacth.lsym.domain.DimCategory;
import com.qydata.bacth.lsym.entity.Sh3Catcategory;
import com.qydata.bacth.lsym.listener.DimCategoryListener;
import com.qydata.bacth.lsym.processor.JsonItemProcessor;
import com.qydata.bacth.lsym.properties.AppProperties;
import com.qydata.bacth.lsym.repo.Sh3CatcategoryRepo;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.data.RepositoryItemWriter;
import org.springframework.batch.item.json.JacksonJsonObjectReader;
import org.springframework.batch.item.json.JsonItemReader;
import org.springframework.batch.item.json.builder.JsonItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.PathResource;
import java.io.File;
import java.net.URI;
/**
* @author outengfei
* @email t15294977908@163.com
* @date 2022-01-10 14:18
*/
@Slf4j
@Configuration
@RequiredArgsConstructor
public class JsonToSqlServerConfig {
private final StepBuilderFactory stepBuilderFactory;
private final AppProperties appProperties;
private final JsonItemProcessor jsonItemProcessor;
private final Sh3CatcategoryRepo sh3CatcategoryRepo;
@Bean
public Step jsonToSqlServerStep() {
return stepBuilderFactory
.get("jsonToSqlServerStep")
.<DimCategory, Sh3Catcategory>chunk(200)
.reader(itemReader())
.listener(new DimCategoryListener())
.processor(jsonItemProcessor)
.writer(repositoryItemWriter())
.build();
}
/**
* 读取json文件数据
* @return
*/
private JsonItemReader<DimCategory> itemReader() {
URI source = new File(appProperties.getJsonFileRoot() + AppConstants.JSON_FILE_NAME).toURI();
return new JsonItemReaderBuilder<DimCategory>()
.jsonObjectReader(new JacksonJsonObjectReader<>(DimCategory.class))
.resource(new PathResource(source))
.name("jsonItemReader-DimCategoryRaw")
.build();
}
/**
* 写入到 Sql Server数据库中
* @return
*/
public RepositoryItemWriter<Sh3Catcategory> repositoryItemWriter() {
RepositoryItemWriter<Sh3Catcategory> peopleRepositoryItemWriter = new RepositoryItemWriter<>();
peopleRepositoryItemWriter.setRepository(sh3CatcategoryRepo);
peopleRepositoryItemWriter.setMethodName("save");
return peopleRepositoryItemWriter;
}
}
读取json 数据配置监听器
package com.qydata.bacth.lsym.listener;
import com.qydata.bacth.lsym.domain.DimCategory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemReadListener;
/**
* @author outengfei
* @email t15294977908@163.com
* @date 2022-01-10 14:27
*/
@Slf4j
public class DimCategoryListener implements ItemReadListener<DimCategory> {
@Override
public void beforeRead() {
}
@Override
public void afterRead(DimCategory dimCategory) {
log.info("监听读取");
log.info(dimCategory.toString());
}
@Override
public void onReadError(Exception e) {
log.error(e.getMessage(), e);
}
}
把读取的json 数据转换成Sql server 实体类 Sh3Catcategory
package com.qydata.bacth.lsym.processor;
import com.qydata.bacth.lsym.domain.DimCategory;
import com.qydata.bacth.lsym.entity.Sh3Catcategory;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* @author outengfei
* @email t15294977908@163.com
* @date 2022-01-10 15:33
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class JsonItemProcessor implements ItemProcessor<DimCategory, Sh3Catcategory> {
@Override
public Sh3Catcategory process(DimCategory dimCategory) {
Sh3Catcategory sh3Catcategory = new Sh3Catcategory();
sh3Catcategory.setLevel(dimCategory.getCateId());
sh3Catcategory.setFLevel(dimCategory.getParentId());
sh3Catcategory.setLevelFlag(dimCategory.getLevelId());
sh3Catcategory.setStateFlag(dimCategory.getIsSale());
sh3Catcategory.setUpdateFlg(0);
sh3Catcategory.setUpdateTime(new Date());
sh3Catcategory.setClsName(dimCategory.getCateName());
System.out.println(sh3Catcategory.toString());
return sh3Catcategory;
}
}
到此同步工作就结束了
测试我们写的任务有没有正确的执行
package com.qydata.bacth.lsym;
import com.qydata.bacth.lsym.domain.DimCategory;
import com.qydata.bacth.lsym.entity.Sh3Catcategory;
import com.qydata.bacth.lsym.repo.Sh3CatcategoryRepo;
import com.qydata.bacth.lsym.repository.DimCategoryRepo;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.ApplicationContext;
import java.util.*;
@SpringBootTest
@Slf4j
class LsymApplicationTests {
@Autowired
DimCategoryRepo dimCategoryRepo;
@Autowired
Sh3CatcategoryRepo sh3CatcategoryRepo;
@Autowired
SimpleJobLauncher jobLauncher;
@Autowired
ApplicationContext context;
//先初始化mysql 数据库数据
@Test
void addTest(){
int count = 10000;
List<DimCategory> list = new ArrayList<>();
for (int i = 0; i < count; i++) {
DimCategory dimCategory = new DimCategory();
dimCategory.setCateId(i + "id");
dimCategory.setCateName( i + "分类");
dimCategory.setLevelId(1);
dimCategory.setParentId(i + "1");
dimCategory.setIsSale(0);
list.add(dimCategory);
}
dimCategoryRepo.saveAll(list);
}
//测试任务
@Test
void test() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
/**
* 任务参数
* 如果一个任务名字 在参数相同的情况下是不允许执行第二次的 除非任务没有正确执行完成
* 在这里我用uuid来做参数 确保不会因为参数相同而被禁止执行任务
*/
JobParameters jobParameters = new JobParametersBuilder()
.addString("uuid", UUID.randomUUID().toString()).toJobParameters();
Job targetJob = context.getBean(Objects.requireNonNull("OdsDimCategoryToSqlServer"), Job.class);
jobLauncher.run(targetJob, jobParameters);
}
}
评论区