本章目标
- 掌握高效交易验证方式
- 掌握缓存库存模型(解决库存行锁的性能问题)
交易性能瓶颈
JMeter压测
交易验证完全依赖数据库
OrderServiceImpl.class:
@Override
@Transactional
public OrderModel createOrder(Integer userId, Integer itemId, Integer promoId, Integer amount) throws BusinessException {
//1.校验下单状态,下单的商品是否存在,用户是否合法,购买数量是否正确
ItemModel itemModel = itemService.getItemById(itemId);
if(itemModel == null){
throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"商品信息不存在");
}
//校验用户
UserModel userModel = userService.getUserById(userId);
if(userModel == null){
throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"用户信息不存在");
}
if(amount <= 0 || amount > 99){//不能不买,也不能一次买多个
throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"数量信息不正确");
}
//校验活动信息-->是内存操作
if(promoId != null){
//(1)校验对应活动是否存在这个适用商品
if(promoId.intValue() != itemModel.getPromoModel().getId()){
throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"活动信息不正确");
//(2)校验活动是否正在进行中
}else if(itemModel.getPromoModel().getStatus().intValue() != 2) {
throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"活动信息还未开始");
}
}
//2.落单减库存
boolean result = itemService.decreaseStock(itemId,amount);
if(!result){
throw new BusinessException(EmBusinessError.STOCK_NOT_ENOUGH);
}
//3.订单入库
OrderModel orderModel = new OrderModel();
orderModel.setUserId(userId);
orderModel.setItemId(itemId);
orderModel.setAmount(amount);
if(promoId != null){
orderModel.setItemPrice(itemModel.getPromoModel().getPromoItemPrice());
}else{
orderModel.setItemPrice(itemModel.getPrice());
}
orderModel.setPromoId(promoId);
orderModel.setOrderPrice(orderModel.getItemPrice().multiply(new BigDecimal(amount)));
//3.1生成交易流水号,订单号
orderModel.setId(generateOrderNo());
OrderDO orderDO = convertFromOrderModel(orderModel);
orderDOMapper.insertSelective(orderDO);
//加上商品的销量
itemService.increaseSales(itemId,amount);
//4.返回前端
return orderModel;
}
分析:
1.校验下单状态 中的
itemService.getItemById(itemId);
中,其实操作了3次数据库:- 根据商品id查询
item
表:itemDOMapper.selectByPrimaryKey(id);
- 根据商品信息查询库存
item_stock
表:itemStockDOMapper.selectByItemId(itemDO.getId());
- 根据商品id查询活动商品信息
promo
表:promoService.getPromoByItemId(itemModel.getId());
- 根据商品id查询
校验用户
- 查询user表:
userDOMapper.selectByPrimaryKey(id);
- 查询use_password表:
userPasswordDOMapper.selectByUserId(userDO.getId());
- 查询user表:
2.落单减库存 这将会是个热点操作
最终在
item_stock
表中完成减库存操作itemStockDOMapper.decreaseStock(itemId,amount);
对应sql语句:有一个
item_id = #{itemId}
,比如说传入的itemId=6,就会在6这一行加入行锁<update id="decreaseStock"> update item_stock set stock = stock - #{amount} where item_id = #{itemId} and stock >= #{amount} </update>
3.订单入库
- 3.1生成交易流水号 对
order_info
表:orderDOMapper.insertSelective(orderDO);
- 加销量,对
item
表:itemService.increaseSales(itemId,amount);
- 3.1生成交易流水号 对
可以看出这里有8次查询数据库的IO操作,而且减库存的操作有一个行锁等待。
库存行锁
落单减库存:
boolean result = itemService.decreaseStock(itemId,amount);
最终在item_stock
表中完成减库存操作itemStockDOMapper.decreaseStock(itemId,amount);
对应sql语句:有一个item_id = #{itemId}
,比如说传入的itemId=6,就会在6这一行加入行锁
ItemStockDOMapper.xml:
<update id="decreaseStock">
update item_stock
set stock = stock - #{amount}
where item_id = #{itemId} and stock >= #{amount}
</update>
补充:
update from table set xx='aa' where yy='bb'
这里的yy字段不是主键,但值都是唯一的,这样的话,不加 rowlock时update是锁行还是锁表?
锁表。
若指定的yy为索引(主键是特殊的索引),只有一条记录,则锁行。
若不指定yy为索引,则锁表。
后置处理逻辑
交易链路优化
交易验证优化
- 用户风控策略优化:策略缓存模型优化
- 活动校验策略优化:引入活动发布流程,模型缓存化,紧急下限能力
UserModel和ItemModel缓存模型
增加
UserService.getUserByIdInCache(Integer id);
方法UserServiceImpl.class:
@Override public UserModel getUserByIdInCache(Integer id) { UserModel userModel= (UserModel) redisTemplate.opsForValue().get("user_validate_"+id); if (userModel==null){ userModel=this.getUserById(id); redisTemplate.opsForValue().set("user_validate_"+id,userModel); redisTemplate.expire("user_validate_"+id,10, TimeUnit.MINUTES); } return userModel; }
增加
ItemService.getItemByIdInCache(Integer id);
方法ItemServiceImpl.class:
@Override public ItemModel getItemByIdInCache(Integer id) { ItemModel itemModel= (ItemModel) redisTemplate.opsForValue().get("item_validate_"+id); if (itemModel==null){ itemModel=this.getItemById(id); redisTemplate.opsForValue().set("item_validate_"+id,itemModel); redisTemplate.expire("item_validate_"+id,10, TimeUnit.MINUTES); } return itemModel; }
库存行锁优化
再回顾下我们的减库存操作:
ItemStockDOMapper.xml:
<update id="decreaseStock">
update item_stock
set stock = stock - #{amount}
where item_id = #{itemId} and stock >= #{amount}
</update>
mysql将会在item_id = #{itemId}
的地方加上一个行锁,前提是item_id
在数据库里是必须是有索引的。如果没有索引是会锁表的。
默认item_stock
表中的item_id字段是没有索引的,设置索引:
ALTER table item_stock add UNIQUE INDEX
item_id_index(item_id)
扣减库存缓存化
在内存中肯定比在磁盘中扣减库存快。
方案:
- 活动发布同步库存进缓存
- 下单交易减缓存中库存
动发布同步库存进缓存
PromoServiceImpl.class
@Override
public void publishPromo(Integer promoId) {
//通过活动id获取活动
PromoDO promoDO=promoDOMapper.selectByItemId(promoId);
if (promoDO.getItemId()==null||promoDO.getItemId().intValue()==0){//说明对应的操作不存在,这个活动没有适应的商品
return;
}
ItemModel itemModel=itemService.getItemById(promoDO.getItemId());
//将库存同步到redis内
redisTemplate.opsForValue().set("promo_item_stock_"+itemModel.getId(),itemModel.getStock());
}
在ItemController.class中完成发布promo的操作
@RequestMapping(value = "/publicpromo",method = {RequestMethod.GET})
@ResponseBody
public CommonReturnType publicpromo(@RequestParam(name = "id")Integer id){
promoService.publishPromo(id);
return CommonReturnType.create(null);
}
下单交易减缓存中库存
ItemServiceImpl.class中
@Override
@Transactional
public boolean decreaseStock(Integer itemId, Integer amount) throws BusinessException {
// int affectedRow = itemStockDOMapper.decreaseStock(itemId,amount);
Long result=redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue()*-1);
if(result >= 0){
//更新库存成功
return true;
}else{
//更新库存失败
return false;
}
}
问题
数据库记录不一致
内存毕竟是不可靠的,如果宕机,库存信息就会全部消失
异步同步数据库
采用异步消息队列的方式,将对应的异步扣减的消息同步给消息的consumer
端,由这个consumer
端完成数据库的扣减操作。
方案(前两种步骤不变)
- 活动发布同步库存进缓存
- 下单交易减缓存中库存
- 异步消息扣减数据库内库存
异步扣减数据库:既能保证用户的一个高效的购买体验,又可以保证数据库最终一致性。
异步消息队列中间件rocketmq
- 高性能,高并发,分布式消息中间件
- 典型应用场景:分布式事物,异步解耦
rocketmq概念模型
部署模型
分布式事物
CAP
C:一致性
A:可用性
P:分区容忍性
分布式事物中,p分区容忍性是必须的,所以必须在C一致性
和A可用性
之间做一个选择
BASE
不追求瞬时状态的强一致,追求的是最终的一致性,也就是我的数据最终会达到一致。
B:基础可用
S:软状态
E:最终一致性
软状态:在我们的应用当中,会瞬时的存在有数据不一致的情况,比如说一部分数据成功,另外一部分数据还在处理中,那我们的业务认为这些数据是可以容忍的。
结合到我们缓存库存的模型当中,我们怎么解决这些问题?
在我们缓存库存当中,我们的redis的状态是正确的状态,比如redis中的库存从87减到86,redis中的86是正确的,但是我们数据库中库存状态由于异步消息队列的consumer端还没有被触发,因此在consumer没有消费完消息之前,数据库里的库存数错的,比如还是87。但是只要这个分布式的消息投递成功了,consumer端消费了这个消息,最终我们数据库中的状态会从87减到86,达到最终一致性。
这样的话,只要我们使用的消息中间件的高可用性达到99.99%,那至少有99.99%以上的概率,我们数据库里的状态可以和redis中的数据保持一致的。
数据库中库存数据和缓存中保持最终一致性
缓存库存接入异步化
ItemServiceImpl.java
@Override
@Transactional
public boolean decreaseStock(Integer itemId, Integer amount) throws BusinessException {
// int affectedRow = itemStockDOMapper.decreaseStock(itemId,amount);
Long result=redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue()*-1);
if(result >= 0){
//更新库存成功
boolean mqResult= mQproducer.ausncReduceStock(itemId,amount);
if (!mqResult){
redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue());
return false;
}
return true;
}else{
redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue());
//更新库存失败
return false;
}
}
在redis中扣减完库存后,将扣减库存的消息发送给MQ。使用的是mQproducer.ausncReduceStock(itemId,amount)
方法:
引入消息生产者MQproducer.java
//同步库存扣减消息
public boolean ausncReduceStock(Integer itemId,Integer amount) {
Map<String,Object> bodyMap=new HashMap<>();
bodyMap.put("itemId",itemId);
bodyMap.put("amount",amount);
Message message=new Message(topicName,"increase", JSON.toJSON(bodyMap).toString().getBytes(Charset.forName("UTF-8")));
try {
producer.send(message);
} catch (MQClientException e) {
return false;
} catch (RemotingException e) {
return false;
} catch (MQBrokerException e) {
return false;
} catch (InterruptedException e) {
return false;
}
return true;
}
引入MQconsumer.java
@Component
public class MQconsumer {
private DefaultMQPushConsumer consumer;
@Value("${mq.nameserver.addr}")
private String nameAddr;
@Value("${mq.topicname}")
private String topicName;
@Autowired
private ItemStockDOMapper itemStockDOMapper;
@PostConstruct
public void init() throws MQClientException {
consumer=new DefaultMQPushConsumer("stock_consumer_group");
consumer.setNamesrvAddr(nameAddr);
consumer.subscribe(topicName,"*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//实现库存真正到数据库内扣减的逻辑
Message message=msgs.get(0);
String jsonstring=new String(message.getBody());
Map<String,Object> map= JSON.parseObject(jsonstring, Map.class);
Integer itemId= (Integer) map.get("itemId");
Integer amount= (Integer) map.get("amount");
itemStockDOMapper.decreaseStock(itemId,amount);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
问题
异步消息发送失败
MQproducer.java
中的producer.send(message)
消息发送失败了,我们现在没有解决。扣减操作失败
若库存扣减这个操作执行失败了,这条扣减消息应该怎么处理
下单失败无法正确回补库存
若用户取消订单了,我们怎么回滚库存呢