In a previous post, I went through Lab 1 of the MIT Distributed Systems course implementing MapReduce in Go. In this post, I’ll quickly go through lab 2, which had a much simpler assignment: implementing a simple key-value server in go. This was simple enough that I was thinking whether it’s even worthy of a post or not, but I decided to do a small one anyways.
Problem
The task is to implement a key/value server with the following specifications:
The server runs on a single machine and maintains an in-memory map of key/value pairs.
It should support three types of operations:
Put(key, value): Installs or replaces the value for a given key
Append(key, arg): Appends arg to the existing value of the key and returns the old value
Get(key): Retrieves the current value for the key
We should also build a client that can be used to calls the server to execute these operations.
Each operation should be executed exactly once, even in the presence of network failures.
Edge cases:
Get on a non-existent key should return an empty string.
Append to a non-existent key should treat the initial value as an empty string.
Solution
Building a first version of this without caring too much about client retries and exactly once execution is pretty simple. We keep a map in memory and make changes to the map as needed.
type KVServer struct {
mu sync.Mutex
Entries map[string]string
}
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
kv.mu.Lock()
defer kv.mu.Unlock()
if val, exists := kv.Entries[args.Key]; exists {
reply.Value = val
return
}
reply.Value = ""
}
func (kv *KVServer) Put(args *PutAppendArgs, reply *PutAppendReply) {
kv.mu.Lock()
defer kv.mu.Unlock()
kv.Entries[args.Key] = args.Value
reply.Value = args.Value
}
func (kv *KVServer) Append(args *PutAppendArgs, reply *PutAppendReply) {
kv.mu.Lock()
defer kv.mu.Unlock()
oldValue, exists := kv.Entries[args.Key]
if exists {
kv.Entries[args.Key] = oldValue + args.Value
reply.Value = oldValue
} else {
kv.Entries[args.Key] = args.Value
reply.Value = ""
}
}
func MakeKVServer() *KVServer {
return &KVServer{
Entries: make(map[string]string),
}
}
The client then makes RPC calls to the server. The lab provides helpers that handle running the server and provide methods for making RPC calls, so this task is also relatively easy.
func MakeClerk(server *labrpc.ClientEnd) *Clerk {
ck := new(Clerk)
ck.server = server
return ck
}
// fetch the current value for a key.
// returns "" if the key does not exist.
// keeps trying forever in the face of all other errors.
//
// you can send an RPC with code like this:
// ok := ck.server.Call("KVServer.Get", &args, &reply)
//
// the types of args and reply (including whether they are pointers)
// must match the declared types of the RPC handler function's
// arguments. and reply must be passed as a pointer.
func (ck *Clerk) Get(key string) string {
args := GetArgs{Key: key}
reply := GetReply{}
ok := ck.server.Call("KVServer.Get", &args, &reply)
if ok {
return reply.Value
}
return ""
}
// shared by Put and Append.
//
// you can send an RPC with code like this:
// ok := ck.server.Call("KVServer."+op, &args, &reply)
//
// the types of args and reply (including whether they are pointers)
// must match the declared types of the RPC handler function's
// arguments. and reply must be passed as a pointer.
func (ck *Clerk) PutAppend(key string, value string, op string) string {
rpcName := "KVServer." + op
args := PutAppendArgs{Key: key, Value: value}
reply := PutAppendReply{}
ok := ck.server.Call(rpcName, &args, &reply)
if ok {
return reply.Value
}
return ""
}
func (ck *Clerk) Put(key string, value string) {
ck.PutAppend(key, value, "Put")
}
// Append value to key's value and return that value
func (ck *Clerk) Append(key string, value string) string {
return ck.PutAppend(key, value, "Append")
}
The only thing that remains is to add some retries and make the server not execute the same request twice. To do this, we start sending a Client ID and a Request ID to the server with each request. For the purposes of the lab, whenever we initialize a new client, we give it a random number as the client ID. For each client, request IDs start with zero and go up by one on each request made.
Here’s what the Get and PutAppend methods looks like after we add retries (with sleeps of 100ms between each retry).
func (ck *Clerk) Get(key string) string {
args := GetArgs{Key: key, ClientId: ck.clientId, RequestId: ck.requestId}
ck.requestId++
for {
reply := GetReply{}
ok := ck.server.Call("KVServer.Get", &args, &reply)
if ok {
return reply.Value
}
time.Sleep(100 * time.Millisecond)
}
}
func (ck *Clerk) PutAppend(key string, value string, op string) string {
rpcName := "KVServer." + op
args := PutAppendArgs{Key: key, Value: value, ClientId: ck.clientId, RequestId: ck.requestId}
ck.requestId++
for {
reply := PutAppendReply{}
ok := ck.server.Call(rpcName, &args, &reply)
if ok {
return reply.Value
}
time.Sleep(100 * time.Millisecond)
}
}
After this, all that remains is making sure the server tracks these request and client IDs and does not execute something multiple times while replying to same request (if the client multiple requests). To track this, we add two maps to the server:
type KVServer struct {
mu sync.Mutex
Entries map[string]string
lastRequestId map[int64]int64 // key: clientId, value: lastRequestId
appendReplies map[int64]string // key: clientId, value: last reply for Append
}
The lastRequestId map tracks the last request that the server successfully replied to for each client. Then if we get a request that has an ID lesser than the lastRequestId, we know that we’ve already executed that request and return data instead of executing the request again.
func (kv *KVServer) requestAlreadyDone(clientId int64, requestId int64) bool {
lastRequestId, exists := kv.lastRequestId[clientId]
if !exists {
return false
}
return requestId <= lastRequestId
}
func (kv *KVServer) Put(args *PutAppendArgs, reply *PutAppendReply) {
kv.mu.Lock()
defer kv.mu.Unlock()
if kv.requestAlreadyDone(args.ClientId, args.RequestId) {
reply.Value = kv.Entries[args.Key]
return
}
kv.Entries[args.Key] = args.Value
reply.Value = args.Value
kv.lastRequestId[args.ClientId] = args.RequestId
}
We ned the appendReplies map for one specific reason. The Append operation returns the value of the key BEFORE the new value is appended to it. We need to use appendReplies to track this, because this old data will not be available in the Entries map after we’ve executed the request.
func (kv *KVServer) Append(args *PutAppendArgs, reply *PutAppendReply) {
kv.mu.Lock()
defer kv.mu.Unlock()
if kv.requestAlreadyDone(args.ClientId, args.RequestId) {
reply.Value = kv.appendReplies[args.ClientId]
return
}
oldValue, exists := kv.Entries[args.Key]
if exists {
kv.Entries[args.Key] = oldValue + args.Value
reply.Value = oldValue
} else {
kv.Entries[args.Key] = args.Value
reply.Value = ""
}
kv.lastRequestId[args.ClientId] = args.RequestId
kv.appendReplies[args.ClientId] = reply.Value
}
That’s it! I ran the provided tests using `go test`
to make sure the code does what it’s supposed to.
Fin
If you read until here, thank you. This was not the most complicated assignment, but I figured I shouldn’t skip any assignment. Next up is Lab 3, which involves implementing Raft, which should require much more depth. See you then!
As always, if you liked this post and would like to see more, please let me know by leaving a comment. You can also support me by supporting my personal project, blunderfest.