前言

前段时间在做课程设计的时候,有一个需求是【用户支付后系统自动分配配送员,且保证订单能平均配分】,而我使用了RocketMQ + Redis实现了这个功能,所以本篇就来简单聊一聊我为什么这么做?这么做的好/坏处在哪?

PS:"保证订单能平均分配"中的保证,我理解成了强保证,即整个过程要保证原子性

关于软保证,我会在文末提一下,但事先说明下软保证比强保证要简单。


2020年5月2号补充:Redis 新版6.x开始正式支持多线程了。

分析需求

其实这需求咋一看,实现起来非常简单,比如可能有人想到如下方案:

  • 可以在数据库表中存储配送员的 id 和 权重,每次支付完后,取权重最高的配送员id + 减少权重,完成分配。
  • 每次支付完后,查询所有配送员当前正在配送的任务数量,取最低任务数量的配送员id,完成分配。(妈见打系列)
  • 应用启动时加载配送员数据到ConcurrentSkipListSet/TreeSet(注意线程安全),每次支付后,从中取权重最高的配送员id + 减少权重,完成分配。
  • 使用Redis的zset存储配送员的权重和id,每次支付后,从中取出权重最高的配送员id + 减少权重,完成分配。

对于第一种方式,可以实现,但性能不高,因为要扫全表。
对于第二种方式,可以实现,但是性能妈见打。
对于第三种方式,可以实现,性能不错,但编码工作量高(如需持久化得自己编写),出现问题也不容易排查。
对于第四种方式,可以实现,性能不错,编码量少,出现问题也相对容易排查。

在最后,显而易见,我选择了第四种方式。

那选择第四种方式之后,是不是就可以开始撸代码了呢?
答:可以,但是还能继续优化。

进一步优化

不足之处:

如果说我们的目的只是为了取到当前任务量最少的配送员id,那么上面的想法大可及格。

但很遗憾,在取出配送员id前后,我们的系统还有一系列操作,比如用户支付后,我们首先要将订单的状态更改为【已支付】,然后再取出任务量最少的配送员,之后再将配送员id更新到表中。

假如现在表的数据量比较大,而且请求也比较多,而这三个步骤都放在用户一次请求中同步执行,那系统的并发量就会受到很大的限制。



想法改进:

既然现在知道了不足的地方,那么应该怎么优化呢?

首先,我们要回到现实中重新理一理思路,给的需求是支付后分配配送员,但并没有强调支付后马上分配配送员

然后,根据这一点,我脑海里出现了一个词:最终一致性

试想一下自己平时用的快递APP,下单后虽然是马上分配配送员了,但是配送员一般会在两小时后(或更久)才联系你。简而言之,我们没有必要在支付后马上给用户分配配送员,而是在一定时间内(用户可接受)给用户分配到配送员就可以了。 也就是说我们可以把分配配送员的一系列操作滞后且异步化,但必须保证最终一定要分配到配送员。

小贴士:"马上"一词在这里指的是同步操作。至于是不是真的"马上",我也不太确定,也许只是因为人家用了另一种异步的解决方案,而且这个方案性能也非常棒。



实现选型:

我们已经推导出在这个业务场景下,可以通过最终一致性的理论解决大量同步操作所带来的困扰。那么我们应该通过什么手段去实践这个理论呢?

我的想法是通过消息队列去解决这个问题,已知目前比较流行的消息队列有3种:Kafka,RabbitMQ,RocketMQ。于是我稍微上网查了下资料,得出了比较简单的比较结果:

  • Kafka:性能&吞吐量高,但没有消息重试机制,需要自己实现消息重试才能用于实现分布式事务(最终一致性)。
  • RabbitMQ:可以通过消息重试+手动ack实现,但要保证消息的高可靠性需要付出昂贵的代价。
  • RocketMQ:有官方实现的事务消息机制,同Kafka能在牺牲比较小的情况下保证消息高可靠性,但社区、文档实在不敢恭维(不信可以对比下官网)。

最终,由于我课设里面还计划用到延时消息,所以最终选了RocketMQ。RabbitMQ实现延时消息需要各种死信处理机制,实在不香。而Kafka要自己手动实现消息重试,编码量可能比较大,排错可能也比较难,时间有限,同样不香。

下面再附上一张RocketMQ事务消息的流程图(出自阿里官网)

rocketmq分布式事务.png

这里忍不住再吐糟一下RocketMQ的文档,信息量真的非常少,比较关键的地方甚至没有例图,整个文档几乎全是英文简介+一些代码,就连它事务消息的流程图我都不是从Apache官网找的,而是从阿里的文档里面找到的(开源版本贡献给Apache了)。


编写Lua脚本

Redis比较关键的流程可分为以下三点:

  1. zset 获取权重最高的配送员
  2. 空隙
  3. zset 减少上面获取到的配送员的权重

由于这里的设计是强保证,所以必须要保证当第一个请求进入【1】获取了一次权重最高的配送员后,如果此时有第二个请求试图进入【1】(此时第一个请求处于【2】),那么第二个请求必须等待第一个请求执行完【3】才能进入【1】,以免重复获取。为了保证这个过程的原子性,我采用了lua脚本。


Lua脚本如下:

--- 学校所属id
local school_id = KEYS[1]
--- 订单id
local order_id = KEYS[2]
--- 有效时间
local expire = ARGV[1]

--- 获取分数最高配送员
local res_courier_id = redis.call("ZREVRANGE", school_id, 0, 0)
--- 不为空再执行
if res_courier_id  then
    --- 扣除该配送员10分
    if redis.call("ZINCRBY", school_id , "-10", res_courier_id[1]) then
        --- 记录日志,记录成功再设置过期时间
        if redis.call("set", order_id, res_courier_id[1]) then
            --- 需要类型转换才能比较
            if tonumber(expire) > 0 then
                redis.call("expire", order_id, expire)
            end
        end
    end
end
return res_courier_id

小贴士:Lua脚本是必须经过反复测试,确认没有死锁等致命问题的之后才使用的,但由于测试过程比较繁琐,我在这里就不一一赘述的,你只需要直到上面的脚本是经过一定测试的就可以了。


编写Java代码

先理一理DAO的代码思路:

// 假设配送员初始分数为10000
数据库业务操作函数(参数){
    记录 = mysql在配送任务表中查询orderId的纪律
    if(记录不存在){
        String cid = redis/es 根据订单id查日志(日志的key/vlues形式为 订单id=配送员id)
        if(cid不为空){
            分配配送员id为cid的配送员配送此订单
            // 删除日志(废除)
            // 假设在此处删除日志: todo: 假如系统在这里宕机了,配送员9990分,那mysql事务回滚,mq回滚,因为日志删除,会进入下面的逻辑,配送员会被多扣分的
            // 假设不在此处删除日志: todo 系统宕机,配送员9990分,mysql和mq都回滚,但因为日志依然存在,所以还是进入到此处逻辑,不影响分数一致性
        }else{
            // 使用lua实现确保一致性
            redis取分数最高配送员,并减掉该配送员分数10,记录日志
            mysql配送员配送记录
            // 如果在在之后redis宕机了,代码还是会正常走下去,数据恢复只能靠aof了,默认是每秒保存,不管有没有持久化,都不影响正常业务主线
            // 如果在此处是系统宕机了(redis没死),那么mysql事务也会回滚,mq也会重新发送,但因为有日志的存在,会进入上面的逻辑
            // 如果系统和redis都死了,而redis持久化了,那么下次mq重投就会进入上面,如果没持久化,就重新进入此处,无论哪种情况,
            //     都保证了分数的一致性(一般不会发生redis宕机,因为redis通常是第三方如阿里提供的远程服务)
        }
    }
}


再理一理MQ消费函数中文思路:

消息队列处理函数(参数){
    数据库业务操作函数(orderId);
    // 如果在此处宕机,mysql已提交,mq重投,但由于mysql记录已存在,相当于不执行,直接跳出来
    删除日志;
    // 如果在此处系统宕机,依旧是mysql已提交,mq重投,虽然删除日志的操作重复了,但不影响一致性
    // 如果redis挂了没持久化,也是日志没被删除而已,ttl时间过后自动删除,不会影响正常业务
    // 如果系统和redis都挂了,mysql已提交,mq重投,无论redis有没有持久化,也只是影响到日志而已,跟上一个如果相似,不影响正常业务
}

最终变成如下两个函数:

    /**
     * 考虑到MQ的其它特性,再加了点代码。
     * @param orderId 订单id
     * @param type 执行类型 first-支付之后分配 re-手动重新分配
     */
    @Override
    @Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
    public void distributionCourier(String orderId, String type) {
        // 查询对应订单id且配送员id为""的订单记录
        OrderInfo orderInfo = orderInfoMapper.selectOne(
                new QueryWrapper<OrderInfo>().
                        eq("id", orderId).
                        eq("courier_id", ""));
        orderInfo.setUpdateDate(null);
        // 一般情况下,如果是因网络波动重复发送的消息,就会直接跳过下面的if了,因为重复发送一般是很久没有收到回答信息
        int updateCount = 0;
        if (!ObjectUtils.isEmpty(orderInfo)){
            // 查询 redis 日志
            String logKey = RedisConfig.ORDER_COURIER_DATA + "::" + orderId;
            String courierId = redisService.get(logKey);
            if (!StringUtils.isEmpty(courierId)){
                orderInfo.setCourierId(courierId);
                // 只有支付后分配才将状态改为等待揽收
                if ("first".equals(type)){
                    orderInfo.setOrderStatus(OrderStatusEnum.WAIT_PICK_UP);
                }
                updateCount = orderInfoMapper.update(orderInfo, new QueryWrapper<OrderInfo>()
                        .eq("id", orderId).eq("courier_id", ""));
            }else {
                // 查询下单者的学校id(远程调用)
                SysUser user = ucenterFeignClient.getById(orderInfo.getUserId());
                // redis 选取该学校分数最高的配送员,并减掉该配送员分数10,记录日志
                String key = RedisConfig.COURIER_WEIGHT_DATA + "::" + user.getSchoolId();
                List<String> keys = Arrays.asList(key, logKey);
                courierId = stringRedisTemplate.execute(redisScript, keys, String.valueOf(RedisConfig.DISTRIBUTION_LOG_TIME_OUT));
                if (StringUtils.isEmpty(courierId)){
                    ExceptionCast.cast(CommonCode.COURIER_NOT_EXIST);
                }
                orderInfo.setCourierId(courierId);
                if ("first".equals(type)){
                    orderInfo.setOrderStatus(OrderStatusEnum.WAIT_PICK_UP);
                }
                updateCount = orderInfoMapper.update(orderInfo, new QueryWrapper<OrderInfo>()
                        .eq("id", orderId).eq("courier_id", ""));
            }
        }
        if (updateCount == 0){
            log.warn("【分配配送员业务】可能由于网络波动,消息被重复消费,订单号:{}", orderId);
        }
    }
}

/**
 * @author cwj
 * 延时消息取消订单单和关闭支付
 */
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@RocketMQMessageListener(consumerGroup = RocketmqConfig.DISTRIBUTION_COURIER_GROUP, topic = RocketmqConfig.DISTRIBUTION_COURIER_TOPIC)
@Slf4j
public class DistributionCourierConsumer implements RocketMQListener<String> {

    private final DistributionCourierService distributionCourierService;
    private final RedisService redisService;

    @Override
    public void onMessage(String orderIdAndType) {
        String[] values = orderIdAndType.split("@@");
        String orderId = values[0];
        String type = values[1];
        // 分配配送员业务
        distributionCourierService.distributionCourier(orderId, type);

        // 删除redis日志 (可以不删,但要完善权重自动/手动校准机制)
        String logKey = RedisConfig.ORDER_COURIER_DATA + "::" + orderId;
        redisService.remove(logKey);
    }
}

再补上Lua初始化:

@Configuration
public class LuaConfiguration {
    @Bean
    public DefaultRedisScript<String> redisScript() {
        DefaultRedisScript<String> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/distribution.lua")));
        redisScript.setResultType(String.class);
        return redisScript;
    }
}

小结/扩展

Lua脚本注意点:

Lua脚本虽然能保证原子性,但因为redis是单线程io多路复用模型, 所以在执行Lua的时候,会"阻塞"住当前执行Lua脚本的redis节点的所有其它读写请求,直到当前运行的Lua脚本执行完成。

注意!!!
上面的的"阻塞"是指一个正在执行的Lua线程处于阻塞状态,并不是指当一个Lua正在运行时再运行其它命令会阻塞,如果上一个Lua还在运行,那么再执行其它任何读写命令都会出现(error) BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.

如果是主从模式,主节点正在运行一个Lua脚本,从节点依然可读但不可写。

这也就意味着,lua脚本必须不允许出现死锁、且不能是执行时间长的脚本(如果是执行时间比较长的脚本,应该限流+完善容错重试机制)。

还有就是redis的lua脚本特性所衍生出来的长连接/任务超时问题:
如果lua脚本因某种原因导致长时间阻塞,那么就有可能导致分配配送员消息消费失败重试/长连接导致连接丢失,然后重试的消息不断重试还是失败,最终消息积压爆表,可能导致服务雪崩/长连接过多导致新用户无法再连接。

对于长连接的解决我自然是用RocketMQ解决,毕竟假设在海量数据的情况下,完整的插入事务流程肯定比发送一条事务消息更耗时。

至于失败消息积压方面解决的方案就有很多,包括sentinel限流、rocketmq限制消费的线程池等。


软保证

上面的强保证,主要体现在我们使用了Lua脚本保证了操作原子性,也就是保证了在一定并发下,绝对不会出现重复获取同一权重且同一配送员的情况。

但是这种强保证,无疑给我们带来了很大限制(主要指Lua脚本的限制)。

我们可以再仔细想一想,配送员的公平性是不是一定要强保证呢?因为系统是长久运行的,就算在某一段时间出现了高并发,并且由于网络波动这几个并发请求都获取到了同一个配送员,但从结果来看,这位被重复获取的配送员,该扣的权重还是被扣了。

那么在之后的一段时间内,这位被重复获取的配送员【1】,权重应该是垫底的,且其它配送员就算再扣几次权重,排名也应该还是比配送员【1】高。

也就是我们从系统会长期运行的角度看,就算没有强保证,我们依然能保证配送员的分配是接近于平均的,这种观点有点类似于"最终一致性",我把它称为"软保证"。

使用软保证虽然会破坏短期内的平衡性,但它能保证长期内平衡不会破坏的很离谱,且不再受Lua的限制,提高系统并发。当然,前提是业务能接受短期的不平衡。




参考资料: