
你好,据结我是构实码哥,一个拥抱硬核技术和对象,现原面向人民币编程的据结男人,设置星标不迷路。构实 我在【Redis 使用 List 实现消息队列的现原利与弊】说过使用 List 实现消息队列有很多局限性。 没有 ACK 机制。据结没有类似 Kafka 的构实 ConsumerGroup 消费组概念。消息堆积。现原List 是据结线性结构,查询指定数据需要遍历整个列表。构实1、现原是据结什么Stream 是 Redis 5.0 版本专门为消息队列设计的数据类型,借鉴了 Kafka 的构实 Consume Group 设计思路,提供了消费组概念。现原 同时提供了消息的持久化和主从复制机制,客户端可以访问任何时刻的数据,并且能记住每一个客户端的访问位置,从而保证消息不丢失。 以下几个是 Stream 类型的主要特性。源码下载 使用Radix Tree 和 listpack 结构来存储消息。消息 ID 序列化生成。借鉴 Kafka Consume Group 的概念,多个消费者划分到不同的 Consume Group 中,消费同一个 Streams,同一个 Consume Group 的多个消费者可以一起并行但不重复消费,提升消费能力。支持多播(多对多),阻塞和非阻塞读取。ACK 确认机制,保证了消息至少被消费一次。可设置消息保存上限阈值,我会把历史消息丢弃,防止内存占用过大。需要注意的是,Redis Stream 是一种超轻量级的 MQ,并没有完全实现消息队列的所有设计要点,所以它的使用场景需要考虑业务的数据量和对性能、可靠性的需求。 适合系统消息量不大,容忍数据丢失,使用 Redis Stream 作为消息队列就能享受高性能快速读写消息的b2b供应网优势。 2、修炼心法每个 Stream 都有一个唯一的名称,作为 Stream 在 Redis 的 key,在首次使用 xadd 指令添加消息的时候会自动创建。 可以看到 Stream 在一个 Redix Tree 树上,树上存储的是消息 ID,每个消息 ID 对应的消息通过一个指针指向 listpack。 Stream 流就像是一个仅追加内容的消息链表,把消息一个个串起来,每个消息都有一个唯一的 ID 和消息内容,消息内容则由多个 field/value 键值对组成。底层使用 Radix Tree 和 listpack 数据结构存储数据。 为了便于理解,我画了一张图,并对 Radix Tree 的存储数据做了下变形,使用列表来体现 Stream 中消息的逻辑有序性。 
这张图涉及很多概念,但是你不要慌。我一步步拆开说,最后你再回头看就懂了。WordPress模板 先带你屡下全局思路。 Consumer Group:消费组,每个消费组可以有一个或者多个消费者,消费者之间是竞争关系。不同消费组的消费者之间无任何关系。*pel,全称是 Pending Entries List,记录了当前被客户端读取但是还没有 ack(Acknowledge character 确认字符)的消息。如果客户端没有 ack,这个变量的消息 ID 会越来越多。这是一个核心数据结构,用来确保客户端至少消费消息一次。Stream 结构Streams 结构的源码定义在 stream.h 源码中的 stream 结构体中。 复制typedef struct stream { rax *rax; uint64_t length; streamID last_id; streamID first_id; streamID max_deleted_entry_id; uint64_t entries_added; rax *cgroups; } stream; typedef struct streamID { uint64_t ms; uint64_t seq; } streamID;1.2.3.4.5.6.7.8.9.10.11.12.13.14. *rax,是一个 rax 的指针,指向一个 Radix Tree,key 存储消息 ID,value 实际上指向一个 listpack 数据结构,存储了多条消息,每条消息的 ID 都大于等于 这个 key 的消息 ID。length,该 Stream 的消息条数。streamID结构体,消息 ID 抽象,一共占 128 位,内部维护了毫秒时间戳(字段 ms);一个毫秒内的自增序号(字段 seq),用于区分同一毫秒内插入多条消息。last_id,当前 Stream 最后一条消息的 ID。first_id,当前 Stream 第一条消息的 ID。max_deleted_entry_id,当前 Stream 被删除的最大的消息 ID。entries_added,总共有多少条消息添加到 Stream 中,entries_added = 已删除消息条数 + 未删除消息条数。*cgroups,rax 指针,也指向一个 Radix Tree ,记录当前 Stream 的所有 Consume Group,每个 Consume Group 的名称都是唯一标识,作为 Radix Tree 的 key,Consumer Group 实例作为 value。Consumer GroupConsumer Group 由 streamCG 结构体定义,每个 Stream 可以有多个 Consumer Group,一个消费组可以有多个消费者同时对组内消息进行消费。 复制/* Consumer group. */ typedef struct streamCG { streamID last_id; long long entries_read; rax *pel; rax *consumers; } streamCG;1.2.3.4.5.6.7. last_id,表示该消费组的消费者已经读取但还未 ACK 的最后一条消息 ID。*pel,是 pending entries list 简写,指向一个 Radix Tree 的指针,保存着 Consumer group 中所有消费者读取但还未 ACK 确认的消息,就是这玩意实现了 ACK 机制。该树的 key 是消息 ID,value 关联一个 streamNACK 实例。*consumers, Radix Tree 指针,表示消费组中的所有消费者,key 是消费者名称,value 指向一个 streamConsumer 实例。streamNACK streamCG -> *pel 对应的 value 是一个 streamNACK 实例,用于抽象消费者已经读取,但是未 ACK 的消息 ID 相关信息。 复制/* Pending (yet not acknowledged) message in a consumer group. */ typedef struct streamNACK { mstime_t delivery_time; uint64_t delivery_count; streamConsumer *consumer; } streamNACK;1.2.3.4.5.6. delivery_time,该消息最后一次推送给 Consumer 的时间戳。delivery_count,消息被推送次数。*consumer,消息推送的 Consumer 客户端。streamConsumer Consumer Group 中对 Consumer 的抽象。 复制/* A specific consumer in a consumer group. */ typedef struct streamConsumer { mstime_t seen_time; sds name; rax *pel; } streamConsumer;1.2.3.4.5.6. seen_time,消费者最近一次被激活的时间戳。name,消费者名称。*pel, Radix Tree 指针,对于同一个消息而言,``streamCG -> pel与streamConsumer -> pel的streamNACK` 实例是同一个。最后来一张图,便于你理解。 
肖材积:“Redis 你好,Stream 如何结合 Radix Tree 和 listpack 结构来存储消息?为什么不使用散列表来存储,消息 ID 作为散列表的 key,散列表的 value 存储消息键值对内容。’” 在回答之前,先插入几条消息到 Stream,让你对 Stream 消息的存储格式有个大体认知。 该命令的语法如下。 复制XADD key id field value [field value ...]1. Stream 中的每个消息可以包含不同数量的多个键值对,写入消息成功后,我会把消息的 ID 返回给客户端。 执行如下指令把用户购买书籍的下单消息存放到 hotlist:books队列,消息内容主要由 payerID、amount 和 orderID。 复制> XADD hotlist:books * payerID 1 amount 69.00 orderID 9 1679218539571-0 > XADD hotlist:books * payerID 1 amount 36.00 orderID 15 1679218572182-0 > XADD hotlist:books * payerID 2 amount 99.00 orderID 88 1679218588426-0 > XADD hotlist:books * payerID 3 amount 68.00 orderID 80 1679218604492-01.2.3.4.5.6.7.8. hotlist:books 是 Stream 的名称,后面的 “*” 表示让 Redis 为插入的消息自动生成一个唯一 ID,你也可以自定义。 消息 ID 由两部分组成。 当前毫秒内的时间戳。顺序编号。从 0 为起始值,用于区分同一时间内产生的多个命令。肖材积:“如何理解 Stream 是一种只执行追加操作(append only)的数据结构?” 通过将元素 ID 与时间进行关联,并强制要求新元素的 ID 必须大于旧元素的 ID, Redis 从逻辑上将 Stream 变成了一种只执行追加操作(append only)的数据结构。 用户可以确信,新的消息和事件只会出现在已有消息和事件之后,就像现实世界里新事件总是发生在已有事件之后一样,一切都是有序进行的。 ❝ 肖材积:“插入的消息 ID 大部分相同,比如这四条消息的 ID 都是 1679218 前缀。另外,每条消息键值对的键通常都是一样的,比如这四条消息的键都是 payerID、amount 和 orderID。使用散列表存储的话会很多冗余数据,你这么抠门,所以不使用散列表对不对?” 没毛病,小老弟很聪明。为了节省内存,我使用了 Radix Tree 和 listpack。Radix Tree 的 key 存储消息 ID,value 使用 listpack 数据结构存储多个消息, listapck 中的消息 ID 都大于等于 key 存储的消息 ID。 我在前面已经讲过 listpack,这是一个紧凑型列表,非常节省内存。而 Radix Tree 数据结构的最大特点是适合保存具有相同前缀的数据,从而达到节省内存。 到底 Radix Tree 是怎样的数据结构,继续往下看。 Radix TreeRadix Tree,也被称为 Radix Trie,或者 Compact Prefix Tree),用于高效地存储和查找字符串集合。它将字符串按照前缀拆分成一个个字符,并将每个字符作为一个节点存储在树中。 当插入一个键值对时,Redis 会将键按照字符拆分成一个个字符,并根据字符在 Radix tree 中的位置找到合适的节点,如果该节点不存在,则创建新节点并添加到 Radix tree 中。 当所有字符都添加完毕后,将值对象指针保存到最后一个节点中。当查询一个键时,Redis 按照字符顺序遍历 Radix tree,如果发现某个字符不存在于树中,则键不存在;否则,如果最后一个节点表示一个完整的键,则返回对应的值对象。 如下图展示一个简单的前缀树,将根节点到叶子节点的路径对应字符拼接起来,就得到了两个 key(“他说碉堡了”、“他说碉炸了”)。 
你应该发现了,这两个 key 拥有公共前缀(他说碉),前缀树实现了共享使用,这样就可以避免相同字符串重复存储。如果采用散列表的保存方式,那个 key 的相同前缀就会被多次存储,导致内存浪费。 Radix Tree 改进每个节点只保存一个字符,一是会浪费内存空间,二是在进行查询时,还需要逐一匹配每个节点表示的字符,对查询性能也会造成影响。 所以,Redis 并没有直接使用标准前缀树,而是做了一次变种——Compact Prefix Tree(压缩前缀树)。通俗来说,当多个 key 具有相同的前缀时,那就将相同前缀的字符串合并在一个共享节点中,从而减少存储空间。 如下几个 key(test、toaster、toasting、slow、slowly)在 Radix Tree 上的布局。 
由于 Compact Prefix Tree 可以共享相同前缀的节点,所以在存储一组具有相同前缀的键时,Redis 的 Radix tree 比其他数据结构(如哈希表)具有更低的空间消耗和更快的查询速度。 Radix Tree 节点的数据结构由 rax.h文件中的 raxNode 定义。 复制typedef struct raxNode { uint32_t iskey:1; uint32_t isnull:1; uint32_t iscompr:1; uint32_t size:29; unsigned char data[]; } raxNode;1.2.3.4.5.6.7. iskey:从 Radix Tree 根节点到当前节点组成的字符串是否是一个完整的 key。是的话 iskey 的值为 1。isnull:当前节点是否为空节点,如果当前节点是空节点的话,就不需要为该节点分配指向 value 的指针内存。iscompr,是否为压缩节点。size,当前节点的大小,具体指会根据节点类型而改变。如果是压缩节点,该值表示压缩数据的长度;如果是非压缩节点,该值表示节点的子节点个数。data[],实际存储的数据,根据节点类型不同而有所不同。压缩节点,data 数据包括子节点对应的字符、指向子节点的指针,节点为最终 key 对应的 value 指针。压缩节点,data 数据包含子节点对应的合并字符串、指向子节点的指针,以及节点为最终 key 的 value 指针。value 指针指向一个 listpack 实例,里面保存了消息实际内容。Radix Tree 最大的特点就是适合保存具有相同前缀的数据,实现节省内存的目标,以及支持范围查找。而这个就是 Stream 采用 Radix Tree 作为底层数据结构的原因。 |