Now that I’m working on infrastructure for day job, I figured it would be a good idea for me to learn distributed systems basics. So I’m going to go through the lectures for MIT 6.824 Distributed Systems and do the labs for the course.
In this post, I’ll go through the first lab, where the assignment was to implement MapReduce in Go.
What is MapReduce?
MapReduce is a very popular way to run parallel jobs for processing large datasets easily. The original paper by Jeff Dean and Sanjay Ghemawat is worth a read, but I’ll quickly go through the details and then discuss my implementation.
Let’s understand the programming model using the example from the paper: word counts. Our task is to count the occurrences of each unique word across a large collection of text files. To do this, we define a map function and a reduce function that we’ll pass to the MapReduce framework.
Map function
type KeyValue struct {
Key string
Value string
}
func Map(filename string, contents string) []KeyValue {
// function to detect word separators.
ff := func(r rune) bool { return !unicode.IsLetter(r) }
// split contents into an array of words.
words := strings.FieldsFunc(contents, ff)
kva := []mr.KeyValue{}
for _, w := range words {
kv := mr.KeyValue{w, "1"}
kva = append(kva, kv)
}
return kva
}
The Map function is called once for each file of input. The first argument is the name of the input file and the second is the file’s contents. The return value is a slice of key-value pairs, with the word as the key and “1” as the value.
Reduce function
func Reduce(key string, values []string) string {
// return the number of occurrences of this word.
return strconv.Itoa(len(values))
}
The Reduce function is called once for each key generated by the map tasks, meaning that it’s called once for each unique word. The second parameter to the Reduce function is a slice of ALL the values that the map function generated for the key. So to count the number of times the word occured in all our text files, we can just return the length of the slice passed as the second parameter.
How MapReduce uses these functions
We provide these functions to the MapReduce framework. Then, the framework has two types of processes: a co-ordinator and a worker. Tasks are completed like this:
The co-ordinator breaks the input into M splits.
It then passes each split to a worker that calls map on the split. The intermediate key-value pairs returned by the map function call are written to disk in R splits (R is the number of reduce workers). This was the Map phase.
Once all the map tasks are done, the co-ordinator starts assigning reduce tasks. When a worker is assigned a reduce task by the co-ordinator, it reads all the intermediate data required for the task, sorts it by the intermediate keys so that all occurrences of the same key are grouped together.
After sorting, the reduce worker iterates over the sorted intermediate data and for each unique key, calls the reduce function with the key and all the values that the key has. The output is then written to disk. This was the Reduce phase.
When all map and reduce tasks are done, the workers are stopped and the user program can be told that the task has been completed.
A diagram can explain the flow a little better.
Implementation
(Disclaimer: this isn’t the cleanest code and there is lots of room for improvement, but for a first cut I’ve tried to keep it understandable.)
The code for all this can be found in `coordinator.go` and `worker.go`. For now, since it’s only for educational purposes, we assume that both the coordinator and the workers run on the same machine.
Coordinator
First we define a Task
and a Coordinator
.
type TaskStatus int
const (
NotStarted TaskStatus = iota
Assigned
Completed
)
type Task struct {
taskStatus TaskStatus
assignedAt time.Time
}
type Coordinator struct {
mu sync.Mutex
Files []string
MapTasks []Task
ReduceTasks []Task
done chan struct{}
}
A task tracks its current status and when it was assigned to a worker. The assignedAt field is used by the coordinator to see if some worker has not done the task in a while and re-assign to some other worker if needed.
The coordinator has the following fields:
mu
: Mutex to ensure thread-safe access to shared data like the MapTasks and ReduceTasks slicesFiles
: The input files we’ll process.MapTasks
: A slice to track all the Map tasks we need to doReduceTasks
: A slice to track all the Reduce tasksdone
: Channel to signal when all tasks are completed
The coordinator has a server()
method to start a server that listens for RPC calls from the workers.
func (c *Coordinator) server() {
rpc.Register(c)
rpc.HandleHTTP()
sockname := coordinatorSock()
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
}
We create a MakeCoordinator
method that takes a list of input files and the number of reduce tasks we want to run, starts a coordinator server and returns the coordinator.
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{
Files: files,
MapTasks: make([]Task, len(files)),
ReduceTasks: make([]Task, nReduce),
done: make(chan struct{}),
}
fmt.Printf("Coordinator: MakeCoordinator\n")
fmt.Printf("Coordinator: files %v\n", files)
fmt.Printf("Coordinator: map tasks %v\n", c.MapTasks)
fmt.Printf("Coordinator: reduce tasks %v\n", c.ReduceTasks)
c.startPeriodicChecks()
c.server()
return &c
}
(The startPeriodicChecks
call checks for tasks that have timed out. We’ll go into those details later, let’s first explore the happy path.)
GetTask
The workers can request the coordinator for a new task using a RPC call to `GetTask
`. The interface for this RPC is simple. You ask for a task (no params needed) and get a reply with an input file name, the operation you need to perform (map or reduce) and some metadata like operation number etc.
type GetTaskArgs struct {}
type GetTaskReply struct {
InputFileName string
Operation string
OperationNumber int
NMap int
NReduce int
WaitForTask bool
}
The implementation of the GetTask method is also pretty straightforward.
Look for map tasks, if you find any, assign and return.
If we didn’t find a map task to assign but all map tasks have not been completed by workers yet, tell the worker to wait for a while. Important note: we cannot assign a reduce task in this case, because we need all map tasks to have finished before we can move on to the reduce phase.
If all map tasks have been completed, try to find a reduce task. If found, assign and return.
Again, all reduce tasks are assigned, but not all of them have finished, tell the worker to wait. This is important because a worker who’s assigned a task might fail and the task would have to be reassigned to another worker.
If all reduce tasks have finished, all the work is done!
Another important note: we need to use the mutex to acquire a lock before making changes to our MapTasks
and ReduceTasks
slices, otherwise different workers asking for tasks at the same time might cause race conditions.
var ErrAllTasksCompleted = fmt.Errorf("all tasks completed")
func (c *Coordinator) GetTask(args *GetTaskArgs, reply *GetTaskReply) error {
c.mu.Lock()
defer c.mu.Unlock()
if i, task := c.findAvailableTask(c.MapTasks); task != nil {
c.assignTask("map", i, task, reply)
return nil
}
if !c.allTasksCompleted(c.MapTasks) {
reply.WaitForTask = true
fmt.Println("No map tasks available, worker should wait")
return nil
}
fmt.Println("All map tasks completed, looking for reduce tasks")
if i, task := c.findAvailableTask(c.ReduceTasks); task != nil {
c.assignTask("reduce", i, task, reply)
return nil
}
if c.allTasksCompleted(c.ReduceTasks) {
return ErrAllTasksCompleted
}
reply.WaitForTask = true
fmt.Println("No reduce tasks available, worker should wait")
return nil
}
func (c *Coordinator) findAvailableTask(tasks []Task) (int, *Task) {
for i, task := range tasks {
if task.taskStatus == NotStarted {
return i, &tasks[i]
}
}
return 0, nil
}
func (c *Coordinator) assignTask(operation string, index int, task *Task, reply *GetTaskReply) {
reply.Operation = operation
reply.OperationNumber = index
reply.NMap = len(c.MapTasks)
reply.NReduce = len(c.ReduceTasks)
reply.WaitForTask = false
task.taskStatus = Assigned
task.assignedAt = time.Now()
if operation == "map" {
reply.InputFileName = c.Files[index]
}
}
MarkTaskCompleted
The coordinator exposes a MarkTaskCompleted
RPC, which the workers call as soon as they finish a task they were assigned.
type MarkTaskCompletedArgs struct {
Operation string
OperationNumber int
}
type MarkTaskCompletedReply struct{}
func (c *Coordinator) MarkTaskCompleted(args *MarkTaskCompletedArgs, reply *MarkTaskCompletedReply) error {
c.mu.Lock()
defer c.mu.Unlock()
if args.Operation == "map" {
c.MapTasks[args.OperationNumber].taskStatus = Completed
return nil
} else if args.Operation == "reduce" {
c.ReduceTasks[args.OperationNumber].taskStatus = Completed
return nil
}
return fmt.Errorf("invalid operation")
}
Worker
Now that we have an understanding of the coordinator, let’s go through the worker. A worker spins up and starts requesting tasks. Once it gets assigned a task, it performs the task and tells the coordinator that the task was done. Then it asks for another task. It does this until it hears from the coordinator that there are no more tasks to do.
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
for {
task, taskExists := GetTask()
if !taskExists {
break
}
if task.WaitForTask {
// sleep before trying to get a new task
time.Sleep(500 * time.Millisecond)
continue
}
if task.Operation == "map" {
handleMapTask(task, mapf)
} else if task.Operation == "reduce" {
handleReduceTask(task, reducef)
} else {
log.Fatalf("unknown operation: %v", task.Operation)
panic(fmt.Errorf("unknown operation: %v", task.Operation))
}
MarkTaskCompleted(task.Operation, task.OperationNumber)
}
}
A few things to note:
The map and reduce functions are functions that the user provides. This is the point of the framework.
If the worker is told to wait for a while, it sleeps and then tries again.
Both the GetTask
and MarkTaskCompleted
functions make RPC calls to the coordinator.
// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
func call(rpcname string, args interface{}, reply interface{}) bool {
sockname := coordinatorSock()
c, err := rpc.DialHTTP("unix", sockname)
if err != nil {
log.Fatal("dialing:", err)
}
defer c.Close()
err = c.Call(rpcname, args, reply)
if err == nil {
return true
}
fmt.Println(err)
return false
}
func MarkTaskCompleted(operation string, operationNumber int) {
args := MarkTaskCompletedArgs{
Operation: operation,
OperationNumber: operationNumber,
}
reply := MarkTaskCompletedReply{}
ok := call("Coordinator.MarkTaskCompleted", &args, &reply)
if !ok {
fmt.Printf("call failed!\n")
}
}
func GetTask() (*GetTaskReply, bool) {
args := GetTaskArgs{}
reply := GetTaskReply{}
ok := call("Coordinator.GetTask", &args, &reply)
if ok {
return &reply, true
} else {
fmt.Printf("call failed!\n")
return nil, false
}
}
The other two important parts of the worker are handleMapTask
and handleReduceTask
. As the names signify, these handle map and reduce tasks respectively.
Map operation handler
The handleMapTask takes the input file, calls the map function on it. Here the key is the filename and the value is the entire content of the file. This gives us back a slice of intermediate keys and values. For each intermediate key, we write the key-value pair to a particular file based on what reduce task should handle that key.
func handleMapTask(task *GetTaskReply, mapf func(string, string) []KeyValue) {
fmt.Printf("Map task received...\n")
fmt.Printf("filename: %v\n", task.InputFileName)
fileName := task.InputFileName
contents, err := os.ReadFile(fileName)
if err != nil {
log.Fatalf("cannot read %v", fileName)
panic(err)
}
kva := mapf(fileName, string(contents))
filecontentsmap := make(map[string]string)
for _, kv := range kva {
reduceTaskNumberForKey := ihash(kv.Key) % task.NReduce
outputFileName := fmt.Sprintf("mr-%d-%d", task.OperationNumber, reduceTaskNumberForKey)
output := filecontentsmap[outputFileName]
filecontentsmap[outputFileName] = fmt.Sprintf("%s%s %s\n", output, kv.Key, kv.Value)
}
// ... write to output files ...
fmt.Printf("Map task completed: %v\n", task.InputFileName)
}
The important parts of this code are:
kva := mapf(fileName, string(contents))
This calls the map function for the input file.
filecontentsmap := make(map[string]string)
for _, kv := range kva {
outputFileName := fmt.Sprintf("mr-%d-%d", task.OperationNumber, ihash(kv.Key)%task.NReduce)
output := filecontentsmap[outputFileName]
filecontentsmap[outputFileName] = fmt.Sprintf("%s%s %s\n", output, kv.Key, kv.Value)
}
This code creates a map that contains the content for each intermediate output file. These intermediate output files are inputs for the reduce tasks.
This is the part that assigns each key to a reduce task based on the total number of reduce tasks we have.
reduceTaskNumberForKey := ihash(kv.Key) % task.NReduce
Reduce operation handler
The Reduce task handler reads ALL The input files created for it to create a list of intermediate keys.
intermediate := []KeyValue{}
for i := 0; i < task.NMap; i++ {
filename := fmt.Sprintf("mr-%d-%d", i, task.OperationNumber)
kva := parseKeyValuePairsFromFile(filename)
fmt.Printf("reduce task %v: got intermediate keys from %v\n", task.OperationNumber, filename)
intermediate = append(intermediate, kva...)
}
This gives us all the key value pairs that the reduce task works on. This list is then sorted, based on the key.
sort.Sort(ByKey(intermediate))
Now, for each key, a list of all values is created and reduce is called with the key and these values.
for i := 0; i < len(intermediate); {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)
// ... write output and move on to next key
}
Now we have all the parts to get a basic word count running.
Worker failure
The coordinator also has a thread running that checks for tasks that have been assigned for a long time but not finished. After a timeout, it assumes that the worker that was assigned the task has failed and marks the task as ready for re-assignment.
We use a ticker to do this.
func (c *Coordinator) startPeriodicChecks() {
ticker := time.NewTicker(10 * time.Second)
go func() {
for {
select {
case <-ticker.C:
c.checkTimeoutsAndReassignTasks()
case <-c.done:
ticker.Stop()
return
}
}
}()
}
The checkTimeoutAndReassignTasks is simple to implement as we track when the task was assigned for each task.
func (c *Coordinator) checkTimeoutsAndReassignTasks() {
c.mu.Lock()
defer c.mu.Unlock()
for i, task := range c.MapTasks {
if task.taskStatus == Assigned && time.Since(task.assignedAt) > 10*time.Second {
c.MapTasks[i] = Task{}
}
}
for i, task := range c.ReduceTasks {
if task.taskStatus == Assigned && time.Since(task.assignedAt) > 10*time.Second {
c.ReduceTasks[i] = Task{}
}
}
}
That’s it! We made it through implementing MapReduce in Go!
Epilogue
There’s some things to improve in the code. A good exercise for the future for me is to get this code to run on multiple machines with some shared storage, as that’s how this would run in a real scenario.
If you liked this post and would like to see more, please let me know by leaving a comment. You can also support me by supporting my personal project, blunderfest.
hehe
it was a nice lab, it took me upwards of 4 days to built this, when i picked up the course.
Keep posting here, the progress you are making on the course
I will also re pick it, left it in the middle last time