The slow revolution
Some revolutions are marked by a single, spectacular event: the storming of the Bastille during the French Revolution, or the destruction of the towers of the World Trade Center on September 11, 2001, which so changed the US’s relationship with the rest of the world. But often the most important revolutions aren’t announced with the blare of trumpets. They occur softly, too slow for a single news cycle, but fast enough that if you aren’t alert, the revolution is over before you’re aware it’s happening.
Such a revolution is happening right now in computing. Microprocessor clock speeds have stagnated since about 2000. Major chipmakers such as Intel and AMD continue to wring out improvements in speed by improving on-chip caching and other clever techniques, but they are gradually hitting the point of diminishing returns. Instead, as transistors continue to shrink in size, the chipmakers are packing multiple processing units onto a single chip. Most computers shipped today use multi-core microprocessors, i.e., chips with 2 (or 4, or 8, or more) separate processing units on the main microprocessor.
The result is a revolution in software development. We’re gradually moving from the old world in which multiple processor computing was a special case, used only for boutique applications, to a world in which it is widespread. As this movement happens, software development, so long tailored to single-processor models, is seeing a major shift in some its basic paradigms, to make the use of multiple processors natural and simple for programmers.
This movement to multiple processors began decades ago. Projects such as the Connection Machine demonstrated the potential of massively parallel computing in the 1980s. In the 1990s, scientists became large-scale users of parallel computing, using parallel computing to simulate things like nuclear explosions and the dynamics of the Universe. Those scientific applications were a bit like the early scientific computing of the late 1940s and 1950s: specialized, boutique applications, built with heroic effort using relatively primitive tools. As computing with multiple processors becomes widespread, though, we’re seeing a flowering of general-purpose software development tools tailored to multiple processor environments.
One of the organizations driving this shift is Google. Google is one of the largest users of multiple processor computing in the world, with its entire computing cluster containing hundreds of thousands of commodity machines, located in data centers around the world, and linked using commodity networking components. This approach to multiple processor computing is known as distributed computing; the characteristic feature of distributed computing is that the processors in the cluster don’t necessarily share any memory or disk space, and so information sharing must be mediated by the relatively slow network.
In this post, I’ll describe a framework for distributed computing called MapReduce. MapReduce was introduced in a paper written in 2004 by Jeffrey Dean and Sanjay Ghemawat from Google. What’s beautiful about MapReduce is that it makes parallelization almost entirely invisible to the programmer who is using MapReduce to develop applications. If, for example, you allocate a large number of machines in the cluster to a given MapReduce job, the job runs in a highly parallelized way. If, on the other hand, you allocate only a small number of machines, it will run in a much more serial way, and it’s even possible to run jobs on just a single machine.
What exactly is MapReduce? From the programmer’s point of view, it’s just a library that’s imported at the start of your program, like any other library. It provides a single library call that you can make, passing in a description of some input data and two ordinary serial functions (the “mapper” and “reducer”) that you, the programmer, specify in your favorite programming language. MapReduce then takes over, ensuring that the input data is distributed through the cluster, and computing those two functions across the entire cluster of machines, in a way we’ll make precise shortly. All the details – parallelization, distribution of data, tolerance of machine failures – are hidden away from the programmer, inside the library.
What we’re going to do in this post is learn how to use the MapReduce library. To do this, we don’t need a big sophisticated version of the MapReduce library. Instead, we can get away with a toy implementation (just a few lines of Python!) that runs on a single machine. By using this single-machine toy library we can learn how to develop for MapReduce. The programs we develop will run essentially unchanged when, in later posts, we improve the MapReduce library so that it can run on a cluster of machines.
Our first MapReduce program
Okay, so how do we use MapReduce? I’ll describe it with a simple example, which is a program to count the number of occurrences of different words in a set of files. The example is simple, but you’ll find it rather strange if you’re not already familiar with MapReduce: the program we’ll describe is certainly not the way most programmers would solve the word-counting problem! What it is, however, is an excellent illustration of the basic ideas of MapReduce. Furthermore, what we’ll eventually see is that by using this approach we can easily scale our wordcount program up to run on millions or even billions of documents, spread out over a large cluster of computers, and that’s not something a conventional approach could do easily.
The input to a MapReduce job is just a set of (input_key,input_value) pairs, which we’ll implement as a Python dictionary. In the wordcount example, the input keys will be the filenames of the files we’re interested in counting words in, and the corresponding input values will be the contents of those files:
filenames = ["text\\a.txt","text\\b.txt","text\\c.txt"]
i = {}
for filename in filenames:
f = open(filename)
i[filename] = f.read()
f.close()
After this code is run the Python dictionary i will contain the input to our MapReduce job, namely, i has three keys containing the filenames, and three corresponding values containing the contents of those files. Note that I’ve used Windows’ filenaming conventions above; if you’re running a Mac or Linux you may need to tinker with the filenames. Also, to run the code you will of course need text files text\1.txt, text\2.txt, text\3.txt. You can create some simple examples texts by cutting and pasting from the following:
text\a.txt:
The quick brown fox jumped over the lazy grey dogs.
text\b.txt:
That's one small step for a man, one giant leap for mankind.
text\c.txt:
Mary had a little lamb,
Its fleece was white as snow;
And everywhere that Mary went,
The lamb was sure to go.
The MapReduce job will process this input dictionary in two phases: the map phase, which produces output which (after a little intermediate processing) is then processed by the reduce phase. In the map phase what happens is that for each (input_key,input_value) pair in the input dictionary i, a function mapper(input_key,input_value) is computed, whose output is a list of intermediate keys and values. This function mapper is supplied by the programmer – we’ll show how it works for wordcount below. The output of the map phase is just the list formed by concatenating the list of intermediate keys and values for all of the different input keys and values.
I said above that the function mapper is supplied by the programmer. In the wordcount example, what mapper does is takes the input key and input value – a filename, and a string containing the contents of the file – and then moves through the words in the file. For each word it encounters, it returns the intermediate key and value (word,1), indicating that it found one occurrence of word. So, for example, for the input key text\a.txt, a call to mapper("text\\a.txt",i["text\\a.txt"]) returns:
[('the', 1), ('quick', 1), ('brown', 1), ('fox', 1), ('jumped', 1),
('over', 1), ('the', 1), ('lazy', 1), ('grey', 1), ('dogs', 1)]
Notice that everything has been lowercased, so we don’t count words with different cases as distinct. Furthermore, the same key gets repeated multiple times, because words like the appear more than once in the text. This, incidentally, is the reason we use a Python list for the output, and not a Python dictionary, for in a dictionary the same key can only be used once.
Here’s the Python code for the mapper function, together with a helper function used to remove punctuation:
def mapper(input_key,input_value):
return [(word,1) for word in
remove_punctuation(input_value.lower()).split()]
def remove_punctuation(s):
return s.translate(string.maketrans("",""), string.punctuation)
mapper works by lowercasing the input file, removing the punctuation, splitting the resulting string around whitespace, and finally emitting the pair (word,1) for each resulting word. Note, incidentally, that I’m ignoring apostrophes, to keep the code simple, but you can easily extend the code to deal with apostrophes and other special cases.
With this specification of mapper, the output of the map phase for wordcount is simply the result of combining the lists for mapper("text\\a.txt"), mapper("text\\b.txt"), and mapper("text\\c.txt"):
[('the', 1), ('quick', 1), ('brown', 1), ('fox', 1),
('jumped', 1), ('over', 1), ('the', 1), ('lazy', 1), ('grey', 1),
('dogs', 1), ('mary', 1), ('had', 1), ('a', 1), ('little', 1),
('lamb', 1), ('its', 1), ('fleece', 1), ('was', 1), ('white', 1),
('as', 1), ('snow', 1), ('and', 1), ('everywhere', 1),
('that', 1), ('mary', 1), ('went', 1), ('the', 1), ('lamb', 1),
('was', 1), ('sure', 1), ('to', 1), ('go', 1), ('thats', 1),
('one', 1), ('small', 1), ('step', 1), ('for', 1), ('a', 1),
('man', 1), ('one', 1), ('giant', 1), ('leap', 1), ('for', 1),
('mankind', 1)]
The map phase of MapReduce is logically trivial, but when the input dictionary has, say 10 billion keys, and those keys point to files held on thousands of different machines, implementing the map phase is actually quite non-trivial. What the MapReduce library handles is details like knowing which files are stored on what machines, making sure that machine failures don’t affect the computation, making efficient use of the network, and storing the output in a useable form. We won’t worry about these issues for now, but we will come back to them in future posts.
What the MapReduce library now does in preparation for the reduce phase is to group together all the intermediate values which have the same key. In our example the result of doing this is the following intermediate dictionary:
{'and': [1], 'fox': [1], 'over': [1], 'one': [1, 1], 'as': [1],
'go': [1], 'its': [1], 'lamb': [1, 1], 'giant': [1],
'for': [1, 1], 'jumped': [1], 'had': [1], 'snow': [1],
'to': [1], 'leap': [1], 'white': [1], 'was': [1, 1],
'mary': [1, 1], 'brown': [1], 'lazy': [1], 'sure': [1],
'that': [1], 'little': [1], 'small': [1], 'step': [1],
'everywhere': [1], 'mankind': [1], 'went': [1], 'man': [1],
'a': [1, 1], 'fleece': [1], 'grey': [1], 'dogs': [1],
'quick': [1], 'the': [1, 1, 1], 'thats': [1]}
We see, for example, that the word ‘and’, which appears only once in the three files, has as its associated value a list containing just a single 1, [1]. By contrast, the word ‘one’, which appears twice, has [1,1] as its value.
The reduce phase now commences. A programmer-defined function reducer(intermediate_key,intermediate_value_list) is applied to each entry in the intermediate dictionary. For wordcount, reducer simply sums up the list of intermediate values, and return both the intermediate_key and the sum as the output. This is done by the following code:
def reducer(intermediate_key,intermediate_value_list):
return (intermediate_key,sum(intermediate_value_list))
The output from the reduce phase, and from the total MapReduce computation, is thus:
[('and', 1), ('fox', 1), ('over', 1), ('one', 2), ('as', 1),
('go', 1), ('its', 1), ('lamb', 2), ('giant', 1), ('for', 2),
('jumped', 1), ('had', 1), ('snow', 1), ('to', 1), ('leap', 1),
('white', 1), ('was', 2), ('mary', 2), ('brown', 1),
('lazy', 1), ('sure', 1), ('that', 1), ('little', 1),
('small', 1), ('step', 1), ('everywhere', 1), ('mankind', 1),
('went', 1), ('man', 1), ('a', 2), ('fleece', 1), ('grey', 1),
('dogs', 1), ('quick', 1), ('the', 3), ('thats', 1)]
You can easily check that this is just a list of the words in the three files we started with, and the associated wordcounts, as desired.
We’ve looked at code defining the input dictionary i, the mapper and reducer functions. Collecting it all up, and adding a call to the MapReduce library, here’s the complete wordcount.py program:
#word_count.py
import string
import map_reduce
def mapper(input_key,input_value):
return [(word,1) for word in
remove_punctuation(input_value.lower()).split()]
def remove_punctuation(s):
return s.translate(string.maketrans("",""), string.punctuation)
def reducer(intermediate_key,intermediate_value_list):
return (intermediate_key,sum(intermediate_value_list))
filenames = ["text\\a.txt","text\\b.txt","text\\c.txt"]
i = {}
for filename in filenames:
f = open(filename)
i[filename] = f.read()
f.close()
print map_reduce.map_reduce(i,mapper,reducer)
The map_reduce module imported by this program implements MapReduce in pretty much the simplest possible way, using some useful functions from the itertools library:
# map_reduce.py
"'Defines a single function, map_reduce, which takes an input
dictionary i and applies the user-defined function mapper to each
(input_key,input_value) pair, producing a list of intermediate
keys and intermediate values. Repeated intermediate keys then
have their values grouped into a list, and the user-defined
function reducer is applied to the intermediate key and list of
intermediate values. The results are returned as a list."'
import itertools
def map_reduce(i,mapper,reducer):
intermediate = []
for (key,value) in i.items():
intermediate.extend(mapper(key,value))
groups = {}
for key, group in itertools.groupby(sorted(intermediate),
lambda x: x[0]):
groups[key] = list([y for x, y in group])
return [reducer(intermediate_key,groups[intermediate_key])
for intermediate_key in groups]
(Credit to a nice blog post from Dave Spencer for the use of itertools.groupby to simplify the reduce phase.)
Obviously, on a single machine an implementation of the MapReduce library is pretty trivial! In later posts we’ll extend this library so that it can distribute the execution of the mapper and reducer functions across multiple machines on a network. The payoff is that with enough improvement to the library we can with essentially no change use our wordcount.py program to count the words not just in 3 files, but rather the words in billions of files, spread over thousands of computers in a cluster. What the MapReduce library does, then, is provide an approach to developing in a distributed environment where many simple tasks (like wordcount) remain simple for the programmer. Important (but boring) tasks like parallelization, getting the right data into the right places, dealing with the failure of computers and networking components, and even coping with racks of computers being taken offline for maintenance, are all taken care of under the hood of the library.
In the posts that follow, we’re thus going to do two things. First, we’re going to learn how to develop MapReduce applications. That means taking familiar tasks – things like computing PageRank – and figuring out how they can be done within the MapReduce framework. We’ll do that in the next post in this series. In later posts, we’ll also take a look at Hadoop, an open source platform that can be used to develop MapReduce applications.
Second, we’ll go under the hood of MapReduce, and look at how it works. We’ll scale up our toy implementation so that it can be used over small clusters of computers. This is not only fun in its own right, it will also make us better MapReduce programmers, in the same way as understanding the innards of an operating system (for example) can make you a better application programmer.
To finish off this post, though, we’ll do just two things. First, we’ll sum up what MapReduce does, stripping out the wordcount-specific material. It’s not any kind of a formal specification, just a brief informal summary, together with a few remarks. We’ll refine this summary a little in some future posts, but this is the basic MapReduce model. Second, we’ll give a overview of how MapReduce takes advantage of a distributed environment to parallelize jobs.
MapReduce in general
Summing up our earlier description of MapReduce, and with the details about wordcount removed, the input to a MapReduce job is a set of (input_key,input_value) pairs. Each pair is used as input to a function mapper(input_key,input_value) which produces as output a list of intermediate keys and intermediate values:
[(intermediate_key,intermediate_value),
(intermediate_key',intermediate_value'),
...]
The output from all the different input pairs is then sorted, so that values associated with the same intermediate_key are grouped together in a list of intermediate values. The reducer(intermediate_key,intermediate_value_list) function is then applied to each intermediate key and list of intermediate values, to produce the output from the MapReduce job.
A natural question is whether the order of values in intermediate_value_list matters. I must admit I’m not sure of the answer to this question – if it’s discussed in the original paper, then I missed it. In most of the examples I’m familiar with, the order doesn’t matter, because the reducer works by applying a commutative, associate operation across all intermediate values in the list. As we’ll see in a minute, because the mapper computations are potentially done in parallel, on machines which may be of varying speed, it’d be hard to guarantee the ordering, and this suggests that the ordering doesn’t matter. It’d be nice to know for sure – if anyone reading this does know the answer, I’d appreciate hearing it, and will update the post!
One of the most striking things about MapReduce is how restrictive it is. A priori it’s by no means clear that MapReduce should be all that useful in practical applications. It turns out, though, that many interesting computations can be expressed either directly in MapReduce, or as a sequence of a few MapReduce computations. We’ve seen wordcount implemented in this post, and we’ll see how to compute PageRank using MapReduce in the next post. Many other important computations can also be implemented using MapReduce, including doing things like finding shortest paths in a graph, grepping a large document collection, or many data mining algorithms. For many such problems, though, the standard approach doesn’t obviously translate into MapReduce. Instead, you need to think through the problem again from scratch, and find a way of doing it using MapReduce.
Exercises
- How would you implement grep in MapReduce?
Problems
- Take a well-known algorithms book, e.g., Cormen-Leiserson-Rivest-Stein, or a list of well-known algorithms. Which algorithms lend themselves to being expressed in MapReduce? Which do not? This isn’t so much a problem as it is a suggestion for a research program.
- The early years of serial computing saw the introduction of powerful general-purpose ideas like those that went into the Lisp and Smalltalk programming languages. Arguably, those ideas are still the most powerful in today’s modern programming languages. MapReduce isn’t a programming language as we conventionally think of them, of course, but it’s similar in that it introduces a way of thinking about and approaching programming problems. I don’t think MapReduce has quite the same power as the ideas that went into Lisp and Smalltalk; it’s more like the Fortran of distributed computing. Can we find similarly powerful ideas to Lisp or Smalltalk in distributed computing? What’s the hundred-year framework going to look like for distributed computing?
How MapReduce takes advantage of the distributed setting
You can probably already see how MapReduce takes advantage of a large cluster of computers, but let’s spell out some of the details. There are two key points. First, the mapper functions can be run in parallel, on different processors, because they don’t share any data. Provided the right data is in the local memory of the right processor – a task MapReduce manages for you – the different computations can be done in parallel. The more machines are in the cluster, the more mapper computations can be simultaneously executed. Second, the reducer functions can also be run in parallel, for the same reason, and, again, the more machines are available, the more computations can be done in parallel.
The difficulty in this story arises in the grouping step that takes place between the map phase and the reduce phase. For the reducer functions to work in parallel, we need to ensure that all the intermediate values corresponding to the same key get sent to the same machine. Obviously, this requires communcation between machines, over the relatively slow network connections.
This looks tough to arrange, and you might think (as I did at first) that a lot of communication would be required to get the right data to the right machine. Fortunately, a simple and beautiful idea is used to make sure that the right data ends up in the right location, without there being too much communication overhead.
Imagine you’ve got 1000 machines that you’re going to use to run reducers on. As the mappers compute the intermediate keys and value lists, they compute hash(intermediate_key) mod 1000 for some hash function. This number is used to identify the machine in the cluster that the corresponding reducer will be run on, and the resulting intermediate key and value list is then sent to that machine. Because every machine running mappers uses the same hash function, this ensures that value lists corresponding to the same intermediate key all end up at the same machine. Furthermore, by using a hash we ensure that the intermediate keys end up pretty evenly spread over machines in the cluster. Now, I should warn you that in practice this hashing method isn’t literally quite what’s done (we’ll describe that in later lectures), but that’s the main idea.
Needless to say, there’s a lot more to the story of how MapReduce works in practice, especially the way it handles data distribution and fault-tolerance. In future posts we’ll explore many more of the details. Nonetheless, hopefully this post gives you a basic understanding of how MapReduce works. In the next post we’ll apply MapReduce to the computation of PageRank.
About this post: This is one in a series of posts about the Google Technology Stack – PageRank, MapReduce, and so on. The posts are summarized here, and there is FriendFeed room for the series here. You can subscribe to my blog to follow future posts in the series.