Ryan Zhu — kv-store

a concurrent in-memory kv-store

Feburary 24, 2024


I tried writing FASTER, a concurrent kv-store from Microsoft Research. But it didn’t work. One day I’ll find out why and rage over it. But here’s the process to there.

Problem

A key value store aims to provide an interface to store and retrieve data associated with keys. Canonically, they are able to support 4 operations:

For the sake of simplicity, my kv-store reduces the problem to keys of 30 characters or less, and values being 4 byte signed integers.

The most trivial solution is to make a single linked list of key-value pairs, and iterate through the list to find the key. This is $O(n)$ for all operations, and it’s trash. It works, but if you ever find yourself with more than a mordicum; a morsel; a miniscule amount of data, it’ll take longer than that Somali runner’s 100m.

Binary Search Trees

Ok, you say, let’s sort these keys in a BST and traverse the tree. Then, we’ll have $O(\log n)$ for all operations. If you use a balanced tree, then this is guaranteed, and you’ll get pretty reasonable results.

If you’re satisfied with that, you are a far better man than I. There’s so much untapped potential here, and I must rip it out. Even if it means I must discard 48 straight hours in the middle of a school week.

Hash Tables

Our next trick is basically going to be the naive version, except we partition it up.

As a motivating example, let’s say we’re given a bunch of colored toys, with numbers written on them. We’d like to answer the questions like is there a green ball with the number 4? Or, change the number on the blue cube to 2. Well, it’d be pretty nice if we could just… group by color? Then, you look through the color that you actually care about, and depending on the number of colors and the colors of the toys, in reasonable cases you’ll be looking through less toys.

Good thing hash tables do exactly that, except instead of colors, we’re using a cold, mechanical, and unlovable hash function. We’ll make a nice and big number of buckets, something like $2^{20} \sim 10^6$. We then have a hash function with the signature

uint64_t hash(const char* key);

This will give us the “color” of the key, and we can get the bucket by taking the modulo of the hash with the number of buckets.

Is this any faster than the BST? God cares if I know, because I didn’t measure. It certainly depends on your hash function, and the distribution of your data, but under optimal conditions, we can expect $O(1)$ for all operations. I’m not going to talk about hashing functions. I could, and talk about things like Cuckoo hashing and probabilistic guarantees, but does anyone really care? Not really, and we can just salt our string and hope the avalanching of the hash function is good enough.

Either way, just make the table big enough, and you’ll get pretty good performance. Java and C++ use a load factor of around $0.75$, so we can expect $0.75$ items in each bucket on average. If you use Markov’s inequality and assuming our hash function is fully uniform (big assumption, but who cares) with similar load factor, then we see that $$ \mathbb{P}[\text{bucket has more than 10 items}] \leq \frac{0.75}{10} = 0.075. $$

I promised no math, but I lied. This is a pretty good guarantee, and we can expect $O(1)$ for lookups, since we’ll be looking through less than 10 items at least 92% of the time.

A balanced BST will have path lengths of $O(\log n),$ so on average we’re going to be doing $\log n$ comparisons. If $n > 2^{10} \sim 1000$ then we’re doing more comparisons on average. Unless you realy really care about the $0.1\text{%}$ performance, the hash table will clear, and even if you do care it’s probably better still from the expensive rebalancing operations.

I forget the speed of using one thread (and can’t be bothered to measure), but std::unordered_map<std::string, int> does about 3M operations per second on my machine, although the default string hash is known to be pretty slow. My table does similar, maybe a little faster from the lack of other goodies that unordered_map provides.

Our hash table kv will look like the following, and I can’t be bothered to inculde a balanced BST, so here’s a link to that:

typedef struct kv {
    char key[30];
    int value;
    struct kv* next;
} kv_t;

kv_t table[1 << 20];

int get(const char* key) {
    uint64_t h = hash(key) & ((1 << 20) - 1);
    kv_t* p = table[h];
    while (p) {
        if (strcmp(p->key, key) == 0) {
            return p->value;
        }
        p = p->next;
    }
    return -1;
}

void upsert(const char* key, int value) {
    uint64_t h = hash(key) & ((1 << 20) - 1);
    kv_t* p = table[h];
    while (p) {
        if (strcmp(p->key, key) == 0) {
            p->value = value;
            return;
        }
        p = p->next;
    }
    kv_t* new = malloc(sizeof(kv_t));
    strcpy(new->key, key);
    new->value = value;
    new->next = table[h];
    table[h] = new;
}

void read_modify_write(const char* key, int (*f)(int), int value) {
    uint64_t h = hash(key) & ((1 << 20) - 1);
    kv_t* p = table[h];
    while (p) {
        if (strcmp(p->key, key) == 0) {
            p->value = f(p->value);
            return;
        }
        p = p->next;
    }
    kv_t* new = malloc(sizeof(kv_t));
    strcpy(new->key, key);
    new->value = value;
    new->next = table[h];
    table[h] = new;
}

void delete(const char* key) {
    uint64_t h = hash(key) & ((1 << 20) - 1);
    kv_t* p = table[h];
    kv_t* q = NULL;
    if (!p) {
        return;
    }
    if (strcmp(p->key, key) == 0) {
        table[h] = p->next;
        free(p);
        return;
    }
    while (p) {
        if (strcmp(p->key, key) == 0) {
            q->next = p->next;
            free(p);
            return;
        }
        q = p;
        p = p->next;
    }
}

Cache Optimization

Another thing that Hash Tables may buy us is memory locality (not in our circumstance though lol, but it does in the original paper). Today’s CPUs are extremely fast, so memory access from main memory is slow, on the order of a few hundred cycles, or around 200 nanoseconds slow (source: me). To avoid fetching from main memory each time, when we fetch memory CPUs will actually load a whole cache line, which traditionally is 64 bytes, but on my M2, it’s 128 bytes and varies by architecture. Thus, if we load a memory address, we get the 63 other bytes basically for free, and want to use them.

Our memory design is actually pretty hard to cache optimize, but one thing we can do is to make sure the kv_t struct is aligned to cache lines, so we now have

typedef struct kv {
    _Alignas(64) char key[32];
    int value;
    struct kv* next;
} kv_t;

We can do more, but our data is already extremely compact, so beyond compressing records together (which causes extra cache misses anyways), there’s not much to do here to mitigate cache misses.

My mortal mind could also just be missing something, and this could be suboptimal.

Concurrency

Now, let’s just add more threads! What could go wrong? Nothing really, in my opinion, but in some people’s opinion, a lot does go wrong. In particular, if we call delete on two consecutive elements at once, we can potentially lose some of the elements. My face when thread compeition :sob:. The solution is to lock the system. We’re going to introduce a table wide lock, let’s make it a read write lock for fun. Then, when we want to do a read operation (get), we’ll lock the table for reading, and when we want to do a write operation (upsert, delete, read_modify_write), we’ll lock the table for writing. C already has these primitives in the pthreads library, so we’ll use those. The distinction between read-write locks and normal mutexes is that we can have multiple readers at once, while mutexes can’t be shared. Thus, if our workload is read heavy, we’ll reap the benefits.

I’m not writing it out, because that’s pretty bad. Instead, we’ll use per-bucket locks, which is still relatively heavyweight, but getting closer to optimal. Instead of locking the entire table, we’ll just lock one bucket at a time respectively. Because if we’re looking at green toys, why do we care about the blue things? If Kai from Kung Fu Panda comes and is happily jadeifying the blue bucket toys in its entirety, that still has nothing to do with the green things (we are assuming a dystopian and segregated toy society). So we’ll have a secondary array of mutexes, each one corresponding to a bucket. Our code now has the following changes

// remember to init these
pthread_rwlock_t locks[1 << 20];
...
int get(const char* key) {
    uint64_t h = hash(key) & ((1 << 20) - 1);
    pthread_rwlock_rdlock(&locks[h]);
    ...
    // in every return path
    pthread_rwlock_unlock(&locks[h]);
}

void upsert(const char* key, int value) {
    uint64_t h = hash(key) & ((1 << 20) - 1);
    pthread_rwlock_wrlock(&locks[h]);
    ...
    // in every return path
    pthread_rwlock_unlock(&locks[h]);
}

void read_modify_write(const char* key, int (*f)(int), int value) {
    uint64_t h = hash(key) & ((1 << 20) - 1);
    pthread_rwlock_wrlock(&locks[h]);
    ...
    // in every return path
    pthread_rwlock_unlock(&locks[h]);
}

void delete(const char* key) {
    uint64_t h = hash(key) & ((1 << 20) - 1);
    pthread_rwlock_wrlock(&locks[h]);
    ...
    // in every return path
    pthread_rwlock_unlock(&locks[h]);
}

Very menial. This is pretty good now, and on my M2 Pro with 10 cores, under a load of 80:10:5:5 R:W:RMW:D workload, it hits around 7 million operations per second if I remember correctly. These benchmarks are hand baked, and the proccess for that is basically

generate set of M keys of random length, M ~ 1e7

every thread:
sample operation from above ratio
sample key from above set
perform operation

Is this reasonable? I don’t know, since this was my first time really measuring systems performance here. But it felt really bad, since a single core hits around 3 million (source: me). The 9 extra cores are barely worth more than double performance.

Atomic Operations

the CAS godsend

To make things better, we realize that we really don’t need to lock the entire bucket when we’re operationg on it. Like, if we’re in a bucket of length 5, and we have a concurrent read on the first element and a delete for the third element, there’s no reason why these two can’t happen at the same time. So instead of locks, we’ll be using atomic operations. These basically are mutexes over operations, but basically to every thread atomic operations look like a single operation, and we won’t get in between states. They’re implemented at a hardware level, so hopefully they’re faster than locking mutexes each time. More importantly, we don’t lock the entire bucket at a time, we’re basically giving each record a lock, but without the extra storage.

The operation that we care about, since the others don’t matter, is the compare and swap (CAS) operation. If we have an atomic variable x, then CAS(&x, &old, new) is more or less

if (*x == *old) {
    *x = new;
    return true;
} else {
    *old = *x
    return false;
}

except this entire thing is atomic. Similarily, there are also store(&x, value) and get(&x), which you should be able to figure out what they do. Thus, what we really want here is to change our operations so that whenever we modify something in memory (shared), we’ll do a CAS and if it doesn’t work, we’ll just try again. We hope that the overhead of trying again will be larger than the overhead of locking, meaning that we have a smaller number being subtracted from a larger number, giving us still a positive decrease in time.

In symbols, we want $t_{\text{CAS}} < t_{\text{lock}}$. I know this equation can be hard to understand, but the time will roughly be $t_{\text{Base}} + t_{\text{CAS}} + t_{\text{retry}} < t_{\text{Base}} + t_{\text{lock}}$, so hopefully we will be able to have a smaller time. (there was no reason for this)

Anyways, we modify our operations as follows, where C provides the _Atomic keyword to make something atomic (shocker). Surprisingly it’s actually generic. I’ll split it up this time, since it’s a few more changes than before.

typedef struct kv {
    _Alignas(64) char key[32];
    _Atomic int value;
    _Atomic (struct kv*) next;
} kv_t;

_Atomic (kv_t*) table[1 << 20];

We change most things to be atomic, where the value must be atomic, as does the pointer to next, because these can be modified. The key is never modified, so it need not be atomic. The table now contains atomic pointers, since these pointers can have contention with inserts and deletes.

Get stays the same, since reading doesn’t cause contention. We’ll change the other operations.

void upsert(const char* key, int value) {
    uint64_t h = hash(key) & ((1 << 20) - 1);
    kv_t* head = atomic_load(&table[h]);
    kv_t* prev = NULL;
    while (head) {
        if (strcmp(head->key, key) == 0) {
            atomic_store(&head->value, value);
            return;
        }
        prev = head;
        head = atomic_load(&head->next);
    }
    kv_t* new = malloc(sizeof(kv_t));
    strcpy(new->key, key);
    new->value = value;
    new->next = NULL;
    prev = table[h];
    if (atomic_compare_exchange_strong(&table[h], &prev, new)) {
        return;
    }

    free(new);

    // try again
    upsert(key, value);
}

void read_modify_write(const char* key, int (*f)(int), int value) {
    uint64_t h = hash(key) & ((1 << 20) - 1);
    kv_t* head = table[h];
    while (head) {
        if (strcmp(head->key, key) == 0) {
            int old;
            do {
                old = atomic_load(&head->value);
            } while (!atomic_compare_exchange_strong(&head->value, &old, f(old)));
            return;
        }
        head = head->next;
    }
    // try to insert. we actually cannot call upsert here, because it may get inserted inbetween (i might be yapping out of my mind here)
    kv_t* new = malloc(sizeof(kv_t));
    strcpy(new->key, key);
    new->value = value;
    new->next = NULL;
    prev = table[h];
    if (atomic_compare_exchange_strong(&table[h], &prev, new)) {
        return;
    }

    free(new);
    // try again
    read_modify_write(key, f, value);
}

Delete is a bit more changed.

void delete(const char* key) {
    uint64_t h = hash(key) & ((1 << 20) - 1);
    kv_t* head = table[h];
    kv_t* prev = NULL;
    if (head == NULL) {
        return;
    }
    kv_t *next = atomic_load(&head->next);
    if (strcmp(head->key, key) == 0) {
        if (atomic_compare_exchange_strong(&table[h], &head, next)) {
            // technically wrong, we’ll correct this later
            free(head);
            return;
        }
        // try again
        delete(key);
        return;
    }
    prev = head;
    head = next;
    while (head) {
        next = atomic_load(&head->next);
        if (strcmp(head->key, key) == 0) {
            if (atomic_compare_exchange_strong(&prev->next, &head, next)) {
                free(head);
                return;
            }
            // try again
            delete(key);
            return;
        }
        prev = head;
        head = next;
    }
}

Epoching

If one thinks hard enough and submits themselves to the mercy of the scheduler, it will dawn on one’s wise mind that if there is a concurrent call to delete and get at the same time, it’s quite easy to get a segfault. This happens when you call free(head) in delete, but head happens to be the pointer you’re using in get. Thus, FASTER from Microsoft (I am finally referencing it) uses an epoch system. With such an epoch system, each thread has a thread local epoch time $E_t$ as well as a global epoch time $E_g$. It ensures that we only free things after everyone is done using the items we’re about to free.

On deletes, we swap out the pointer, meaning this pointer is no longer accessible, but another thread may have access to it still. So instead of calling free on the memory, we’ll increment the global epoch timer, and add the pointer to our thread local delete queue along with the current time. After every operation (or few), we’ll update our local epoch time to the global time, and every few cycles, we’ll check if all the threads have updated their local epoch time to be greater than the epoch time when we deleted the item, then we know that every thread is done with the item, and we can safely free it. No segfaults now!

Thread State:

_Thread_local uint64_t epoch;
_Atomic uint64_t global_epoch = 0;

typedef struct delete_queue {
    kv_t* ptr;
    uint64_t time;
    struct delete_queue* next;
} delete_queue_t;
_Thread_local *delete_queue_t delete_queue_head;
_Thread_local *delete_queue_t delete_queue_tail;

// what you think it is
void delete_insert(kv_t* ptr, uint64_t time);

Then, on deletes, we have

// if swapped
uint64_t time = atomic_fetch_add(&global_epoch, 1) + 1;
delete_insert(head, time);

And after every operation, we have

epoch = atomic_load(&global_epoch);

Every once in a while, we will go through our delete queue, and check if we can delete any items. For simplicity, I’m ignoring how we get the epoch times of other threads, but it’s another global array of atomics.

uint64_t safe_epoch = epoch;
for (int i=0;i<thread_count;i++) {
    safe_epoch = min(safe_epoch, atomic_load(&thread_epoch[i]));
}
while (delete_queue_head != NULL && delete_queue_head->time < safe_epoch) {
    free(head->ptr);
    delete_queue_head = delete_queue_head->next;
}
if (delete_queue_head == NULL) {
    delete_queue_tail = NULL;
}

Conclusion

My implementation segfaults. It’s ggs, and I really don’t know why. I had to give up, because I was balls deep doing work for a class I wasn’t taking, while I blissfully ignored my dreaded n*mber theory homework, watching the quadratic integer fields pile up.

Even so, I proceeded to spend around 10 hours debugging, tearing through a decade of data race tooling (drd is god awful, and I’m pretty sure it’s wrong because it said I had 10 million data races - if it’s right, I’m just a terrible programmer). I pulled out helgrind, but this turned out to be how drd was implemented because it was the same goddamn error messages. I rewrote it 3 (three) times, and each time it would work for while, before segfaulting spectacularly and crushing the dreams I had for finally making this work. On the benchmarks I ran, the scaling was much more linear with mutexes, and on the same workload as above, hit around 16–18 million operations per second, which is basically perfect linear scaling up to 6 cores (6 because of big.LITTLE). That’s pretty good! Microsoft reports 160Mop/s on 64 cores or something, from 2018, so unless the Xeon caches are enormous (M2 has huge caches), then I’m still missing something. Until then, I’ve been defeated, since the M2 IPC and memory latency must be far better than some skylake Xeon.

I am doing the FASTER paper a disservice, it’s a cool paper and an in memory store is the first part of it. It also allows dynamic allocation, log file system, and also out of too large for memory objects as well. Check it out here: Faster. Badrish Chandramouli et al. ACM SIGMOD ’18

Built with Pollen and Racket, inspired by
  Eric Zhang