Redis支付交易系统应用实践

https://mp.weixin.qq.com/s/jAc9T0R5x5r4cQaWtzoafA

前言

在服务端业务中,当我们想要以准实时的速度访问快速变动的数据流时,Redis这样的键值数据库是非常好选择。通过字符串、列表等多种数据类型,Redis对键值对模式进行了扩展,它既提供了极其快速的内存数据集操作,又可以在运行时轻松地将这些数据持久化到磁盘上面,本文主要介绍了一些在网易支付的业务场景中Redis的使用。

1.Redis介绍

1.1 Redis简介

Redis 是完全开源的,遵守 BSD 协议,是一个高性能的 key-value 数据库。

Redis与其他 key – value 缓存产品相比,具有高性能、持久化、数据结构、原子操作、多语言支持、主从复制等特性,详细的说明可以参考官方文档:https://redis.io/docs/

Redis数据结构

图片

Redis的数据结构大致有以上几种:

String(字符串):字符串,最简单的 key-value 类型,可以保存字符串、整数和浮点数

List(链表):一个链表结构,每一个节都是一个字符串,可以从链表的两侧插入数据或者进行遍历。

Set(集合):基于Hash实现的一个Set,原理与Java中的set类似。set中的元素是无序的,在它里而每一个元素都是一个字符串,而且是独一无二,各不相同的。

HASH(哈希散列表) :类似于Java中的Map,是一个键值对应的无序列表

ZSet(有序集合):在Set的基础上,给value中的每个字符串关联了一个score属性,即得分。Zset通过计算得分,将字符串进行从小到大的排序。

1.2 选择Redis的理由

1.速度快,Redis 将所有数据集存储在内存中,根据官方文档的说法,可以在入门级 Linux 机器中每秒写(SET)11 万次,读(GET)8.1 万次。

2.数据结构丰富,相比Memcached所有的值均是简单的字符串,redis支持更为丰富的数据类型,方便实现各种不同的功能。

3.数据持久化的能力,可以使用灵活的策略将更改异步保存在磁盘上。

4.可靠性好,提供哨兵、Cluster等机制保障Redis集群的高可用性。

5.社区生态完善,有比较多的客户端和扩展,方便实现各种需求。

1.3 Redis常见客户端简介

客户端 简介 优点 缺点
Jedis Redis的Java实现客户端,提供了比较全面的Redis命令的支持 支持全面的 Redis 操作特性 使用阻塞的 I/O,不支持异步;不是线程安全的
Redisson 架设在Redis基础上,为使用者提供了一系列具有分布式特性的常用工具类,具有协调分布式多机多线程并发系统的能力,同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间的协作。 提供很多分布式相关操作服务是线程安全的 对字符串的操作支持比较差
Lettuce 高级Redis客户端,用于线程安全同步,异步和响应使用,支持集群,Sentinel,管道和编码器。 支持同步异步通信模式是线程安全的 对pipeline行为不友好

1.4 Redis高可用

对于线上业务的依赖来说,除了满足业务的需求,可用性也是必须要考虑的因素,Redis的可靠性以来以下两种方式。

  • 主从复制数据。
  • 哨兵监控数据节点的运行情况,当主节点出现问题可以支持由从节点顶上继续进行服务。

1.4.1 主从复制

redis 2.8版本以上使用psync命令完成同步,支持全量复制与部分复制

图片

psync命令流程

  • 如果从服务器之前没有复制过任何主服务器,或者之前执行过slaveof no one命令,那么从服务器就会向主服务器发送psync ? -1命令,请求主服务器进行数据的全量同步。
  • 否则,如果前面从服务器已经同步过部分数据,那么从服务器向主服务器发送psync 命令,其中runid是上一次主服务器的运行id,offset是当前从服务器的复制偏移量。

前面两种情况主服务器收到psync命令之后,会出现以下三种可能:

  • 主服务器返回+fullresync 回复,表示主服务器要求与从服务器进行完整的数据全量同步操作。其中,runid是当前主服务器运行id,而offset是当前主服务器的复制偏移量。
  • 如果主服务器应答+continue,那么表示主服务器与从服务器进行部分数据同步操作,将从服务器缺失的数据同步过来即可。
  • 如果主服务器应答-err,那么表示主服务器版本低于2.8,识别不了psync命令,此时从服务器将向主服务器发送sync命令,执行完整的全量数据同步。

1.4.2 哨兵Sentinel

Redis Sentinel是一个分布式架构,包含若干个Sentinel节点和Redis数据节点,每个Sentinel节点会对数据节点和其余Sentinel节点进行监控,当发现节点不可达时,会对节点做下线标识。如果被标识的是主节点,他还会选择和其他Sentinel节点进行“协商”,当大多数的Sentinel节点都认为主节点不可达时,他们会选举出一个Sentinel节点来完成自动故障转移工作,同时将这个变化通知给Redis应用方。

1.4.3 支付交易中的Redis高可用

网易支付中的Redis架构部署如下图所示,采用Redis数据节点采用双机房主从部署,yq作为主机房部署一主一丛,机房bj部署一丛,主Redis通过psync命令同步数据到从Redis异地。

哨兵集群负责监控Redis集群,哨兵集群节点间,以及哨兵节点到Redis数据节点通过ping命令做心跳探测进行监控。

图片

2.Redis在支付交易业务中的应用

2.1 App客户端消息推送

2.1.1 需求背景

网易支付的一些业务执行完成后,会有一些业务相关的提醒内容,想要在用户打开app时通知到客户,比如:支付成功后的支付成功消息、提现成功后的提现成功消息、退款成功后的退款成功消息等等,除了业务完成后推送消息,还有一种业务需求是业务未完成推送消息,例如,用户想通过人脸找回密码,但是人脸失败了,对于这样的情景,我们需要能进行识别,并把快速找回密码的入口推送到用户的app上,方便用户进行密码找回。除此之外还有运营推送的相关消息,如优惠活动消息等。

2.1.2 需求分析

基于以上的场景,可以归纳为:

1.通过一个合适的数据结构,实现业务消息的快速存入,并要支持过期机制,防止消息数量无限膨胀。

2.当app侧进行拉取时,支持用户维度的快速拉取消息。

3.针对并发的场景,需要支持幂等控制,防止消息重复写入

4.针对业务未完成的推送消息,需要额外考虑数据结构做处理。

2.1.3 方案概述

经过分析,设计得到对应的存储数据结构如下:

1.以账号为维度zset,在该ZSET中,保存的以消息Id作为key,以消息创建时间作为value的数据记录,所有数据按创建时间排列。App拉取消息时,只要拉取当前账号对应的队列中的消息id即可。

2.全局set,以消息Id为key,以消息实际内容为value的set,当从前面账号维度的zset拉取到消息id后,就可以直接从当前zset中获取消息内容,返回app端。

3.全局set,以当前业务类型bizType + 用于幂等的外部id为key ,以消息id为value的set,该set的作用主要是幂等控制,防止消息的重复写入。

图片

2.1.3.1 消息写入的流程

当用户ACCOUNT1有新的支付成功消息产生,消息的创建时间为10020,消息内容为:

{ “title”: “支付”, “contentTemplate”: “¥xxx 支付成功”, “url”: “xxx://xxx.com/detail?orderId=xxx”, “logoUrl”: “https://xxx.com/message_111.png”, “isNeedPush”: false }

则此时bizType为“pay”,外部业务id为“outerBizId6”,那么写入流程为:

1.判断set2中是否已存在key为”pay_outerBizId6″的记录,如果存在,则说明当前业务已经写入过,不再重复写入

2.在set1中写入key为“msgId1”,value为对应消息内容的json串。

3.若当前业务没有写入过,则生成新的msgId,假设为msgId6,在set2中写入key为“pay_outerBizId6”,value为“msgId6”的记录

4.在名为“msgCenter_ACCOUNT1“的zset”中写入key为msgId6,value为10020的记录。

写入成功后的数据如下:

图片

2.1.3.2 App端拉取消息的流程

仍以用户ACCOUNT1为例,假设当前用户App上最后一条消息为消息MsgId1,那么当App拉取最近的消息时,执行过程如下:

1.向服务端请求最新的消息,传入参数为当前账号,以及最后一条接收到的消息MsgId1,若不传,则代表需要全量拉取。

2.服务端接收到请求后,从当前账号的消息zset:msgCenter_ACCOUNT1执行zrange,得到MsgId1在队列中的index,再把当前index+1,若找不到则index=0。

3.再对当前zset执行zrange方法,起始位置为index,结束值为-1,代表取从index开始的所有数据,拿到需要拉取的所有消息Id

4.从存储消息内容的全局set中,根据消息id,依次拉取消息内容,返回客户端。

图片

2.1.3.3 未完成业务场景的处理

前面介绍的主要是业务完成后消息的处理方案,在实际的业务需求中,还有一种针对业务未完成的消息提醒,比如,用户忘记了密码,在pc端进行了一些找回密码的操作,比如短信密码找回、银行卡找回密码等,但是可能因为种种原因,如手机号注销了、银行卡找不到了等导致找回失败了,或者仅仅是因为用户嫌麻烦放弃了找回。对这种情况,我们需要能进行识别并发送app消息,消息内容是人脸识别快速找回的路径,帮助用户在app上快速找回密码。

对这类业务,采用的方案如下:

1.设计了一个全局的zset,key为一系列关键信息的组合,包括业务类型、用户信息等,value为用户开始执行当前业务的起始时间,以找回密码为例,这里开始的时间为用户点击找回密码功能的时间。

2.若用户完成了找回密码,则在相关业务执行完成后,服务端会执行zrem操作,从zset中删除之前写入的记录。

3.若用户没有完成,则zset中的记录得以保留。

4.我们假设如果一项操作在n分钟以后仍然没有完成,就代表用户的这项操作失败了。会有一个调度任务,轮询当前的zset,取n分钟以前创建的记录,封装成消息以后进行发送。这里的n可以根据配置设定。

5.zset中的key是基于用户、操作维度控制的,因此重复一项操作只能写入一个记录,不会造成重复发送多条消息的情况。

2.2 支付、下单业务分布式锁

2.2.1 需求背景

2.2.1.1 主次单模型

网易支付中的订单模型存在一种主次单模型,主单是商户下单时生成的订单,次单是基于主单,以及商户的支付相关策略生成的用于在网易支付收银台支付的支付订单,两者通过一张关联表进行关联。

图片

在这种订单模式下,当生成次订单时,我们就需要通过分布式锁锁定主单,防止次单的并发重复生成。

2.2.1.2 支付模型

在网易支付中的订单,用户发起支付时,都需要先创建支付方案。支付方案创建时,都有明确的支付指令集合,组合支付时有多条不同的支付指令。支付指令是支付能力的抽象,每一条支付指令即表示用户将使用某种支付方式,如快捷、余额、网银等。

任意支付指令可以进行组合支付,其支付顺序可自定义,如 快捷 +余额 + 积分,可按以下顺序进行支付 余额 -> 积分 ->快捷,也可逆序完成。

图片

不难发现,在这个支付模型中,同一个支付指令存在很多被并发调用的场景。比如一笔订单,同一个支付方案的并发请求,前一个支付指令完成后并发触发了后一个指令的支付等,为了防止对同一支付方案的并发支付请求,我们使用基于分布式锁进行控制。

2.2.2 分布式锁方案选型

分布式锁目前比较流行的实现方式有三种:

1.基于数据库实现分布式锁,利用select … where … for update 排他锁来实现

2.基于缓存(Redis等)实现分布式锁

3.基于Zookeeper实现分布式锁

三种方案的比较如下:

方式 优点 缺点
数据库 直接使用数据库,方便简单 db操作性能较差,并且有锁表的风险非阻塞操作失败后,需要轮询,占用cpu资源;
Redis 基于Redis单线程处理,性能高 锁删除失败 过期时间不好控制非阻塞,操作失败后,需要轮询,占用cpu资源;
Zookeeper 可靠性高不存在redis的超时,数据同步 性能不如redis

由于我们下单、支付业务对于性能的要求比较高,并且本身业务逻辑、数据库表设计上也有幂等防并发的控制,并不一定需要特别高的可靠性,因此综合考虑下来,我们选择Redis分布式锁的方案来实现,Redisson的分布式锁是基于Redis的现成实现方案,因此直接引入Redisson分布式锁来实现业务需求。

2.2.3 Redisson分布式锁原理

Redisson的分布式锁依赖redis,本质上是通过执行一段原子性的lua脚本,由CommandAsyncExecutor执行,这个锁最终持久化到redis,使用的是hset的key,field,value。通过lua脚本的结果,判断是否获得锁。核心类为RedissonLock,该类实现了java.util.concurrent.locks.Lock接口,从而符合了Lock的标准。RedissonLock是支持重入的锁,对于同一个线程,支持重复获得当前的锁。

2.2.3.1 加锁原理

假设当前要获取的key名为pay_lock_123,则在redisson的加锁流程如下:

1.当有加锁请求时,其会获取当前的线程号threadId,首先通过exist检查当前获取的key对应的hash:pay_lock_123是否存在,如果不存在,那么说明当前线程可以获取当前的key,直接hset设置一个pay_lock_123的hash,设置一个field:threadId,value为1,代表当前这个锁已经占用,由threadId占用,并只有一次重入。

2.若获得了hash:pay_lock_123,则说明锁已经占用了,再通过hexist检查hash中是否存在threadId(value是否匹配),如果存在,则说明是当前线程重入的场景,将对应的计数再+1

3.若value不匹配,说明当前的锁不是当前线程获取的,那么就返回key的剩余时间,代表获取锁失败。

图片

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              "if (redis.call('exists', KEYS[1]) == 0) then " +  //情况1,检查当前锁是否已经存在,若不存在
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +  //则执行hset,获取当前锁,设置重入计数为1
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +  //设置过期时间
                  "return nil; " +                               //nil,表示获得锁
              "end; " +
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + //情况2,当前锁已存在
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +        //将计数+1
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +           //设置过期时间
                  "return nil; " +                                        //nil,表示获得锁
              "end; " +
              "return redis.call('pttl', KEYS[1]);",                       //情况3,获取不到分布式锁,则返回锁的过期时间
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

2.2.3.2 解锁原理

假设当前要获取的key名为pay_lock_123,则在redisson的解锁流程如下:

1.当有解锁锁请求时,其会获取当前的线程号threadId,首先通过exist检查当前获取的key对应的hash:pay_lock_123是否存在,如果不存在,则直接广播锁释放信息,结果流程。

2.若获得了hash:pay_lock_123,则说明锁已经占用了,再通过hexist检查hash中是否存在threadId(value是否匹配),如果匹配,则说明是当前线程重入的场景,将对应的计数再-1。当重入为0时,代表锁可以释放,此时删除pay_lock_123的hash,并广播锁释放信息,代表锁已经释放。

3.持有计数仍旧>0,重新设置过期时间。

图片

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('exists', KEYS[1]) == 0) then " +  //锁不存在
                "redis.call('publish', KEYS[2], ARGV[1]); " +  //直接广播锁释放消息
                "return 1; " +                                 //返回成功
            "end;" +
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +  //分布式锁未被当前线程持有
                "return nil;" +                                          //表示解锁失败。
            "end; " +
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + //持有锁数量减1
            "if (counter > 0) then " +                                        //持有计数大于1的情况
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +                //重设过期时间
                "return 0; " +
            "else " +                                                       //删除当前锁,锁释放成功
                "redis.call('del', KEYS[1]); " +                            
                "redis.call('publish', KEYS[2], ARGV[1]); " +               //发布锁释放广播
                "return 1; "+
            "end; " +
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

2.2.3.3 等待锁原理

Resisson分布式锁的等待是通过 Redis 的 channel 订阅来实现的,如果等待的过程中一直未等到锁的释放事件通知,当超过最大等待时间则获取锁失败,如果等到了锁的释放事件的通知,则会开始进入一个不断重试获取锁的循环。

循环中每次都先试着获取锁,并得到已存在的锁的剩余存活时间。如果在重试中拿到了锁,则直接返回。如果锁当前还是被占用的,那么等待释放锁的消息,具体实现使用了JDK的信号量 Semaphore 来阻塞线程,当锁释放并发布释放锁的消息后,信号量的 release() 方法会被调用,此时被信号量阻塞的等待队列中的一个线程就可以继续尝试获取锁了。

这里有一个分布式锁的关键点:当锁正在被占用时,等待获取锁的进程并不是通过一个 while(true) 死循环去获取锁,而是利用了 Redis 的发布订阅机制,通过 await 方法阻塞等待锁的进程,有效的解决了无效的锁申请浪费资源的问题。

//订阅锁释放时间,并通过await的方式阻塞当前线程,等待锁释放的结果
//基于信息量等待
//当await返回false,代表等待超过了最大等待时间,取消订阅,返回锁获取失败
//当await返回true,则代表锁获取成功,进入循环获取锁的逻辑
final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
//await方法内部通过CountDownLatch实现,获取subscribe的异步执行结果
if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
    if (!subscribeFuture.cancel(false)) {
        subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
            @Override
            public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                if (subscribeFuture.isSuccess()) {
                    unsubscribe(subscribeFuture, threadId);
                }
            }
        });
    }
    acquireFailed(threadId);
    return false;
}

try {
    time -= (System.currentTimeMillis() - current);
    if (time <= 0) {
        acquireFailed(threadId);
        return false;
    }

    while (true) {
        long currentTime = System.currentTimeMillis();
        //再次尝试获取锁
        ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }

        time -= (System.currentTimeMillis() - currentTime);
        //超过最大等待时间,结束
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }

        // waiting for message
        //通过信号量阻塞,等待解锁消息
        currentTime = System.currentTimeMillis();
        if (ttl >= 0 && ttl < time) {
            getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
        } else {
            getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
        }

        time -= (System.currentTimeMillis() - currentTime);
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
    }
} finally {
    //无论是否获得锁,都要取消订阅
    unsubscribe(subscribeFuture, threadId);
}

2.2.4 业务实践

1 预热连接,在服务启动时,会有一个初始化方法,执行一次加锁,通过这样的方式,快速获得Redis资源,保证服务启动后立刻能够进入状态,正常快速地执行加锁、解锁流程。

2.超时时间设置配置化,由于不同的业务,并发性,执行时间等各方面都有不同,因此最好将执行加锁的超时时间参数配置化,根据业务设置不同的参数,以达到最佳的平衡点。

3.业务降级的考虑,分布式锁为业务实现带来了很大的便利和可靠性,但是不能完全依赖分布式锁,需要考虑Redis失效时的降级处理。

2.3 关单提醒业务

2.3.1 需求背景

经回访了解,沉默用户未完成交易离开原因,其中一部分用户反馈自己因为下单后有别的事情打扰离开了,最后忘记支付。因此希望在网易支付侧增加关单提醒以增加对用户的触达转化。对于特定商户,在订单在失效前20分钟提醒,app推送:【待支付提醒】您有一笔xxx商品订单将于20分钟后关闭,请尽快完成支付,以防花落别家。

2.3.2 需求分析

针对需求背景,可以得到如下结论:

1.数据量较大,目前网易支付侧每日的订单量在百万量级。

2.订单下单存在短时间峰值,某一时间段可能需要处理大量订单,对该功能的性能提出了较高的要求

3.目前业务中没有相关的数据,可以直接处理超时提醒的问题

2.3.3 方案选型

方案一 新增业务字段+调度扫表

• 对当前订单表新增一个超时时间timestamp字段,在订单下单时根据下单时间+设置的订单超时分钟数计算得 到,下单时写入数据库。

• 通过调度扫表,每隔一段时间拉取达到超时时间的订单执行通知。

方案二 Redisson延时队列方案

• 对延时队列是一个支持延时消费消息的队列,可以指定队列中的消息何时被消费,Redisson延时队列是实现该功能的一个队列。

• 订单下单时,计算好要提醒的时间,写入到延时队列,到时后延时队列自动消费进行通知

•  订单支付成功或被关单后,则直接将数据从延时队列移除即可

方案 优点 缺点
新增业务字段+调度扫表 1.实现简单,直接新加字段即可2.通过现有调度工具即可实现 1.对当前已经很庞大的订单表增加冗余字段,除了关单提醒功能外没有其他用途2.订单表目前没法支持基于分布式调度进行扫表,只能通过单线程调度,数据量大时性能不一定能满足。3.定时调度,只能每隔一段时间处理,实时性相对较差
Redisson延时队列 1.不需要冗余数据2.实时推送,实效性高3.分布式处理,能应对数据量较大的情况4.可以做成比较通用的延时处理框架,供其他功能使用 1.基于Redisson,可靠性依赖redis。2.流量极大时可能存在性能问题,需要优化

综合考虑后,选择了方案二,原因是方案二不冗余字段,对主业务无影响,且有更高的实时性,并且更通用。

2.3.4 方案概述

1.延时队列是一个支持延时消费消息的队列,可以指定队列中的消息何时被消费,Redisson延时队列是实现该功能的一个队列。

2.订单下单时,计算好要提醒的时间,写入到延时队列,到时后延时队列自动消费进行通知

3.订单支付成功或被关单后,则直接将数据从延时队列移除即可

图片

如图所示,mcc是kafka消息消费中心,订单创建、支付成功、订单关闭的消息都会在mcc进行消费。左侧的三台epay服务订阅了当前的延时队列信息。当接收到订单创建消息时,mcc消费该消息,将当前订单写入延迟队列。后续如果有支付成功或关单的消息,则将该订单数据移出延时队列。

若订单一直在队列中,当订单到达设定好的时间(关单前20分钟),延时队列将数据转入阻塞队列BlockQueue中epay会持续从阻塞队列拉取数据,epay收到数据后,就组装订单过期提醒消息发送给用户。

2.3.5 实际执行中碰到的问题及解决方式

2.3.5.1 问题描述

利用原生的redisson提供的延迟队列在线上跑了一段时间后,项目一开始运行的比较顺利,但是运行一段时间后,部分共用redis的服务经常会出现Could not get a resource from the pool异常。

此外哨兵的redis监控也爆出慢查询,慢查询的语句为延迟队列删除元素的方法。

2.3.5.2 Redisson延迟队列的原理

图片

延迟队列的4个数据结构:

1.阻塞队列block_queue,所有到期的数据会被放到这里

2.redisson_delay_queue_channel 用于publish 数据timeout事件的channel,接收到事件的客户端会把到期的数据放到阻塞队列中

3.redisson_delay_queue 延迟队列中的list,放置元素,主要作用是轮询查找元素

4.redisson_delay_queue_timeout 延迟队列中的zset,放置元素,以延迟时间作为score。

延迟队列的核心方法是:

添加元素的offer,移除元素的remove,还有它自身的构造函数。

首先看添加元素的offer方法

public void offer(V e, long delay, TimeUnit timeUnit) {
        get(offerAsync(e, delay, timeUnit));
    }

    public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
        long delayInMs = timeUnit.toMillis(delay);
        long timeout = System.currentTimeMillis() + delayInMs;

        long randomId = PlatformDependent.threadLocalRandom().nextLong();
        return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
                "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" 
              + "redis.call('zadd', KEYS[2], ARGV[1], value);"
              + "redis.call('rpush', KEYS[3], value);"
              // if new object added to queue head when publish its startTime 
              // to all scheduler workers 
              + "local v = redis.call('zrange', KEYS[2], 0, 0); "
              + "if v[1] == value then "
                 + "redis.call('publish', KEYS[4], ARGV[1]); "
              + "end;"
                 ,
              Arrays.<Object>asList(getName(), timeoutSetName, queueName, channelName), 
              timeout, randomId, encode(e));
    }

1.对元素的操作,是把当前的元素encode后,再附带一个随机randomId,使用lua脚本的struct.pack方法按照一定格式打成一个value。这个value作为zset(redisson_delay_queue_timeout )中的member,以延迟时间为score,保存到zset中。同时会把value,rpush到redisson_delay_queue 这个list中。因此,如果往延迟队列中放入同一个元素,由于randomId的不同,也会被打包成为不同的元素,因此该队列不存在去重的能力。

2.执行zrange+publish取出排序好的第一个数据,也就是最临近要触发的数据,然后发送通知 (之前订阅了的客户端,可能是微服务就有多个客户端),通知的内容为为当前这个元素的延迟时间,内容为将要触发的时间。客户端收到通知后,就在自己进程里面开启延时任务(HashedWheelTimer),到时间后就可以从redis取数据发送到到期的BlockQueue。

再来看删除元素的remove方法

@Override
    public RFuture<Boolean> removeAsync(Object o) {
        return removeAsync(o, 1);
    }

    protected RFuture<Boolean> removeAsync(Object o, int count) {
        return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
                "local s = redis.call('llen', KEYS[1]);" +
                "for i = 0, s-1, 1 do "
                    + "local v = redis.call('lindex', KEYS[1], i);"
                    + "local randomId, value = struct.unpack('dLc0', v);"
                    + "if ARGV[1] == value then "
                        + "redis.call('zrem', KEYS[2], v);"
                        + "redis.call('lrem', KEYS[1], 1, v);"
                        + "return 1;"
                    + "end; "
               + "end;" +
               "return 0;",
        Arrays.<Object>asList(queueName, timeoutSetName), encode(o));
    }

由于延迟队列元素的组装,我们已经无法得知需要删除的元素对应的value到底是什么样的了,因此,redisson延迟队列采用的是,依次从redisson_delay_queue 把所有value取出来,用struct.unpack解开获取实际元素,进行比对,发现相等的就删除zset和list中的数据。这样一来,删除元素的操作时间复杂度就变成了O(n)

最后是延迟队列的构造方法

protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {
        super(codec, commandExecutor, name);
        channelName = prefixName("redisson_delay_queue_channel", getName());
        queueName = prefixName("redisson_delay_queue", getName());
        timeoutSetName = prefixName("redisson_delay_queue_timeout", getName());

        QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {

            @Override
            protected RFuture<Long> pushTaskAsync() {
                return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
                        "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
                      + "if #expiredValues > 0 then "
                          + "for i, v in ipairs(expiredValues) do "
                              + "local randomId, value = struct.unpack('dLc0', v);"
                              + "redis.call('rpush', KEYS[1], value);"
                              + "redis.call('lrem', KEYS[3], 1, v);"
                          + "end; "
                          + "redis.call('zrem', KEYS[2], unpack(expiredValues));"
                      + "end; "
                        // get startTime from scheduler queue head task
                      + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
                      + "if v[1] ~= nil then "
                         + "return v[2]; "
                      + "end "
                      + "return nil;",
                      Arrays.<Object>asList(getName(), timeoutSetName, queueName), 
                      System.currentTimeMillis(), 100);
            }

            @Override
            protected RTopic getTopic() {
                return new RedissonTopic(LongCodec.INSTANCE, commandExecutor, channelName);
            }
        };

        queueTransferService.schedule(queueName, task);

        this.queueTransferService = queueTransferService;
    }

延迟队列会创建一个zset的数据结构(redisson_delay_queue_timeout),一个list的数据机构(redisson_delay_queue),一个channel(redisson_delay_queue_channel)。

延迟队列启动时,首先zrangebyscore取出前100条过了当前时间的数据。如果取的是0的话就执行下面的zrange, 这里程序刚启动肯定是0(除非是之前的队列数据没有取完)。之所以在刚启动时 这样取数据就是为了把上次进程宕机后没发完的数据发完。

之后会从zset(redisson_delay_queue_timeout)中取最早到期的一条数据,拿到最早到期的时间,用于启动QueueTransferTask。

QueueTransferTask是一个基于时间轮的定时调度任务,该task订阅了一个channel,即redisson_delay_queue_channel。当该channel有发布后,任务获取到了发布的内容(即最早到期元素的延迟时间,这里假设为t),之后就会启动调度,等待t时间后,执行pushTaskAsync的操作,将元素放入到已到期的元素集合block_queue中,等待服务获取。

2.3.5.3 优化和改进

瓶井分析:

1.延迟队列中,添加到zset和list中的value不是一开始纯粹传入的值e,而是一个随机生成的randomId+ 经过encode处理的值e。这里的主要作用是当传入的值e是重复的情况下,延迟队列仍然认为是两个不同的元素,延迟通知也是两次。

2.使用了一个list(redisson_delay_queue),由于zset和list中存储的都是(randomId+encode(e))的组合,这样的话删除时因为无法知道randomId是多少,只能从list中一个一个遍历当前元素去找目标值,然后分别在zset和list中执行删除,因此时间复杂度达到了O(n),在队列元素个数10万的场景下,单次删除耗时约200ms,当元素过多时,就有可能造成慢查询。

图片

redisson_delay_queue的作用

优化方案

基于上面的分析,如果使用场景中不要求进行遍历(延迟队列本身的实现是可以不依赖遍历的),同时需要幂等控制元素写入的情况下,其实可以做到移除这个list(redisson_delay_queue),从而减少删除元素,查询元素的时间复杂度。因此考虑在原RedisDelayedQueue的实现基础上进行调整,移除redisson_delay_queue。这样,延迟队列的相关操作逻辑如下:

1.插入元素

移除randomId随机值,将元素e,打包成固定的二进制串作为member,延迟时间为score,存入zset:redisson_delay_queue_timeout中。不再rpush到队列redisson_delay_queue。当该元素是当前zset中score最小的元素,就执行publish到redisson_delay_queue_channel,值为过期时间。

2.删除元素

直接根据元素e打包成的固定二进制串作为member从zset中删除,无需操作redisson_delay_queue

3.发布订阅机制下的数据放入阻塞队列

延迟队列添加元素后会根据条件push到channel:redisson_delay_queue_channel中,值为最近一个元素的延迟时间。redisDelayedQueue通过一个定时任务,延迟时间到后执行拉取元素放入阻塞队列的操作。这样一来,就可以把删除元素的时间复杂度降为O(log(n)),同时添加元素和发布订阅机制下数据放入阻塞队列的操作也可以变得更简单。

优化后架构如下:

图片

经过验证原本10w数据情况下删除元素耗时从200ms降到了1ms以下。调整版本发布至今,项目运行良好,没有再出现慢查询和无法获取redis连接的问题。

3.总结

1.介绍了Redis的基本情况和数据结构、常见客户端

2.介绍了网易支付交易侧redis部署架构以及高可用,使用采用双机房主从部署,通过主从复制及哨兵Sentinel机制来保证高可用

3.介绍了Redis在支付交易业务中的使用

  • 第一个案例是App消息推送,在这个案例中,Redis主要作为一个快速存取数据的内存数据库,通过合理的数据结构设计,保证App消息的快速存取。
  • 第二个案例是支付业务分布式锁,基于Redisson的分布式锁,主要目的是在交易的下单与支付系统中,保证业务的执行顺序及保证一致性、并发控制,并讨论其内部实现原理和一些业务实践中的思考。
  • 第三个案例是基于Redis延迟队列的关单提醒业务,对实现方案做了说明,并对实际运行中的问题,通过对源码的分析和调整进行了介绍。

当然,实际的使用场景还有很多,这里就不再一一介绍了。


发表评论

电子邮件地址不会被公开。 必填项已用*标注