前言

这段时间写毕设、自己玩具项目的时候,想用Elasticsearch替代一些SQL的方案了,比如帖子搜索等功能,如果坚持用MySQL做全文检索的话性能直接拉胯。

但在整合Elasticsearch时,遇到了不少麻烦,其中之一就是如何保证将MySQL中帖子信息的表数据增量同步到Elasticsearch,并且一定程度上保证数据的一致性。

一开始本来是想直接用canal这种中间件的,但是它太重量级了,抛开除同步Elasticsearch的功能不说,一个canal正常情况下都要启动两个项目,个人电脑费内存,还要提前创建ElasticSearch的索引文档,如果你的应用仅仅只是将数据同步Elasticsearch,那使用体验其实还是蛮糟糕的,后来我就直接放弃了使用中间件,改成自己在应用中开一个线程实现类似的功能。

后来再github上找到了这个项目:https://github.com/shyiko/mysql-binlog-connector-java ,在这个项目的基础上顺利地完成了一部分实现,原理和canal类似,也是伪装slave节点获取binlog同步。

目前实现结果:

  • 内嵌于项目某个模块之中,应用启动时能自动创建索引文档
  • 同步时异常宕机,可根据上一次大概的binlog偏移量重新进行同步,避免数据不一致的情况(指服务应用宕机,非es宕机)
  • 同步线程启动时,会注册到nacos上,在项目启动多个的情况下也只会有一个同步线程,一定程度上避免重复操作
  • 支持同步mysql的text类型数据(算是一个小坑)

Q:为什么内嵌于某个模块中?而不是单独开一个模块?
A:垃圾电脑内存不够了,平时开发环境跑起来就是4个java应用+mysql+redis+es+nacos+sentinel+nginx+微信小程序开发工具+IDEA+谷歌浏览器+postman,16G内存都被干掉了。。还有jenkins这种之后再集成的都丢师兄送的云上了。
image.png

本文核心代码可见https://gitee.com/wenjie2018/UT-APP 的:
image.png

  • 一些工具类或者实体类就一笔带过了,卡在这一块的就自己找代码看看。

环境准备

  • 首先是mysql的配置,和canal要求的类似,我们需要开启binlog和更改binlog的格式:

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要重复

  • 同时连接mysql的帐号必须要有REPLICATION SLAVE权限,自己做实验嫌麻烦直接用root用户即可。

  • 基础环境是SpringBoot,同步代码、es相关依赖,mysql等其它依赖就不贴了,可随喜好更改:

        <!-- binlog listener -->
        <dependency>
            <groupId>com.github.shyiko</groupId>
            <artifactId>mysql-binlog-connector-java</artifactId>
            <version>${mysql-binlog-connector-java.version}</version>
        </dependency>

        <!--elasticsearch client-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
        </dependency>

  • es的版本是7.9.2。

核心代码

实体类

为了能自动创建文档,就直接借用spring data的自动创建功能了,写一个实体类再加上@Document(indexName = "对应数据库表名称")和@Component即可,这个两注解随后还会用于扫描对象。


package run.ut.app.model.elasticsearch;

import lombok.*;
import lombok.experimental.Accessors;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import org.springframework.stereotype.Component;
import run.ut.app.model.domain.BaseEntity;
import run.ut.app.model.dto.base.InputConverter;
import run.ut.app.model.param.PostParam;

/**
 * <p>
 * Posts
 * </p>
 *
 * @author wenjie
 * @since 2020-05-12
 */
@Data
@EqualsAndHashCode(callSuper = true)
@Accessors(chain = true)
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Document(indexName = "posts")
@Component···
public class ESPosts extends BaseEntity implements InputConverter<PostParam> {

    @Id
    private Long id;

    private Long uid;

    private Long forumId;

    @Field(analyzer = "ik_max_word",type = FieldType.Text)
    private String title;

    @Field(analyzer = "ik_max_word",type = FieldType.Text)
    private String content;

    private Long likes;
}

扫描并记录es文档名称


import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.core.Ordered;
import org.springframework.core.PriorityOrdered;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * Scan @Document annotation and save
 *
 * @author wenjie
 */
@Component
@Slf4j
@Getter
public class DocumentScan implements BeanDefinitionRegistryPostProcessor, PriorityOrdered {

    private List<String> indexNames = new ArrayList<>();

    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException {
        log.debug("【run.ut.app.scan.DocumentScan.postProcessBeanDefinitionRegistry】");
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
        Map<String, Object> beansWithAnnotation = configurableListableBeanFactory.getBeansWithAnnotation(Document.class);
        log.info("scan @Document and register");
        for (Map.Entry<String, Object> stringObjectEntry : beansWithAnnotation.entrySet()) {
            indexNames.add(stringObjectEntry.getValue().getClass().getAnnotation(Document.class).indexName());
        }
    }

    @Override
    public int getOrder() {
        return Ordered.LOWEST_PRECEDENCE;
    }
}

获取表和列的映射

前面说es文档名要和数据库表名一致是有原因的,因为监听binlog获取的数据是没有列名的,只有按列顺序排列的数据值list,所以当我们要将其保存为es文档时,需要自己设置对应列名。


import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service;
import run.ut.app.mapper.ExMapper;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * TableTemplate
 *
 * @author wenjie
 */
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class TableTemplate implements ApplicationListener<ContextRefreshedEvent> {

    private final DocumentScan documentScan;
    private final ExMapper exMapper;

    @Value("${spring.datasource.database}")
    private String database;

    /**
     * table -> columns
     */
    @Getter
    private Map<String, List<String>> columnMap = new HashMap<>();


    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        // Because of some special circumstances, the release of "" is required
        columnMap.put("", new ArrayList<>());
        initColumnMap();
    }

    /**
     * init columnMap
     */
    private void initColumnMap() {
        List<String> indexNames = documentScan.getIndexNames();
        for (String indexName : indexNames) {
            List<String> columns = exMapper.selectColumnNameByDatabaseAndTable(database, indexName);
            columnMap.put(indexName, columns);
        }
    }
}

查询某表列名的sql语句:


import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;

import java.util.List;

public interface ExMapper extends BaseMapper {

    @Select("select COLUMN_NAME from information_schema.COLUMNS where table_name = #{table} and table_schema = #{database}")
    List<String> selectColumnNameByDatabaseAndTable(@Param("database") String database, @Param("table") String table);
}


不同sql操作类型的Handler

  • 先写个接口

/**
 * @author wenjie
 */
public interface EsSyncHandler {

    /**
     * Processing eventData
     *
     * @param eventData    binlog eventData
     */
    public void sync(EventData eventData) throws IOException;
}

  • 抽象实现,便于做一些公共扩展

import com.github.shyiko.mysql.binlog.event.EventData;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import run.ut.app.service.OptionsService;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author wenjie
 */
@Data
@Component
@Slf4j
public class AbstractEsSyncHandler implements EsSyncHandler {

    @Autowired
    private OptionsService optionsService;

    private AtomicInteger count = new AtomicInteger(0);
    private int mod = 100;

    // 每发生100次insert、update、delete操作就持久化一次binlog的偏移量,用于下次重启时找到该偏移量重新同步。
    public static ConcurrentHashMap<String, Object> binLogPropertiesMap = new ConcurrentHashMap<>();

    /**
     * Every {@link AbstractEsSyncHandler#mod} times invoke, save to Mysql
     */
    protected void syncBinLogProperties() {
        int i = count.incrementAndGet();
        if (i / mod == 0) {
            optionsService.save(binLogPropertiesMap);
        }
    }

    @Override
    public void sync(EventData eventData) throws IOException {

    }
}

  • mysql insert操作同步handler

import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import run.ut.app.elasticsearch.TableTemplate;

import java.io.IOException;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author wenjie
 */
@Component
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class InsertEsSyncHandler extends AbstractEsSyncHandler implements EsSyncHandler {

    private final TableTemplate tableTemplate;
    private final RestHighLevelClient restHighLevelClient;


    @Value("${spring.datasource.database}")
    private String database;

    @Override
    public void sync(EventData eventData) throws IOException {
        if (!this.database.equals(EsSyncHandlers.database)) {
            return;
        }
        if (eventData instanceof WriteRowsEventData) {
            List<Serializable[]> rows = ((WriteRowsEventData) eventData).getRows();
            List<String> columns = tableTemplate.getColumnMap().get(EsSyncHandlers.table);
            for (Serializable[] serializables : rows) {
                // sql's column -> value
                Map<String, String> map = new HashMap<>(1 << 4);
                for (int i = 0; i < serializables.length; i++) {
                    if (serializables[i] instanceof byte[]) {
                        // Text type corresponding to MySQL
                        map.put(columns.get(i), new String((byte[]) serializables[i]));
                    } else {
                        map.put(columns.get(i), serializables[i].toString());
                    }
                }
                // sync document
                IndexRequest indexRequest = new IndexRequest(EsSyncHandlers.table);

                indexRequest.id(map.get("id"));
                map.remove("id");
                String createTime = map.get("create_time");
                String updateTime = map.get("update_time");
                map.put("create_time", createTime.substring(0, createTime.length() - 2));
                map.put("update_time", updateTime.substring(0, updateTime.length() - 2));
                indexRequest.source(map, XContentType.JSON);
                restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
                syncBinLogProperties();
            }
        }
    }
}

  • mysql update操作同步handler

import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import run.ut.app.elasticsearch.TableTemplate;
import run.ut.app.utils.JsonUtils;

import java.io.IOException;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * @author wenjie
 */
@Component
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class UpdateEsSyncHandler extends AbstractEsSyncHandler implements EsSyncHandler {

    private final TableTemplate tableTemplate;
    private final RestHighLevelClient restHighLevelClient;


    @Value("${spring.datasource.database}")
    private String database;

    @Override
    public void sync(EventData eventData) throws IOException {
        if (!this.database.equals(EsSyncHandlers.database)) {
            return;
        }
        if (eventData instanceof UpdateRowsEventData) {
            List<Serializable[]> rows = ((UpdateRowsEventData) eventData).getRows().stream()
                // key is before,value is after
                .map(Map.Entry::getValue)
                .collect(Collectors.toList());
            List<String> columns = tableTemplate.getColumnMap().get(EsSyncHandlers.table);
            for (Serializable[] serializables : rows) {
                // sql's column -> value
                Map<String, String> map = new HashMap<>(1 << 4);
                for (int i = 0; i < serializables.length; i++) {
                    // Text type corresponding to MySQL
                    if (serializables[i] instanceof byte[]) {
                        map.put(columns.get(i), new String((byte[]) serializables[i]));
                    } else {
                        map.put(columns.get(i), serializables[i].toString());
                    }
                }
                if ("1".equals(map.get("deleted"))) {
                    DeleteRequest deleteRequest = new DeleteRequest(EsSyncHandlers.table, map.get("id"));
                    restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
                    syncBinLogProperties();
                }
                UpdateRequest updateRequest = new UpdateRequest(EsSyncHandlers.table, map.get("id"));
                map.remove("id");
                String createTime = map.get("create_time");
                String updateTime = map.get("update_time");
                map.put("create_time", createTime.substring(0, createTime.length() - 2));
                map.put("update_time", updateTime.substring(0, updateTime.length() - 2));
                updateRequest.doc(JsonUtils.objectToJson(map), XContentType.JSON);

                restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
                syncBinLogProperties();
            }
        }
    }
}

  • mysql delete操作同步handler

import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import run.ut.app.elasticsearch.TableTemplate;

import java.io.IOException;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author wenjie
 */
@Component
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class InsertEsSyncHandler extends AbstractEsSyncHandler implements EsSyncHandler {

    private final TableTemplate tableTemplate;
    private final RestHighLevelClient restHighLevelClient;


    @Value("${spring.datasource.database}")
    private String database;

    @Override
    public void sync(EventData eventData) throws IOException {
        if (!this.database.equals(EsSyncHandlers.database)) {
            return;
        }
        if (eventData instanceof WriteRowsEventData) {
            List<Serializable[]> rows = ((WriteRowsEventData) eventData).getRows();
            List<String> columns = tableTemplate.getColumnMap().get(EsSyncHandlers.table);
            for (Serializable[] serializables : rows) {
                // sql's column -> value
                Map<String, String> map = new HashMap<>(1 << 4);
                for (int i = 0; i < serializables.length; i++) {
                    if (serializables[i] instanceof byte[]) {
                        // Text type corresponding to MySQL
                        map.put(columns.get(i), new String((byte[]) serializables[i]));
                    } else {
                        map.put(columns.get(i), serializables[i].toString());
                    }
                }
                // sync document
                IndexRequest indexRequest = new IndexRequest(EsSyncHandlers.table);

                indexRequest.id(map.get("id"));
                map.remove("id");
                String createTime = map.get("create_time");
                String updateTime = map.get("update_time");
                map.put("create_time", createTime.substring(0, createTime.length() - 2));
                map.put("update_time", updateTime.substring(0, updateTime.length() - 2));
                indexRequest.source(map, XContentType.JSON);
                restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
                syncBinLogProperties();
            }
        }
    }
}

  • mysql刷binlog事件handler,这个是保证一致性的关键

import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import run.ut.app.model.properties.BinLogProperties;

/**
 * @author wenjie
 */
@Component
@Slf4j
public class RotateEventHandler extends AbstractEsSyncHandler implements EsSyncHandler {
    
    /**
     * 这里先将偏移量保存到缓存,等适合时机再持久化到mysql。
     */
    @Override
    public void sync(EventData eventData) {
        if (eventData instanceof RotateEventData) {
            RotateEventData rotateEventData = (RotateEventData) eventData;
            String binlogFilename = rotateEventData.getBinlogFilename();
            long binlogPosition = rotateEventData.getBinlogPosition();

            AbstractEsSyncHandler.binLogPropertiesMap.put(BinLogProperties.BINLOG_FILENAME.getValue(), binlogFilename);
            AbstractEsSyncHandler.binLogPropertiesMap.put(BinLogProperties.BINLOG_POSITION.getValue(), binlogPosition);
        }
    }
}


轮询调用handler

这大概也算是职责链模式的一种体现吧其实就是for循环




import com.github.shyiko.mysql.binlog.event.EventData;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;

/**
 * @author wenjie
 */
@Slf4j
@Service
public class EsSyncHandlers {

    private Collection<EsSyncHandler> esSyncHandlers = new LinkedList<>();

    public static String table = "";
    public static String database = "";

    public EsSyncHandlers(ApplicationContext applicationContext) {
        this.esSyncHandlers = applicationContext.getBeansOfType(EsSyncHandler.class).values();
    }

    public void sync(EventData eventData) throws IOException {
        for (EsSyncHandler esSyncHandler : esSyncHandlers) {
            esSyncHandler.sync(eventData);
        }
    };

}

client启动设置代码

注册操作的代码可选择性无视:


import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.cloud.nacos.discovery.NacosServiceDiscovery;
import com.alibaba.cloud.nacos.registry.NacosRegistration;
import com.alibaba.cloud.nacos.registry.NacosServiceRegistry;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import run.ut.app.elasticsearch.handler.EsSyncHandlers;
import run.ut.app.model.properties.BinLogProperties;
import run.ut.app.service.OptionsService;
import run.ut.app.utils.SpringUtils;

import java.io.IOException;
import java.util.List;
import java.util.Optional;

/**
 *
 * @author wenjie
 */
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Service
public class BinLogListener implements BinaryLogClient.EventListener, ApplicationListener<ContextRefreshedEvent> {

    private final EsSyncHandlers esSyncHandlers;
    private final TableTemplate tableTemplate;
    private final DataSourceProperties dataSourceProperties;
    private final NacosServiceDiscovery nacosServiceDiscovery;
    private final NacosServiceRegistry nacosServiceRegistry;
    private final NacosDiscoveryProperties nacosDiscoveryProperties;
    private final ApplicationContext context;
    private BinaryLogClient client;

    @Value("${spring.datasource.host}")
    private String host;
    @Value("${spring.datasource.port}")
    private Integer port;

    @SneakyThrows
    @Override
    public void onEvent(Event event) {
        if (event != null && event.getData() != null) {
            log.debug(event.getData().toString());
        }
        EventType eventType = event.getHeader().getEventType();
        if (eventType == EventType.TABLE_MAP){
            TableMapEventData tableMapEventData = event.getData();
            EsSyncHandlers.database = tableMapEventData.getDatabase();
            EsSyncHandlers.table = tableMapEventData.getTable();
            return;
        }
        if (!tableTemplate.getColumnMap().containsKey(EsSyncHandlers.table)) {
            return;
        }
        EventData data = event.getData();
        esSyncHandlers.sync(data);
    }


    @SneakyThrows
    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        // 注册,防止分布式环境下重复的同步线程
        String serviceName = this.getClass().getSimpleName();
        List<ServiceInstance> instances = nacosServiceDiscovery.getInstances(serviceName);
        if(CollectionUtils.isEmpty(instances)) {
            register(serviceName);
        } else {
            return;
        }
        Thread thread = new Thread(() -> {
            try {
                binlogClientBoot();
            } catch (Exception e) {
                closeBinlogClient();
                e.printStackTrace();
            }
        });
        thread.setDaemon(false);
        thread.start();
    }

    /**
     * 启动监听binlog的线程
     */
    public void binlogClientBoot() throws IOException {
        BinLogListener binLogListener = SpringUtils.getBean(BinLogListener.class);
        OptionsService optionsService = SpringUtils.getBean(OptionsService.class);
        // 从mysql的配置表中取出上次应用关闭时的binlog偏移量
        Optional<Object> binlogFileName = optionsService.getByKey(BinLogProperties.BINLOG_FILENAME.getValue());
        Optional<Object> binlogPosition = optionsService.getByKey(BinLogProperties.BINLOG_POSITION.getValue());
        client = new BinaryLogClient(
            host,
            port,
            dataSourceProperties.getUsername(),
            dataSourceProperties.getPassword()
        );
        client.registerEventListener(binLogListener);
        // 设置了偏移量就可以从偏移量的位置开始读取
        binlogFileName.ifPresent(o -> client.setBinlogFilename(String.valueOf(o)));
        binlogPosition.ifPresent(o -> client.setBinlogPosition(Long.parseLong(String.valueOf(o))));

        client.connect();
    }

    public void closeBinlogClient() {
        deregister(this.getClass().getSimpleName());
        try {
            client.disconnect();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void register(String serviceName) {
        nacosServiceRegistry.register(buildNacosRegistration(serviceName));
    }

    private void deregister(String serviceName) {
        nacosServiceRegistry.deregister(buildNacosRegistration(serviceName));
    }

    private NacosRegistration buildNacosRegistration(String serviceName) {
        NacosDiscoveryProperties newNacosDiscoveryProperties = new NacosDiscoveryProperties();
        BeanUtils.copyProperties(nacosDiscoveryProperties, newNacosDiscoveryProperties);
        newNacosDiscoveryProperties.setService(serviceName);
        return new NacosRegistration(newNacosDiscoveryProperties, context);
    }
}


试试效果

就直接拿我的项目试试,发个帖子:
image.png
image.png
image.png

你也可以试下在同步es之前抛出异常看看,之后再重启应用or同步线程就会自动按照上次记录前的binlog偏移量恢复数据了。

just do it ✔


常见疑问

  • Q:为什么binlog的偏移量是一定次数后才持久化而不是定时任务持久化呢?

  • A:使用定时任务可能会会出现这样一种情况:一个事务刚好提交了,并且在提交之后~es同步完成期间,触发了定时任务将偏移量持久化,假如在这次持久化完成之后,es没有同步完成,那么这条记录就永远不会被同步到es了,最终造成数据不一致。

  • 欢迎补充

Q.E.D.