Redis 核心数据结构(三)

在五年前,D瓜哥写了 Redis 核心数据结构(一) 和 Redis 核心数据结构(二) 两篇文章,来对 Redis 内部的数据结构做了深入分析。随着时间的推移,Redis 的实现也在不断进化,现在这些内容已经跟不上最新发展了,推陈出新,现在重写文章,来介绍 Redis 的最新发展。
listpack
从 Redis 7.0 开始,使用 listpack 替换原来的 ziplist。至于替换原因,在 [NEW] listpack migration - replace all usage of ziplist with listpack 做了解释说明:
The reason for using listpack instead of ziplist is that ziplist may cause cascading updates when insert and delete in middle, which is the biggest problem.
翻译过来:当在中间进行插入和删除时,ziplist 也许会产生级联更新,这是一个大问题。
编码规范

相比 ziplist,listpack 更偏向空间换时间。淡化极致的内存使用率,向更快的方向发力。
对整数编码

Redis 对整数以及长整数的编码与 Hessian 协议解释与实战(一):布尔、日期、浮点数与整数:整数类型数据 和 Hessian 协议解释与实战(二):长整型、二进制数据与 Null:长整数类型数据 的编码类似:小数尽可能使用短编码;大数才用长编码。由于小数偏多,所以,可以节省相当一部分内存。
listpack.c#define LP_ENCODING_13BIT_INT 0xC0
#define LP_ENCODING_13BIT_INT_MASK 0xE0
#define LP_ENCODING_IS_13BIT_INT(byte) (((byte)&LP_ENCODING_13BIT_INT_MASK)==LP_ENCODING_13BIT_INT)
#define LP_ENCODING_13BIT_INT_ENTRY_SIZE 3
#define LP_ENCODING_16BIT_INT 0xF1
#define LP_ENCODING_16BIT_INT_MASK 0xFF
#define LP_ENCODING_IS_16BIT_INT(byte) (((byte)&LP_ENCODING_16BIT_INT_MASK)==LP_ENCODING_16BIT_INT)
#define LP_ENCODING_16BIT_INT_ENTRY_SIZE 4
#define LP_ENCODING_24BIT_INT 0xF2
#define LP_ENCODING_24BIT_INT_MASK 0xFF
#define LP_ENCODING_IS_24BIT_INT(byte) (((byte)&LP_ENCODING_24BIT_INT_MASK)==LP_ENCODING_24BIT_INT)
#define LP_ENCODING_24BIT_INT_ENTRY_SIZE 5
#define LP_ENCODING_32BIT_INT 0xF3
#define LP_ENCODING_32BIT_INT_MASK 0xFF
#define LP_ENCODING_IS_32BIT_INT(byte) (((byte)&LP_ENCODING_32BIT_INT_MASK)==LP_ENCODING_32BIT_INT)
#define LP_ENCODING_32BIT_INT_ENTRY_SIZE 6
#define LP_ENCODING_64BIT_INT 0xF4
#define LP_ENCODING_64BIT_INT_MASK 0xFF
#define LP_ENCODING_IS_64BIT_INT(byte) (((byte)&LP_ENCODING_64BIT_INT_MASK)==LP_ENCODING_64BIT_INT)
#define LP_ENCODING_64BIT_INT_ENTRY_SIZE 10
/* Stores the integer encoded representation of 'v' in the 'intenc' buffer. */
static inline void lpEncodeIntegerGetType(int64_t v,
unsigned char *intenc, uint64_t *enclen) {
if (v >= 0 && v <= 127) {
/* Single byte 0-127 integer. */
// if (intenc != NULL) intenc[0] = v;
// 0 <= num <= 127,则使用一个字节表示
if (enclen != NULL) *enclen = 1;
} else if (v >= -4096 && v <= 4095) {
/* 13 bit integer. */
if (v < 0) v = ((int64_t)1<<13)+v;
if (intenc != NULL) {
// -4096 <= num <= 4095 使用 13 个比特位表示,
// LP_ENCODING_13BIT_INT = 0xC0,前三位是类型标识
// 两个字节,剩下 13 个比特位用于存数字(首位表示符号,12位存数字)
intenc[0] = (v>>8)|LP_ENCODING_13BIT_INT;
intenc[1] = v&0xff;
}
// 编码长度为 2
if (enclen != NULL) *enclen = 2;
} else if (v >= -32768 && v <= 32767) {
/* 16 bit integer. */
if (v < 0) v = ((int64_t)1<<16)+v;
if (intenc != NULL) {
// 一个字节是类型标识
intenc[0] = LP_ENCODING_16BIT_INT;
// 剩下两个字节,16 个比特位存数字(首位表示符号,15位存数字)
intenc[1] = v&0xff;
intenc[2] = v>>8;
}
// 编码长度为 3
if (enclen != NULL) *enclen = 3;
} else if (v >= -8388608 && v <= 8388607) {
/* 24 bit integer. */
if (v < 0) v = ((int64_t)1<<24)+v;
if (intenc != NULL) {
// 一个字节是类型标识
intenc[0] = LP_ENCODING_24BIT_INT;
// 剩下 3 个字节,24 个比特位存数字(首位表示符号,15位存数字)
intenc[1] = v&0xff;
intenc[2] = (v>>8)&0xff;
intenc[3] = v>>16;
}
// 编码长度为 4
if (enclen != NULL) *enclen = 4;
} else if (v >= -2147483648 && v <= 2147483647) {
/* 32 bit integer. */
if (v < 0) v = ((int64_t)1<<32)+v;
if (intenc != NULL) {
// 一个字节是类型标识
intenc[0] = LP_ENCODING_32BIT_INT;
// 剩下 4 个字节,32 个比特位存数字(首位表示符号,15位存数字)
intenc[1] = v&0xff;
intenc[2] = (v>>8)&0xff;
intenc[3] = (v>>16)&0xff;
intenc[4] = v>>24;
}
// 编码长度为 5
if (enclen != NULL) *enclen = 5;
} else {
/* 64 bit integer. */
uint64_t uv = v;
if (intenc != NULL) {
// 一个字节是类型标识
intenc[0] = LP_ENCODING_64BIT_INT;
// 剩下 8 个字节,64 个比特位存数字(首位表示符号,15位存数字)
intenc[1] = uv&0xff;
intenc[2] = (uv>>8)&0xff;
intenc[3] = (uv>>16)&0xff;
intenc[4] = (uv>>24)&0xff;
intenc[5] = (uv>>32)&0xff;
intenc[6] = (uv>>40)&0xff;
intenc[7] = (uv>>48)&0xff;
intenc[8] = uv>>56;
}
// 编码长度为 9
if (enclen != NULL) *enclen = 9;
}
}对字符串编码

对比 Hessian 对字符串的编码 Hessian 协议解释与实战(三):字符串,两者也有相似之处:
类型标识+字符串长度+实际字符串内容;短字符串使用短码编码长度,长字符串使用长码编码长度。由于短字符串更多,也能节省很多内存。
| Redis 对字符串是如何编码的? |
listpack.c#define LP_ENCODING_6BIT_STR 0x80
#define LP_ENCODING_6BIT_STR_MASK 0xC0
#define LP_ENCODING_IS_6BIT_STR(byte) (((byte)&LP_ENCODING_6BIT_STR_MASK)==LP_ENCODING_6BIT_STR)
#define LP_ENCODING_12BIT_STR 0xE0
#define LP_ENCODING_12BIT_STR_MASK 0xF0
#define LP_ENCODING_IS_12BIT_STR(byte) (((byte)&LP_ENCODING_12BIT_STR_MASK)==LP_ENCODING_12BIT_STR)
#define LP_ENCODING_32BIT_STR 0xF0
#define LP_ENCODING_32BIT_STR_MASK 0xFF
#define LP_ENCODING_IS_32BIT_STR(byte) (((byte)&LP_ENCODING_32BIT_STR_MASK)==LP_ENCODING_32BIT_STR)
/* Encode the string element pointed by 's' of size 'len' in the target
* buffer 's'. The function should be called with 'buf' having always enough
* space for encoding the string. This is done by calling lpEncodeGetType()
* before calling this function. */
static inline void lpEncodeString(unsigned char *buf, unsigned char *s, uint32_t len) {
if (len < 64) {
// LP_ENCODING_6BIT_STR = 0x80 | length,length 最多有 6 个比特位,则最大数字是: 2^6 -1 = 63
buf[0] = len | LP_ENCODING_6BIT_STR;
// 将字符串内容复制到后面的内存中
memcpy(buf+1,s,len);
} else if (len < 4096) {
// LP_ENCODING_12BIT_STR = 0xE0,前四个比特位存类型标识
// 剩余 4 个比特位存长度: (lenght >> 8) | LP_ENCODING_12BIT_STR
// 剩余 8 个比特位存第一个字节,length & 0xff,12 个比特位,最大长度是 2^12 -1 = 4095
buf[0] = (len >> 8) | LP_ENCODING_12BIT_STR;
buf[1] = len & 0xff;
// 将字符串内容复制到后面的内存中
memcpy(buf+2,s,len);
} else {
// 长度大于等于 4096 时,使用一个类型字节 + 四个标识长度的字节,来标识长度。
// 最大长度是 2^32 -1 = 4294967295
buf[0] = LP_ENCODING_32BIT_STR;
buf[1] = len & 0xff;
buf[2] = (len >> 8) & 0xff;
buf[3] = (len >> 16) & 0xff;
buf[4] = (len >> 24) & 0xff;
// 将字符串内容复制到后面的内存中
memcpy(buf+5,s,len);
}
}元素长度编码
每个 listpack 元素在最后都保存当前元素占用的字节数,只包含前面的类型字节、长度字节和数据字节,不包含这个元素长度的字节数。

保存这个长度信息,主要是为了方便从右向左搜索:每个字节首位是 1 表示前面还有数据,是 0 则表示长度编码到此为止。
源码分析
新建 listpack 对象
listpack.c/* Create a new, empty listpack.
* On success the new listpack is returned, otherwise an error is returned.
* Pre-allocate at least `capacity` bytes of memory,
* over-allocated memory can be shrunk by `lpShrinkToFit`.
* */
unsigned char *lpNew(size_t capacity) {
unsigned char *lp = lp_malloc(capacity > LP_HDR_SIZE+1 ? capacity : LP_HDR_SIZE+1);
if (lp == NULL) return NULL;
// 设置 listpack 目前字节长度
lpSetTotalBytes(lp,LP_HDR_SIZE+1);
// 元素为 0
lpSetNumElements(lp,0);
// 后面直接接结束标志符
lp[LP_HDR_SIZE] = LP_EOF;
return lp;
}通过分析上面的代码可以得知,listpack 的新对象初始化完成后,内容如下:

增删改
listpack 的增删改所有更新操作,都被一个函数包圆了: lpInsert,来看看它是如何实现的。
listpack.c/* Insert, delete or replace the specified string element 'elestr' of length
* 'size' or integer element 'eleint' at the specified position 'p', with 'p'
* being a listpack element pointer obtained with lpFirst(), lpLast(), lpNext(),
* lpPrev() or lpSeek().
*
* The element is inserted before, after, or replaces the element pointed
* by 'p' depending on the 'where' argument, that can be LP_BEFORE, LP_AFTER
* or LP_REPLACE.
*
* If both 'elestr' and `eleint` are NULL, the function removes the element
* pointed by 'p' instead of inserting one.
* If `eleint` is non-NULL, 'size' is the length of 'eleint', the function insert
* or replace with a 64 bit integer, which is stored in the 'eleint' buffer.
* If 'elestr` is non-NULL, 'size' is the length of 'elestr', the function insert
* or replace with a string, which is stored in the 'elestr' buffer.
*
* Returns NULL on out of memory or when the listpack total length would exceed
* the max allowed size of 2^32-1, otherwise the new pointer to the listpack
* holding the new element is returned (and the old pointer passed is no longer
* considered valid)
*
* If 'newp' is not NULL, at the end of a successful call '*newp' will be set
* to the address of the element just added, so that it will be possible to
* continue an interaction with lpNext() and lpPrev().
*
* For deletion operations (both 'elestr' and 'eleint' set to NULL) 'newp' is
* set to the next element, on the right of the deleted one, or to NULL if the
* deleted element was the last one. */
unsigned char *lpInsert(unsigned char *lp, unsigned char *elestr, unsigned char *eleint,
uint32_t size, unsigned char *p, int where, unsigned char **newp)
{
unsigned char intenc[LP_MAX_INT_ENCODING_LEN];
unsigned char backlen[LP_MAX_BACKLEN_SIZE];
uint64_t enclen; /* The length of the encoded element. */
// 如果没有传递内容,则就是删除
int delete = (elestr == NULL && eleint == NULL);
/* when deletion, it is conceptually replacing the element with a
* zero-length element. So whatever we get passed as 'where', set
* it to LP_REPLACE. */
if (delete) where = LP_REPLACE;
/* If we need to insert after the current element, we just jump to the
* next element (that could be the EOF one) and handle the case of
* inserting before. So the function will actually deal with just two
* cases: LP_BEFORE and LP_REPLACE. */
if (where == LP_AFTER) {
p = lpSkip(p);
where = LP_BEFORE;
ASSERT_INTEGRITY(lp, p);
}
/* Store the offset of the element 'p', so that we can obtain its
* address again after a reallocation. */
unsigned long poff = p-lp;
int enctype;
if (elestr) {
/* Calling lpEncodeGetType() results into the encoded version of the
* element to be stored into 'intenc' in case it is representable as
* an integer: in that case, the function returns LP_ENCODING_INT.
* Otherwise if LP_ENCODING_STR is returned, we'll have to call
* lpEncodeString() to actually write the encoded string on place later.
*
* Whatever the returned encoding is, 'enclen' is populated with the
* length of the encoded element. */
// 检查字符串是否可以使用整型表示。整型更加节省空间。
enctype = lpEncodeGetType(elestr,size,intenc,&enclen);
if (enctype == LP_ENCODING_INT) eleint = intenc;
} else if (eleint) {
enctype = LP_ENCODING_INT;
enclen = size; /* 'size' is the length of the encoded integer element. */
} else {
enctype = -1;
enclen = 0;
}
/* We need to also encode the backward-parsable length of the element
* and append it to the end: this allows to traverse the listpack from
* the end to the start. */
unsigned long backlen_size = (!delete) ? lpEncodeBacklen(backlen,enclen) : 0;
uint64_t old_listpack_bytes = lpGetTotalBytes(lp);
uint32_t replaced_len = 0;
if (where == LP_REPLACE) {
replaced_len = lpCurrentEncodedSizeUnsafe(p);
replaced_len += lpEncodeBacklenBytes(replaced_len);
ASSERT_INTEGRITY_LEN(lp, p, replaced_len);
}
uint64_t new_listpack_bytes = old_listpack_bytes + enclen + backlen_size
- replaced_len;
if (new_listpack_bytes > UINT32_MAX) return NULL;
/* We now need to reallocate in order to make space or shrink the
* allocation (in case 'when' value is LP_REPLACE and the new element is
* smaller). However we do that before memmoving the memory to
* make room for the new element if the final allocation will get
* larger, or we do it after if the final allocation will get smaller. */
unsigned char *dst = lp + poff; /* May be updated after reallocation. */
/* Realloc before: we need more room. */
// 扩容
if (new_listpack_bytes > old_listpack_bytes &&
new_listpack_bytes > lp_malloc_size(lp)) {
if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
dst = lp + poff;
}
/* Setup the listpack relocating the elements to make the exact room
* we need to store the new one. */
if (where == LP_BEFORE) {
memmove(dst+enclen+backlen_size,dst,old_listpack_bytes-poff);
} else { /* LP_REPLACE. */
memmove(dst+enclen+backlen_size,
dst+replaced_len,
old_listpack_bytes-poff-replaced_len);
}
/* Realloc after: we need to free space. */
// 缩容
if (new_listpack_bytes < old_listpack_bytes) {
if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
dst = lp + poff;
}
/* Store the entry. */
// 新增元素
if (newp) {
*newp = dst;
/* In case of deletion, set 'newp' to NULL if the next element is
* the EOF element. */
if (delete && dst[0] == LP_EOF) *newp = NULL;
}
// 更新元素
if (!delete) {
if (enctype == LP_ENCODING_INT) {
memcpy(dst,eleint,enclen);
} else if (elestr) {
lpEncodeString(dst,elestr,size);
} else {
redis_unreachable();
}
dst += enclen;
memcpy(dst,backlen,backlen_size);
dst += backlen_size;
}
/* Update header. */
if (where != LP_REPLACE || delete) {
uint32_t num_elements = lpGetNumElements(lp);
if (num_elements != LP_HDR_NUMELE_UNKNOWN) {
if (!delete)
lpSetNumElements(lp,num_elements+1);
else
lpSetNumElements(lp,num_elements-1);
}
}
lpSetTotalBytes(lp,new_listpack_bytes);
#if 0
/* This code path is normally disabled: what it does is to force listpack
* to return *always* a new pointer after performing some modification to
* the listpack, even if the previous allocation was enough. This is useful
* in order to spot bugs in code using listpacks: by doing so we can find
* if the caller forgets to set the new pointer where the listpack reference
* is stored, after an update. */
unsigned char *oldlp = lp;
lp = lp_malloc(new_listpack_bytes);
memcpy(lp,oldlp,new_listpack_bytes);
if (newp) {
unsigned long offset = (*newp)-oldlp;
*newp = lp + offset;
}
/* Make sure the old allocation contains garbage. */
memset(oldlp,'A',new_listpack_bytes);
lp_free(oldlp);
#endif
return lp;
}查看代码简单总结一下:
如果可以,会优先使用整型对内容进行编码;
如果没有传递内容,则表示删除。
整体流程是:先操作数据元素,然后更新整体长度和元素个数。
内容使用存放和废除使用内存拷贝
memcpy函数来完成。
查找
listpack.c/* Skip the current entry returning the next. It is invalid to call this
* function if the current element is the EOF element at the end of the
* listpack, however, while this function is used to implement lpNext(),
* it does not return NULL when the EOF element is encountered. */
static inline unsigned char *lpSkip(unsigned char *p) {
unsigned long entrylen = lpCurrentEncodedSizeUnsafe(p);
entrylen += lpEncodeBacklenBytes(entrylen);
p += entrylen;
return p;
}
/* Return the listpack element pointed by 'p'.
*
* The function changes behavior depending on the passed 'intbuf' value.
* Specifically, if 'intbuf' is NULL:
*
* If the element is internally encoded as an integer, the function returns
* NULL and populates the integer value by reference in 'count'. Otherwise if
* the element is encoded as a string a pointer to the string (pointing inside
* the listpack itself) is returned, and 'count' is set to the length of the
* string.
*
* If instead 'intbuf' points to a buffer passed by the caller, that must be
* at least LP_INTBUF_SIZE bytes, the function always returns the element as
* it was a string (returning the pointer to the string and setting the
* 'count' argument to the string length by reference). However if the element
* is encoded as an integer, the 'intbuf' buffer is used in order to store
* the string representation.
*
* The user should use one or the other form depending on what the value will
* be used for. If there is immediate usage for an integer value returned
* by the function, than to pass a buffer (and convert it back to a number)
* is of course useless.
*
* If 'entry_size' is not NULL, *entry_size is set to the entry length of the
* listpack element pointed by 'p'. This includes the encoding bytes, length
* bytes, the element data itself, and the backlen bytes.
*
* If the function is called against a badly encoded ziplist, so that there
* is no valid way to parse it, the function returns like if there was an
* integer encoded with value 12345678900000000 + <unrecognized byte>, this may
* be an hint to understand that something is wrong. To crash in this case is
* not sensible because of the different requirements of the application using
* this lib.
*
* Similarly, there is no error returned since the listpack normally can be
* assumed to be valid, so that would be a very high API cost. */
static inline unsigned char *lpGetWithSize(unsigned char *p, int64_t *count, unsigned char *intbuf, uint64_t *entry_size) {
int64_t val;
uint64_t uval, negstart, negmax;
assert(p); /* assertion for valgrind (avoid NPD) */
if (LP_ENCODING_IS_7BIT_UINT(p[0])) {
negstart = UINT64_MAX; /* 7 bit ints are always positive. */
negmax = 0;
uval = p[0] & 0x7f;
if (entry_size) *entry_size = LP_ENCODING_7BIT_UINT_ENTRY_SIZE;
} else if (LP_ENCODING_IS_6BIT_STR(p[0])) {
*count = LP_ENCODING_6BIT_STR_LEN(p);
if (entry_size) *entry_size = 1 + *count + lpEncodeBacklenBytes(*count + 1);
return p+1;
} else if (LP_ENCODING_IS_13BIT_INT(p[0])) {
uval = ((p[0]&0x1f)<<8) | p[1];
negstart = (uint64_t)1<<12;
negmax = 8191;
if (entry_size) *entry_size = LP_ENCODING_13BIT_INT_ENTRY_SIZE;
} else if (LP_ENCODING_IS_16BIT_INT(p[0])) {
uval = (uint64_t)p[1] |
(uint64_t)p[2]<<8;
negstart = (uint64_t)1<<15;
negmax = UINT16_MAX;
if (entry_size) *entry_size = LP_ENCODING_16BIT_INT_ENTRY_SIZE;
} else if (LP_ENCODING_IS_24BIT_INT(p[0])) {
uval = (uint64_t)p[1] |
(uint64_t)p[2]<<8 |
(uint64_t)p[3]<<16;
negstart = (uint64_t)1<<23;
negmax = UINT32_MAX>>8;
if (entry_size) *entry_size = LP_ENCODING_24BIT_INT_ENTRY_SIZE;
} else if (LP_ENCODING_IS_32BIT_INT(p[0])) {
uval = (uint64_t)p[1] |
(uint64_t)p[2]<<8 |
(uint64_t)p[3]<<16 |
(uint64_t)p[4]<<24;
negstart = (uint64_t)1<<31;
negmax = UINT32_MAX;
if (entry_size) *entry_size = LP_ENCODING_32BIT_INT_ENTRY_SIZE;
} else if (LP_ENCODING_IS_64BIT_INT(p[0])) {
uval = (uint64_t)p[1] |
(uint64_t)p[2]<<8 |
(uint64_t)p[3]<<16 |
(uint64_t)p[4]<<24 |
(uint64_t)p[5]<<32 |
(uint64_t)p[6]<<40 |
(uint64_t)p[7]<<48 |
(uint64_t)p[8]<<56;
negstart = (uint64_t)1<<63;
negmax = UINT64_MAX;
if (entry_size) *entry_size = LP_ENCODING_64BIT_INT_ENTRY_SIZE;
} else if (LP_ENCODING_IS_12BIT_STR(p[0])) {
*count = LP_ENCODING_12BIT_STR_LEN(p);
if (entry_size) *entry_size = 2 + *count + lpEncodeBacklenBytes(*count + 2);
return p+2;
} else if (LP_ENCODING_IS_32BIT_STR(p[0])) {
*count = LP_ENCODING_32BIT_STR_LEN(p);
if (entry_size) *entry_size = 5 + *count + lpEncodeBacklenBytes(*count + 5);
return p+5;
} else {
uval = 12345678900000000ULL + p[0];
negstart = UINT64_MAX;
negmax = 0;
}
/* We reach this code path only for integer encodings.
* Convert the unsigned value to the signed one using two's complement
* rule. */
if (uval >= negstart) {
/* This three steps conversion should avoid undefined behaviors
* in the unsigned -> signed conversion. */
uval = negmax-uval;
val = uval;
val = -val-1;
} else {
val = uval;
}
/* Return the string representation of the integer or the value itself
* depending on intbuf being NULL or not. */
if (intbuf) {
*count = ll2string((char*)intbuf,LP_INTBUF_SIZE,(long long)val);
return intbuf;
} else {
*count = val;
return NULL;
}
}
/* Find pointer to the entry with a comparator callback.
*
* 'cmp' is a comparator callback. If it returns zero, current entry pointer
* will be returned. 'user' is passed to this callback.
* Skip 'skip' entries between every comparison.
* Returns NULL when the field could not be found. */
static inline unsigned char *lpFindCbInternal(unsigned char *lp, unsigned char *p,
void *user, lpCmp cmp, unsigned int skip)
{
int skipcnt = 0;
unsigned char *value;
int64_t ll;
uint64_t entry_size = 123456789; /* initialized to avoid warning. */
uint32_t lp_bytes = lpBytes(lp);
if (!p)
p = lpFirst(lp);
while (p) {
if (skipcnt == 0) {
value = lpGetWithSize(p, &ll, NULL, &entry_size);
if (value) {
/* check the value doesn't reach outside the listpack before accessing it */
assert(p >= lp + LP_HDR_SIZE && p + entry_size < lp + lp_bytes);
}
if (unlikely(cmp(lp, p, user, value, ll) == 0))
return p;
/* Reset skip count */
skipcnt = skip;
p += entry_size;
} else {
/* Skip entry */
skipcnt--;
/* Move to next entry, avoid use `lpNext` due to `lpAssertValidEntry` in
* `lpNext` will call `lpBytes`, will cause performance degradation */
p = lpSkip(p);
}
/* The next call to lpGetWithSize could read at most 8 bytes past `p`
* We use the slower validation call only when necessary. */
if (p + 8 >= lp + lp_bytes)
lpAssertValidEntry(lp, lp_bytes, p);
else
assert(p >= lp + LP_HDR_SIZE && p < lp + lp_bytes);
if (p[0] == LP_EOF) break;
}
return NULL;
}
/* Find pointer to the entry equal to the specified entry. Skip 'skip' entries
* between every comparison. Returns NULL when the field could not be found. */
unsigned char *lpFind(unsigned char *lp, unsigned char *p, unsigned char *s,
uint32_t slen, unsigned int skip)
{
struct lpFindArg arg = {
.s = s,
.slen = slen
};
return lpFindCbInternal(lp, p, &arg, lpFindCmp, skip);
}
/* Validate the integrity of a single listpack entry and move to the next one.
* The input argument 'pp' is a reference to the current record and is advanced on exit.
* the data pointed to by 'lp' will not be modified by the function.
* Returns 1 if valid, 0 if invalid. */
int lpValidateNext(unsigned char *lp, unsigned char **pp, size_t lpbytes) {
#define OUT_OF_RANGE(p) ( \
(p) < lp + LP_HDR_SIZE || \
(p) > lp + lpbytes - 1)
unsigned char *p = *pp;
if (!p)
return 0;
/* Before accessing p, make sure it's valid. */
if (OUT_OF_RANGE(p))
return 0;
if (*p == LP_EOF) {
*pp = NULL;
return 1;
}
/* check that we can read the encoded size */
uint32_t lenbytes = lpCurrentEncodedSizeBytes(p[0]);
if (!lenbytes)
return 0;
/* make sure the encoded entry length doesn't reach outside the edge of the listpack */
if (OUT_OF_RANGE(p + lenbytes))
return 0;
/* get the entry length and encoded backlen. */
unsigned long entrylen = lpCurrentEncodedSizeUnsafe(p);
unsigned long encodedBacklen = lpEncodeBacklenBytes(entrylen);
entrylen += encodedBacklen;
/* make sure the entry doesn't reach outside the edge of the listpack */
if (OUT_OF_RANGE(p + entrylen))
return 0;
/* move to the next entry */
p += entrylen;
/* make sure the encoded length at the end patches the one at the beginning. */
uint64_t prevlen = lpDecodeBacklen(p-1);
if (prevlen + encodedBacklen != entrylen)
return 0;
*pp = p;
return 1;
#undef OUT_OF_RANGE
}
/* Validate that the entry doesn't reach outside the listpack allocation. */
static inline void lpAssertValidEntry(unsigned char* lp, size_t lpbytes, unsigned char *p) {
assert(lpValidateNext(lp, &p, lpbytes));
}这篇文章已经够长,到此为止。下一篇文章,分析一下 listpack 在各个数据结构中的使用情况,敬请期待: Redis 核心数据结构(4)。
参考资料
Introduction to the Redis 7.0 release — 从 7.0 开始,将 Hash、 List、 Zset 中的 listpack 替换 ziplist。



