侧边栏壁纸
博主头像
小城雨巷 博主等级

行动起来,活在当下

  • 累计撰写 20 篇文章
  • 累计创建 6 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

etl把数据从mysql同步到sqlserver

Administrator
2023-10-18 / 0 评论 / 0 点赞 / 13 阅读 / 0 字

引入依赖

	<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-task</artifactId>
            <version>2.3.4</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-validation</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.mariadb.jdbc</groupId>
            <artifactId>mariadb-java-client</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13</version>
            <scope>test</scope>
        </dependency>

        <!-- sql server -->
        <dependency>
            <groupId>com.microsoft.sqlserver</groupId>
            <artifactId>mssql-jdbc</artifactId>
            <scope>runtime</scope>
        </dependency>

配置yml文件

app:
  json-file-root: D:\2021\
  etl:
    #mysql 数据源
    datasource:
      url: jdbc:mysql://localhost:3306/odsxxx?allowPublicKeyRetrieval=true&useSSL=false
      driver-class-name: com.mysql.cj.jdbc.Driver
      username: root
      password: root
	#sqlserver 数据源
    slaveDatasource:
      url: jdbc:sqlserver://localhost:1433;DatabaseName=DataInterface
      driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
      username: sa
      password: 1qazXSW@
spring:
  main:
    allow-bean-definition-overriding: true
  application:
    name: mingdao-batch-app
#spring batch 记录任务用到的数据源
  datasource:
    url: jdbc:mysql://localhost:3306/dataflow?allowPublicKeyRetrieval=true&useSSL=false
    driver-class-name: com.mysql.cj.jdbc.Driver
    username: root
    password: root
  sql:
    init:
      mode: always
  jpa:
    hibernate:
      #
      dialect: org.hibernate.dialect.SQLServer2017Dialect
      #ddl-auto: update
      #
      default_schema: dbo
      #
      naming:
        implicit-strategy: org.hibernate.boot.model.naming.ImplicitNamingStrategyLegacyJpaImpl
        physical-strategy: org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl
        #
        strategy: org.hibernate.cfg.ImprovedNamingStrategy

    properties:
      hibernate:
        dialect: org.hibernate.dialect.MySQL8Dialect
        show-sql: false
    show-sql: false
    #
    database-platform: com.ford.configuration.SqlServerDialect
  batch:
    job:
      enabled: false
    jdbc:
      initialize-schema: always
  cloud:
    task:
      name: 零食有鸣 BATCH TASK
      batch:
        fail-on-job-failure: true
      initialize-enabled: true
  output:
    ansi:
      enabled: detect
  profiles:
    active: default

编写配置文件映射的实体类

package com.qydata.bacth.lsym.properties;

import lombok.Data;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import org.springframework.validation.annotation.Validated;

import javax.validation.Valid;
import javax.validation.constraints.NotBlank;

/**
 * Application properties
 *
 * @author outengfei
 * @email t15294977908@163.com
 * @date 2022-01-10 18:12
 */
@Slf4j
@Data
@ToString
@Validated
@ConfigurationProperties(prefix = "app")
@Component
public class AppProperties {

    /**
     * 本地 Json 数据保存根目录
     */
    @Valid
    @NotBlank
    String jsonFileRoot;
}

把yml配置数据源信息映射到实体类

package com.qydata.bacth.lsym.properties;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import lombok.Data;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import org.springframework.validation.annotation.Validated;
import javax.annotation.PostConstruct;
import javax.validation.Valid;
import javax.validation.constraints.NotBlank;
import java.io.IOException;
import java.io.StringWriter;

/**
 * ETL 使用的 Properties
 *
 * @author outengfei
 * @email t15294977908@163.com
 * @date 2022-01-10 18:12
 */
@Slf4j
@Data
@Validated
@ToString
@ConfigurationProperties(prefix = "app.etl")
@Component
public class EtlProperties {

    private Datasource datasource;

    private SlaveDatasource slaveDatasource;

    @Data
    public static class Datasource {
        /**
         * ETL 目标数据库连线 Url (默认:本地测试数据库 url)
         */
        @Valid
        @NotBlank
        String url;

        String driverClassName;

        /**
         * ETL 目标数据库连线用户名 (默认:本地测试数据库用户)
         */
        @Valid
        @NotBlank
        String username;

        /**
         * ETL 目标数据库连线密码
         */
        @Valid
        @NotBlank
        String password;
    }


    @Data
    public static class SlaveDatasource{
        /**
         * ETL 目标数据库连线 Url (默认:本地测试数据库 url)
         */
        @Valid
        @NotBlank
        String url;

        String driverClassName;

        /**
         * ETL 目标数据库连线用户名 (默认:本地测试数据库用户)
         */
        @Valid
        @NotBlank
        String username;

        /**
         * ETL 目标数据库连线密码
         */
        @Valid
        @NotBlank
        String password;
    }


    @PostConstruct
    public void postConstruct() throws IOException {
        ObjectMapper mapper = new ObjectMapper();
        mapper.enable(SerializationFeature.INDENT_OUTPUT);
        mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
        StringWriter sw = new StringWriter();
        mapper.writeValue(sw, this);
        log.info("\n\n" +
                "================= ETL - Properties =================\n" +
                sw + "\n" +
                "================================================================" +
                "\n\n");
    }
}

公共属性

package com.qydata.bacth.lsym.config;

/**
 * @author outengfei
 * @email t15294977908@163.com
 * @date 2022-01-11 13:46
 */
public class AppConstants {

    public static final String JSON_FILE_NAME = "lsym-dimCategory-batch.json";

    public static final String JOB_NAME_PARAM_NAME = "jobName";
}

注入数据源 (一共三个数据源)

package com.qydata.bacth.lsym.config;



import com.qydata.bacth.lsym.properties.EtlProperties;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.batch.BatchDataSource;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.cloud.task.configuration.DefaultTaskConfigurer;
import org.springframework.cloud.task.configuration.TaskConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;

import javax.sql.DataSource;

/**
 * 设定 Spring batch JobRepository Datasource, 以及 ETL repository datasource
 * @author outengfei
 * @email t15294977908@163.com
 * @date 2022-01-05 9:50
 */
@Configuration
@RequiredArgsConstructor
public class DataSourceConfig {

    private final Environment env;
    private final EtlProperties etlProperties;

    /**
     * Specify datasource for batch jobRepository (using  @BatchDataSource annotation)
     *
     * @return Datasource
     */
    @Bean(name = "springBatchDs")
    @BatchDataSource
    public DataSource springBatchDs() {
        return DataSourceBuilder.create()
                .driverClassName(env.getProperty("spring.datasource.driver-class-name"))
                .url(env.getProperty("spring.datasource.url"))
                .username(env.getProperty("spring.datasource.username"))
                .password(env.getProperty("spring.datasource.password"))
                .build();
    }

    /**
     * for Spring Task multiple datasource config
     *
     * @param dataSource datasource
     * @return TaskConfigurer
     */
    @Bean
    public TaskConfigurer taskConfigurer(@Qualifier("springBatchDs") DataSource dataSource) {
        return new DefaultTaskConfigurer(dataSource);
    }

    /**
     * Specify datasource for primary etl sink (using spring bean name and @Primary annotation)
     *
     * @return Datasource
     */
    @Primary
    @Bean(name = "etlDataSource")
    public DataSource primaryDatasource() {
        EtlProperties.Datasource ds = this.etlProperties.getDatasource();
        return DataSourceBuilder.create()
                .driverClassName(ds.getDriverClassName())
                .url(ds.getUrl())
                .username(ds.getUsername())
                .password(ds.getPassword())
                .build();
    }

    @Bean(name = "secondaryDataSource")
    public DataSource secondaryDataSource() {
        EtlProperties.SlaveDatasource ds = this.etlProperties.getSlaveDatasource();
        return DataSourceBuilder.create()
                .driverClassName(ds.getDriverClassName())
                .url(ds.getUrl())
                .username(ds.getUsername())
                .password(ds.getPassword())
                .build();
    }

}

指定数据源要操控的repository

设定mysql 数据源要操作的repository

package com.qydata.bacth.lsym.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateProperties;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateSettings;
import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import javax.persistence.EntityManager;
import javax.sql.DataSource;
import java.util.Map;

/**
 * @author outengfei
 * @email t15294977908@163.com
 * @date 2022-01-10 18:57
 */
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(
        entityManagerFactoryRef="entityManagerFactoryPrimary",
        transactionManagerRef="transactionManagerPrimary",
        basePackages= { "com.qydata.bacth.lsym.repository" })// repository 这个包下面的repository 自动会用下面指定的数据源
public class PrimaryConfig {

    @Autowired
    @Qualifier("etlDataSource")
    private DataSource etlDataSource;

    @Autowired
    private JpaProperties jpaProperties;
    @Autowired
    private HibernateProperties hibernateProperties;

    private Map<String, Object> getVendorProperties() {
        return hibernateProperties.determineHibernateProperties(jpaProperties.getProperties(), new HibernateSettings());
    }

    @Primary
    @Bean(name = "entityManagerPrimary")
    public EntityManager entityManager(EntityManagerFactoryBuilder builder) {
        return entityManagerFactoryPrimary(builder).getObject().createEntityManager();
    }

    @Primary
    @Bean(name = "entityManagerFactoryPrimary")
    public LocalContainerEntityManagerFactoryBean entityManagerFactoryPrimary (EntityManagerFactoryBuilder builder) {
        return builder
                .dataSource(etlDataSource)
                .packages("com.qydata.bacth.lsym.domain") //设置实体类所在位置
                .persistenceUnit("primaryPersistenceUnit")
                .properties(getVendorProperties())
                .build();
    }

    @Primary
    @Bean(name = "transactionManagerPrimary")
    public PlatformTransactionManager transactionManagerPrimary(EntityManagerFactoryBuilder builder) {
        return new JpaTransactionManager(entityManagerFactoryPrimary(builder).getObject());
    }

}

设定sqlserver 要操作的repository

package com.qydata.bacth.lsym.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateProperties;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateSettings;
import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import javax.persistence.EntityManager;
import javax.sql.DataSource;
import java.util.Map;

/**
 * @author outengfei
 * @email t15294977908@163.com
 * @date 2022-01-10 18:48
 */
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(
        entityManagerFactoryRef="entityManagerFactorySecondary",
        transactionManagerRef="transactionManagerSecondary",
        basePackages= { "com.qydata.bacth.lsym.repo" })
public class SecondaryConfig {

    @Autowired
    @Qualifier("secondaryDataSource")
    private DataSource secondaryDataSource;

    @Autowired
    private JpaProperties jpaProperties;


    @Autowired
    private HibernateProperties hibernateProperties;

    private Map<String, Object> getVendorProperties() {
        return hibernateProperties.determineHibernateProperties(jpaProperties.getProperties(), new HibernateSettings());
    }

    @Bean(name = "entityManagerSecondary")
    public EntityManager entityManager(EntityManagerFactoryBuilder builder) {
        return entityManagerFactorySecondary(builder).getObject().createEntityManager();
    }


    @Bean(name = "entityManagerFactorySecondary")
    public LocalContainerEntityManagerFactoryBean entityManagerFactorySecondary (EntityManagerFactoryBuilder builder) {
        return builder
                .dataSource(secondaryDataSource)
                .packages("com.qydata.bacth.lsym.entity")
                .persistenceUnit("secondaryPersistenceUnit")
                .properties(getVendorProperties())
                .build();
    }



    @Bean(name = "transactionManagerSecondary")
    PlatformTransactionManager transactionManagerSecondary(EntityManagerFactoryBuilder builder) {
        return new JpaTransactionManager(entityManagerFactorySecondary(builder).getObject());
    }
}

配置sqlserver 表名转大写

因为sqlserver 表名和字段名默认是大写 所以需要把表名跟字段名转换成大写

package com.qydata.bacth.lsym.config;



import org.hibernate.boot.model.naming.Identifier;
import org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl;
import org.hibernate.engine.jdbc.env.spi.JdbcEnvironment;
import org.springframework.stereotype.Component;

/**
 * 转换表名小写为大写
 * @author outengfei
 * @email t15294977908@163.com
 * @date 2022-01-11 9:43
 */
@Component
public class UpperTableStrategy extends PhysicalNamingStrategyStandardImpl {
    private static final long serialVersionUID = 1383021413247872469L;

    @Override
    public Identifier toPhysicalTableName(Identifier name, JdbcEnvironment context) {
        // 将表名全部转换成大写
        String tableName = name.getText().toUpperCase();
        return Identifier.toIdentifier(tableName);
    }
}

设置sqlserver 方言类

package com.qydata.bacth.lsym.config;



import org.hibernate.dialect.SQLServer2012Dialect;
import org.hibernate.type.StandardBasicTypes;
import org.springframework.stereotype.Component;

import java.sql.Types;

/**
 * 自定义方言类,处理使用原生Sql查询,数据类型和hibernate对应关系

 继承SQLServer2012Dialect,如果sqlServer版本是2008则继承
 SQLServer2008Dialect,其他版本继承各自的版本
 * @author outengfei
 * @email t15294977908@163.com
 * @date 2022-01-11 9:43
 */
@Component
public class SqlServerDialect extends SQLServer2012Dialect {

    /**
     * 设置sql server 数据库方言转换
     */
    public SqlServerDialect() {
        registerHibernateType(Types.NCHAR, StandardBasicTypes.CHARACTER.getName());
        registerHibernateType(Types.NCHAR, 1, StandardBasicTypes.CHARACTER.getName());
        registerHibernateType(Types.NCHAR, 255, StandardBasicTypes.STRING.getName());
        registerHibernateType(Types.NVARCHAR, StandardBasicTypes.STRING.getName());
        registerHibernateType(Types.LONGNVARCHAR, StandardBasicTypes.TEXT.getName());
        registerHibernateType(Types.NCLOB, StandardBasicTypes.CLOB.getName());
    }
}

至此 框架方面已经搭建完成 下面开始实际的业务代码

编写Mysql数据库中映射实体类

package com.qydata.bacth.lsym.domain;

import lombok.Data;
import lombok.ToString;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
import java.io.Serializable;

@Data
@Entity
@ToString
@Table(name = "dim_category")
public class DimCategory implements Serializable {
    /**
     *
     */
    @Id
    @Column(name = "cate_id")
    private String cateId;

    /**
     *
     */
    @Column(name = "parent_id")
    private String parentId;

    /**
     *
     */
    @Column(name = "level_id")
    private Integer levelId;

    /**
     *
     */
    @Column(name = "cate_name")
    private String cateName;

    /**
     *
     */
    @Column(name = "is_sale")
    private Integer isSale;

    private static final long serialVersionUID = 1L;
}

编写Sql Server实体类

package com.qydata.bacth.lsym.entity;

import lombok.Data;
import lombok.ToString;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
import java.io.Serializable;
import java.util.Date;

/**
 * @author outengfei
 * @email t15294977908@163.com
 * @date 2022-01-10 14:21
 */
@Data
@Entity(name = "Sh3_Catcategory")
@ToString
@Table(name = "SH3_Catcategory")
public class Sh3Catcategory implements Serializable {

    @Id
    @Column(name = "Level")
    private String Level;

    @Column(name = "FLevel")
    private String FLevel;

    @Column(name = "LevelFlag")
    private Integer LevelFlag;

    @Column(name = "ClsName")
    private String ClsName;

    @Column(name = "StateFlag")
    private Integer StateFlag;

    @Column(name = "UpdateFlg")
    private Integer UpdateFlg;

    @Column(name = "UpdateTime")
    private Date UpdateTime;

}

编写Mysql Repository(采用jpa 的方式)

package com.qydata.bacth.lsym.repository;

import com.qydata.bacth.lsym.domain.DimCategory;
import org.springframework.data.repository.PagingAndSortingRepository;
import org.springframework.stereotype.Repository;

/**
 * @author outengfei
 * @email t15294977908@163.com
 * @date 2022-01-10 12:03
 */
@Repository
public interface DimCategoryRepo extends PagingAndSortingRepository<DimCategory, String> {

}


编写Sql Server Repository

package com.qydata.bacth.lsym.repo;

import com.qydata.bacth.lsym.entity.Sh3Catcategory;
import org.springframework.data.repository.PagingAndSortingRepository;
import org.springframework.stereotype.Repository;

/**
 * @author outengfei
 * @email t15294977908@163.com
 * @date 2022-01-10 18:12
 */
@Repository
public interface Sh3CatcategoryRepo extends PagingAndSortingRepository<Sh3Catcategory, String> {
}

实体类和 Repository已经完成了 开始写具体的job

配置Spring batch 配置类

package com.qydata.bacth.lsym.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;

/**
 * @author outengfei
 * @email t15294977908@163.com
 * @date 2022-01-10 12:25
 */
@Configuration
@EnableBatchProcessing
public class BatchJobConfig {

    private final JobBuilderFactory jobBuilderFactory;

    @Qualifier("mysqlToJsonStep")
    private final Step mysqlToJsonStep;

    @Qualifier("jsonToSqlServerStep")
    private final Step jsonToSqlServerStep;

    @Lazy
    public BatchJobConfig(
            JobBuilderFactory jobBuilderFactory,
            Step mysqlToJsonStep,
            Step jsonToSqlServerStep
    ){
        this.jobBuilderFactory = jobBuilderFactory;
        this.mysqlToJsonStep = mysqlToJsonStep;
        this.jsonToSqlServerStep = jsonToSqlServerStep;
    }

	// 
   /**
     * 任务名称 任务有两个步骤 
     * 1. 从Mysql 数据库中查询数据 保存保存成json 数据
     * 2. 读取json 数据并转化成sql server 对应的实体类数据 并保存到sqlserver 数据库中
     */		
    @Bean(name = "OdsDimCategoryToSqlServer")
    public Job OdsDimCategoryToSqlServer(){
        return jobBuilderFactory.get("OdsDimCategoryToSqlServer")
                .start(mysqlToJsonStep)
                .next(jsonToSqlServerStep)
                .build();
    }

    /**
     * 任务启动器
     * @param jobRepository
     * @return
     */
    @Bean
    public SimpleJobLauncher jobLauncher(JobRepository jobRepository) {
        SimpleJobLauncher launcher = new SimpleJobLauncher();
        launcher.setJobRepository(jobRepository);
        return launcher;
    }
}

具体的步骤 mysqlToJsonStep

package com.qydata.bacth.lsym.step;

import com.qydata.bacth.lsym.config.AppConstants;
import com.qydata.bacth.lsym.domain.DimCategory;
import com.qydata.bacth.lsym.properties.AppProperties;
import com.qydata.bacth.lsym.repository.DimCategoryRepo;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.data.RepositoryItemReader;
import org.springframework.batch.item.json.JacksonJsonObjectMarshaller;
import org.springframework.batch.item.json.JsonFileItemWriter;
import org.springframework.batch.item.json.builder.JsonFileItemWriterBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.PathResource;
import org.springframework.data.domain.Sort;

import java.io.File;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

/**
 * @author outengfei
 * @email t15294977908@163.com
 * @date 2022-01-10 12:29
 */

@Slf4j
@Configuration
@RequiredArgsConstructor
public class MysqlToJsonStepConfig {

    private final StepBuilderFactory stepBuilderFactory;

    private final AppProperties appProperties;

    private final DimCategoryRepo dimCategoryRepo;

    @Bean
    public Step mysqlToJsonStep() {
        return stepBuilderFactory
                .get("mysqlToJsonStep")
                .<DimCategory, DimCategory>chunk(200)
                .reader(itemReader())
                .writer(jsonFileItemWriter())
                .build();
    }

    /**
     * 写入数据到json文件
     * @return
     */
    private JsonFileItemWriter<DimCategory> jsonFileItemWriter() {
        URI source = new File(appProperties.getJsonFileRoot() + AppConstants.JSON_FILE_NAME).toURI();
        return new JsonFileItemWriterBuilder<DimCategory>()
                .jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>())
                .resource(new PathResource(source))
                .name("restJsonFileItemWriter")
                .build();
    }

    /**
     * 读取数据
     * @return
     */
    private RepositoryItemReader<DimCategory> itemReader() {
        Map<String, Sort.Direction> map = new HashMap<>();
        map.put("cateId", Sort.Direction.ASC);
        RepositoryItemReader<DimCategory> reader = new RepositoryItemReader<>();
        reader.setRepository(dimCategoryRepo);
        reader.setMethodName("findAll");
        reader.setSort(map);
        return reader;
    }
}


具体的 jsonToSqlServerStep 读取json数据转化并保存

package com.qydata.bacth.lsym.step;

import com.qydata.bacth.lsym.config.AppConstants;
import com.qydata.bacth.lsym.domain.DimCategory;
import com.qydata.bacth.lsym.entity.Sh3Catcategory;
import com.qydata.bacth.lsym.listener.DimCategoryListener;
import com.qydata.bacth.lsym.processor.JsonItemProcessor;
import com.qydata.bacth.lsym.properties.AppProperties;
import com.qydata.bacth.lsym.repo.Sh3CatcategoryRepo;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.data.RepositoryItemWriter;
import org.springframework.batch.item.json.JacksonJsonObjectReader;
import org.springframework.batch.item.json.JsonItemReader;
import org.springframework.batch.item.json.builder.JsonItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.PathResource;

import java.io.File;
import java.net.URI;

/**
 * @author outengfei
 * @email t15294977908@163.com
 * @date 2022-01-10 14:18
 */
@Slf4j
@Configuration
@RequiredArgsConstructor
public class JsonToSqlServerConfig {

    private final StepBuilderFactory stepBuilderFactory;

    private final AppProperties appProperties;

    private final JsonItemProcessor jsonItemProcessor;

    private final Sh3CatcategoryRepo sh3CatcategoryRepo;

    @Bean
    public Step jsonToSqlServerStep() {
        return stepBuilderFactory
                .get("jsonToSqlServerStep")
                .<DimCategory, Sh3Catcategory>chunk(200)
                .reader(itemReader())
                .listener(new DimCategoryListener())
                .processor(jsonItemProcessor)
                .writer(repositoryItemWriter())
                .build();
    }

    /**
     * 读取json文件数据
     * @return
     */
    private JsonItemReader<DimCategory> itemReader() {
        URI source = new File(appProperties.getJsonFileRoot() + AppConstants.JSON_FILE_NAME).toURI();
        return new JsonItemReaderBuilder<DimCategory>()
                .jsonObjectReader(new JacksonJsonObjectReader<>(DimCategory.class))
                .resource(new PathResource(source))
                .name("jsonItemReader-DimCategoryRaw")
                .build();
    }

    /**
     * 写入到 Sql Server数据库中
     * @return
     */
    public RepositoryItemWriter<Sh3Catcategory> repositoryItemWriter() {
        RepositoryItemWriter<Sh3Catcategory> peopleRepositoryItemWriter = new RepositoryItemWriter<>();
        peopleRepositoryItemWriter.setRepository(sh3CatcategoryRepo);
        peopleRepositoryItemWriter.setMethodName("save");
        return peopleRepositoryItemWriter;
    }

}


读取json 数据配置监听器

package com.qydata.bacth.lsym.listener;

import com.qydata.bacth.lsym.domain.DimCategory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemReadListener;

/**
 * @author outengfei
 * @email t15294977908@163.com
 * @date 2022-01-10 14:27
 */
@Slf4j
public class DimCategoryListener implements ItemReadListener<DimCategory> {

    @Override
    public void beforeRead() {

    }

    @Override
    public void afterRead(DimCategory dimCategory) {
        log.info("监听读取");
        log.info(dimCategory.toString());
    }

    @Override
    public void onReadError(Exception e) {
        log.error(e.getMessage(), e);
    }
}

把读取的json 数据转换成Sql server 实体类 Sh3Catcategory

package com.qydata.bacth.lsym.processor;

import com.qydata.bacth.lsym.domain.DimCategory;
import com.qydata.bacth.lsym.entity.Sh3Catcategory;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @author outengfei
 * @email t15294977908@163.com
 * @date 2022-01-10 15:33
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class JsonItemProcessor implements ItemProcessor<DimCategory, Sh3Catcategory> {

    @Override
    public Sh3Catcategory process(DimCategory dimCategory) {
        Sh3Catcategory sh3Catcategory = new Sh3Catcategory();
        sh3Catcategory.setLevel(dimCategory.getCateId());
        sh3Catcategory.setFLevel(dimCategory.getParentId());
        sh3Catcategory.setLevelFlag(dimCategory.getLevelId());
        sh3Catcategory.setStateFlag(dimCategory.getIsSale());
        sh3Catcategory.setUpdateFlg(0);
        sh3Catcategory.setUpdateTime(new Date());
        sh3Catcategory.setClsName(dimCategory.getCateName());
        System.out.println(sh3Catcategory.toString());
        return sh3Catcategory;
    }
}

到此同步工作就结束了

测试我们写的任务有没有正确的执行

package com.qydata.bacth.lsym;

import com.qydata.bacth.lsym.domain.DimCategory;
import com.qydata.bacth.lsym.entity.Sh3Catcategory;
import com.qydata.bacth.lsym.repo.Sh3CatcategoryRepo;
import com.qydata.bacth.lsym.repository.DimCategoryRepo;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.ApplicationContext;

import java.util.*;

@SpringBootTest
@Slf4j
class LsymApplicationTests {

    @Autowired
    DimCategoryRepo dimCategoryRepo;

    @Autowired
    Sh3CatcategoryRepo sh3CatcategoryRepo;
    @Autowired
    SimpleJobLauncher jobLauncher;

    @Autowired
    ApplicationContext context;
//先初始化mysql 数据库数据
    @Test
    void addTest(){
        int count = 10000;
        List<DimCategory> list = new ArrayList<>();
        for (int i = 0; i < count; i++) {
            DimCategory dimCategory = new DimCategory();
            dimCategory.setCateId(i + "id");
            dimCategory.setCateName( i + "分类");
            dimCategory.setLevelId(1);
            dimCategory.setParentId(i + "1");
            dimCategory.setIsSale(0);
            list.add(dimCategory);
        }
        dimCategoryRepo.saveAll(list);
    }

//测试任务
    @Test
    void test() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
	/**
 	* 任务参数
 	* 如果一个任务名字 在参数相同的情况下是不允许执行第二次的 除非任务没有正确执行完成 
 	* 在这里我用uuid来做参数 确保不会因为参数相同而被禁止执行任务
 	*/
        JobParameters jobParameters = new JobParametersBuilder()
                .addString("uuid", UUID.randomUUID().toString()).toJobParameters();
        Job targetJob = context.getBean(Objects.requireNonNull("OdsDimCategoryToSqlServer"), Job.class);
        jobLauncher.run(targetJob, jobParameters);
    }

}


0

评论区