Part 10 - 分割叶子节点

我们的 B 树并不像只有一个节点的树。 要解决此问题, 我们需要一些代码把一个叶子节点一分为二。 然后, 我们需要创建一个内部节点作为两个叶子节点的父节点。

基本上, 本文的目标是从此出发:

../../_images/btree2.png

one-node btree

到此:

../../_images/btree3.png

two-level btree

首先, 让我们删除完整叶节点的错误处理:

void leaf_node_insert(Cursor* cursor, uint32_t key, Row* value)
{
    void* node = get_page(cursor->table->pager, cursor->page_num);
    uint32_t num_cells = *leaf_node_num_cells(node);
    if (num_cells >= LEAF_NODE_MAX_CELLS)
    {
        // Node full
        leaf_node_split_and_insert(cursor, key, value);
        return;
    }
    if (cursor->cell_num < num_cells)
    {
        // Make room for new cell
        for (uint32_t i = num_cells; i > cursor->cell_num; i--)
        {
            memcpy(leaf_node_cell(node, i), leaf_node_cell(node, i - 1),LEAF_NODE_CELL_SIZE);
        }
    }
    *(leaf_node_num_cells(node)) += 1;
    *(leaf_node_key(node, cursor->cell_num)) = key;
    serialize_row(value, leaf_node_value(node, cursor->cell_num));
}

ExecuteResult execute_insert(Statement* statement, Table* table)
{
    void* node = get_page(table->pager, table->root_page_num);
    uint32_t num_cells = (*leaf_node_num_cells(node));

    Row* row_to_insert = &(statement->row_to_insert);
    uint32_t key_to_insert = row_to_insert->id;
    Cursor* cursor = table_find(table, key_to_insert);

    if (cursor->cell_num < num_cells)
    {
        uint32_t key_at_index = *leaf_node_key(node, cursor->cell_num);
        if (key_at_index == key_to_insert)
        {
            return EXECUTE_DUPLICATE_KEY;
        }
    }

    leaf_node_insert(cursor, row_to_insert->id, row_to_insert);
    free(cursor);
    return EXECUTE_SUCCESS;
}

10.1 分割算法

简单的部分结束了。 这是我们需要从 《 SQLite 数据库系统执行的操作的描述 : 设计和实现

If there is no space on the leaf node, we would split the existing entries residing there and the new one (being inserted) into two equal halves: lower and upper halves. (Keys on the upper half are strictly greater than those on the lower half.) We allocate a new leaf node, and move the upper half into the new node.

让我们获取旧节点的句柄并创建新节点:

[void leaf_node_split_and_insert(Cursor* cursor, uint32_t key, Row* value)]
/*
 * Create a new node and move half the cells over.
 * Insert the new value in one of the two nodes.
 * Update parent or create a new parent.
 */

void *old_node = get_page(cursor->table->pager, cursor->page_num);
uint32_t new_page_num = get_unused_page_num(cursor->table->pager);
void *new_node = get_page(cursor->table->pager, new_page_num);
initialize_leaf_node(new_node);

接下来, 将每个单元格复制到新位置:

[void leaf_node_split_and_insert(Cursor* cursor, uint32_t key, Row* value)]
/*
 * All existing keys plus new key should be divided
 * evenly between old (left) and new (right) nodes.
 * Starting from the right, move each key to correct position.
 */
for (int32_t i = LEAF_NODE_MAX_CELLS; i >= 0; i--)
{
    void *destination_node;
    if (i >= LEAF_NODE_LEFT_SPLIT_COUNT)
    {
        destination_node = new_node;
    } else {
        destination_node = old_node;
    }
    uint32_t index_within_node = i % LEAF_NODE_LEFT_SPLIT_COUNT;
    void *destination = leaf_node_cell(destination_node, index_within_node);

    if (i == cursor->cell_num)
    {
        serialize_row(value, destination);
    } else if (i > cursor->cell_num)
    {
        memcpy(destination, leaf_node_cell(old_node, i - 1), LEAF_NODE_CELL_SIZE);
    } else {
        memcpy(destination, leaf_node_cell(old_node, i), LEAF_NODE_CELL_SIZE);
    }
}

更新每个节点 header 中的单元格计数:

[void leaf_node_split_and_insert(Cursor* cursor, uint32_t key, Row* value)]
/* Update cell count on both leaf nodes */
*(leaf_node_num_cells(old_node)) = LEAF_NODE_LEFT_SPLIT_COUNT;
*(leaf_node_num_cells(new_node)) = LEAF_NODE_RIGHT_SPLIT_COUNT;

然后我们需要更新节点的父节点。 如果原始节点是根节点, 则它没有父节点。 在这种情况下, 请创建一个新的根节点以充当父节点。 我现在暂存另一个分支:

if (is_node_root(old_node))
{
    return create_new_root(cursor->table, new_page_num);
} else {
    printf("Need to implement updating parent after split\n");
    exit(EXIT_FAILURE);
}

10.2 分配新页面

让我们回过头来定义一些新的函数和常量。 创建新的叶子节点时, 将其放入由 get_unused_pa​​ge_num() 确定的页面中:

/*
* Until we start recycling free pages, new pages will always
* go onto the end of the database file
*/
uint32_t get_unused_page_num(Pager* pager)
{
    return pager->num_pages;
}

现在我们假设在具有 N 页的数据库中, 分配了页码 0 到 N-1。 因此我们始终可以为新页面分配页码 N。 最终在我们实现删除操作后, 某些页面可能会变空并且其页码未使用。 为了提高效率, 我们可以重新分配那些空闲页面。

10.3 叶节点大小

为了使树保持平衡, 我们在两个新节点之间平均分配了单元。 如果叶节点可以容纳 N 个单元, 则在拆分期间, 我们需要在两个节点之间分配 N + 1 个单元 (N 个原始单元加一个新单元)。 如果 N + 1 为奇数, 我将任意选择左侧节点以得到一个单元格。

const uint32_t LEAF_NODE_RIGHT_SPLIT_COUNT = (LEAF_NODE_MAX_CELLS + 1) / 2;
const uint32_t LEAF_NODE_LEFT_SPLIT_COUNT = (LEAF_NODE_MAX_CELLS + 1) - LEAF_NODE_RIGHT_SPLIT_COUNT;

10.4 创建一个新的根节点

以下是 SQLite 数据库系统 如何解释创建新根节点的过程:

令 N 为根节点。 首先分配两个节点, 例如 L 和 R。 将 N 的下半部分移至 L, 将上半部分移至 R。 现在 N 为空。 在 N 中添加 <L,K,R>, 其中 K 是 L 中的最大键。 第 N 页仍然是根。 请注意, 树的深度增加了 1, 但是新树保持了高度平衡, 而没有违反任何 B+ tree 属性。

至此我们已经分配了合适的子节点并将上半部分移入其中。 我们的函数将右边的子节点作为输入, 并分配一个新页面来存储左边的子节点。

[create_new_root(Table* table, uint32_t right_child_page_num)]
/*
 * Handle splitting the root.
 * Old root copied to new page, becomes left child.
 * Address of right child passed in.
 * Re-initialize root page to contain the new root node.
 * New root node points to two children.
 */

void* root = get_page(table->pager, table->root_page_num);
void* right_child = get_page(table->pager, right_child_page_num);
uint32_t left_child_page_num = get_unused_page_num(table->pager);
void* left_child = get_page(table->pager, left_child_page_num);

旧的根节点将复制到左子节点, 因此我们可以重用根节点页:

[create_new_root(Table* table, uint32_t right_child_page_num)]
/* Left child has data copied from old root */
memcpy(left_child, root, PAGE_SIZE);
set_node_root(left_child, false);

最后, 我们将根页面初始化为具有两个子节点的新内部节点。

[create_new_root(Table* table, uint32_t right_child_page_num)]
/* Root node is a new internal node with one key and two children */
initialize_internal_node(root);
set_node_root(root, true);
*internal_node_num_keys(root) = 1;
*internal_node_child(root, 0) = left_child_page_num;
uint32_t left_child_max_key = get_node_max_key(left_child);
*internal_node_key(root, 0) = left_child_max_key;
*internal_node_right_child(root) = right_child_page_num;

10.5 内部节点格式

现在我们终于创建了一个内部节点, 我们必须定义它的布局。 它从公共 header 开始, 然后是它所包含的键的数量, 然后是它最右边的子节点的页码。 内部节点总是比它们的键多一个子节点指针。 这个额外的子节点指针被存储在 header 中。

/*
* Internal Node Header Layout
*/
const uint32_t INTERNAL_NODE_NUM_KEYS_SIZE = sizeof(uint32_t);
const uint32_t INTERNAL_NODE_NUM_KEYS_OFFSET = COMMON_NODE_HEADER_SIZE;
const uint32_t INTERNAL_NODE_RIGHT_CHILD_SIZE = sizeof(uint32_t);
const uint32_t INTERNAL_NODE_RIGHT_CHILD_OFFSET =
        INTERNAL_NODE_NUM_KEYS_OFFSET + INTERNAL_NODE_NUM_KEYS_SIZE;
const uint32_t INTERNAL_NODE_HEADER_SIZE = COMMON_NODE_HEADER_SIZE +
        INTERNAL_NODE_NUM_KEYS_SIZE +
        INTERNAL_NODE_RIGHT_CHILD_SIZE;

主体是一个单元格数组, 其中每个单元格都包含一个子节点指针和一个键。 每个键应该是子级左侧包含的最大键。

/*
* Internal Node Body Layout
*/
const uint32_t INTERNAL_NODE_KEY_SIZE = sizeof(uint32_t);
const uint32_t INTERNAL_NODE_CHILD_SIZE = sizeof(uint32_t);
const uint32_t INTERNAL_NODE_CELL_SIZE =
        INTERNAL_NODE_CHILD_SIZE + INTERNAL_NODE_KEY_SIZE;

根据这些常数, 以下是内部节点的布局:

../../_images/internal-node-format.png

Our internal node format

注意我们巨大的分支因子。 由于每个子节点指针 / 键对都非常小, 因此我们可以在每个内部节点中容纳 510 个键和 511 个子指针。 这意味着我们将不必遍历树的许多层来找到给定的键!

# internal node layers

max # leaf nodes

Size of all leaf nodes

0

511^0 = 1

4 KB

1

511^1 = 512

-2 MB

2

511^2 = 261,121

-1 GB

3

511^3 = 133,432,831

-550 GB

实际上由于头部 、 键和浪费的空间的开销, 我们不能在每个叶子节点上存储整整 4KB 的数据。 但是我们可以通过从磁盘加载 4 个页面来搜索大约 500GB 的数据。 这就是为什么 B 树是数据库的一个有用的数据结构。

下面是对内部节点进行读写的方法:

uint32_t* internal_node_num_keys(void* node)
{
    return node + INTERNAL_NODE_NUM_KEYS_OFFSET;
}

uint32_t* internal_node_right_child(void* node)
{
    return node + INTERNAL_NODE_RIGHT_CHILD_OFFSET;
}

uint32_t* internal_node_cell(void* node, uint32_t cell_num)
{
    return node + INTERNAL_NODE_HEADER_SIZE + cell_num * INTERNAL_NODE_CELL_SIZE;
}

uint32_t* internal_node_child(void* node, uint32_t child_num)
{
    uint32_t num_keys = *internal_node_num_keys(node);
    if (child_num > num_keys)
    {
        printf("Tried to access child_num %d > num_keys %d\n", child_num, num_keys);
        exit(EXIT_FAILURE);
    } else if (child_num == num_keys)
    {
        return internal_node_right_child(node);
    } else {
        return internal_node_cell(node, child_num);
    }
}

uint32_t* internal_node_key(void* node, uint32_t key_num)
{
    return internal_node_cell(node, key_num) + INTERNAL_NODE_CHILD_SIZE;
}

10.5 内部节点格式

对于内部节点, 最大键始终是其右键。 对于叶节点, 这是最大索引处的关键:

uint32_t get_node_max_key(void* node) {
    switch (get_node_type(node))
    {
        case NODE_INTERNAL:
            return *internal_node_key(node, *internal_node_num_keys(node) - 1);
        case NODE_LEAF:
            return *leaf_node_key(node, *leaf_node_num_cells(node) - 1);
    }
}

10.6 追踪根源

我们最后使用普通节点头中的 is_root 字段。 回顾一下, 我们用它来决定如何分割一个叶子节点:

[leaf_node_split_and_insert(Cursor* cursor, uint32_t key, Row* value)]
if (is_node_root(old_node))
{
    return create_new_root(cursor->table, new_page_num);
} else {
    printf("Need to implement updating parent after split\n");
    exit(EXIT_FAILURE);
}

这是 gettersetter

bool is_node_root(void* node)
{
    uint8_t value = *((uint8_t*)(node + IS_ROOT_OFFSET));
    return (bool)value;
}

void set_node_root(void* node, bool is_root)
{
    uint8_t value = is_root;
    *((uint8_t*)(node + IS_ROOT_OFFSET)) = value;
}

初始化这两种类型的节点是应该默认将 is_root 设置为 false:

void initialize_leaf_node(void* node)
{
    set_node_type(node, NODE_LEAF);
    set_node_root(node, false);
    *leaf_node_num_cells(node) = 0;
}

void initialize_internal_node(void* node)
{
    set_node_type(node, NODE_INTERNAL);
    set_node_root(node, false);
    *internal_node_num_keys(node) = 0;
}

我们应该在创建表的第一个节点时将 is_root 设置为 true:

Table* db_open(const char* filename)
{
    Pager* pager = pager_open(filename);

    Table* table = malloc(sizeof(Table));
    table->pager = pager;
    table->root_page_num = 0;

    if (pager->num_pages == 0)
    {
        // New database file. Initialize page 0 as leaf node.
        void* root_node = get_page(pager, 0);
        initialize_leaf_node(root_node);
        set_node_root(root_node, true);
    }

    return table;
}

10.7 打印树

为了帮助我们可视化数据库的状态, 我们应该更新 .btree 元指令以打印多级树。

我将替换当前的 print_leaf_node() 函数。

一个新的递归函数, 该函数接受任何节点, 然后打印该节点及其子节点。 它以缩进级别作为参数, 每次递归调用时都会增加。 我还添加了一个小的辅助函数来缩进。

void indent(uint32_t level)
{
    for (uint32_t i = 0; i < level; i++)
    {
        printf("  ");
    }
}

void print_tree(Pager* pager, uint32_t page_num, uint32_t indentation_level)
{
    void* node = get_page(pager, page_num);
    uint32_t num_keys, child;

    switch (get_node_type(node))
    {
        case (NODE_LEAF):
            num_keys = *leaf_node_num_cells(node);
            indent(indentation_level);
            printf("- leaf (size %d)\n", num_keys);
            for (uint32_t i = 0; i < num_keys; i++)
            {
                indent(indentation_level + 1);
                printf("- %d\n", *leaf_node_key(node, i));
            }
            break;
        case (NODE_INTERNAL):
            num_keys = *internal_node_num_keys(node);
            indent(indentation_level);
            printf("- internal (size %d)\n", num_keys);
            for (uint32_t i = 0; i < num_keys; i++)
            {
                child = *internal_node_child(node, i);
                print_tree(pager, child, indentation_level + 1);

                indent(indentation_level + 1);
                printf("- key %d\n", *internal_node_key(node, i));
            }
            child = *internal_node_right_child(node);
            print_tree(pager, child, indentation_level + 1);
            break;
    }
}

并更新对打印函数的调用, 缩进级别为零。

else if(strcmp(input_buffer->buffer, ".btree") == 0){
        printf("Tree:\n");
        print_tree(table->pager, 0, 0);
        return META_COMMAND_SUCCESS;
    }

这是新打印功能的测试用例!

it 'allows printing out the structure of a 3-leaf-node btree' do
    script = (1..14).map do |i|
    "insert #{i} user#{i} person#{i}@example.com"
    end
    script << ".btree"
    script << "insert 15 user15 person15@example.com"
    script << ".exit"
    result = run_script(script)

    expect(result[14...(result.length)]).to match_array([
    "db > Tree:",
    "- internal (size 1)",
    "  - leaf (size 7)",
    "    - 1",
    "    - 2",
    "    - 3",
    "    - 4",
    "    - 5",
    "    - 6",
    "    - 7",
    "  - key 7",
    "  - leaf (size 7)",
    "    - 8",
    "    - 9",
    "    - 10",
    "    - 11",
    "    - 12",
    "    - 13",
    "    - 14",
    "db > Need to implement searching an internal node",
    ])
end

新格式有所简化, 因此我们需要更新现有的 .btree 测试:

it 'allows printing out the structure of a one-node btree' do
    script = [3, 1, 2].map do |i|
    "insert #{i} user#{i} person#{i}@example.com"
    end
    script << ".btree"
    script << ".exit"
    result = run_script(script)

    expect(result).to match_array([
    "db > Executed.",
    "db > Executed.",
    "db > Executed.",
    "db > Tree:",
    "- leaf (size 3)",
    "  - 1",
    "  - 2",
    "  - 3",
    "db > "
    ])
end

这是新测试本身的 .btree 输出:

Tree:
- internal (size 1)
    - leaf (size 7)
        - 1
        - 2
        - 3
        - 4
        - 5
        - 6
        - 7
    - key 7
    - leaf (size 7)
        - 8
        - 9
        - 10
        - 11
        - 12
        - 13
        - 14

在最小缩进级别上, 我们看到根节点 (内部节点)。 之所以说是 1 号, 是因为它有一个 key。 缩进一个级别, 我们看到一个叶节点, 一个键和另一个叶节点。 根节点 (7) 中的密钥是第一个叶节点中的最大密钥。 每个大于 7 的键都在第二个叶子节点中。

10.8 严重问题

如果你一直在密切关注, 你可能会发现我们错过了一些重要的东西。 看看如果我们尝试插入一个额外的行会发生什么:

db > insert 15 user15 person15@example.com
Need to implement searching an internal node

哎呀! 谁写了那个 TODO 消息? :P

接下来, 我们将通过在多级树上执行搜索来继续史诗般的 B 树传奇。

这里[8] 是本节代码所有的改动。