RingBuffer 在 Puma 中的应用

1091 查看

什么是 RingBuffer

环形缓冲区:https://zh.wikipedia.org/wiki/環形緩衝區

维基百科的解释是:它是一种用于表示一个固定尺寸、头尾相连的缓冲区的数据结构,适合缓存数据流。

底层数据结构非常简单,一个固定长度的数组加一个写指针和一个读指针。

只要像这张图一样,把这个数组辦弯,它就成了一个 RingBuffer。

那它到底有什么精妙的地方呢?

我最近做的项目正好要用到类似的设计思路,所以翻出了以前在点评写的 Puma 系统,看了看以前自己写的代码。顺便写个文章总结一下。

Puma 是什么,为什么要用 RingBuffer

Puma 简介

Puma 是一个 MySQL 数据库 Binlog 订阅消费系统。类似于阿里的 Canel

Puma 会伪装成一个 MySQL Slave,然后消费 Binlog 数据,并缓存在本地。当有客户端连接上来的时候,就会从本地读取数据给客户端消费。

如果用很一般的设计,一个单独的线程会从 MySQL 消费数据,存到本地文件中。然后每个客户端的连接都会有一个线程,从本地文件中读取数据。

没错,第一版就是这么简单粗暴,当然,它是有效的。

利用缓存优化性能

不用做性能测试就知道,系统压力大了以后这里必定会是一个瓶颈。读写数据会有一点延时,如果多个客户端同时读取同一份数据又会造成很多的浪费。然后数据的编码解码还会有不少损耗。

所以这里当然要加一个缓存了。

一个线程写数据到磁盘,然后它可以同时把数据传递给各个客户端。

听起来像发布者订阅者模式?也有点像生产者消费者模式?

但是,它并不仅仅是发布者订阅者模式,因为这里的“发布者”和“订阅者”是完全异步的,而且每个“订阅者”的消费速度是不一样的。

它也不仅仅是生产者消费者模式,因为这里的“消费者”是同时消费所有数据,而不是把数据分发给各个“消费者”。

它们的消费速度不一样,还会出现缓存内的数据过新,“消费者”不得不去磁盘读取。

感觉这里的需求是两种设计模式的结合。

不仅如此,这是一套高并发的系统,怎么保证性能,怎么保证数据一致性?

所以,这个缓存看上去简单,其实它不简单。

常规解决思路

目标明确后,看看 Java 的并发集合中有什么能满足需求的吧。

第一个想到的就是BlockingQueue,为每一个客户端创建一个BlockingQueueBlockingQueue内部是通过加锁来实现的,虽然锁冲突不会很多,但高并发的情况下,最好还是能做到无锁。

Disruptor

当时正好看到了一系列介绍 Disruptor 的文章:传送门

所以就在想能不能把 RingBuffer 用来解决我们的问题呢?

对 RingBuffer 进行改进

看完了 RingBuffer 的基本原理后,就要开始用它来适应我们的系统了。这里遇到了几个问题:

  1. 如何支持多个消费者

  2. 如何判断当前有无新数据,如何判断当前数据是否已经被新数据覆盖

  3. 如何保证数据一致性

第一个问题

这个问题简单,原始的 RingBuffer 只有一个写指针和一个读指针。

要支持多个消费者的话,只要为每个消费者创建一个读指针即可。

第二个问题

RingBuffer 的一个精髓就是,写指针和读指针的大小是会超过数组长度的,写入和读取数据的时候,是采用writeIndex % CACHED_SIZE这样的形式来读取的。

为什么要这么做?这就是为了解决判断有无新数据和数据是否已被覆盖的问题。

假设我内部2个指针,分别叫nextWriteIndexnextReadIndex

那么判断有无新数据的逻辑就是if (nextReadIndex >= nextWriteIndex),返回true的话就是没有新数据了。

而判断数据是否被覆盖的逻辑就是if (nextReadIndex < nextWriteIndex - CACHED_SIZE),返回true的话就是数据已经被覆盖了。

拿实际数据举个例子:

一个长度为10的 RingBuffer,内部是一个长度为10的数组。

此时nextWriteIndex=12,意味着它下一次写入的数据会在 12%10=2 上。此时,可读的有效范围是 2~11,对应的数组内的索引就是 2, 3, 4, 5, 6, 7, 8, 9, 0, 1。

所以,当nextReadIndex=12 的时候,会读到最老的数据2,这是老数据,不是新数据,此时表示没有行数据了。

nextReadIndex=1 的时候,是新数据,而不是想要的老数据,老数据已经被覆盖掉了,此时它没办法从缓存里读数据了。

第三个问题

最棘手的第三个问题来了,这个系统是要支持高并发的,如果是同步的操作,上面的代码没有任何问题。或者说,如果是同步的代码,干嘛还要用 RingBuffer 呢?

上面写入和读取,都有两步操作,更改数据和更改索引,按照逻辑上来讲,它们应该是强一致性的。只能加锁了?如果要加锁,为何不直接用BlockingQueue

所以,是否可以通过什么方法,高并发和最终一致性呢?

直接贴代码吧,根据代码一步步分析:

public class CachedDataStorage {

    private static final int CACHED_SIZE = 5000;

    private final ChangedEventWithSequence[] data = new ChangedEventWithSequence[CACHED_SIZE];

    private volatile long nextWriteIndex = 0;

    public void append(Object dataValue) {
        data[(int) (nextWriteIndex % CACHED_SIZE)] = dataValue;
        nextWriteIndex++;
    }

    public Reader createReader() {
        return new Reader();
    }

    public class Reader {

        private Reader() {
        }

        private volatile long nextReadIndex = 0;

        public Object next() throws IOException {
            if (nextReadIndex >= nextWriteIndex) {
                return null;
            }

            if (nextReadIndex <= nextWriteIndex - CACHED_SIZE) {
                throw new IOException("data outdated");
            }

            Object dataValue = data[(int) (nextReadIndex % CACHED_SIZE)];

            if (nextReadIndex <= nextWriteIndex - CACHED_SIZE) {
                throw new IOException("data outdated");
            } else {
                nextReadIndex++;
                return dataValue;
            }
        }
    }
}

我们来一步步分析,先看内部的ReadercreateReader()方法,每来一个客户端就会创建一个Reader,每个Reader会维护一个nextReadIndex

然后看append()方法,可以说没有任何逻辑,直接写入数据,修改索引就结束了。但是,别小看了这两个步骤的操作顺序。

好了,到了最复杂的next()方法了,这里可就大有讲究了。

一进来立刻执行if (nextReadIndex >= nextWriteIndex),用来判断当前是否还有更新的数据。

因为写入的时候是先写数据再改索引,所以可能会出现明明有数据,但是这里认为没数据的情况。

但是并没有关系,我们更关注最终一致性,因为我们要的是确保这里一定不会读错数据,而不一定要确保这里有新数据就要立刻处理。就算这一轮没读到,下一轮也一定会读取到了。

下一步是这一行if (nextReadIndex <= nextWriteIndex - CACHED_SIZE),判断想要读取的数据有没有被新数据覆盖。等一下,这里为什么和上面介绍的不一样?

上面写的是<,而这里却是<=。上面提到,同步操作的情况下,用<是没有问题的,但是这里的异步的。

写入数据的时候,可能会出现数据已被覆盖,而索引未被更新的问题,所以这样子判断可以保证不会读错数据。

既然上下边界都检查过了,那么就读取数据吧!就当这里准备读数据的时候,写数据的线程竟然又写入了好多数据,导致读出来的数据已经被覆盖了!

所以,一定要在读完数据后,再次检查数据是否被覆盖。

最终,整个过程实现了无锁,高并发和最终一致性。

在 Puma 系统中,启用缓存和关闭缓存,一写五读的情况下,性能整整提高了一倍。测试还是在我 SSD 上进行的,如果是传统硬盘,提升会更明显。

利用 RingBuffer 实现后的优点

代码实现完,就可以和BlockingQueue对比一下了。

首先,RingBuffer 完全是无锁的,没有任何锁冲突。而利用BlockingQueue的话它内部会加锁,虽然锁冲突不会很多,但是没锁肯定比有锁好。而且,当 Writer 往多个BlockingQueue中顺序写入数据的时候,会有相互影响。而利用 RingBuffer 实现的话,无论有多少 Reader,都不会影响写入性能。

然后是内存上的优势,每次多一个 Reader 仅仅是多一个对象,Reader对象内部也只有一个变量,占用内存非常非常小。而使用BlockingQueue的话,需要创建的就不仅仅是一个对象了,会有一系列的东西。

目前看来,该实现能满足我们的需求且无明显缺点,而且已经在系统中平稳运行了将近一年了。在我离职之前的最后一个项目,就包括为点评订单系统接入 Puma,订单系统在活动期间的写入 QPS 非常高。但对 Puma 来说也是毫无压力的,最终的瓶颈都不是在 Puma 上,而是在目标数据库的写入性能上。

高并发系统的设计思路

首先,这部分的代码可以在这里找到:传送门

完整的代码还包含了老数据被覆盖无数据可读时的数据源切换逻辑,还有当无消费者时关闭 RingBuffer 的逻辑。上面的代码已经被简化了很多,想看完整代码的话可以在上面的链接中看到。

以后有空还会再介绍更多 Puma 中遇到的问题和解决的思路。

然后谈谈高并发系统的设计。

Java 并发编程的第一重境界是善用各种锁,尽量减少锁冲突,不能有死锁。

第二重境界就是善用 Java 的各种并发包,Java 的并发包里有的是无锁的,例如AtomicLong中用了CAS;有的是用了各种手段减少锁冲突,例如ConcurrentHashMap中就用了锁分段技术。整体效率都非常高,能熟练应用后也能写出很高效的程序。

再下一个境界就非常搞脑子了,往往是放弃了强一致性,而去追求最终一致性。其中会用到AtomicLong等无锁,或锁分段技术,并且常常会把它们结合起来用。就像上面那部分代码,看似简单,但实际上却要把各种边界条件思考地很全面,因为是最终一致性,所以中间的状态非常多。

源地址:http://www.dozer.cc/2016/09/ringbuffer-in-puma.html