前言有些小伙伴在工作中可能遇到过数据库查询慢的种方问题,特别是种方模糊查询和复杂聚合查询,这时候引入ES(Elasticsearch)作为搜索引擎是种方个不错的选择。 今天我们来聊聊MySQL同步到ES(Elasticsearch)的种方5种常见方案。 希望对你会有所帮助。种方 一、种方为什么需要MySQL同步到ES?种方在我们深入讨论方案之前,先明确一下为什么需要将MySQL数据同步到ES: 全文搜索能力:ES提供强大的种方全文搜索功能,远超MySQL的种方LIKE查询。复杂聚合分析:ES支持复杂的种方聚合查询,适合大数据分析。种方高性能查询:ES的种方倒排索引设计使查询速度极快。水平扩展:ES天生支持分布式,种方易于水平扩展。种方        先来看一下整体的种方同步架构图:  图片
 接下来,我们详细分析每种方案的实现原理和优缺点。 二、方案一:双写方案双写方案是最直接的同步方式,即在业务代码中同时向MySQL和ES写入数据。 示例代码:复制@Service                        public class UserService {                        @Autowired                        private UserMapper userMapper;                        @Autowired                        private ElasticsearchTemplate elasticsearchTemplate;                        @Transactional                        public void addUser(User user) {                        // 写入MySQL                        userMapper.insert(user);                        // 写入Elasticsearch                        IndexQuery indexQuery = new IndexQueryBuilder()                        .withObject(user)                        .withId(user.getId().toString())                        .build();                        elasticsearchTemplate.index(indexQuery);                        }                        @Transactional                        public void updateUser(User user) {                        // 更新MySQL                        userMapper.updateById(user);                        // 更新Elasticsearch                        IndexRequest request = new IndexRequest("user_index")                        .id(user.getId().toString())                        .source(JSON.toJSONString(user), XContentType.JSON);                        elasticsearchTemplate.getClient().index(request, RequestOptions.DEFAULT);                        }                        }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.                                                                优缺点分析优点: 实现简单,不需要引入额外组件实时性高,网站模板数据立即同步        缺点: 数据一致性难保证,需要处理分布式事务问题代码侵入性强,业务逻辑复杂性能受影响,每次写操作都要等待ES响应        适用场景适合数据量不大,对实时性要求高,且能够接受一定数据不一致的业务场景。 三、方案二:定时任务方案定时任务方案通过定期扫描MySQL数据变化来同步到ES。 示例代码:复制@Component                        public class UserSyncTask {                        @Autowired                        private UserMapper userMapper;                        @Autowired                        private UserESRepository userESRepository;                        // 每5分钟执行一次                        @Scheduled(fixedRate = 5 * 60 * 1000)                        public void syncUserToES() {                        // 查询最近更新的数据                        Date lastSyncTime = getLastSyncTime();                        List<User> updatedUsers = userMapper.selectUpdatedAfter(lastSyncTime);                        // 同步到ES                        for (User user : updatedUsers) {                        userESRepository.save(user);                        }                        // 更新最后同步时间                        updateLastSyncTime(new Date());                        }                        // 获取最后同步时间                        private Date getLastSyncTime() {                        // 从数据库或Redis中获取                        // ...                        }                        }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.                                                                数据更新追踪策略为了提高同步效率,通常需要设计良好的数据变更追踪机制:  图片
 优缺点分析优点: 实现简单,不需要修改现有业务代码对数据库压力可控,可以调整同步频率        缺点: 实时性差,数据同步有延迟可能遗漏数据,如果系统崩溃会丢失部分数据扫描全表可能对数据库造成压力        适用场景适合对实时性要求不高,数据变更不频繁的场景。 四、方案三:Binlog同步方案Binlog是MySQL的二进制日志,记录了所有数据变更操作。 通过解析Binlog可以实现数据同步。 示例代码:复制public class BinlogSyncService {                        public void startSync() {                        BinaryLogClient client = new BinaryLogClient("localhost", 3306, "username", "password");                        client.registerEventListener(new BinaryLogClient.EventListener() {                        @Override                        public void onEvent(Event event) {                        EventData eventData = event.getData();                        if (eventData instanceof WriteRowsEventData) {                        // 插入操作                        WriteRowsEventData writeData = (WriteRowsEventData) eventData;                        processInsertEvent(writeData);                        } elseif (eventData instanceof UpdateRowsEventData) {                        // 更新操作                        UpdateRowsEventData updateData = (UpdateRowsEventData) eventData;                        processUpdateEvent(updateData);                        } elseif (eventData instanceof DeleteRowsEventData) {                        // 删除操作                        DeleteRowsEventData deleteData = (DeleteRowsEventData) eventData;                        processDeleteEvent(deleteData);                        }                        }                        });                        client.connect();                        }                        private void processInsertEvent(WriteRowsEventData eventData) {                        // 处理插入事件,同步到ES                        for (Serializable[] row : eventData.getRows()) {                        User user = convertRowToUser(row);                        syncToElasticsearch(user, "insert");                        }                        }                        private void syncToElasticsearch(User user, String operation) {                        // 同步到ES的实现                        // ...                        }                        }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.                                                                优缺点分析优点: 实时性高,几乎实时同步对业务代码无侵入,不需要修改现有代码性能好,不影响数据库性能        缺点: 实现复杂,需要解析Binlog格式需要考虑Binlog格式变更的兼容性问题主从切换时可能需要重新同步        适用场景适合对实时性要求高,IT技术网数据量大的场景。 五、方案四:Canal方案Canal是阿里巴巴开源的MySQL Binlog增量订阅&消费组件,简化了Binlog同步的复杂性。 示例代码:复制# canal.properties 配置                        canal.instance.master.address=127.0.0.1:3306                        canal.instance.dbUsername=username                        canal.instance.dbPassword=password                        canal.instance.connectionCharset=UTF-8                        canal.instance.filter.regex=.*\\..*1.2.3.4.5.6.                                                                        复制public class CanalClientExample {                        public static void main(String[] args) {                        // 创建Canal连接                        CanalConnector connector = CanalConnectors.newSingleConnector(                        new InetSocketAddress("127.0.0.1", 11111), "example", "", "");                        try {                        connector.connect();                        connector.subscribe(".*\\..*");                        while (true) {                        Message message = connector.getWithoutAck(100);                        long batchId = message.getId();                        if (batchId != -1 && !message.getEntries().isEmpty()) {                        processEntries(message.getEntries());                        connector.ack(batchId); // 提交确认                        }                        Thread.sleep(1000);                        }                        } finally {                        connector.disconnect();                        }                        }                        private static void processEntries(List<CanalEntry.Entry> entries) {                        for (CanalEntry.Entry entry : entries) {                        if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());                        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {                        if (rowChange.getEventType() == CanalEntry.EventType.INSERT) {                        processInsert(rowData);                        } elseif (rowChange.getEventType() == CanalEntry.EventType.UPDATE) {                        processUpdate(rowData);                        } elseif (rowChange.getEventType() == CanalEntry.EventType.DELETE) {                        processDelete(rowData);                        }                        }                        }                        }                        }                        }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.                                                                架构设计Calan方案的架构如下:  图片
 优缺点分析优点: 实时性高,延迟低对业务系统无侵入阿里巴巴开源项目,社区活跃        缺点: 需要部署维护Canal服务器需要处理网络分区和故障恢复可能产生数据重复同步问题        适用场景适合大数据量、高实时性要求的场景,且有专门团队维护中间件。 六、方案五:MQ异步方案MQ异步方案通过消息队列解耦MySQL和ES的同步过程,提高系统的可靠性和扩展性。 示例代码:复制@Service                        public class UserService {                        @Autowired                        private UserMapper userMapper;                        @Autowired                        private RabbitTemplate rabbitTemplate;                        @Transactional                        public void addUser(User user) {                        // 写入MySQL                        userMapper.insert(user);                        // 发送消息到MQ                        rabbitTemplate.convertAndSend("user.exchange", "user.add", user);                        }                        @Transactional                        public void updateUser(User user) {                        // 更新MySQL                        userMapper.updateById(user);                        // 发送消息到MQ                        rabbitTemplate.convertAndSend("user.exchange", "user.update", user);                        }                        }                        @Component                        public class UserMQConsumer {                        @Autowired                        private UserESRepository userESRepository;                        @RabbitListener(queues = "user.queue")                        public void processUserAdd(User user) {                        userESRepository.save(user);                        }                        @RabbitListener(queues = "user.queue")                        public void processUserUpdate(User user) {                        userESRepository.save(user);                        }                        @RabbitListener(queues = "user.queue")                        public void processUserDelete(Long userId) {                        userESRepository.deleteById(userId);                        }                        }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.                                                                消息队列选型对比不同的消息队列产品有不同特点,下面是常见MQ的对比:  图片
 优缺点分析优点: 完全解耦,MySQL和ES同步过程相互独立高可用,MQ本身提供消息持久化和重试机制可扩展,可以方便地增加消费者处理消息        缺点: 系统复杂度增加,需要维护MQ集群可能产生消息顺序问题,需要处理消息顺序性数据一致性延迟,依赖于消息消费速度        适用场景适合大型分布式系统,对可靠性和扩展性要求高的场景。源码库 七、5种方案对比为了更直观地比较这5种方案,我们来看一个综合对比表格: 方案名称 实时性 数据一致性 系统复杂度 性能影响 适用场景 双写方案 高 难保证 低 高 小规模应用 定时任务 低 最终一致 低 中 非实时场景 Binlog方案 高 最终一致 高 低 大数据量高实时 Canal方案 高 最终一致 中 低 大数据量高实时 MQ异步方案 中 最终一致 高 低 分布式大型系统 选择建议有些小伙伴在工作中可能会纠结选择哪种方案,这里给出一些建议: 初创项目或小规模系统:可以选择双写方案或定时任务方案,实现简单。中大型系统:建议使用Canal方案或MQ异步方案,保证系统的可靠性和扩展性。大数据量高实时要求:Binlog方案或Canal方案是最佳选择。已有MQ基础设施:优先考虑MQ异步方案,充分利用现有资源。        注意事项无论选择哪种方案,都需要注意以下几点: 幂等性处理:同步过程需要保证幂等性,防止重复数据。监控告警:建立完善的监控体系,及时发现同步延迟或失败。数据校验:定期校验MySQL和ES中的数据一致性。容错机制:设计良好的故障恢复机制,避免数据丢失。        总结MySQL同步到ES(Elasticsearch)是现代应用开发中常见的需求,选择合适的同步方案对系统性能和可靠性至关重要。 本文介绍了5种常见方案,各有优缺点,适用于不同场景。 在实际项目中,可能需要根据具体需求组合使用多种方案,或者对某种方案进行定制化改造。 重要的是要理解每种方案的原理和特点,才能做出合理的技术选型。  |