Dawn's Blogs

分享技术 记录成长

0%

godis源码阅读 (4) 存储数据结构之List和SortedSet

List

simple-redis 中定义了 List 接口,以定义 List 的各种操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Expected check whether given item is equals to expected value
type Expected func(a interface{}) bool

// Consumer traverses list.
// It receives index and value as params, returns true to continue traversal, while returns false to break
type Consumer func(i int, v interface{}) bool

type List interface {
Add(val interface{})
Get(index int) (val interface{})
Set(index int, val interface{})
Insert(index int, val interface{})
Remove(index int) (val interface{})
RemoveLast() (val interface{})
RemoveAllByVal(expected Expected) int
RemoveByVal(expected Expected, count int) int
ReverseRemoveByVal(expected Expected, count int) int
Len() int
ForEach(consumer Consumer)
Contains(expected Expected) bool
Range(start int, stop int) []interface{}
}

数据结构

快速链表

在 simple-redis 中,采用快速链表作为 List 的数据结构。

快速链表实际上就是一个双向链表,但是双向链表中的每一个节点不是存储一个数据,而是将数据连续存放形成一段连续的存储空间作为链表的节点。

这一段连续的存储空间在 godis 中被称为 page(每一个 page 的大小为 1024),page 的类型为空接口切片

1
2
3
4
5
6
7
8
9
const (
pageSize = 1024
)

// QuickList 快速链表,用于实现list
type QuickList struct {
data *list.List
size int
}

迭代器

又定义了 iterator 为快速链表的迭代器,在 [-1, QuickList.Len()] 范围内移动。

  • node 表示元素所在的 page。
  • offset 表示这个元素在 page 中的下标。
1
2
3
4
5
6
// iterator of QuickList, move between [-1, ql.Len()]
type iterator struct {
node *list.Element
offset int
ql *QuickList
}

下面以 QuickList.find 方法、iterator.get 方法、iterator.next 方法为例说明迭代器的实现。

find 查找 index 对应的迭代器

比如 QuckList 定义了一个方法 find,用于查找对应 index 的迭代器

1
2
// find returns page and in-page-offset of given index
func (ql *QuickList) find(index int) *iterator

其流程为:

  • 首先查看 index 在快速链表的前半部分还是后半部分。若在前半部分,则从前向后查找
    • 从前向后查找,找到 index 所在的 page,pageBeg 表示这个 index 所在 page 之前的元素个数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
if index < ql.size/2 {
// search from front
n = ql.data.Front()
pageBeg = 0
for {
// assert: n != nil
page = n.Value.([]interface{})
if pageBeg+len(page) > index {
break
}
pageBeg += len(page)
n = n.Next()
}
}
  • 若 index 在快速链表的后半部分,则从后向前查找
1
2
3
4
5
6
7
8
9
10
11
12
13
else {
// search from back
n = ql.data.Back()
pageBeg = ql.size
for {
page = n.Value.([]interface{})
pageBeg -= len(page)
if pageBeg <= index {
break
}
n = n.Prev()
}
}
  • 计算这个 index 在 page 中的相对位置 offset,返回迭代器。
1
2
3
4
5
6
pageOffset := index - pageBeg
return &iterator{
node: n,
offset: pageOffset,
ql: ql,
}

get 获取迭代器所指向的数据

interator.get 方法用于获取迭代器所执行的数据:

  • 它首先调用 iterator.page 方法获取所在 page(类型为空接口切片)。
  • 接着获取 page 中 offset 下标中的数据内容。
1
2
3
4
5
6
7
func (iter *iterator) get() interface{} {
return iter.page()[iter.offset]
}

func (iter *iterator) page() []interface{} {
return iter.node.Value.([]interface{})
}

next 移动到下一个数据元素

iterator.nex 方法将迭代器移动到下一个元素上。

1
2
// next returns whether iter is in bound
func (iter *iterator) next() bool

其流程为:

  • 首先获取所在 page,若当前迭代器的元素不是 page 的最后一个元素,直接 offset + 1即可返回。
1
2
3
4
5
page := iter.page()
if iter.offset < len(page)-1 {
iter.offset++
return true
}
  • 否则则移动到下一个 page 上:
    • 移动之前首先看看当前 page 是不是最后一个 page,若是则说明迭代器当前指向的元素是整个快速链表的最后一个元素。此时将迭代器移动到 QuickList.Len() 位置上。
    • 不是最后一个 page,就移动到下一个 page 的第一个数据上。
1
2
3
4
5
6
7
8
9
// move to next page
if iter.node == iter.ql.data.Back() {
// already at last node
iter.offset = len(page)
return false
}
iter.offset = 0
iter.node = iter.node.Next()
return true

Get Set 查询、更新数据

在快速链表上查询数据非常简单,首先获取 index 所在的迭代器,然后调用 iterator.get 方法获取其指向的值。

1
2
3
4
5
// Get returns value at the given index
func (ql *QuickList) Get(index int) (val interface{}) {
iter := ql.find(index)
return iter.get()
}

在快速链表上更新数据也是同样的,首先获取 index 所在的迭代器,接着调用 iterator.set 方法更改其值。

1
2
3
4
5
6
7
8
9
10
func (iter *iterator) set(val interface{}) {
page := iter.page()
page[iter.offset] = val
}

// Set updates value at the given index, the index should between [0, list.size]
func (ql *QuickList) Set(index int, val interface{}) {
iter := ql.find(index)
iter.set(val)
}

Add 在尾部插入数据

Add 用于在快速链表的尾部插入数据。

1
2
// Add adds value to the tail
func (ql *QuickList) Add(val interface{})

流程如下:

  • 首先检查是否是空链表,若是空链表则初始化一个链表插入到第一个位置上。
1
2
3
4
5
6
7
ql.size++
if ql.data.Len() == 0 { // empty list
page := make([]interface{}, 0, pageSize)
page = append(page, val)
ql.data.PushBack(page)
return
}
  • 找到最后一个 page,检查最后一个 page 是否已满。若最后一个 page 满了则新增一个 page 到后面,插入数据到新增 page 的第一个位置。
1
2
3
4
5
6
7
8
backNode := ql.data.Back()
backPage := backNode.Value.([]interface{})
if len(backPage) == cap(backPage) { // full page, create new page
page := make([]interface{}, 0, pageSize)
page = append(page, val)
ql.data.PushBack(page)
return
}
  • 最后一个 page 没有满则直接插入到最后一个 page 的最后面。
1
2
3
// append into page
backPage = append(backPage, val)
backNode.Value = backPage

Insert 插入数据

Insert 用于在快速链表的 index 位置插入数据,这个过程比较复杂。

1
func (ql *QuickList) Insert(index int, val interface{})

其流程如下:

  • 若插入到最后,则调用 Add 进行末尾上的插入。
1
2
3
4
if index == ql.size { // insert at
ql.Add(val)
return
}
  • 找到 index 所在的 page,若 page 没有满,则直接插入到 page 的相应位置上并返回。
1
2
3
4
5
6
7
8
9
10
iter := ql.find(index)
page := iter.node.Value.([]interface{})
if len(page) < pageSize {
// insert into not full page
page = append(page[:iter.offset+1], page[iter.offset:]...)
page[iter.offset] = val
iter.node.Value = page
ql.size++
return
}
  • 若 page 满了,则将一个满的 page 分裂成两个 page
1
2
3
4
// insert into a full page may cause memory copy, so we split a full page into two half pages
var nextPage []interface{}
nextPage = append(nextPage, page[pageSize/2:]...) // pageSize must be even
page = page[:pageSize/2]
  • 若迭代器的 offset 在前半部分则在第一个 page 上插入;否则在第二个位置上插入数据。
1
2
3
4
5
6
7
8
9
10
11
12
if iter.offset < len(page) {
page = append(page[:iter.offset+1], page[iter.offset:]...)
page[iter.offset] = val
} else {
i := iter.offset - pageSize/2
nextPage = append(nextPage[:i+1], nextPage[i:]...)
nextPage[i] = val
}
// store current page and next page
iter.node.Value = page
ql.data.InsertAfter(nextPage, iter.node)
ql.size++

Sorted Set

跳跃链表

godis 使用跳跃链表实现有序集合(Sorted Set),实现有序集合最简单的方法是有序链表,但是其查找的时间复杂度很高为 O(n)

跳表的优化思路是添加上层链表,上层链表中会跳过一些节点。

img

其数据结构为还是类似于双向链表,只不过链表上的节点有了层的概念。其两个最核心的概念节点和层的解释如下:

  • 节点:链表上的节点,一个节点存储一个元素,节点有指向下一个节点的后向指针,同时还保存了这个节点上的每一层(是一个切片,0 号为最下层)。
  • 层:表示一个节点的某一层,包括指向同一层中下一个节点的前向指针,以及到下一个节点跳过的节点数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 对外的元素抽象
type Element struct {
Member string
Score float64
}

type Node struct {
Element // 元素的名称和 score
backward *Node // 后向指针
level []*Level // 前向指针, level[0] 为最下层
}

// 节点中每一层的抽象
type Level struct {
forward *Node // 指向同层中的下一个节点
span int64 // 到 forward 跳过的节点数
}

// 跳表的定义
type skiplist struct {
header *Node
tail *Node
length int64
level int16
}

img

查找节点

getByRank

skiplist.getByRank 方法用于查找排名第 rank 的节点。它从顶层向下一层一层查询,从当前层向前搜索,若当前层的下一个节点已经超过目标值,则结束搜索进入下一层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (skiplist *skiplist) getByRank(rank int64) *node {
var i int64 = 0
n := skiplist.header
// scan from top level
for level := skiplist.level - 1; level >= 0; level-- {
for n.level[level].forward != nil && (i+n.level[level].span) <= rank {
i += n.level[level].span
n = n.level[level].forward
}
if i == rank {
return n
}
}
return nil
}

getFirstInScoreRange

skiplist.getFirstInScoreRange 方法用于查找分数范围内的第一个节点。

1
func (skiplist *skiplist) getFirstInScoreRange(min *ScoreBorder, max *ScoreBorder) *node

其流程为:

  • 判断跳表和范围是否有交集,若无交集提早返回。
1
2
3
if !skiplist.hasInRange(min, max) {
return nil
}
  • 从顶层向下查询,若 forward 节点未进入范围则继续向前若 forward 节点已经进入范围,当 level > 0 时 forward 节点不能保证是第一个在范围内的节点,因此需要进入下一层查找
1
2
3
4
5
6
7
8
n := skiplist.header
// scan from top level
for level := skiplist.level - 1; level >= 0; level-- {
// if forward is not in range than move forward
for n.level[level].forward != nil && !min.less(n.level[level].forward.Score) {
n = n.level[level].forward
}
}
  • 查找结束,此时 n.level[0].forward 节点一定是范围内的第一个节点。
1
2
3
4
5
n = n.level[0].forward
if !max.greater(n.Score) {
return nil
}
return n

插入节点

skiplist.insert 方法用于插入节点。基本思想就是,寻找每一层的前驱节点,前驱节点的前向指针 forward 指向新插入的节点。

1
func (skiplist *skiplist) insert(member string, score float64) *node

流程如下:

  • 因为每一层都要找到插入节点的前驱节点,所以用 update 数组记录每一层的前驱节点。并且用 rank 数组保存各层先驱节点的排名,用于计算 span。
1
2
update := make([]*node, maxLevel) // link new node with node in `update`
rank := make([]int64, maxLevel)
  • 从上向下一层一层寻找,到达每一层时只要当前节点的前向指针 forward 不为空并且 forward 指向的节点分数小于插入节点,就向后移动,当找到这一层的前驱节点之后再去下一层中寻找
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// find position to insert
node := skiplist.header
for i := skiplist.level - 1; i >= 0; i-- {
if i == skiplist.level-1 {
rank[i] = 0
} else {
rank[i] = rank[i+1] // store rank that is crossed to reach the insert position
}
if node.level[i] != nil {
// traverse the skip list
for node.level[i].forward != nil &&
(node.level[i].forward.Score < score ||
(node.level[i].forward.Score == score && node.level[i].forward.Member < member)) { // same score, different key
rank[i] += node.level[i].span
node = node.level[i].forward
}
}
update[i] = node
}
  • 随机决定新节点的层数,若新的节点层数大于当前跳表的最高层数,则创建新的层
1
2
3
4
5
6
7
8
9
10
level := randomLevel()
// extend skiplist level
if level > skiplist.level {
for i := skiplist.level; i < level; i++ {
rank[i] = 0
update[i] = skiplist.header
update[i].level[i].span = skiplist.length
}
skiplist.level = level
}
  • 创建新节点,并且插入到跳表中,对于每一层都修改新节点和前驱节点的 forward 指针和 span。
1
2
3
4
5
6
7
8
9
10
// make node and link into skiplist
node = makeNode(level, score, member)
for i := int16(0); i < level; i++ {
node.level[i].forward = update[i].level[i].forward
update[i].level[i].forward = node

// update span covered by update[i] as node is inserted here
node.level[i].span = update[i].level[i].span - (rank[0] - rank[i])
update[i].level[i].span = (rank[0] - rank[i]) + 1
}
  • 若新节点的层数小于当前跳表的最高层数,也就是新节点可能不包含所有的层。那么对于没有的层,先驱节点的 span 会加1(后面插入了新节点导致span+1)
1
2
3
4
// increment span for untouched levels
for i := level; i < skiplist.level; i++ {
update[i].level[i].span++
}
  • 最后,更新后向指针
1
2
3
4
5
6
7
8
9
10
11
12
13
// set backward node
if update[0] == skiplist.header {
node.backward = nil
} else {
node.backward = update[0]
}
if node.level[0].forward != nil {
node.level[0].forward.backward = node
} else {
skiplist.tail = node
}
skiplist.length++
return node

删除节点

skiplist.RemoveRangeByRank 方法用于删除 [start, stop) 范围内的节点,其思想和插入节点差不多,就是找到被删除的范围内的第一个节点的前驱节点,然后依次删除。

1
func (skiplist *skiplist) RemoveRangeByRank(start int64, stop int64) (removed []*Element)

其流程如下:

  • 因为每一层都要找到删除节点的前驱节点,所以用 update 数组记录每一层的前驱节点。并且用 removed 记录被删除的节点。
1
2
3
var i int64 = 0 // rank of iterator
update := make([]*node, maxLevel)
removed = make([]*Element, 0)
  • 从顶向下,寻找前驱节点。
1
2
3
4
5
6
7
8
9
// scan from top level
node := skiplist.header
for level := skiplist.level - 1; level >= 0; level-- {
for node.level[level].forward != nil && (i+node.level[level].span) < start {
i += node.level[level].span
node = node.level[level].forward
}
update[level] = node
}
  • 移动到被目标范围内的第一个节点
1
2
i++
node = node.level[0].forward // first node in range
  • 依次删除范围内的所有节点。
1
2
3
4
5
6
7
8
9
10
// remove nodes in range
for node != nil && i < stop {
next := node.level[0].forward
removedElement := node.Element
removed = append(removed, &removedElement)
skiplist.removeNode(node, update)
node = next
i++
}
return removed

removeNode

skiplist.removeNode 是删除一个节点操作的具体方法,传入被删除的节点和每一层的前驱节点。

1
func (skiplist *skiplist) removeNode(node *node, update []*node)

其流程如下:

  • 修改每一层前驱节点,如果前驱节点的 forward 指针指向了目标节点,则需要修改前驱的 forward 指针跳过要删除的目标节点,同时更新前驱节点的 span
1
2
3
4
5
6
7
8
for i := int16(0); i < skiplist.level; i++ {
if update[i].level[i].forward == node {
update[i].level[i].span += node.level[i].span - 1
update[i].level[i].forward = node.level[i].forward
} else {
update[i].level[i].span--
}
}
  • 修改目标节点后继节点的 backward 指针
1
2
3
4
5
if node.level[0].forward != nil {
node.level[0].forward.backward = node.backward
} else {
skiplist.tail = node.backward
}
  • 必要时删除空白的层
1
2
3
4
for skiplist.level > 1 && skiplist.header.level[skiplist.level-1].forward == nil {
skiplist.level--
}
skiplist.length--

有序集合的实现

goredis 中,有序集合通过字典和跳跃链表。

  • 字典记录集合内元素和分数的映射关系。
  • 跳跃链表用于对分数的顺序存储。
1
2
3
4
5
6
7
8
9
10
11
12
13
// SortedSet is a set which keys sorted by bound score
type SortedSet struct {
dict map[string]*Element
skiplist *skiplist
}

// Make makes a new SortedSet
func Make() *SortedSet {
return &SortedSet{
dict: make(map[string]*Element),
skiplist: makeSkiplist(),
}
}

下面举几个例子。

Add 插入元素

SortedSet.Add 方法用于在有序集合中插入元素:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Add puts member into set,  and returns whether has inserted new node
func (sortedSet *SortedSet) Add(member string, score float64) bool {
element, ok := sortedSet.dict[member]
sortedSet.dict[member] = &Element{
Member: member,
Score: score,
}
if ok {
if score != element.Score {
sortedSet.skiplist.remove(member, element.Score)
sortedSet.skiplist.insert(member, score)
}
return false
}
sortedSet.skiplist.insert(member, score)
return true
}

RemoveByRank 删除范围内的元素

SortedSet.RemoveByRank 方法用于删除 [start, stop) 范围内的元素。

1
2
3
4
5
6
7
8
9
// RemoveByRank removes member ranking within [start, stop)
// sort by ascending order and rank starts from 0
func (sortedSet *SortedSet) RemoveByRank(start int64, stop int64) int64 {
removed := sortedSet.skiplist.RemoveRangeByRank(start+1, stop+1)
for _, element := range removed {
delete(sortedSet.dict, element.Member)
}
return int64(len(removed))
}