第14天-程序员宅基地

技术标签: changgou  

第14章 秒杀

学习目标

  • 防止秒杀重复排队

    重复排队:一个人抢购商品,如果没有支付,不允许重复排队抢购
    
  • 并发超卖问题解决

    1个商品卖给多个人:1商品多订单
    
  • 秒杀订单支付

    秒杀支付:支付流程需要调整
    
  • 超时支付订单库存回滚

    1.RabbitMQ延时队列
    2.利用延时队列实现支付订单的监听,根据订单支付状况进行订单数据库回滚
    

1 防止秒杀重复排队

用户每次抢单的时候,一旦排队,我们设置一个自增值,让该值的初始值为1,每次进入抢单的时候,对它进行递增,如果值>1,则表明已经排队,不允许重复排队,如果重复排队,则对外抛出异常,并抛出异常信息100表示已经正在排队。

1.1 后台排队记录

修改SeckillOrderServiceImpl的add方法,新增递增值判断是否排队中,代码如下:

1558785112055

上图代码如下:

//递增,判断是否排队
Long userQueueCount = redisTemplate.boundHashOps("UserQueueCount").increment(username, 1);
if(userQueueCount>1){
    
    //100:表示有重复抢单
    throw new RuntimeException(String.valueOf(StatusCode.REPERROR));
}

2 并发超卖问题解决

超卖问题,这里是指多人抢购同一商品的时候,多人同时判断是否有库存,如果只剩一个,则都会判断有库存,此时会导致超卖现象产生,也就是一个商品下了多个订单的现象。

2.1 思路分析

1557080237953

解决超卖问题,可以利用Redis队列实现,给每件商品创建一个独立的商品个数队列,例如:A商品有2个,A商品的ID为1001,则可以创建一个队列,key=SeckillGoodsCountList_1001,往该队列中塞2次该商品ID。

每次给用户下单的时候,先从队列中取数据,如果能取到数据,则表明有库存,如果取不到,则表明没有库存,这样就可以防止超卖问题产生了。

在我们队Redis进行操作的时候,很多时候,都是先将数据查询出来,在内存中修改,然后存入到Redis,在并发场景,会出现数据错乱问题,为了控制数量准确,我们单独将商品数量整一个自增键,自增键是线程安全的,所以不担心并发场景的问题。

1557081924548

2.2 代码实现

每次将商品压入Redis缓存的时候,另外多创建一个商品的队列。

修改SeckillGoodsPushTask,添加一个pushIds方法,用于将指定商品ID放入到指定的数字中,代码如下:

/***
 * 将商品ID存入到数组中
 * @param len:长度
 * @param id :值
 * @return
 */
public Long[] pushIds(int len,Long id){
    
    Long[] ids = new Long[len];
    for (int i = 0; i <ids.length ; i++) {
    
        ids[i]=id;
    }
    return ids;
}

修改SeckillGoodsPushTask的loadGoodsPushRedis方法,添加队列操作,代码如下:

1557393449652

上图代码如下:

//商品数据队列存储,防止高并发超卖
Long[] ids = pushIds(seckillGood.getStockCount(), seckillGood.getId());
redisTemplate.boundListOps("SeckillGoodsCountList_"+seckillGood.getId()).leftPushAll(ids);
//自增计数器
redisTemplate.boundHashOps("SeckillGoodsCount").increment(seckillGood.getId(),seckillGood.getStockCount());

2.3 超卖控制

修改多线程下单方法,分别修改数量控制,以及售罄后用户抢单排队信息的清理,修改代码如下图:

1558788854992

上图代码如下:

/***
 * 多线程下单操作
 */
@Async
public void createOrder(){
    
    //从队列中获取排队信息
    SeckillStatus seckillStatus = (SeckillStatus) redisTemplate.boundListOps("SeckillOrderQueue").rightPop();

    try {
    
        //从队列中获取一个商品
        Object sgood = redisTemplate.boundListOps("SeckillGoodsCountList_" + seckillStatus.getGoodsId()).rightPop();
        if(sgood==null){
    
            //清理当前用户的排队信息
            clearQueue(seckillStatus);
            return;
        }

        //时间区间
        String time = seckillStatus.getTime();
        //用户登录名
        String username=seckillStatus.getUsername();
        //用户抢购商品
        Long id = seckillStatus.getGoodsId();

        //获取商品数据
        SeckillGoods goods = (SeckillGoods) redisTemplate.boundHashOps("SeckillGoods_" + time).get(id);

        //如果有库存,则创建秒杀商品订单
        SeckillOrder seckillOrder = new SeckillOrder();
        seckillOrder.setId(idWorker.nextId());
        seckillOrder.setSeckillId(id);
        seckillOrder.setMoney(goods.getCostPrice());
        seckillOrder.setUserId(username);
        seckillOrder.setCreateTime(new Date());
        seckillOrder.setStatus("0");

        //将秒杀订单存入到Redis中
        redisTemplate.boundHashOps("SeckillOrder").put(username,seckillOrder);

        //商品库存-1
        Long surplusCount = redisTemplate.boundHashOps("SeckillGoodsCount").increment(id, -1);//商品数量递减
        goods.setStockCount(surplusCount.intValue());    //根据计数器统计

        //判断当前商品是否还有库存
        if(surplusCount<=0){
    
            //并且将商品数据同步到MySQL中
            seckillGoodsMapper.updateByPrimaryKeySelective(goods);
            //如果没有库存,则清空Redis缓存中该商品
            redisTemplate.boundHashOps("SeckillGoods_" + time).delete(id);
        }else{
    
            //如果有库存,则直数据重置到Reids中
            redisTemplate.boundHashOps("SeckillGoods_" + time).put(id,goods);
        }
        //抢单成功,更新抢单状态,排队->等待支付
        seckillStatus.setStatus(2);
        seckillStatus.setOrderId(seckillOrder.getId());
        seckillStatus.setMoney(seckillOrder.getMoney().floatValue());
        redisTemplate.boundHashOps("UserQueueStatus").put(username,seckillStatus);
    } catch (Exception e) {
    
        e.printStackTrace();
    }
}

/***
 * 清理用户排队信息
 * @param seckillStatus
 */
public void clearQueue(SeckillStatus seckillStatus){
    
    //清理排队标示
    redisTemplate.boundHashOps("UserQueueCount").delete(seckillStatus.getUsername());

    //清理抢单标示
    redisTemplate.boundHashOps("UserQueueStatus").delete(seckillStatus.getUsername());
}

3 订单支付

1557113836465

完成秒杀下订单后,进入支付页面,此时前端会每3秒中向后台发送一次请求用于判断当前用户订单是否完成支付,如果完成了支付,则需要清理掉排队信息,并且需要修改订单状态信息。

3.2 创建支付二维码

下单成功后,会跳转到支付选择页面,在支付选择页面要显示订单编号和订单金额,所以我们需要在下单的时候,将订单金额以及订单编号信息存储到用户查询对象中。

选择微信支付后,会跳转到微信支付页面,微信支付页面会根据用户名查看用户秒杀订单,并根据用户秒杀订单的ID创建预支付信息并获取二维码信息,展示给用户看,此时页面每3秒查询一次支付状态,如果支付成功,需要修改订单状态信息。

3.2.1 回显订单号、金额

下单后,进入支付选择页面,需要显示订单号和订单金额,所以需要在用户下单后将该数据传入到pay.html页面,所以查询订单状态的时候,需要将订单号和金额封装到查询的信息中,修改查询订单装的方法加入他们即可。

修改SeckillOrderController的queryStatus方法,代码如下:

1558789543285

上图代码如下:

return new Result(true,seckillStatus.getStatus(),"抢购状态",seckillStatus);

使用Postman测试,效果如下:

http://localhost:18084/seckill/order/query

1558847001814

3.2.2 创建二维码

用户创建二维码,可以先查询用户的秒杀订单抢单信息,然后再发送请求到支付微服务中创建二维码,将订单编号以及订单对应的金额传递到支付微服务:/weixin/pay/create/native

使用Postman测试效果如下:

http://localhost:9022/weixin/pay/create/native?outtradeno=1132510782836314112&money=1

1558847312481

3.3 支付流程分析

1558832314454

如上图,步骤分析如下:

1.用户抢单,经过秒杀系统实现抢单,下单后会将向MQ发送一个延时队列消息,包含抢单信息,延时半小时后才能监听到
2.秒杀系统同时启用延时消息监听,一旦监听到订单抢单信息,判断Redis缓存中是否存在订单信息,如果存在,则回滚
3.秒杀系统还启动支付回调信息监听,如果支付完成,则将订单吃句话到MySQL,如果没完成,清理排队信息回滚库存
4.每次秒杀下单后调用支付系统,创建二维码,如果用户支付成功了,微信系统会将支付信息发送给支付系统指定的回调地址,支付系统收到信息后,将信息发送给MQ,第3个步骤就可以监听到消息了。

3.4 支付回调更新

支付回调这一块代码已经实现了,但之前实现的是订单信息的回调数据发送给MQ,指定了对应的队列,不过现在需要实现的是秒杀信息发送给指定队列,所以之前的代码那块需要动态指定队列。

3.4.1 支付回调队列指定

关于指定队列如下:

1.创建支付二维码需要指定队列
2.回调地址回调的时候,获取支付二维码指定的队列,将支付信息发送到指定队列中

在微信支付统一下单API中,有一个附加参数,如下:

attach:附加数据,String(127),在查询API和支付通知中原样返回,可作为自定义参数使用。

我们可以在创建二维码的时候,指定该参数,该参数用于指定回调支付信息的对应队列,每次回调的时候,会获取该参数,然后将回调信息发送到该参数对应的队列去。

3.4.1.1 改造支付方法

修改支付微服务的WeixinPayController的createNative方法,代码如下:

1558839099792

修改支付微服务的WeixinPayService的createNative方法,代码如下:

1558839160020

修改支付微服务的WeixinPayServiceImpl的createNative方法,代码如下:

1558839263213

我们创建二维码的时候,需要将下面几个参数传递过去

username:用户名,可以根据用户名查询用户排队信息
outtradeno:商户订单号,下单必须
money:支付金额,支付必须
queue:队列名字,回调的时候,可以知道将支付信息发送到哪个队列

修改WeixinPayApplication,添加对应队列以及对应交换机绑定,代码如下:

@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients
public class WeixinPayApplication {
    

    public static void main(String[] args) {
    
        SpringApplication.run(WeixinPayApplication.class,args);
    }

    @Autowired
    private Environment env;


    /***
     * 创建DirectExchange交换机
     * @return
     */
    @Bean
    public DirectExchange basicExchange(){
    
        return new DirectExchange(env.getProperty("mq.pay.exchange.order"), true,false);
    }

    /***
     * 创建队列
     * @return
     */
    @Bean(name = "queueOrder")
    public Queue queueOrder(){
    
        return new Queue(env.getProperty("mq.pay.queue.order"), true);
    }

    /***
     * 创建秒杀队列
     * @return
     */
    @Bean(name = "queueSeckillOrder")
    public Queue queueSeckillOrder(){
    
        return new Queue(env.getProperty("mq.pay.queue.seckillorder"), true);
    }

    /****
     * 队列绑定到交换机上
     * @return
     */
    @Bean
    public Binding basicBindingOrder(){
    
        return BindingBuilder
                .bind(queueOrder())
                .to(basicExchange())
                .with(env.getProperty("mq.pay.routing.orderkey"));
    }

    /****
     * 队列绑定到交换机上
     * @return
     */
    @Bean
    public Binding basicBindingSeckillOrder(){
    
        return BindingBuilder
                .bind(queueSeckillOrder())
                .to(basicExchange())
                .with(env.getProperty("mq.pay.routing.seckillorderkey"));
    }
}

修改application.yml,添加如下配置

#位置支付交换机和队列
mq:
  pay:
    exchange:
      order: exchange.order
      seckillorder: exchange.seckillorder
    queue:
      order: queue.order
      seckillorder: queue.seckillorder
    routing:
      key: queue.order
      seckillkey: queue.seckillorder
3.4.1.2 测试

使用Postman创建二维码测试

http://localhost:18092/weixin/pay/create/native?username=szitheima&out_trade_no=1132510782836314121&total_fee=1&queue=queue.seckillorder&routingkey=queue.seckillorder&exchange=exchange.seckillorder

1558848297437

以后每次支付,都需要带上对应的参数,包括前面的订单支付。

3.4.1.3 改造支付回调方法

修改com.changgou.pay.controller.WeixinPayController的notifyUrl方法,获取自定义参数,并转成Map,获取queue地址,并将支付信息发送到绑定的queue中,代码如下:

1567738035562

3.4.2 支付状态监听

支付状态通过回调地址发送给MQ之后,我们需要在秒杀系统中监听支付信息,如果用户已支付,则修改用户订单状态,如果支付失败,则直接删除订单,回滚库存。

在秒杀工程中创建com.changgou.seckill.consumer.SeckillOrderPayMessageListener,实现监听消息,代码如下:

@Component
@RabbitListener(queues = "${mq.pay.queue.seckillorder}")
public class SeckillOrderPayMessageListener {
    


    /**
     * 监听消费消息
     * @param message
     */
    @RabbitHandler
    public void consumeMessage(@Payload String message){
    
        System.out.println(message);
        //将消息转换成Map对象
        Map<String,String> resultMap = JSON.parseObject(message,Map.class);
        System.out.println("监听到的消息:"+resultMap);
    }
}

修改SeckillApplication创建对应的队列以及绑定对应交换机。

@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients
@MapperScan(basePackages = {
    "com.changgou.seckill.dao"})
@EnableScheduling
@EnableAsync
public class SeckillApplication {
    


    public static void main(String[] args) {
    
        SpringApplication.run(SeckillApplication.class,args);
    }

    @Bean
    public IdWorker idWorker(){
    
        return new IdWorker(1,1);
    }

    @Autowired
    private Environment env;


    /***
     * 创建DirectExchange交换机
     * @return
     */
    @Bean
    public DirectExchange basicExchange(){
    
        return new DirectExchange(env.getProperty("mq.pay.exchange.order"), true,false);
    }

    /***
     * 创建队列
     * @return
     */
    @Bean(name = "queueOrder")
    public Queue queueOrder(){
    
        return new Queue(env.getProperty("mq.pay.queue.order"), true);
    }

    /***
     * 创建秒杀队列
     * @return
     */
    @Bean(name = "queueSeckillOrder")
    public Queue queueSeckillOrder(){
    
        return new Queue(env.getProperty("mq.pay.queue.seckillorder"), true);
    }

    /****
     * 队列绑定到交换机上
     * @return
     */
    @Bean
    public Binding basicBindingOrder(){
    
        return BindingBuilder
                .bind(queueOrder())
                .to(basicExchange())
                .with(env.getProperty("mq.pay.routing.orderkey"));
    }


    /****
     * 队列绑定到交换机上
     * @return
     */
    @Bean
    public Binding basicBindingSeckillOrder(){
    
        return BindingBuilder
                .bind(queueSeckillOrder())
                .to(basicExchange())
                .with(env.getProperty("mq.pay.routing.seckillorderkey"));
    }
}

修改application.yml文件,添加如下配置:

#位置支付交换机和队列
mq:
  pay:
    exchange:
      order: exchange.order
      seckillorder: exchange.seckillorder
    queue:
      order: queue.order
      seckillorder: queue.seckillorder
    routing:
      key: queue.order
      seckillkey: queue.seckillorder
3.4.3 修改订单状态

监听到支付信息后,根据支付信息判断,如果用户支付成功,则修改订单信息,并将订单入库,删除用户排队信息,如果用户支付失败,则删除订单信息,回滚库存,删除用户排队信息。

3.4.3.1 业务层

修改SeckillOrderService,添加修改订单方法,代码如下

/***
 * 更新订单状态
 * @param out_trade_no
 * @param transaction_id
 * @param username
 */
void updatePayStatus(String out_trade_no, String transaction_id,String username);

修改SeckillOrderServiceImpl,添加修改订单方法实现,代码如下:

/***
 * 更新订单状态
 * @param out_trade_no
 * @param transaction_id
 * @param username
 */
@Override
public void updatePayStatus(String out_trade_no, String transaction_id,String username) {
    
    //订单数据从Redis数据库查询出来
    SeckillOrder seckillOrder = (SeckillOrder) redisTemplate.boundHashOps("SeckillOrder").get(username);
    //修改状态
    seckillOrder.setStatus("1");

    //支付时间
    seckillOrder.setPayTime(new Date());
    //同步到MySQL中
    seckillOrderMapper.insertSelective(seckillOrder);

    //清空Redis缓存
    redisTemplate.boundHashOps("SeckillOrder").delete(username);

    //清空用户排队数据
    redisTemplate.boundHashOps("UserQueueCount").delete(username);

    //删除抢购状态信息
    redisTemplate.boundHashOps("UserQueueStatus").delete(username);
}
3.4.3.2 修改订单对接

修改微信支付状态监听的代码,当用户支付成功后,修改订单状态,也就是调用上面的方法,代码如下:

1558839807871

3.4.4 删除订单回滚库存

如果用户支付失败,我们需要删除用户订单数据,并回滚库存。

3.4.4.1 业务层实现

修改SeckillOrderService,创建一个关闭订单方法,代码如下:

/***
 * 关闭订单,回滚库存
 */
void closeOrder(String username);

修改SeckillOrderServiceImpl,创建一个关闭订单实现方法,代码如下:

/***
 * 关闭订单,回滚库存
 * @param username
 */
@Override
public void closeOrder(String username) {
    
    //将消息转换成SeckillStatus
    SeckillStatus seckillStatus = (SeckillStatus) redisTemplate.boundHashOps("UserQueueStatus").get(username);
    //获取Redis中订单信息
    SeckillOrder seckillOrder = (SeckillOrder) redisTemplate.boundHashOps("SeckillOrder").get(username);

    //如果Redis中有订单信息,说明用户未支付
    if(seckillStatus!=null && seckillOrder!=null){
    
        //删除订单
        redisTemplate.boundHashOps("SeckillOrder").delete(username);
        //回滚库存
        //1)从Redis中获取该商品
        SeckillGoods seckillGoods = (SeckillGoods) redisTemplate.boundHashOps("SeckillGoods_"+seckillStatus.getTime()).get(seckillStatus.getGoodsId());

        //2)如果Redis中没有,则从数据库中加载
        if(seckillGoods==null){
    
            seckillGoods = seckillGoodsMapper.selectByPrimaryKey(seckillStatus.getGoodsId());
        }

        //3)数量+1  (递增数量+1,队列数量+1)
        Long surplusCount = redisTemplate.boundHashOps("SeckillGoodsCount").increment(seckillStatus.getGoodsId(), 1);
        seckillGoods.setStockCount(surplusCount.intValue());
        redisTemplate.boundListOps("SeckillGoodsCountList_" + seckillStatus.getGoodsId()).leftPush(seckillStatus.getGoodsId());

        //4)数据同步到Redis中
        redisTemplate.boundHashOps("SeckillGoods_"+seckillStatus.getTime()).put(seckillStatus.getGoodsId(),seckillGoods);

        //清理排队标示
        redisTemplate.boundHashOps("UserQueueCount").delete(seckillStatus.getUsername());

        //清理抢单标示
        redisTemplate.boundHashOps("UserQueueStatus").delete(seckillStatus.getUsername());
    }
}
3.4.4.2 调用删除订单

修改SeckillOrderPayMessageListener,在用户支付失败后调用关闭订单方法,代码如下:

//支付失败,删除订单
seckillOrderService.closeOrder(attachMap.get("username"));
3.4.4.3 测试

使用Postman完整请求创建二维码下单测试一次。

商品ID:1131814854034853888

数量:49

1558851734898

下单:

http://localhost:18084/seckill/order/add?id=1131814854034853888&time=2019052614

下单后,Redis数据

1558851403264

下单查询:

http://localhost:18084/seckill/order/query

创建二维码:

http://localhost:9022/weixin/pay/create/native?username=szitheima&outtradeno=1132530879663575040&money=1&queue=queue.seckillorder

秒杀抢单后,商品数量变化:

1558851165865

支付微服务回调方法控制台:

{
    
	nonce_str=Mnv06RIaIwxzg3bA, 
    code_url=weixin://wxpay/bizpayurl?pr=iTidd5h, 
    appid=wx8397f8696b538317, 
    sign=1436E43FBA8A171D79A9B78B61F0A7AB, 
    trade_type=NATIVE, 
    return_msg=OK, 
    result_code=SUCCESS, 
    mch_id=1473426802, 
    return_code=SUCCESS, 
    prepay_id=wx2614182102123859e3869a853739004200
}
{
    money=1, queue=queue.seckillorder, username=szitheima, outtradeno=1132530879663575040}

订单微服务控制台输出

{
    
    transaction_id=4200000289201905268232990890,
    nonce_str=a1aefe00a9bc4e8bb66a892dba38eb42,
    bank_type=CMB_CREDIT,
    openid=oNpSGwUp-194-Svy3JnVlAxtdLkc,
    sign=56679BC02CC82204635434817C1FCA46,
    fee_type=CNY,
    mch_id=1473426802,
    cash_fee=1,
    out_trade_no=1132530879663575040,
    appid=wx8397f8696b538317,
    total_fee=1,
    trade_type=NATIVE,
    result_code=SUCCESS,
    attach={
    
    "username": "szitheima",
    "outtradeno": "1132530879663575040",
    "money": "1",
    "queue": "queue.seckillorder"
  }, time_end=20190526141849, is_subscribe=N, return_code=SUCCESS
}

附录:

支付微服务application.yml

server:
  port: 9022
spring:
  application:
    name: pay
  main:
    allow-bean-definition-overriding: true
  rabbitmq:
    host: 127.0.0.1 #mq的服务器地址
    username: guest #账号
    password: guest #密码
eureka:
  client:
    service-url:
      defaultZone: http://127.0.0.1:6868/eureka
  instance:
    prefer-ip-address: true
feign:
  hystrix:
    enabled: true
#hystrix 配置
hystrix:
  command:
    default:
      execution:
        timeout:
        #如果enabled设置为false,则请求超时交给ribbon控制
          enabled: true
        isolation:
          strategy: SEMAPHORE

#微信支付信息配置
weixin:
  appid: wx8397f8696b538317
  partner: 1473426802
  partnerkey: T6m9iK73b0kn9g5v426MKfHQH7X8rKwb
  notifyurl: http://2cw4969042.wicp.vip:36446/weixin/pay/notify/url

#位置支付交换机和队列
mq:
  pay:
    exchange:
      order: exchange.order
    queue:
      order: queue.order
      seckillorder: queue.seckillorder
    routing:
      orderkey: queue.order
      seckillorderkey: queue.seckillorder

秒杀微服务application.yml配置

server:
  port: 18084
spring:
  application:
    name: seckill
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/changgou_seckill?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
    username: root
    password: itcast
  rabbitmq:
    host: 127.0.0.1 #mq的服务器地址
    username: guest #账号
    password: guest #密码
  main:
    allow-bean-definition-overriding: true
eureka:
  client:
    service-url:
      defaultZone: http://127.0.0.1:6868/eureka
  instance:
    prefer-ip-address: true
feign:
  hystrix:
    enabled: true
mybatis:
  configuration:
    map-underscore-to-camel-case: true
  mapper-locations: classpath:mapper/*Mapper.xml
  type-aliases-package: com.changgou.seckill.pojo

#hystrix 配置
hystrix:
  command:
    default:
      execution:
        timeout:
        #如果enabled设置为false,则请求超时交给ribbon控制
          enabled: true
        isolation:
          thread:
            timeoutInMilliseconds: 10000
          strategy: SEMAPHORE
#位置支付交换机和队列
mq:
  pay:
    exchange:
      order: exchange.order
    queue:
      order: queue.order
      seckillorder: queue.seckillorder
    routing:
      orderkey: queue.order
      seckillorderkey: queue.seckillorder

4 RabbitMQ延时消息队列

4.1 延时队列介绍

延时队列即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。
那么,为什么需要延迟消费呢?我们来看以下的场景

网上商城下订单后30分钟后没有完成支付,取消订单(如:淘宝、去哪儿网)
系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会
系统中的业务失败之后,需要重试
这些场景都非常常见,我们可以思考,比如第二个需求,系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会。那么一天之中肯定是会有很多个预约的,时间也是不一定的,假设现在有1点 2点 3点 三个预约,如何让系统知道在当前时间等于0点 1点 2点给用户发送信息呢,是不是需要一个轮询,一直去查看所有的预约,比对当前的系统时间和预约提前一小时的时间是否相等呢?这样做非常浪费资源而且轮询的时间间隔不好控制。如果我们使用延时消息队列呢,我们在创建时把需要通知的预约放入消息中间件中,并且设置该消息的过期时间,等过期时间到达时再取出消费即可。

Rabbitmq实现延时队列一般而言有两种形式:
第一种方式:利用两个特性: Time To Live(TTL)、Dead Letter Exchanges(DLX)[A队列过期->转发给B队列]

第二种方式:利用rabbitmq中的插件x-delay-message

4.2 TTL DLX实现延时队列

4.2.1 TTL DLX介绍

TTL
RabbitMQ可以针对队列设置x-expires(则队列中所有的消息都有相同的过期时间)或者针对Message设置x-message-ttl(对消息进行单独设置,每条消息TTL可以不同),来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)

Dead Letter Exchanges(DLX)
RabbitMQ的Queue可以配置x-dead-letter-exchange和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。
x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange

x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送

1557396863944

4.2.3 DLX延时队列实现
4.2.3.1 创建工程

创建springboot_rabbitmq_delay工程,并引入相关依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>changgou_parent</artifactId>
        <groupId>com.changgou</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>springboot_rabbitmq_delay</artifactId>

    <dependencies>
        <!--starter-web-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--加入ampq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <!--测试-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
</project>

application.yml配置

spring:
  application:
    name: springboot-demo
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    password: guest
    username: guest
4.2.3.2 队列创建

创建2个队列,用于接收消息的叫延时队列queue.message.delay,用于转发消息的队列叫queue.message,同时创建一个交换机,代码如下:

@Configuration
public class QueueConfig {
    

    /** 短信发送队列 */
    public static final String QUEUE_MESSAGE = "queue.message";

    /** 交换机 */
    public static final String DLX_EXCHANGE = "dlx.exchange";

    /** 短信发送队列 延迟缓冲(按消息) */
    public static final String QUEUE_MESSAGE_DELAY = "queue.message.delay";

    /**
     * 短信发送队列
     * @return
     */
    @Bean
    public Queue messageQueue() {
    
        return new Queue(QUEUE_MESSAGE, true);
    }

    /**
     * 短信发送队列
     * @return
     */
    @Bean
    public Queue delayMessageQueue() {
    
        return QueueBuilder.durable(QUEUE_MESSAGE_DELAY)
                .withArgument("x-dead-letter-exchange", DLX_EXCHANGE)        // 消息超时进入死信队列,绑定死信队列交换机
                .withArgument("x-dead-letter-routing-key", QUEUE_MESSAGE)   // 绑定指定的routing-key
                .build();
    }

    /***
     * 创建交换机
     * @return
     */
    @Bean
    public DirectExchange directExchange(){
    
        return new DirectExchange(DLX_EXCHANGE);
    }


    /***
     * 交换机与队列绑定
     * @param messageQueue
     * @param directExchange
     * @return
     */
    @Bean
    public Binding basicBinding(Queue messageQueue, DirectExchange directExchange) {
    
        return BindingBuilder.bind(messageQueue)
                .to(directExchange)
                .with(QUEUE_MESSAGE);
    }
}
4.2.3.3 消息监听

创建MessageListener用于监听消息,代码如下:

@Component
@RabbitListener(queues = QueueConfig.QUEUE_MESSAGE)
public class MessageListener {
    


    /***
     * 监听消息
     * @param msg
     */
    @RabbitHandler
    public void msg(@Payload Object msg){
    
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("当前时间:"+dateFormat.format(new Date()));
        System.out.println("收到信息:"+msg);
    }

}
4.2.3.4 创建启动类
@SpringBootApplication
@EnableRabbit
public class SpringRabbitMQApplication {
    

    public static void main(String[] args) {
    
        SpringApplication.run(SpringRabbitMQApplication.class,args);
    }
}
4.2.3.5 测试
@SpringBootTest
@RunWith(SpringRunner.class)
public class RabbitMQTest {
    

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /***
     * 发送消息
     */
    @Test
    public void sendMessage() throws InterruptedException, IOException {
    
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("发送当前时间:"+dateFormat.format(new Date()));
        Map<String,String> message = new HashMap<>();
        message.put("name","szitheima");
        rabbitTemplate.convertAndSend(QueueConfig.QUEUE_MESSAGE_DELAY, message, new MessagePostProcessor() {
    
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
    
                message.getMessageProperties().setExpiration("10000");
                return message;
            }
        });

        System.in.read();
    }
}

其中message.getMessageProperties().setExpiration(“10000”);设置消息超时时间,超时后,会将消息转入到另外一个队列。

测试效果如下:

1558853755945

5 库存回滚(作业)

5.1 秒杀流程回顾

1558832314454

如上图,步骤分析如下:

1.用户抢单,经过秒杀系统实现抢单,下单后会将向MQ发送一个延时队列消息,包含抢单信息,延时半小时后才能监听到
2.秒杀系统同时启用延时消息监听,一旦监听到订单抢单信息,判断Redis缓存中是否存在订单信息,如果存在,则回滚
3.秒杀系统还启动支付回调信息监听,如果支付完成,则将订单吃句话到MySQL,如果没完成,清理排队信息回滚库存
4.每次秒杀下单后调用支付系统,创建二维码,如果用户支付成功了,微信系统会将支付信息发送给支付系统指定的回调地址,支付系统收到信息后,将信息发送给MQ,第3个步骤就可以监听到消息了。

延时队列实现订单关闭回滚库存:

1.创建一个过期队列  Queue1
2.接收消息的队列    Queue2
3.中转交换机
4.监听Queue2
	1)SeckillStatus->检查Redis中是否有订单信息
	2)如果有订单信息,调用删除订单回滚库存->[需要先关闭微信支付]
	3)如果关闭订单时,用于已支付,修改订单状态即可
	4)如果关闭订单时,发生了别的错误,记录日志,人工处理

5.2 关闭支付

用户如果半个小时没有支付,我们会关闭支付订单,但在关闭之前,需要先关闭微信支付,防止中途用户支付。

修改支付微服务的WeixinPayService,添加关闭支付方法,代码如下:

/***
 * 关闭支付
 * @param orderId
 * @return
 */
Map<String,String> closePay(Long orderId) throws Exception;

修改WeixinPayServiceImpl,实现关闭微信支付方法,代码如下:

/***
 * 关闭微信支付
 * @param orderId
 * @return
 * @throws Exception
 */
@Override
public Map<String, String> closePay(Long orderId) throws Exception {
    
    //参数设置
    Map<String,String> paramMap = new HashMap<String,String>();
    paramMap.put("appid",appid); //应用ID
    paramMap.put("mch_id",partner);    //商户编号
    paramMap.put("nonce_str",WXPayUtil.generateNonceStr());//随机字符
    paramMap.put("out_trade_no",String.valueOf(orderId));   //商家的唯一编号

    //将Map数据转成XML字符
    String xmlParam = WXPayUtil.generateSignedXml(paramMap,partnerkey);

    //确定url
    String url = "https://api.mch.weixin.qq.com/pay/closeorder";

    //发送请求
    HttpClient httpClient = new HttpClient(url);
    //https
    httpClient.setHttps(true);
    //提交参数
    httpClient.setXmlParam(xmlParam);

    //提交
    httpClient.post();

    //获取返回数据
    String content = httpClient.getContent();

    //将返回数据解析成Map
    return  WXPayUtil.xmlToMap(content);
}

5.3 关闭订单回滚库存

5.3.1 配置延时队列

在application.yml文件中引入队列信息配置,如下:

#位置支付交换机和队列
mq:
  pay:
    exchange:
      order: exchange.order
    queue:
      order: queue.order
      seckillorder: queue.seckillorder
      seckillordertimer: queue.seckillordertimer
      seckillordertimerdelay: queue.seckillordertimerdelay
    routing:
      orderkey: queue.order
      seckillorderkey: queue.seckillorder

配置队列与交换机,在SeckillApplication中添加如下方法

/**
 * 到期数据队列
 * @return
 */
@Bean
public Queue seckillOrderTimerQueue() {
    
    return new Queue(env.getProperty("mq.pay.queue.seckillordertimer"), true);
}

/**
 * 超时数据队列
 * @return
 */
@Bean
public Queue delaySeckillOrderTimerQueue() {
    
    return QueueBuilder.durable(env.getProperty("mq.pay.queue.seckillordertimerdelay"))
            .withArgument("x-dead-letter-exchange", env.getProperty("mq.pay.exchange.order"))        // 消息超时进入死信队列,绑定死信队列交换机
            .withArgument("x-dead-letter-routing-key", env.getProperty("mq.pay.queue.seckillordertimer"))   // 绑定指定的routing-key
            .build();
}

/***
 * 交换机与队列绑定
 * @return
 */
@Bean
public Binding basicBinding() {
    
    return BindingBuilder.bind(seckillOrderTimerQueue())
            .to(basicExchange())
            .with(env.getProperty("mq.pay.queue.seckillordertimer"));
}
5.3.2 发送延时消息

修改MultiThreadingCreateOrder,添加如下方法:

/***
 * 发送延时消息到RabbitMQ中
 * @param seckillStatus
 */
public void sendTimerMessage(SeckillStatus seckillStatus){
    
    rabbitTemplate.convertAndSend(env.getProperty("mq.pay.queue.seckillordertimerdelay"), (Object) JSON.toJSONString(seckillStatus), new MessagePostProcessor() {
    
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
    
            message.getMessageProperties().setExpiration("10000");
            return message;
        }
    });
}

在createOrder方法中调用上面方法,如下代码:

//发送延时消息到MQ中
sendTimerMessage(seckillStatus);
5.3.3 库存回滚

创建SeckillOrderDelayMessageListener实现监听消息,并回滚库存,代码如下:

@Component
@RabbitListener(queues = "${mq.pay.queue.seckillordertimer}")
public class SeckillOrderDelayMessageListener {
    

    @Autowired
    private RedisTemplate redisTemplate;

    @Autowired
    private SeckillOrderService seckillOrderService;

    @Autowired
    private WeixinPayFeign weixinPayFeign;

    /***
     * 读取消息
     * 判断Redis中是否存在对应的订单
     * 如果存在,则关闭支付,再关闭订单
     * @param message
     */
    @RabbitHandler
    public void consumeMessage(@Payload String message){
    
        //读取消息
        SeckillStatus seckillStatus = JSON.parseObject(message,SeckillStatus.class);

        //获取Redis中订单信息
        String username = seckillStatus.getUsername();
        SeckillOrder seckillOrder = (SeckillOrder) redisTemplate.boundHashOps("SeckillOrder").get(username);

        //如果Redis中有订单信息,说明用户未支付
        if(seckillOrder!=null){
    
            System.out.println("准备回滚---"+seckillStatus);
            //关闭支付
            Result closeResult = weixinPayFeign.closePay(seckillStatus.getOrderId());
            Map<String,String> closeMap = (Map<String, String>) closeResult.getData();

            if(closeMap!=null && closeMap.get("return_code").equalsIgnoreCase("success") &&
                    closeMap.get("result_code").equalsIgnoreCase("success") ){
    
                //关闭订单
                seckillOrderService.closeOrder(username);
            }
        }
    }
}

ent
@RabbitListener(queues = “${mq.pay.queue.seckillordertimer}”)
public class SeckillOrderDelayMessageListener {

@Autowired
private RedisTemplate redisTemplate;

@Autowired
private SeckillOrderService seckillOrderService;

@Autowired
private WeixinPayFeign weixinPayFeign;

/***
 * 读取消息
 * 判断Redis中是否存在对应的订单
 * 如果存在,则关闭支付,再关闭订单
 * @param message
 */
@RabbitHandler
public void consumeMessage(@Payload String message){
    //读取消息
    SeckillStatus seckillStatus = JSON.parseObject(message,SeckillStatus.class);

    //获取Redis中订单信息
    String username = seckillStatus.getUsername();
    SeckillOrder seckillOrder = (SeckillOrder) redisTemplate.boundHashOps("SeckillOrder").get(username);

    //如果Redis中有订单信息,说明用户未支付
    if(seckillOrder!=null){
        System.out.println("准备回滚---"+seckillStatus);
        //关闭支付
        Result closeResult = weixinPayFeign.closePay(seckillStatus.getOrderId());
        Map<String,String> closeMap = (Map<String, String>) closeResult.getData();

        if(closeMap!=null && closeMap.get("return_code").equalsIgnoreCase("success") &&
                closeMap.get("result_code").equalsIgnoreCase("success") ){
            //关闭订单
            seckillOrderService.closeOrder(username);
        }
    }
}

}


版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_44762676/article/details/115921709

智能推荐

Docker 快速上手学习入门教程_docker菜鸟教程-程序员宅基地

文章浏览阅读2.5w次,点赞6次,收藏50次。官方解释是,docker 容器是机器上的沙盒进程,它与主机上的所有其他进程隔离。所以容器只是操作系统中被隔离开来的一个进程,所谓的容器化,其实也只是对操作系统进行欺骗的一种语法糖。_docker菜鸟教程

电脑技巧:Windows系统原版纯净软件必备的两个网站_msdn我告诉你-程序员宅基地

文章浏览阅读5.7k次,点赞3次,收藏14次。该如何避免的,今天小编给大家推荐两个下载Windows系统官方软件的资源网站,可以杜绝软件捆绑等行为。该站提供了丰富的Windows官方技术资源,比较重要的有MSDN技术资源文档库、官方工具和资源、应用程序、开发人员工具(Visual Studio 、SQLServer等等)、系统镜像、设计人员工具等。总的来说,这两个都是非常优秀的Windows系统镜像资源站,提供了丰富的Windows系统镜像资源,并且保证了资源的纯净和安全性,有需要的朋友可以去了解一下。这个非常实用的资源网站的创建者是国内的一个网友。_msdn我告诉你

vue2封装对话框el-dialog组件_<el-dialog 封装成组件 vue2-程序员宅基地

文章浏览阅读1.2k次。vue2封装对话框el-dialog组件_

MFC 文本框换行_c++ mfc同一框内输入二行怎么换行-程序员宅基地

文章浏览阅读4.7k次,点赞5次,收藏6次。MFC 文本框换行 标签: it mfc 文本框1.将Multiline属性设置为True2.换行是使用"\r\n" (宽字符串为L"\r\n")3.如果需要编辑并且按Enter键换行,还要将 Want Return 设置为 True4.如果需要垂直滚动条的话将Vertical Scroll属性设置为True,需要水平滚动条的话将Horizontal Scroll属性设_c++ mfc同一框内输入二行怎么换行

redis-desktop-manager无法连接redis-server的解决方法_redis-server doesn't support auth command or ismis-程序员宅基地

文章浏览阅读832次。检查Linux是否是否开启所需端口,默认为6379,若未打开,将其开启:以root用户执行iptables -I INPUT -p tcp --dport 6379 -j ACCEPT如果还是未能解决,修改redis.conf,修改主机地址:bind 192.168.85.**;然后使用该配置文件,重新启动Redis服务./redis-server redis.conf..._redis-server doesn't support auth command or ismisconfigured. try

实验四 数据选择器及其应用-程序员宅基地

文章浏览阅读4.9k次。济大数电实验报告_数据选择器及其应用

随便推点

灰色预测模型matlab_MATLAB实战|基于灰色预测河南省社会消费品零售总额预测-程序员宅基地

文章浏览阅读236次。1研究内容消费在生产中占据十分重要的地位,是生产的最终目的和动力,是保持省内经济稳定快速发展的核心要素。预测河南省社会消费品零售总额,是进行宏观经济调控和消费体制改变创新的基础,是河南省内人民对美好的全面和谐社会的追求的要求,保持河南省经济稳定和可持续发展具有重要意义。本文建立灰色预测模型,利用MATLAB软件,预测出2019年~2023年河南省社会消费品零售总额预测值分别为21881...._灰色预测模型用什么软件

log4qt-程序员宅基地

文章浏览阅读1.2k次。12.4-在Qt中使用Log4Qt输出Log文件,看这一篇就足够了一、为啥要使用第三方Log库,而不用平台自带的Log库二、Log4j系列库的功能介绍与基本概念三、Log4Qt库的基本介绍四、将Log4qt组装成为一个单独模块五、使用配置文件的方式配置Log4Qt六、使用代码的方式配置Log4Qt七、在Qt工程中引入Log4Qt库模块的方法八、获取示例中的源代码一、为啥要使用第三方Log库,而不用平台自带的Log库首先要说明的是,在平时开发和调试中开发平台自带的“打印输出”已经足够了。但_log4qt

100种思维模型之全局观思维模型-67_计算机中对于全局观的-程序员宅基地

文章浏览阅读786次。全局观思维模型,一个教我们由点到线,由线到面,再由面到体,不断的放大格局去思考问题的思维模型。_计算机中对于全局观的

线程间控制之CountDownLatch和CyclicBarrier使用介绍_countdownluach于cyclicbarrier的用法-程序员宅基地

文章浏览阅读330次。一、CountDownLatch介绍CountDownLatch采用减法计算;是一个同步辅助工具类和CyclicBarrier类功能类似,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。二、CountDownLatch俩种应用场景: 场景一:所有线程在等待开始信号(startSignal.await()),主流程发出开始信号通知,既执行startSignal.countDown()方法后;所有线程才开始执行;每个线程执行完发出做完信号,既执行do..._countdownluach于cyclicbarrier的用法

自动化监控系统Prometheus&Grafana_-自动化监控系统prometheus&grafana实战-程序员宅基地

文章浏览阅读508次。Prometheus 算是一个全能型选手,原生支持容器监控,当然监控传统应用也不是吃干饭的,所以就是容器和非容器他都支持,所有的监控系统都具备这个流程,_-自动化监控系统prometheus&grafana实战

React 组件封装之 Search 搜索_react search-程序员宅基地

文章浏览阅读4.7k次。输入关键字,可以通过键盘的搜索按钮完成搜索功能。_react search