第7章 交易优化技术之缓存库存

本章目标

  • 掌握高效交易验证方式
  • 掌握缓存库存模型(解决库存行锁的性能问题)

交易性能瓶颈

JMeter压测

屏幕快照 2019-07-02 下午7.05.46

交易验证完全依赖数据库

OrderServiceImpl.class:

    @Override
    @Transactional
    public OrderModel createOrder(Integer userId, Integer itemId, Integer promoId, Integer amount) throws BusinessException {
        //1.校验下单状态,下单的商品是否存在,用户是否合法,购买数量是否正确
        ItemModel itemModel = itemService.getItemById(itemId);
        if(itemModel == null){
            throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"商品信息不存在");
        }
        //校验用户
        UserModel userModel = userService.getUserById(userId);
        if(userModel == null){
            throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"用户信息不存在");
        }
        if(amount <= 0 || amount > 99){//不能不买,也不能一次买多个
            throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"数量信息不正确");
        }

        //校验活动信息-->是内存操作
        if(promoId != null){
            //(1)校验对应活动是否存在这个适用商品
            if(promoId.intValue() != itemModel.getPromoModel().getId()){
                throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"活动信息不正确");
                //(2)校验活动是否正在进行中
            }else if(itemModel.getPromoModel().getStatus().intValue() != 2) {
                throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"活动信息还未开始");
            }
        }

        //2.落单减库存
        boolean result = itemService.decreaseStock(itemId,amount);
        if(!result){
            throw new BusinessException(EmBusinessError.STOCK_NOT_ENOUGH);
        }

        //3.订单入库
        OrderModel orderModel = new OrderModel();
        orderModel.setUserId(userId);
        orderModel.setItemId(itemId);
        orderModel.setAmount(amount);
        if(promoId != null){
            orderModel.setItemPrice(itemModel.getPromoModel().getPromoItemPrice());
        }else{
            orderModel.setItemPrice(itemModel.getPrice());
        }
        orderModel.setPromoId(promoId);
        orderModel.setOrderPrice(orderModel.getItemPrice().multiply(new BigDecimal(amount)));

        //3.1生成交易流水号,订单号
        orderModel.setId(generateOrderNo());
        OrderDO orderDO = convertFromOrderModel(orderModel);
        orderDOMapper.insertSelective(orderDO);

        //加上商品的销量
        itemService.increaseSales(itemId,amount);
        //4.返回前端
        return orderModel;
    }

分析:

  • 1.校验下单状态 中的itemService.getItemById(itemId);中,其实操作了3次数据库:

    • 根据商品id查询item表:itemDOMapper.selectByPrimaryKey(id);
    • 根据商品信息查询库存item_stock表:itemStockDOMapper.selectByItemId(itemDO.getId());
    • 根据商品id查询活动商品信息promo表:promoService.getPromoByItemId(itemModel.getId());
  • 校验用户

    • 查询user表:userDOMapper.selectByPrimaryKey(id);
    • 查询use_password表:userPasswordDOMapper.selectByUserId(userDO.getId());
  • 2.落单减库存 这将会是个热点操作

    最终在item_stock表中完成减库存操作itemStockDOMapper.decreaseStock(itemId,amount);

    对应sql语句:有一个item_id = #{itemId},比如说传入的itemId=6,就会在6这一行加入行锁

    <update id="decreaseStock">
        update item_stock
        set stock = stock - #{amount}
        where item_id = #{itemId} and stock >= #{amount}
      </update>
  • 3.订单入库

    • 3.1生成交易流水号 对order_info表:orderDOMapper.insertSelective(orderDO);
    • 加销量,对item表:itemService.increaseSales(itemId,amount);

可以看出这里有8次查询数据库的IO操作,而且减库存的操作有一个行锁等待。

库存行锁

落单减库存:

boolean result = itemService.decreaseStock(itemId,amount);

最终在item_stock表中完成减库存操作itemStockDOMapper.decreaseStock(itemId,amount);

对应sql语句:有一个item_id = #{itemId},比如说传入的itemId=6,就会在6这一行加入行锁

ItemStockDOMapper.xml:

<update id="decreaseStock">
    update item_stock
    set stock = stock - #{amount}
    where item_id = #{itemId} and stock >= #{amount}
  </update>

补充:

update from table set xx='aa' where yy='bb'

这里的yy字段不是主键,但值都是唯一的,这样的话,不加 rowlock时update是锁行还是锁表?

锁表。

若指定的yy为索引(主键是特殊的索引),只有一条记录,则锁行。
若不指定yy为索引,则锁表。

后置处理逻辑

未命名文件

交易链路优化

交易验证优化

  • 用户风控策略优化:策略缓存模型优化
  • 活动校验策略优化:引入活动发布流程,模型缓存化,紧急下限能力

UserModel和ItemModel缓存模型

  • 增加UserService.getUserByIdInCache(Integer id);方法

    UserServiceImpl.class:

    @Override
        public UserModel getUserByIdInCache(Integer id) {
            UserModel userModel= (UserModel) redisTemplate.opsForValue().get("user_validate_"+id);
            if (userModel==null){
                userModel=this.getUserById(id);
                redisTemplate.opsForValue().set("user_validate_"+id,userModel);
                redisTemplate.expire("user_validate_"+id,10, TimeUnit.MINUTES);
            }
    
            return userModel;
        }
  • 增加ItemService.getItemByIdInCache(Integer id);方法

    ItemServiceImpl.class:

    @Override
        public ItemModel getItemByIdInCache(Integer id) {
            ItemModel itemModel= (ItemModel) redisTemplate.opsForValue().get("item_validate_"+id);
            if (itemModel==null){
                itemModel=this.getItemById(id);
                redisTemplate.opsForValue().set("item_validate_"+id,itemModel);
                redisTemplate.expire("item_validate_"+id,10, TimeUnit.MINUTES);
            }
            return itemModel;
        }

库存行锁优化

再回顾下我们的减库存操作:

ItemStockDOMapper.xml:

<update id="decreaseStock">
    update item_stock
    set stock = stock - #{amount}
    where item_id = #{itemId} and stock >= #{amount}
  </update>

mysql将会在item_id = #{itemId}的地方加上一个行锁,前提是item_id在数据库里是必须是有索引的。如果没有索引是会锁表的。

默认item_stock表中的item_id字段是没有索引的,设置索引:

ALTER table item_stock add UNIQUE INDEX
item_id_index(item_id)

扣减库存缓存化

在内存中肯定比在磁盘中扣减库存快。

方案:

  • 活动发布同步库存进缓存
  • 下单交易减缓存中库存

动发布同步库存进缓存

PromoServiceImpl.class

@Override
    public void publishPromo(Integer promoId) {
        //通过活动id获取活动
        PromoDO promoDO=promoDOMapper.selectByItemId(promoId);
        if (promoDO.getItemId()==null||promoDO.getItemId().intValue()==0){//说明对应的操作不存在,这个活动没有适应的商品
            return;
        }
        ItemModel itemModel=itemService.getItemById(promoDO.getItemId());

        //将库存同步到redis内
        redisTemplate.opsForValue().set("promo_item_stock_"+itemModel.getId(),itemModel.getStock());

    }

在ItemController.class中完成发布promo的操作

@RequestMapping(value = "/publicpromo",method = {RequestMethod.GET})
    @ResponseBody
    public CommonReturnType publicpromo(@RequestParam(name = "id")Integer id){
        promoService.publishPromo(id);
        return CommonReturnType.create(null);
    }

下单交易减缓存中库存

ItemServiceImpl.class中

    @Override
    @Transactional
    public boolean decreaseStock(Integer itemId, Integer amount) throws BusinessException {
       // int affectedRow =  itemStockDOMapper.decreaseStock(itemId,amount);
        Long result=redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue()*-1);
        if(result >= 0){
            //更新库存成功
            return true;
        }else{
            //更新库存失败
            return false;
        }

    }

问题

  • 数据库记录不一致

    内存毕竟是不可靠的,如果宕机,库存信息就会全部消失

异步同步数据库

采用异步消息队列的方式,将对应的异步扣减的消息同步给消息的consumer端,由这个consumer端完成数据库的扣减操作。

方案(前两种步骤不变)

  • 活动发布同步库存进缓存
  • 下单交易减缓存中库存
  • 异步消息扣减数据库内库存

异步扣减数据库:既能保证用户的一个高效的购买体验,又可以保证数据库最终一致性。

异步消息队列中间件rocketmq

  • 高性能,高并发,分布式消息中间件
  • 典型应用场景:分布式事物,异步解耦
rocketmq概念模型

屏幕快照 2019-07-02 下午10.04.57

部署模型

屏幕快照 2019-07-02 下午10.07.58

分布式事物

屏幕快照 2019-07-02 下午10.26.37

CAP

C:一致性

A:可用性

P:分区容忍性

分布式事物中,p分区容忍性是必须的,所以必须在C一致性A可用性之间做一个选择

BASE

不追求瞬时状态的强一致,追求的是最终的一致性,也就是我的数据最终会达到一致。

B:基础可用

S:软状态

E:最终一致性

软状态:在我们的应用当中,会瞬时的存在有数据不一致的情况,比如说一部分数据成功,另外一部分数据还在处理中,那我们的业务认为这些数据是可以容忍的。

结合到我们缓存库存的模型当中,我们怎么解决这些问题?

在我们缓存库存当中,我们的redis的状态是正确的状态,比如redis中的库存从87减到86,redis中的86是正确的,但是我们数据库中库存状态由于异步消息队列的consumer端还没有被触发,因此在consumer没有消费完消息之前,数据库里的库存数错的,比如还是87。但是只要这个分布式的消息投递成功了,consumer端消费了这个消息,最终我们数据库中的状态会从87减到86,达到最终一致性。

这样的话,只要我们使用的消息中间件的高可用性达到99.99%,那至少有99.99%以上的概率,我们数据库里的状态可以和redis中的数据保持一致的。

数据库中库存数据和缓存中保持最终一致性

缓存库存接入异步化

ItemServiceImpl.java
@Override
    @Transactional
    public boolean decreaseStock(Integer itemId, Integer amount) throws BusinessException {
       // int affectedRow =  itemStockDOMapper.decreaseStock(itemId,amount);
        Long result=redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue()*-1);
        if(result >= 0){
            //更新库存成功
            boolean mqResult= mQproducer.ausncReduceStock(itemId,amount);
            if (!mqResult){
                redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue());
                return false;
            }
            return true;
        }else{
            redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue());
            //更新库存失败
            return false;
        }

    }

在redis中扣减完库存后,将扣减库存的消息发送给MQ。使用的是mQproducer.ausncReduceStock(itemId,amount)方法:

引入消息生产者MQproducer.java
//同步库存扣减消息
    public boolean ausncReduceStock(Integer itemId,Integer amount)  {
        Map<String,Object> bodyMap=new HashMap<>();
        bodyMap.put("itemId",itemId);
        bodyMap.put("amount",amount);
        Message message=new Message(topicName,"increase", JSON.toJSON(bodyMap).toString().getBytes(Charset.forName("UTF-8")));
        try {
            producer.send(message);
        } catch (MQClientException e) {
            return false;
        } catch (RemotingException e) {
            return false;
        } catch (MQBrokerException e) {
            return false;
        } catch (InterruptedException e) {
            return false;
        }
        return true;
    }
引入MQconsumer.java
@Component
public class MQconsumer {

    private DefaultMQPushConsumer consumer;

    @Value("${mq.nameserver.addr}")
    private String nameAddr;

    @Value("${mq.topicname}")
    private String topicName;

    @Autowired
    private ItemStockDOMapper itemStockDOMapper;

    @PostConstruct
    public void init() throws MQClientException {
        consumer=new DefaultMQPushConsumer("stock_consumer_group");
        consumer.setNamesrvAddr(nameAddr);
        consumer.subscribe(topicName,"*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
               //实现库存真正到数据库内扣减的逻辑
                Message message=msgs.get(0);
                String jsonstring=new String(message.getBody());
                Map<String,Object> map= JSON.parseObject(jsonstring, Map.class);
                Integer itemId= (Integer) map.get("itemId");
                Integer amount= (Integer) map.get("amount");

                itemStockDOMapper.decreaseStock(itemId,amount);

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

问题

  • 异步消息发送失败

    MQproducer.java中的producer.send(message)消息发送失败了,我们现在没有解决。

  • 扣减操作失败

    若库存扣减这个操作执行失败了,这条扣减消息应该怎么处理

  • 下单失败无法正确回补库存

    若用户取消订单了,我们怎么回滚库存呢


   转载规则


《第7章 交易优化技术之缓存库存》 徐兴华 采用 知识共享署名 4.0 国际许可协议 进行许可。
  目录
I I