前言
这段时间写毕设、自己玩具项目的时候,想用Elasticsearch替代一些SQL的方案了,比如帖子搜索等功能,如果坚持用MySQL做全文检索的话性能直接拉胯。
但在整合Elasticsearch时,遇到了不少麻烦,其中之一就是如何保证将MySQL中帖子信息的表数据增量同步到Elasticsearch,并且一定程度上保证数据的一致性。
一开始本来是想直接用canal这种中间件的,但是它太重量级了,抛开除同步Elasticsearch的功能不说,一个canal正常情况下都要启动两个项目,个人电脑费内存,还要提前创建ElasticSearch的索引文档,如果你的应用仅仅只是将数据同步Elasticsearch,那使用体验其实还是蛮糟糕的,后来我就直接放弃了使用中间件,改成自己在应用中开一个线程实现类似的功能。
后来再github上找到了这个项目:https://github.com/shyiko/mysql-binlog-connector-java ,在这个项目的基础上顺利地完成了一部分实现,原理和canal类似,也是伪装slave节点获取binlog同步。
目前实现结果:
- 内嵌于项目某个模块之中,应用启动时能自动创建索引文档
- 同步时异常宕机,可根据上一次大概的binlog偏移量重新进行同步,避免数据不一致的情况(指服务应用宕机,非es宕机)
- 同步线程启动时,会注册到nacos上,在项目启动多个的情况下也只会有一个同步线程,一定程度上避免重复操作
- 支持同步mysql的text类型数据(算是一个小坑)
Q:为什么内嵌于某个模块中?而不是单独开一个模块?
A:垃圾电脑内存不够了,平时开发环境跑起来就是4个java应用+mysql+redis+es+nacos+sentinel+nginx+微信小程序开发工具+IDEA+谷歌浏览器+postman,16G内存都被干掉了。。还有jenkins这种之后再集成的都丢师兄送的云上了。
本文核心代码可见https://gitee.com/wenjie2018/UT-APP 的:
- 一些工具类或者实体类就一笔带过了,卡在这一块的就自己找代码看看。
环境准备
- 首先是mysql的配置,和canal要求的类似,我们需要开启binlog和更改binlog的格式:
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要重复
-
同时连接mysql的帐号必须要有REPLICATION SLAVE权限,自己做实验嫌麻烦直接用root用户即可。
-
基础环境是SpringBoot,同步代码、es相关依赖,mysql等其它依赖就不贴了,可随喜好更改:
<!-- binlog listener -->
<dependency>
<groupId>com.github.shyiko</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>${mysql-binlog-connector-java.version}</version>
</dependency>
<!--elasticsearch client-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
- es的版本是7.9.2。
核心代码
实体类
为了能自动创建文档,就直接借用spring data的自动创建功能了,写一个实体类再加上@Document(indexName = "对应数据库表名称")和@Component即可,这个两注解随后还会用于扫描对象。
package run.ut.app.model.elasticsearch;
import lombok.*;
import lombok.experimental.Accessors;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import org.springframework.stereotype.Component;
import run.ut.app.model.domain.BaseEntity;
import run.ut.app.model.dto.base.InputConverter;
import run.ut.app.model.param.PostParam;
/**
* <p>
* Posts
* </p>
*
* @author wenjie
* @since 2020-05-12
*/
@Data
@EqualsAndHashCode(callSuper = true)
@Accessors(chain = true)
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Document(indexName = "posts")
@Component···
public class ESPosts extends BaseEntity implements InputConverter<PostParam> {
@Id
private Long id;
private Long uid;
private Long forumId;
@Field(analyzer = "ik_max_word",type = FieldType.Text)
private String title;
@Field(analyzer = "ik_max_word",type = FieldType.Text)
private String content;
private Long likes;
}
扫描并记录es文档名称
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.core.Ordered;
import org.springframework.core.PriorityOrdered;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* Scan @Document annotation and save
*
* @author wenjie
*/
@Component
@Slf4j
@Getter
public class DocumentScan implements BeanDefinitionRegistryPostProcessor, PriorityOrdered {
private List<String> indexNames = new ArrayList<>();
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException {
log.debug("【run.ut.app.scan.DocumentScan.postProcessBeanDefinitionRegistry】");
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
Map<String, Object> beansWithAnnotation = configurableListableBeanFactory.getBeansWithAnnotation(Document.class);
log.info("scan @Document and register");
for (Map.Entry<String, Object> stringObjectEntry : beansWithAnnotation.entrySet()) {
indexNames.add(stringObjectEntry.getValue().getClass().getAnnotation(Document.class).indexName());
}
}
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
}
获取表和列的映射
前面说es文档名要和数据库表名一致是有原因的,因为监听binlog获取的数据是没有列名的,只有按列顺序排列的数据值list,所以当我们要将其保存为es文档时,需要自己设置对应列名。
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service;
import run.ut.app.mapper.ExMapper;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* TableTemplate
*
* @author wenjie
*/
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class TableTemplate implements ApplicationListener<ContextRefreshedEvent> {
private final DocumentScan documentScan;
private final ExMapper exMapper;
@Value("${spring.datasource.database}")
private String database;
/**
* table -> columns
*/
@Getter
private Map<String, List<String>> columnMap = new HashMap<>();
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
// Because of some special circumstances, the release of "" is required
columnMap.put("", new ArrayList<>());
initColumnMap();
}
/**
* init columnMap
*/
private void initColumnMap() {
List<String> indexNames = documentScan.getIndexNames();
for (String indexName : indexNames) {
List<String> columns = exMapper.selectColumnNameByDatabaseAndTable(database, indexName);
columnMap.put(indexName, columns);
}
}
}
查询某表列名的sql语句:
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import java.util.List;
public interface ExMapper extends BaseMapper {
@Select("select COLUMN_NAME from information_schema.COLUMNS where table_name = #{table} and table_schema = #{database}")
List<String> selectColumnNameByDatabaseAndTable(@Param("database") String database, @Param("table") String table);
}
不同sql操作类型的Handler
- 先写个接口
/**
* @author wenjie
*/
public interface EsSyncHandler {
/**
* Processing eventData
*
* @param eventData binlog eventData
*/
public void sync(EventData eventData) throws IOException;
}
- 抽象实现,便于做一些公共扩展
import com.github.shyiko.mysql.binlog.event.EventData;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import run.ut.app.service.OptionsService;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author wenjie
*/
@Data
@Component
@Slf4j
public class AbstractEsSyncHandler implements EsSyncHandler {
@Autowired
private OptionsService optionsService;
private AtomicInteger count = new AtomicInteger(0);
private int mod = 100;
// 每发生100次insert、update、delete操作就持久化一次binlog的偏移量,用于下次重启时找到该偏移量重新同步。
public static ConcurrentHashMap<String, Object> binLogPropertiesMap = new ConcurrentHashMap<>();
/**
* Every {@link AbstractEsSyncHandler#mod} times invoke, save to Mysql
*/
protected void syncBinLogProperties() {
int i = count.incrementAndGet();
if (i / mod == 0) {
optionsService.save(binLogPropertiesMap);
}
}
@Override
public void sync(EventData eventData) throws IOException {
}
}
- mysql insert操作同步handler
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import run.ut.app.elasticsearch.TableTemplate;
import java.io.IOException;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author wenjie
*/
@Component
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class InsertEsSyncHandler extends AbstractEsSyncHandler implements EsSyncHandler {
private final TableTemplate tableTemplate;
private final RestHighLevelClient restHighLevelClient;
@Value("${spring.datasource.database}")
private String database;
@Override
public void sync(EventData eventData) throws IOException {
if (!this.database.equals(EsSyncHandlers.database)) {
return;
}
if (eventData instanceof WriteRowsEventData) {
List<Serializable[]> rows = ((WriteRowsEventData) eventData).getRows();
List<String> columns = tableTemplate.getColumnMap().get(EsSyncHandlers.table);
for (Serializable[] serializables : rows) {
// sql's column -> value
Map<String, String> map = new HashMap<>(1 << 4);
for (int i = 0; i < serializables.length; i++) {
if (serializables[i] instanceof byte[]) {
// Text type corresponding to MySQL
map.put(columns.get(i), new String((byte[]) serializables[i]));
} else {
map.put(columns.get(i), serializables[i].toString());
}
}
// sync document
IndexRequest indexRequest = new IndexRequest(EsSyncHandlers.table);
indexRequest.id(map.get("id"));
map.remove("id");
String createTime = map.get("create_time");
String updateTime = map.get("update_time");
map.put("create_time", createTime.substring(0, createTime.length() - 2));
map.put("update_time", updateTime.substring(0, updateTime.length() - 2));
indexRequest.source(map, XContentType.JSON);
restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
syncBinLogProperties();
}
}
}
}
- mysql update操作同步handler
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import run.ut.app.elasticsearch.TableTemplate;
import run.ut.app.utils.JsonUtils;
import java.io.IOException;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @author wenjie
*/
@Component
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class UpdateEsSyncHandler extends AbstractEsSyncHandler implements EsSyncHandler {
private final TableTemplate tableTemplate;
private final RestHighLevelClient restHighLevelClient;
@Value("${spring.datasource.database}")
private String database;
@Override
public void sync(EventData eventData) throws IOException {
if (!this.database.equals(EsSyncHandlers.database)) {
return;
}
if (eventData instanceof UpdateRowsEventData) {
List<Serializable[]> rows = ((UpdateRowsEventData) eventData).getRows().stream()
// key is before,value is after
.map(Map.Entry::getValue)
.collect(Collectors.toList());
List<String> columns = tableTemplate.getColumnMap().get(EsSyncHandlers.table);
for (Serializable[] serializables : rows) {
// sql's column -> value
Map<String, String> map = new HashMap<>(1 << 4);
for (int i = 0; i < serializables.length; i++) {
// Text type corresponding to MySQL
if (serializables[i] instanceof byte[]) {
map.put(columns.get(i), new String((byte[]) serializables[i]));
} else {
map.put(columns.get(i), serializables[i].toString());
}
}
if ("1".equals(map.get("deleted"))) {
DeleteRequest deleteRequest = new DeleteRequest(EsSyncHandlers.table, map.get("id"));
restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
syncBinLogProperties();
}
UpdateRequest updateRequest = new UpdateRequest(EsSyncHandlers.table, map.get("id"));
map.remove("id");
String createTime = map.get("create_time");
String updateTime = map.get("update_time");
map.put("create_time", createTime.substring(0, createTime.length() - 2));
map.put("update_time", updateTime.substring(0, updateTime.length() - 2));
updateRequest.doc(JsonUtils.objectToJson(map), XContentType.JSON);
restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
syncBinLogProperties();
}
}
}
}
- mysql delete操作同步handler
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import run.ut.app.elasticsearch.TableTemplate;
import java.io.IOException;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author wenjie
*/
@Component
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class InsertEsSyncHandler extends AbstractEsSyncHandler implements EsSyncHandler {
private final TableTemplate tableTemplate;
private final RestHighLevelClient restHighLevelClient;
@Value("${spring.datasource.database}")
private String database;
@Override
public void sync(EventData eventData) throws IOException {
if (!this.database.equals(EsSyncHandlers.database)) {
return;
}
if (eventData instanceof WriteRowsEventData) {
List<Serializable[]> rows = ((WriteRowsEventData) eventData).getRows();
List<String> columns = tableTemplate.getColumnMap().get(EsSyncHandlers.table);
for (Serializable[] serializables : rows) {
// sql's column -> value
Map<String, String> map = new HashMap<>(1 << 4);
for (int i = 0; i < serializables.length; i++) {
if (serializables[i] instanceof byte[]) {
// Text type corresponding to MySQL
map.put(columns.get(i), new String((byte[]) serializables[i]));
} else {
map.put(columns.get(i), serializables[i].toString());
}
}
// sync document
IndexRequest indexRequest = new IndexRequest(EsSyncHandlers.table);
indexRequest.id(map.get("id"));
map.remove("id");
String createTime = map.get("create_time");
String updateTime = map.get("update_time");
map.put("create_time", createTime.substring(0, createTime.length() - 2));
map.put("update_time", updateTime.substring(0, updateTime.length() - 2));
indexRequest.source(map, XContentType.JSON);
restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
syncBinLogProperties();
}
}
}
}
- mysql刷binlog事件handler,这个是保证一致性的关键
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import run.ut.app.model.properties.BinLogProperties;
/**
* @author wenjie
*/
@Component
@Slf4j
public class RotateEventHandler extends AbstractEsSyncHandler implements EsSyncHandler {
/**
* 这里先将偏移量保存到缓存,等适合时机再持久化到mysql。
*/
@Override
public void sync(EventData eventData) {
if (eventData instanceof RotateEventData) {
RotateEventData rotateEventData = (RotateEventData) eventData;
String binlogFilename = rotateEventData.getBinlogFilename();
long binlogPosition = rotateEventData.getBinlogPosition();
AbstractEsSyncHandler.binLogPropertiesMap.put(BinLogProperties.BINLOG_FILENAME.getValue(), binlogFilename);
AbstractEsSyncHandler.binLogPropertiesMap.put(BinLogProperties.BINLOG_POSITION.getValue(), binlogPosition);
}
}
}
轮询调用handler
这大概也算是职责链模式的一种体现吧其实就是for循环
import com.github.shyiko.mysql.binlog.event.EventData;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
/**
* @author wenjie
*/
@Slf4j
@Service
public class EsSyncHandlers {
private Collection<EsSyncHandler> esSyncHandlers = new LinkedList<>();
public static String table = "";
public static String database = "";
public EsSyncHandlers(ApplicationContext applicationContext) {
this.esSyncHandlers = applicationContext.getBeansOfType(EsSyncHandler.class).values();
}
public void sync(EventData eventData) throws IOException {
for (EsSyncHandler esSyncHandler : esSyncHandlers) {
esSyncHandler.sync(eventData);
}
};
}
client启动设置代码
注册操作的代码可选择性无视:
import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.cloud.nacos.discovery.NacosServiceDiscovery;
import com.alibaba.cloud.nacos.registry.NacosRegistration;
import com.alibaba.cloud.nacos.registry.NacosServiceRegistry;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import run.ut.app.elasticsearch.handler.EsSyncHandlers;
import run.ut.app.model.properties.BinLogProperties;
import run.ut.app.service.OptionsService;
import run.ut.app.utils.SpringUtils;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
/**
*
* @author wenjie
*/
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Service
public class BinLogListener implements BinaryLogClient.EventListener, ApplicationListener<ContextRefreshedEvent> {
private final EsSyncHandlers esSyncHandlers;
private final TableTemplate tableTemplate;
private final DataSourceProperties dataSourceProperties;
private final NacosServiceDiscovery nacosServiceDiscovery;
private final NacosServiceRegistry nacosServiceRegistry;
private final NacosDiscoveryProperties nacosDiscoveryProperties;
private final ApplicationContext context;
private BinaryLogClient client;
@Value("${spring.datasource.host}")
private String host;
@Value("${spring.datasource.port}")
private Integer port;
@SneakyThrows
@Override
public void onEvent(Event event) {
if (event != null && event.getData() != null) {
log.debug(event.getData().toString());
}
EventType eventType = event.getHeader().getEventType();
if (eventType == EventType.TABLE_MAP){
TableMapEventData tableMapEventData = event.getData();
EsSyncHandlers.database = tableMapEventData.getDatabase();
EsSyncHandlers.table = tableMapEventData.getTable();
return;
}
if (!tableTemplate.getColumnMap().containsKey(EsSyncHandlers.table)) {
return;
}
EventData data = event.getData();
esSyncHandlers.sync(data);
}
@SneakyThrows
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
// 注册,防止分布式环境下重复的同步线程
String serviceName = this.getClass().getSimpleName();
List<ServiceInstance> instances = nacosServiceDiscovery.getInstances(serviceName);
if(CollectionUtils.isEmpty(instances)) {
register(serviceName);
} else {
return;
}
Thread thread = new Thread(() -> {
try {
binlogClientBoot();
} catch (Exception e) {
closeBinlogClient();
e.printStackTrace();
}
});
thread.setDaemon(false);
thread.start();
}
/**
* 启动监听binlog的线程
*/
public void binlogClientBoot() throws IOException {
BinLogListener binLogListener = SpringUtils.getBean(BinLogListener.class);
OptionsService optionsService = SpringUtils.getBean(OptionsService.class);
// 从mysql的配置表中取出上次应用关闭时的binlog偏移量
Optional<Object> binlogFileName = optionsService.getByKey(BinLogProperties.BINLOG_FILENAME.getValue());
Optional<Object> binlogPosition = optionsService.getByKey(BinLogProperties.BINLOG_POSITION.getValue());
client = new BinaryLogClient(
host,
port,
dataSourceProperties.getUsername(),
dataSourceProperties.getPassword()
);
client.registerEventListener(binLogListener);
// 设置了偏移量就可以从偏移量的位置开始读取
binlogFileName.ifPresent(o -> client.setBinlogFilename(String.valueOf(o)));
binlogPosition.ifPresent(o -> client.setBinlogPosition(Long.parseLong(String.valueOf(o))));
client.connect();
}
public void closeBinlogClient() {
deregister(this.getClass().getSimpleName());
try {
client.disconnect();
} catch (IOException e) {
e.printStackTrace();
}
}
private void register(String serviceName) {
nacosServiceRegistry.register(buildNacosRegistration(serviceName));
}
private void deregister(String serviceName) {
nacosServiceRegistry.deregister(buildNacosRegistration(serviceName));
}
private NacosRegistration buildNacosRegistration(String serviceName) {
NacosDiscoveryProperties newNacosDiscoveryProperties = new NacosDiscoveryProperties();
BeanUtils.copyProperties(nacosDiscoveryProperties, newNacosDiscoveryProperties);
newNacosDiscoveryProperties.setService(serviceName);
return new NacosRegistration(newNacosDiscoveryProperties, context);
}
}
试试效果
就直接拿我的项目试试,发个帖子:
你也可以试下在同步es之前抛出异常看看,之后再重启应用or同步线程就会自动按照上次记录前的binlog偏移量恢复数据了。
just do it ✔
常见疑问
-
Q:为什么binlog的偏移量是一定次数后才持久化而不是定时任务持久化呢?
-
A:使用定时任务可能会会出现这样一种情况:一个事务刚好提交了,并且在提交之后~es同步完成期间,触发了定时任务将偏移量持久化,假如在这次持久化完成之后,es没有同步完成,那么这条记录就永远不会被同步到es了,最终造成数据不一致。
-
欢迎补充