Biweekly links for 01/19/2009

  • Towards a Wiki For Formally Verified Mathematics
    • “the wiki will state all of known mathematics in a machine-readable language and verify all theorems for correctness, thus providing a knowledge base for interactive proof assistants.”
  • The Art Of Community | jonobacon@home
    • “The Art of Community” is a new book being written by Jono Bacon, the Ubuntu Community Manager. It’s not yet done, but will be published by O’Reilly, and will also be released under a CC license.
  • AcaWiki
    • Site intended to be a “Wikipedia for academic research”, summarizing academic papers.

Click here for all of my del.icio.us bookmarks.

Published

Biweekly links for 01/12/2009

Click here for all of my del.icio.us bookmarks.

Published

Travel and talks

I’ll be in Santa Fe, New Mexico, on Wednesday and Thursday of this coming week. While there, I’m giving a talk about scientific collaboration on the internet at the Santa Fe Institute on Wednesday afternoon, and the after-dinner banquet speech (related topic, very different talk) at the Quantum Information Processing (QIP) 2009 conference. QIP was one of my favourite annual events when I worked in quantum computing, and I’m really looking forward to the chance to catch up with old friends.

On Friday through Sunday I’ll be at the third annual science blogging conference, otherwise known as Science Online 2009, in Rayleigh-Durham, North Carolina. People who went to the first two all say they had a great time, so I’m really looking forward to this!

Finally, while advertising talks, I may as well add that I’ll be giving the University of Toronto Physics Colloquium Thursday, January 22, at 4pm, in the McLennan Building, room 102, at the main University of Toronto Campus.

Published
Categorized as Travel

Using MapReduce to compute PageRank

In this post I explain how to compute PageRank using the MapReduce approach to parallelization. This gives us a way of computing PageRank that can in principle be automatically parallelized, and so potentially scaled up to very large link graphs, i.e., to very large collections of webpages. In this post I describe a single-machine implementation which easily handles a million or so pages. In future posts we’ll use a cluster to scale out much further – it’ll be interesting to see how far we can get.

I’ve discussed PageRank and MapReduce at length in earlier posts – see here for MapReduce, and here and here for PageRank – so in this post we’ll just quickly review the basic facts. Let’s start with PageRank. The idea is that we number webpages [tex]0,\ldots,n-1[/tex]. For webpage number [tex]j[/tex] there is an associated PageRank [tex]q_j[/tex] which measures the importance of page [tex]j[/tex]. The vector [tex]q = (q_0,\ldots,q_{n-1})[/tex] of PageRanks is a probability distribution, i.e., the PageRanks are numbers between [tex]0[/tex] and [tex]1[/tex], and sum up to one, in total. The PageRank [tex]q_j[/tex] measures the importance of page [tex]j[/tex]; the bigger the PageRank, the more important the page.

How is the PageRank vector [tex]q[/tex] computed? I’ll just describe the mathematical upshot here; the full motivation in terms of a crazy websurfer who randomly surfs the web is described in an earlier post. The upshot is that the PageRank vector [tex]q[/tex] can be defined by the equation (explanation below):

[tex] q \leftarrow M^j P. [/tex]

What this equation represents is a starting distribution [tex]P[/tex] for the crazy websurfer, and then [tex]j[/tex] steps of “surfing”, with each action of [tex]M[/tex] representing how the distribution changes in a single step. [tex]P[/tex] is an [tex]n[/tex]-dimensional vector, and [tex]M[/tex] is an [tex]n \times n[/tex] matrix whose entries reflect the link structure of the web in a way I’ll make precide below. The PageRank [tex]q[/tex] is defined in the limit of large [tex]j[/tex] – in our examples, convergence typically occurs for [tex]j[/tex] in the range [tex]10[/tex] to [tex]30[/tex]. You might wonder how [tex]P[/tex] is chosen, but part of the magic of PageRank is that it doesn’t matter how [tex]P[/tex] is chosen, provided it’s a probability distribution. The intuition is that the starting distribution for the websurfer doesn’t matter to the websurfer’s long-run behaviour. We’ll start with the uniform probability distribution, [tex]P = (1/n,1/n,\ldots)[/tex], since it’s easy to generate.

How is the matrix [tex]M[/tex] defined? It can be broken up into three pieces

[tex] M = s A + s D + t E, [/tex]

including: a contribution [tex]sA[/tex] representing the crazy websurfer randomly picking links to follow; a contribution [tex]sD[/tex] due to the fact that the websurfer can’t randomly pick a link when they hit a dangling page (i.e., one with no outbound links), and so something else needs to be done in that case; and finally a contribution [tex]tE[/tex] representing the websurfer getting bored and “teleporting” to a random webpage.

We’ll set [tex]s = 0.85[/tex] and [tex]t = 1-s = 0.15[/tex] as the respective probabilities for randomly selecting a link and teleporting. See this post for a discussion of the reasons for this choice.

The matrix [tex]A[/tex] describes the crazy websurfer’s linkfollowing behaviour, and so, in some sense, encodes the link structure of the web. In particular, suppose we define [tex]\#(j)[/tex] to be the number of links outbound from page [tex]j[/tex]. Then [tex]A_{kj}[/tex] is defined to be [tex]0[/tex] if page [tex]j[/tex] does not link to page [tex]k[/tex], and [tex]1/\#(j)[/tex] if page [tex]j[/tex] does link to page [tex]k[/tex]. Stated another way, the entries of the [tex]j[/tex]th column of [tex]A[/tex] are zero, except at locations corresponding to outgoing links, where they are [tex]1/\#(j)[/tex]. The intuition is that [tex]A[/tex] describes the action of a websurfer at page [tex]j[/tex] randomly choosing an outgoing link.

The matrix [tex]D[/tex] is included to deal with dangling pages, i.e., pages with no outgoing links. For such pages it is obviously ambiguous what it means to choose an outgoing link at random. The conventional resolution is to choose another page uniformly at random from the entire set of pages. What this means is that if [tex]j[/tex] is a dangling page, then the [tex]j[/tex]th column of [tex]D[/tex] should have all its entries [tex]1/n[/tex], otherwise, if [tex]j[/tex] is not dangling, all the entries should be zero. A compact way of writing this is

[tex] D = e d^T/ n, [/tex]

where [tex]d[/tex] is the vector of dangling pages, i.e., the [tex]j[/tex]th entry of [tex]d[/tex] is [tex]1[/tex] if page [tex]j[/tex] is dangling, and otherwise is zero. [tex]e[/tex] is the vector whose entries are all [tex]1[/tex]s.

The final piece of [tex]M[/tex] is the matrix [tex]E[/tex], describing the bored websurfer teleporting somewhere else at random. This matrix has entries [tex]1/n[/tex] everywhere, representing a uniform probability of going to another webpage.

Okay, that’s PageRank in a mathematical nutshell. What about MapReduce? Again, I’ll just remind you of the basic details – if you want an introduction, see this post. MapReduce is one of those ideas where understanding is really helped by first working through an example, rather than starting with an abstract description, like I’m about to give, so if you’re not familiar with MapReduce, I strongly suggest reading the earlier post.

The input to a MapReduce job is a set of (input_key,input_value) pairs. Each pair is used as input to a function mapper(input_key,input_value) which produces as output a list of intermediate keys and intermediate values:

[(intermediate_key,intermediate_value),
 (intermediate_key',intermediate_value'),
 ...]

The output from all the different input pairs is then sorted, so that intermediate values associated with the same intermediate_key are grouped together in a list of intermediate values. The reducer(intermediate_key,intermediate_value_list) function is then applied to each intermediate key and list of intermediate values, to produce the output from the MapReduce job.

Computing PageRank with MapReduce

Okay, so how can we compute PageRank using MapReduce? The approach we’ll take is to use MapReduce to repeatedly multiply a vector by the matrix [tex]M[/tex]. In particular, we’re going to show that if [tex]p[/tex] is a probability distribution, then we can easily compute [tex]Mp[/tex] using MapReduce. We can thus compute [tex]M^jP[/tex] using repeated invocations of MapReduce. Those invocations have to be done serially, but the individual MapReduce jobs are themselves all easily parallelized, and so we can potentially get a speedup by running those jobs on a big cluster. Much more about doing that in later posts.

The nub of the problem, then, is figuring out how to compute [tex]Mp[/tex], given a starting probability distribution [tex]p[/tex]. Let’s start out with a rough approach that gets the basic idea right, essentially using MapReduce to compute [tex]Ap[/tex]. We’ll see below that it’s easy to fix this up to take dangling nodes and teleportation into account. The fix involves introducing an additional MapReduce job, though, so each multiplication step [tex]p \rightarrow Mp[/tex] actually involves two MapReduce jobs, not just one. For now, though, let’s concentrate on roughing out a MapReduce job that computes [tex]Ap[/tex].

As input to the MapReduce computation, we’ll use (key,value) pairs where the key is just the number of the webpage, let’s call it j, and value contains several items of data describing the page, including [tex]p_j[/tex], the number [tex]\#(j)[/tex] of outbound webpages, and a list [k_1,k_2,...] of pages that j links to.

For each of the pages k_l that j links to, the mapper outputs an intermediate key-value pair, with the intermediate key being k_l and the value just the contribution [tex]p_j/\#(j)[/tex] made to the PageRank. Intuitively, this corresponds to the crazy websurfer randomly moving to page k_l, with the probability [tex]p_j/\#(j)[/tex] combining both the probability [tex]p_j[/tex] that they start at page [tex]j[/tex], and the probability [tex]p_j/\#(j)[/tex] that they move to page k_l.

Between the map and reduce phases, MapReduce collects up all intermediate values corresponding to any given intermediate key, k, i.e., the list of all the probabilities of moving to page k. The reducer simply sums up all those probabilities, outputting the result as the second entry in the pair (k,p_k'), and giving us the entries of [tex]Ap[/tex], as was desired.

To modify this so it computes [tex]Mp[/tex] we need to make three changes.

The first change is to make sure we deal properly with dangling pages, i.e., we include the term [tex]sD[/tex]. One possible way is to treat dangling pages as though they have outgoing links to every single other page, [0,1,2,...]. While this works, it would require us to maintain many very large lists of links, and would be extremely inefficient.

A better way to go is to use our earlier expression [tex]D = e d^T/n[/tex], and thus [tex]Dp = e (d\cdot p)/n[/tex], where [tex]d \cdot p[/tex] is the inner product between the vector [tex]d[/tex] of dangling pages, and [tex]p[/tex]. Computing [tex]Dp[/tex] then really boils down to computing [tex]d \cdot p[/tex].

We can compute [tex]d \cdot p[/tex] using a separate MapReduce job which we run first. This job computes the inner product, and then passes it as a parameter to the second MapReduce job, which is based on the earlier rough description, and which finishes off the computation. This first MapReduce job uses the same input as the earlier job – a set of keys j corresponding to pages, and values describing the pages, i.e., containing the value for p_j, and a description of the outbound links from page j. If page j is dangling the mapper outputs the intermediate pair (1,p_j), otherwise it outputs nothing. All the intermediate keys are the same, so the reducer acts on just one big list, summing up all the values p_j for dangling pages, giving us the inner product we wanted.

As an aside, while this prescription for computing the inner product using MapReduce is obviously correct, you might worry about the fact that all the intermediate keys have the same value. This means all the intermediate values will go to a single reducer, running on just one machine in the cluster. If there are a lot of dangling pages, that means a lot of communication and computation overhead associated with that single machine – it doesn’t seem like a very parallel solution. There’s actually a simple solution to this problem, which is to modify the MapReduce framework just a little, introducing a “combine” phase inbetween map and reduce, which essentially runs little “mini-reducers” directly on the output from all the mappers, offloading some of the reduce functionality onto the machines used as mappers. We won’t explore this idea in detail here, but we will implement it in future posts, and we’ll see that in practice having just a single key isn’t a bottleneck.

The second change we need to make in our rough MapReduce job is to include the teleportation step. This can be done easily by modifying the reducer to include a contribution from teleportation.

The third change we need to make in our rough MapReduce job is somewhat subtle; I actually didn’t realize I needed to make this change until after I ran the code, and realized I had a bug. Think about the set of intermediate keys produced by the mappers. The only way a given page can appear as an intermediate key is if it’s linked to by some other page. Pages with no links to them won’t appear in the list of intermediate keys, and so won’t appear in the output from the MapReduce job. The way we deal with this problem is by modifying the mapper so that it emits one extra key-value pair as output. Namely, if it takes as input (j,value), then it emits all the intermediate keys and values described earlier, and an additional pair (j,0), which represents a probability [tex]0[/tex] of moving to page j. This ensures that every page j will appear in the list of intermediate keys, but doesn’t have any impact on the probability of moving to page j; you can think of it simply as a placeholder output.

That completes the high-level theoretical description of computing PageRank using MapReduce. In the next section of the post I’ll describe a simple Python implementation of this MapReduce-based approach to PageRank. If you’re not interested in the implementation, you can skip to the final section, where I talk about how to think about programming with MapReduce – general heuristics you can use to put problems into a form where MapReduce can be used to attack them.

Implementation

The Python code to implement the above PageRank algorithm is straightforward. To run it on just a single machine we can use the exact same MapReduce module I described in my earlier post; for convenience, here’s the code:

# map_reduce.py
# Defines a single function, map_reduce, which takes an input
# dictionary i and applies the user-defined function mapper to each
# (input_key,input_value) pair, producing a list of intermediate 
# keys and intermediate values.  Repeated intermediate keys then 
# have their values grouped into a list, and the user-defined 
# function reducer is applied to the intermediate key and list of 
# intermediate values.  The results are returned as a list.

import itertools

def map_reduce(i,mapper,reducer):
  intermediate = []
  for (key,value) in i.items():
    intermediate.extend(mapper(key,value))
  groups = {}
  for key, group in itertools.groupby(sorted(intermediate), 
                                      lambda x: x[0]):
    groups[key] = list([y for x, y in group])
  return [reducer(intermediate_key,groups[intermediate_key])
          for intermediate_key in groups] 

With that code put in a file somewhere your Python interpreter can find it, here’s the code implementing PageRank:

# pagerank_mr.py
#
# Computes PageRank, using a simple MapReduce library.
#
# MapReduce is used in two separate ways: (1) to compute
# the inner product between the vector of dangling pages
# (i.e., pages with no outbound links) and the current
# estimated PageRank vector; and (2) to actually carry
# out the update of the estimated PageRank vector.
#
# For a web of one million webpages the program consumes
# about one gig of RAM, and takes an hour or so to run,
# on a (slow) laptop with 3 gig of RAM, running Vista and
# Python 2.5.

import map_reduce
import numpy.random
import random

def paretosample(n,power=2.0):
  # Returns a sample from a truncated Pareto distribution
  # with probability mass function p(l) proportional to
  # 1/l^power.  The distribution is truncated at l = n.

  m = n+1
  while m > n: m = numpy.random.zipf(power)
  return m

def initialize(n,power):
  # Returns a Python dictionary representing a web
  # with n pages, and where each page k is linked to by
  # L_k random other pages.  The L_k are independent and
  # identically distributed random variables with a
  # shifted and truncated Pareto probability mass function
  # p(l) proportional to 1/(l+1)^power.

  # The representation used is a Python dictionary with
  # keys 0 through n-1 representing the different pages.
  # i[j][0] is the estimated PageRank, initially set at 1/n,
  # i[j][1] the number of outlinks, and i[j][2] a list of
  # the outlinks.

  # This dictionary is used to supply (key,value) pairs to
  # both mapper tasks defined below.

  # initialize the dictionary
  i = {} 
  for j in xrange(n): i[j] = [1.0/n,0,[]]
  
  # For each page, generate inlinks according to the Pareto
  # distribution. Note that this is somewhat tedious, because
  # the Pareto distribution governs inlinks, NOT outlinks,
  # which is what our representation is adapted to represent.
  # A smarter representation would give easy
  # access to both, while remaining memory efficient.
  for k in xrange(n):
    lk = paretosample(n+1,power)-1
    values = random.sample(xrange(n),lk)
    for j in values:
      i[j][1] += 1 # increment the outlink count for page j
      i[j][2].append(k) # insert the link from j to k
  return i

def ip_mapper(input_key,input_value):
  # The mapper used to compute the inner product between
  # the vector of dangling pages and the current estimated
  # PageRank.  The input is a key describing a webpage, and
  # the corresponding data, including the estimated pagerank.
  # The mapper returns [(1,pagerank)] if the page is dangling,
  # and otherwise returns nothing.
  
  if input_value[1] == 0: return [(1,input_value[0])]
  else: return []

def ip_reducer(input_key,input_value_list):
  # The reducer used to compute the inner product.  Simply
  # sums the pageranks listed in the input value list, which
  # are all the pageranks for dangling pages.

  return sum(input_value_list)

def pr_mapper(input_key,input_value):
  # The mapper used to update the PageRank estimate.  Takes
  # as input a key for a webpage, and as a value the corresponding
  # data, as described in the function initialize.  It returns a
  # list with all outlinked pages as keys, and corresponding values
  # just the PageRank of the origin page, divided by the total
  # number of outlinks from the origin page.  Also appended to
  # that list is a pair with key the origin page, and value 0.
  # This is done to ensure that every single page ends up with at
  # least one corresponding (intermediate_key,intermediate_value)
  # pair output from a mapper.
  
  return [(input_key,0.0)]+[(outlink,input_value[0]/input_value[1])
          for outlink in input_value[2]]

def pr_reducer_inter(intermediate_key,intermediate_value_list,
                     s,ip,n):
  # This is a helper function used to define the reducer used
  # to update the PageRank estimate.  Note that the helper differs
  # from a standard reducer in having some additional inputs:
  # s (the PageRank parameter), ip (the value of the inner product
  # between the dangling pages vector and the estimated PageRank),
  # and n, the number of pages.  Other than that the code is
  # self-explanatory.
  
  return (intermediate_key,
          s*sum(intermediate_value_list)+s*ip/n+(1.0-s)/n)

def pagerank(i,s=0.85,tolerance=0.00001):
  # Returns the PageRank vector for the web described by i,
  # using parameter s.  The criterion for convergence is that
  # we stop when M^(j+1)P-M^jP has length less than tolerance,
  # in l1 norm.
  
  n = len(i)
  iteration = 1
  change = 2 # initial estimate of error
  while change > tolerance:
    print "Iteration: "+str(iteration)
    # Run the MapReduce job used to compute the inner product
    # between the vector of dangling pages and the estimated
    # PageRank.
    ip_list = map_reduce.map_reduce(i,ip_mapper,ip_reducer)

    # the if-else clause is needed in case there are no dangling
    # pages, in which case MapReduce returns ip_list as the empty
    # list.  Otherwise, set ip equal to the first (and only)
    # member of the list returned by MapReduce.
    if ip_list == []: ip = 0
    else: ip = ip_list[0]

    # Dynamically define the reducer used to update the PageRank
    # vector, using the current values for s, ip, and n.
    pr_reducer = lambda x,y: pr_reducer_inter(x,y,s,ip,n)

    # Run the MapReduce job used to update the PageRank vector.
    new_i = map_reduce.map_reduce(i,pr_mapper,pr_reducer)

    # Compute the new estimate of error.
    change = sum([abs(new_i[j][1]-i[j][0]) for j in xrange(n)])
    print "Change in l1 norm: "+str(change)

    # Update the estimate PageRank vector.
    for j in xrange(n): i[j][0] = new_i[j][1]
    iteration += 1
  return i

n = 1000 # works up to about 1000000 pages
i = initialize(n,2.0)
new_i = pagerank(i,0.85,0.0001)

Mostly, the code is self-explanatory. But there are three points that deserve some comment.

First, we represent the web using a Python dictionary i, with keys 0,...,n-1 representing the different pages. The corresponding values are a list, with the first element of the list i[j][0] being just the current probability estimate, which we called earlier p_j, the second element of the list i[j][1] being the number of links outbound from page j, and the third element of the list i[j][2] being another list, this time just a list of all the pages that page j links to.

This representation is, frankly, pretty ugly, and leaves you having to keep track of the meaning of the different indices. I considered instead defining a Python class, say page_description, and using an instance of that class as the value, with sensible attributes like page_description.number_outlinks. This would have made the program a bit longer, but also more readable, and would perhaps be a better choice on those grounds.

Part of the reason I don’t do this is that the way the data is stored in this example already has other problems, problems that wouldn’t be helped by using a Python class. Observe that the MapReduce job takes as input a dictionary with keys 0,...,n-1, and corresponding values describing those pages; the output has the same key set, but the values are just the new values for Mp_j, not the entire page description. That is, the input dictionary and the output dictionary have the same key set, but their values are of quite a different nature. This is a problem, because we want to apply our MapReduce job iteratively, and it’s the reason that at the end of the pagerank function we have to go through and laboriously update our current estimate for the PageRank vector. This is not a good thing – it’s ugly, and it means that part of the job is not automatically parallelizable.

One way of solving this problem would be to pass through the entire MapReduce job a lot of extra information about page description. Doing that has some overhead, though, both conceptually and computationally. What we’ll see in later posts is that by choosing the way we represent data a bit more carefully, we can have our cake and eat it too. I’ll leave that for later posts, because it’s a fairly minor point, and I don’t want to distract from the big picture, which is the focus of this post.

Second, you’ll notice that in the pagerank function, we dyamically define the pr_reducer function, using the pr_reducer_inter function. As you can see from the code, the only difference between the two is that pr_reducer effectively has some of pr_reducer_inter‘s slots filled in, most notably, the value ip for the inner product, produced by the first MapReduce job. The reason we need to do this is because the map_reduce function we’ve defined expects the reducer function to just have two arguments, an intermediate key, and a list of intermediate values.

There are other ways we could achieve the same effect, of course. Most obviously, we could modify the map_reduce function so that extra parameters can be passed to the mapper and reducer. There shouldn’t be too many extra parameters, of course, because those parameters will need to be communicated to all computers in the cluster, but a small set would be perfectly acceptable. I went with the dynamic definition of pr_reducer simply because it seemed fun and elegant.

Exercises

  • The dynamic definition of pr_reducer is very convenient in our code. Can you think of any problems that might arise in using such dynamic definitions on a cluster? Can you think of any ways you might avoid those problems, retaining the ability to use dynamically defined mappers and reducers?

Third, and finally, the way we compute the error estimate is not obviously parallelized. It’s easy to see how you could parallelize it using MapReduce, but, as above, the particular data representation we’re using makes this inconvenient. This will also be easily fixed when we move to our new data representation, in a later post.

A MapReduce programming heuristic

We’ve now seen two examples of using MapReduce to solve programming problems. The first, in an earlier post, showed how to use MapReduce to count word occurrences in a collection of files. The second is the example of this post, namely, to compute PageRank.

As a general rule, when you take a programming task, even one that’s very familiar, it may be challenging to figure out how to implement the algorithm using MapReduce. Not only do you need to find a way of fitting it into the MapReduce framework, you need to make sure the resulting algorithm is well adapted to take advantage of the framework. Think of how we dealt with dangling pages in the PageRank example – we could easily have modelled a dangling page as being connected to every other page, but the overhead in MapReduce would be enormous. We needed to take another approach to get the advantages of MapReduce.

With that said, it’s worth stepping back and distilling out a heuristic for attacking problems using MapReduce. The heuristic is already implicit in earlier discussion, but I’ve found it has helped my thinking to make the heuristic really explicit.

Think back to the wordcount example. There are some interesting patterns in that example, patterns that we’ll see are also repeated in other examples of MapReduce:

  1. There is a large set of questions we want to answer: for each word w in our set of documents, how many times does w appear? The intermediate keys are simply labels for those questions, i.e., there is one intermediate key for each question we want answered. Naturally enough, we use the word itself as the label.
  2. What the map phase does is takes a piece of input data (a file), and then identifies all the questions to which the input data might be relevant, i.e., all the words whose count might be affected by that document. For each such question it outputs the corresponding intermediate key (the word), and whatever information seems relevant to that particular question (in this case, a count).
  3. What the reduce phase recieves as input for a particular intermediate key (i.e., question), is simply all the information relevant to that question, which it can process to produce the answer to the question.

The same pattern is followed in the computation of PageRank using MapReduce. We have a large set of questions we’d like answered: what are the values for Mp_j? We label those questions using j, and so the j are the intermediate keys. What the map phase does is takes a piece of input data (a particular page and its description), and identifies all other pages it is linked to, and therefore might contribute probability to, outputting the corresponding intermediate key (the page linked to), and the relevant information (in this case, the amount of probability that needs to be sent to the linked page). The reducer for any given page k thus receives all information relevant to computing the updated probability distribution.

This same pattern is also followed in the little MapReduce job we described for computing the inner product. There, it’s just a single question that we’re interested in: what’s the value of the inner product between [tex]p[/tex] and the vector of dangling pages? There is thus just a single intermediate key, for which we use the placeholder 1 – we could use anything. The mappers output all the information that’s relevant to that question, meaning they output nothing if a page isn’t dangling, and they output p_j if it is dangling. The reducer combines all this information to get the answer.

I should stress that this is just a heuristic for writing MapReduce programs. There are potentially other ways of using PageRank in algorithms. Furthermore, if you’re having trouble in fitting your programming problem into the MapReduce approach, you’d be advised to consider things like changing the set of questions you’re considering, or otherwise changing the way you represent the data in the problem. It may also be that there’s no good way of solving your problem using MapReduce; MapReduce is a hammer, but not every programming problem is a nail. With these caveats in mind, the heuristic I’ve described can be a useful way of thinking about how to approach putting familiar problems into a form where they can be tackled using MapReduce.

About this post: This is one in a series of posts about the Google Technology Stack – PageRank, MapReduce, and so on. The posts are summarized here, and there is FriendFeed room for the series here. Subscribe to my blog to follow future posts in the series.

Published
Categorized as GTS

How Are the Mighty Fallen

Joshua Gans writes to point out a paper he wrote with George Shepherd, “How Are the Mighty Fallen: Rejected Classic Articles by Leading Economists” (link to pdf), about the experience some leading economists have had with peer review:

We asked over 140 leading economists, including all living winners of the Nobel Prize and John Bates Clark Medal, to describe instances in which journals rejected their papers. We hit a nerve. More than 60 percent responded, many with several blistering pages. Paul Krugman expressed the tone of many letters: “Thanks for the opportunity to let off a bit of steam.”

The paper is extremely readable (and entertaining), if you have any interest in peer review. Among other tidbits: an extraordinary list of rejected papers, many of them among the classics of economics; the estimate from Krugman that 60% of his papers are rejected on first try; the remarkable story of George Akerlof’s Nobel Prize-Winning paper “The Market for Lemons”, rejected by three separate journals before being published; the two rejections of the famous Black-Scholes options pricing paper, also Nobel Prize-Winning; Krugman’s comment that “I am having a terrible time with my current work on economic geography: referees tell me that it’s obvious, it’s wrong, and anyway they said it years ago.” There’s much more.

Addendum: Joshua also pointed me to a retrospective on the article (pdf here), which makes for interesting followup reading.

Biweekly links for 01/09/2009

Click here for all of my del.icio.us bookmarks.

Published

Three myths about scientific peer review

What’s the future of scientific peer review? The way science is communicated is currently changing rapidly, leading to speculation that the peer review system itself might change. For example, the wildly successful physics preprint arXiv is only very lightly moderated, which has led many people to wonder if the peer review process might perhaps die out, or otherwise change beyond recognition.

I’m currently finishing up a post on the future of peer review, which I’ll post in the near future. Before I get to that, though, I want to debunk three widely-believed myths about peer review, myths which can derail sensible discussion of the future of peer review.

A brief terminological note before I get to the myths: the term “peer review” can mean many different things in science. In this post, I restrict my focus to the anonymous peer review system scientific journals use to decide whether to accept or reject scientific papers.

Myth number 1: Scientists have always used peer review

The myth that scientists adopted peer review broadly and early in the history of science is surprisingly widely believed, despite being false. It’s true that peer review has been used for a long time – a process recognizably similar to the modern system was in use as early as 1731, in the Royal Society of Edinburgh’s Medical Essays and Observations (ref). But in most scientific journals, peer review wasn’t routine until the middle of the twentieth century, a fact documented in historical papers by Burnham, Kronick, and Spier.

Let me give a few examples to illustrate the point.

As a first example, we’ll start with the career of Albert Einstein, who wasn’t just an outstanding scientist, but was also a prolific scientist, publishing more than 300 journal articles between 1901 and 1955. Many of Einstein’s most ground-breaking papers appeared in his “miracle year” of 1905, when he introduced new ways of understanding space, time, energy, momentum, light, and the structure of matter. Not bad for someone unable to secure an academic position, and working as a patent clerk in the Swiss patent office.

How many of Einstein’s 300 plus papers were peer reviewed? According to the physicist and historian of science Daniel Kennefick, it may well be that only a single paper of Einstein’s was ever subject to peer review. That was a paper about gravitational waves, jointly authored with Nathan Rosen, and submitted to the journal Physical Review in 1936. The Physical Review had at that time recently introduced a peer review system. It wasn’t always used, but when the editor wanted a second opinion on a submission, he would send it out for review. The Einstein-Rosen paper was sent out for review, and came back with a (correct, as it turned out) negative report. Einstein’s indignant reply to the editor is amusing to modern scientific sensibilities, and suggests someone quite unfamiliar with peer review:

Dear Sir,

We (Mr. Rosen and I) had sent you our manuscript for publication and had not authorized you to show it to specialists before it is printed. I see no reason to address the in any case erroneous comments of your anonymous expert. On the basis of this incident I prefer to publish the paper elsewhere.

Respectfully,

P.S. Mr. Rosen, who has left for the Soviet Union, has authorized me to represent him in this matter.

As a second example, consider the use of peer review at the journal Nature. The prestige associated with publishing in Nature is, of course, considerable, and so competition to get published there is tough. According to Nature’s website, only 8 percent of submissions are accepted, and the rest are rejected. Given this, you might suppose that Nature has routinely used peer review for a long time as a way of filtering submissions. In fact, a formal peer review system wasn’t introduced by Nature until 1967. Prior to that, some papers were refereed, but some weren’t, and many of Nature’s most famous papers were not refereed. Instead, it was up to editorial judgement to determine which papers should be published, and which papers should be rejected.

This was a common practice in the days before peer review became widespread: decisions about what to publish and what to reject were usually made by journal editors, often acting largely on their own. These decisions were often made rapidly, with papers appearing days or weeks after submission, after a cursory review by the editor. Rejection rates at most journals were low, with only obviously inappropriate or unsound material being rejected; indeed, for some Society journals, Society members even asserted a “right” to publication, which occasionally caused friction with unhappy editors (ref).

What caused the change to the modern system of near-ubiquitous peer review? There were three main factors. The first was the increasing specialization of science (ref). As science became more specialized in the early 20th century, editors gradually found it harder to make informed decisions about what was worth publishing, even by the relatively relaxed standards common in many journals at the time.

The second factor in the move to peer review was the enormous increase in the number of scientific papers being published (ref). In the 1800s and early 1900s, journals often had too few submissions. Journal editors would actively round up submissions to make sure their journals remained active. The role of many editorial boards was to make sure enough papers were being submitted; if the journal came up short, members of the editorial board would be asked to submit papers themselves. As late as 1938, the editor-in-chief of the prestigious journal Science relied on personal solicitations for most articles (ref).

The twentieth century saw a massive increase in the number of scientists, a much easier process for writing papers, due to technologies such as typewriters, photocopiers, and computers, and a gradually increasing emphasis on publication in decisions about jobs, tenure, grants and prizes. These factors greatly increased the number of papers being written, and added pressure for filtering mechanisms, such as peer review.

The third factor in the move to peer review (ref) was the introduction of technologies for copying papers. It’s just plain editorially difficult to implement peer review if you can’t easily make copies of papers. The first step along this road was the introduction of typewriters and carbon paper in the 1890s, followed by the commercial introduction of photocopiers in 1959. Both technologies made peer review much easier to implement.

Nowadays, of course, the single biggest factor preserving the peer review system is probably social inertia: in most fields of science, a journal that’s not peer-reviewed isn’t regarded as serious, and so new journals invariably promote the fact that they are peer reviewed. But it wasn’t always that way.

Myth number 2: peer review is reliable

Update: Bill Hooker has pointed out that I’m using a very strong sense of “reliable” in this section, holding peer review to the standard that it nearly always picks up errors, is a very accurate gauge of quality, and rarely suppresses innovation. If you adopt a more relaxed notion of reliability, as many but not all scientists and members of the general public do, then I’d certainly back off describing this as a myth. As an approximate filter that eliminates or improves many papers, peer review may indeed function well.

Every scientist has a story (or ten) about how they were poorly treated by peer review – the important paper that was unfairly rejected, or the silly editor who ignored their sage advice as a referee. Despite this, many strongly presume that the system works “pretty well”, overall.

There’s not much systematic evidence for that presumption. In 2002 Jefferson et al (ref) surveyed published studies of biomedical peer review. After an extensive search, they found just 19 studies which made some attempt to eliminate obvious confounding factors. Of those, just two addressed the impact of peer review on quality, and just one addressed the impact of peer review on validity; most of the rest of the studies were concerned with questions like the effect of double-blind reviewing. Furthermore, for the three studies that addressed quality and validity, Jefferson et al concluded that there were other problems with the studies which meant the results were of limited general interest; as they put it, “Editorial peer review, although widely used, is largely untested and its effects are uncertain”.

In short, at least in biomedicine, there’s not much we know for sure about the reliability of peer review. My searches of the literature suggest that we know don’t much more in other areas of science. If anything, biomedicine seems to be unusually well served, in large part because several biomedical journals (perhaps most notably the Journal of the American Medical Association) have over the last 20 years put a lot of effort into building a community of people studying the effects of peer review; Jefferson et al‘s study is one of the outcomes from that effort.

In the absence of compelling systematic studies, is there anything we can say about the reliability of peer review?

The question of reliability should, in my opinion, really be broken up into three questions. First, does peer review help verify the validity of scientific studies; second, does peer review help us filter scientific studies, making the higher quality ones easier to find, because they get into the “best” journals, i.e., the ones with the most stringent peer review; third, to what extent does peer review suppress innovation?

As regards validity and quality, you don’t have to look far to find striking examples suggesting that peer review is at best partially reliable as a check of validity and a filter of quality.

Consider the story of the German physicist Jan Hendrik Schoen. In 2000 and 2001 Schoen made an amazing series of breakthroughs in organic superconductivity, publishing his 2001 work at a rate of one paper every 8 days, many in prestigious journals such as Nature, Science, and the Physical Review. Eventually, it all seemed a bit too good to be true, and other researchers in his community began to ask questions. His work was investigated, and much of it found to be fraudulent. Nature retracted seven papers by Schoen; Science retracted eight papers; and the Physical Review retracted six. What’s truly breathtaking about this case is the scale of it: it’s not that a few referees failed to pick up on the fraud, but rather that the refereeing system at several of the top journals systematically failed to detect the fraud. Furthermore, what ultimately brought Schoen down was not the anonymous peer review system used by journals, but rather investigation by his broader community of peers.

You might object to using this as an example on the grounds that the Schoen case involved deliberate scientific fraud, and the refereeing system isn’t intended to catch fraud so much as it is to catch mistakes. I think that’s a pretty weak objection – it can be a thin line between honest mistakes and deliberate fraud – but it’s not entirely without merit. As a second example, consider an experiment conducted by the editors of the British Medical Journal (ref). They inserted eight deliberate errors into a paper already accepted for publication, and sent the paper to 420 potential reviewers. 221 responded, catching on average only two of the errors. None of the reviewers caught more than five of the errors, and 16 percent no errors at all.

None of these examples is conclusive. But they do suggest that the refereeing system is far from perfect as a means of checking validity or filtering the quality of scientific papers.

What about the suppression of innovation? Every scientist knows of major discoveries that ran into trouble with peer review. David Horrobin has a remarkable paper (ref) where he documents some of the discoveries almost suppressed by peer review; as he points out, he can’t list the discoveries that were in fact suppressed by peer review, because we don’t know what those were. His list makes horrifying reading. Here’s just a few instances that I find striking, drawn in part from his list. Note that I’m restricting myself to suppression of papers by peer review; I believe peer review of grants and job applications probably has a much greater effect in suppressing innovation.

  • George Zweig’s paper announcing the discovery of quarks, one of the fundamental building blocks of matter, was rejected by Physical Review Letters. It was eventually issued as a CERN report.
  • Berson and Yalow’s work on radioimmunoassay, which led to a Nobel Prize, was rejected by both Science and the Journal of Clinical Investigation. It was eventually published in the Journal of Clinical Investigation.
  • Krebs’ work on the citric acid cycle, which led to a Nobel Prize, was rejected by Nature. It was published in Experientia.
  • Wiesner’s paper introducing quantum cryptography was initially rejected, finally appearing well over a decade after it was written.

To sum up: there is very little reliable evidence about the effect of peer review available from systematic studies; peer review is at best an imperfect filter for validity and quality; and peer review sometimes has a chilling effect, suppressing important scientific discoveries.

At this point I expect most readers will have concluded that I don’t much like the current peer review system. Actually, that’s not true, a point that will become evident in my post about the future of peer review. There’s a great deal that’s good about the current peer review system, and that’s worth preserving. However, I do believe that many people, both scientists and non-scientists, have a falsely exalted view of how well the current peer review system functions. What I’m trying to do in this post is to establish a more realistic view, and that means understanding some of the faults of the current system.

Myth: Peer review is the way we determine what’s right and wrong in science

By now, it should be clear that the peer review system must play only a partial role in determing what scientists think of as established science. There’s no sign, for example, that the lack of peer review in the 19th and early 20th century meant that scientists then were more confused than now about what results should be regarded as well established, and what should not. Nor does it appear that the unreliability of the peer review process leaves us in any great danger of collectively coming to believe, over the long run, things that are false.

In practice, of course, nearly all scientists understand that peer review is only part of a much more complex process by which we evaluate and refine scientific knowledge, gradually coming to (provisionally) accept some findings as well established, and discarding the rest. So, in that sense, this third myth isn’t one that’s widely believed within the scientific community. But in many scientists’ shorthand accounts of how science progresses, peer review is given a falsely exaggerated role, and this is reflected in the understanding many people in the general public have of how science works. Many times I’ve had non-scientists mention to me that a paper has been “peer-reviewed!”, as though that somehow establishes that it is correct, or high quality. I’ve encountered this, for example, in some very good journalists, and it’s a concern, for peer review is only a small part of a much more complex and much more reliable system by which we determine what scientific discoveries are worth taking further, and what should be discarded.

Further reading

I’m writing a book about “The Future of Science”; this post is part of a series where I try out ideas from the book in an open forum. A summary of many of the themes in the book is available in this essay. If you’d like to be notified when the book is available, please send a blank email to the.future.of.science@gmail.com with the subject “subscribe book”. I’ll email you to let you know in advance of publication. I will not use your email address for any other purpose! You can subscribe to my blog here.

Biweekly links for 01/05/2009

Click here for all of my del.icio.us bookmarks.

Published

Write your first MapReduce program in 20 minutes

The slow revolution

Some revolutions are marked by a single, spectacular event: the storming of the Bastille during the French Revolution, or the destruction of the towers of the World Trade Center on September 11, 2001, which so changed the US’s relationship with the rest of the world. But often the most important revolutions aren’t announced with the blare of trumpets. They occur softly, too slow for a single news cycle, but fast enough that if you aren’t alert, the revolution is over before you’re aware it’s happening.

Such a revolution is happening right now in computing. Microprocessor clock speeds have stagnated since about 2000. Major chipmakers such as Intel and AMD continue to wring out improvements in speed by improving on-chip caching and other clever techniques, but they are gradually hitting the point of diminishing returns. Instead, as transistors continue to shrink in size, the chipmakers are packing multiple processing units onto a single chip. Most computers shipped today use multi-core microprocessors, i.e., chips with 2 (or 4, or 8, or more) separate processing units on the main microprocessor.

The result is a revolution in software development. We’re gradually moving from the old world in which multiple processor computing was a special case, used only for boutique applications, to a world in which it is widespread. As this movement happens, software development, so long tailored to single-processor models, is seeing a major shift in some its basic paradigms, to make the use of multiple processors natural and simple for programmers.

This movement to multiple processors began decades ago. Projects such as the Connection Machine demonstrated the potential of massively parallel computing in the 1980s. In the 1990s, scientists became large-scale users of parallel computing, using parallel computing to simulate things like nuclear explosions and the dynamics of the Universe. Those scientific applications were a bit like the early scientific computing of the late 1940s and 1950s: specialized, boutique applications, built with heroic effort using relatively primitive tools. As computing with multiple processors becomes widespread, though, we’re seeing a flowering of general-purpose software development tools tailored to multiple processor environments.

One of the organizations driving this shift is Google. Google is one of the largest users of multiple processor computing in the world, with its entire computing cluster containing hundreds of thousands of commodity machines, located in data centers around the world, and linked using commodity networking components. This approach to multiple processor computing is known as distributed computing; the characteristic feature of distributed computing is that the processors in the cluster don’t necessarily share any memory or disk space, and so information sharing must be mediated by the relatively slow network.

In this post, I’ll describe a framework for distributed computing called MapReduce. MapReduce was introduced in a paper written in 2004 by Jeffrey Dean and Sanjay Ghemawat from Google. What’s beautiful about MapReduce is that it makes parallelization almost entirely invisible to the programmer who is using MapReduce to develop applications. If, for example, you allocate a large number of machines in the cluster to a given MapReduce job, the job runs in a highly parallelized way. If, on the other hand, you allocate only a small number of machines, it will run in a much more serial way, and it’s even possible to run jobs on just a single machine.

What exactly is MapReduce? From the programmer’s point of view, it’s just a library that’s imported at the start of your program, like any other library. It provides a single library call that you can make, passing in a description of some input data and two ordinary serial functions (the “mapper” and “reducer”) that you, the programmer, specify in your favorite programming language. MapReduce then takes over, ensuring that the input data is distributed through the cluster, and computing those two functions across the entire cluster of machines, in a way we’ll make precise shortly. All the details – parallelization, distribution of data, tolerance of machine failures – are hidden away from the programmer, inside the library.

What we’re going to do in this post is learn how to use the MapReduce library. To do this, we don’t need a big sophisticated version of the MapReduce library. Instead, we can get away with a toy implementation (just a few lines of Python!) that runs on a single machine. By using this single-machine toy library we can learn how to develop for MapReduce. The programs we develop will run essentially unchanged when, in later posts, we improve the MapReduce library so that it can run on a cluster of machines.

Our first MapReduce program

Okay, so how do we use MapReduce? I’ll describe it with a simple example, which is a program to count the number of occurrences of different words in a set of files. The example is simple, but you’ll find it rather strange if you’re not already familiar with MapReduce: the program we’ll describe is certainly not the way most programmers would solve the word-counting problem! What it is, however, is an excellent illustration of the basic ideas of MapReduce. Furthermore, what we’ll eventually see is that by using this approach we can easily scale our wordcount program up to run on millions or even billions of documents, spread out over a large cluster of computers, and that’s not something a conventional approach could do easily.

The input to a MapReduce job is just a set of (input_key,input_value) pairs, which we’ll implement as a Python dictionary. In the wordcount example, the input keys will be the filenames of the files we’re interested in counting words in, and the corresponding input values will be the contents of those files:

filenames = ["text\\a.txt","text\\b.txt","text\\c.txt"]
i = {}
for filename in filenames:
  f = open(filename)
  i[filename] = f.read()
  f.close()

After this code is run the Python dictionary i will contain the input to our MapReduce job, namely, i has three keys containing the filenames, and three corresponding values containing the contents of those files. Note that I’ve used Windows’ filenaming conventions above; if you’re running a Mac or Linux you may need to tinker with the filenames. Also, to run the code you will of course need text files text\1.txt, text\2.txt, text\3.txt. You can create some simple examples texts by cutting and pasting from the following:

text\a.txt:

The quick brown fox jumped over the lazy grey dogs.

text\b.txt:

That's one small step for a man, one giant leap for mankind.

text\c.txt:

Mary had a little lamb,
Its fleece was white as snow;
And everywhere that Mary went,
The lamb was sure to go.

The MapReduce job will process this input dictionary in two phases: the map phase, which produces output which (after a little intermediate processing) is then processed by the reduce phase. In the map phase what happens is that for each (input_key,input_value) pair in the input dictionary i, a function mapper(input_key,input_value) is computed, whose output is a list of intermediate keys and values. This function mapper is supplied by the programmer – we’ll show how it works for wordcount below. The output of the map phase is just the list formed by concatenating the list of intermediate keys and values for all of the different input keys and values.

I said above that the function mapper is supplied by the programmer. In the wordcount example, what mapper does is takes the input key and input value – a filename, and a string containing the contents of the file – and then moves through the words in the file. For each word it encounters, it returns the intermediate key and value (word,1), indicating that it found one occurrence of word. So, for example, for the input key text\a.txt, a call to mapper("text\\a.txt",i["text\\a.txt"]) returns:

[('the', 1), ('quick', 1), ('brown', 1), ('fox', 1), ('jumped', 1), 
 ('over', 1), ('the', 1), ('lazy', 1), ('grey', 1), ('dogs', 1)]

Notice that everything has been lowercased, so we don’t count words with different cases as distinct. Furthermore, the same key gets repeated multiple times, because words like the appear more than once in the text. This, incidentally, is the reason we use a Python list for the output, and not a Python dictionary, for in a dictionary the same key can only be used once.

Here’s the Python code for the mapper function, together with a helper function used to remove punctuation:

def mapper(input_key,input_value):
  return [(word,1) for word in 
          remove_punctuation(input_value.lower()).split()]

def remove_punctuation(s):
  return s.translate(string.maketrans("",""), string.punctuation)

mapper works by lowercasing the input file, removing the punctuation, splitting the resulting string around whitespace, and finally emitting the pair (word,1) for each resulting word. Note, incidentally, that I’m ignoring apostrophes, to keep the code simple, but you can easily extend the code to deal with apostrophes and other special cases.

With this specification of mapper, the output of the map phase for wordcount is simply the result of combining the lists for mapper("text\\a.txt"), mapper("text\\b.txt"), and mapper("text\\c.txt"):

[('the', 1), ('quick', 1), ('brown', 1), ('fox', 1), 
 ('jumped', 1), ('over', 1), ('the', 1), ('lazy', 1), ('grey', 1), 
 ('dogs', 1), ('mary', 1), ('had', 1), ('a', 1), ('little', 1), 
 ('lamb', 1), ('its', 1), ('fleece', 1), ('was', 1), ('white', 1), 
 ('as', 1), ('snow', 1), ('and', 1), ('everywhere', 1), 
 ('that', 1), ('mary', 1), ('went', 1), ('the', 1), ('lamb', 1), 
 ('was', 1), ('sure', 1), ('to', 1), ('go', 1), ('thats', 1), 
 ('one', 1), ('small', 1), ('step', 1), ('for', 1), ('a', 1), 
 ('man', 1), ('one', 1), ('giant', 1), ('leap', 1), ('for', 1), 
 ('mankind', 1)]

The map phase of MapReduce is logically trivial, but when the input dictionary has, say 10 billion keys, and those keys point to files held on thousands of different machines, implementing the map phase is actually quite non-trivial. What the MapReduce library handles is details like knowing which files are stored on what machines, making sure that machine failures don’t affect the computation, making efficient use of the network, and storing the output in a useable form. We won’t worry about these issues for now, but we will come back to them in future posts.

What the MapReduce library now does in preparation for the reduce phase is to group together all the intermediate values which have the same key. In our example the result of doing this is the following intermediate dictionary:

{'and': [1], 'fox': [1], 'over': [1], 'one': [1, 1], 'as': [1], 
 'go': [1], 'its': [1], 'lamb': [1, 1], 'giant': [1], 
 'for': [1, 1], 'jumped': [1], 'had': [1], 'snow': [1], 
 'to': [1], 'leap': [1], 'white': [1], 'was': [1, 1], 
 'mary': [1, 1], 'brown': [1], 'lazy': [1], 'sure': [1], 
 'that': [1], 'little': [1], 'small': [1], 'step': [1], 
 'everywhere': [1], 'mankind': [1], 'went': [1], 'man': [1], 
 'a': [1, 1], 'fleece': [1], 'grey': [1], 'dogs': [1], 
 'quick': [1], 'the': [1, 1, 1], 'thats': [1]}

We see, for example, that the word ‘and’, which appears only once in the three files, has as its associated value a list containing just a single 1, [1]. By contrast, the word ‘one’, which appears twice, has [1,1] as its value.

The reduce phase now commences. A programmer-defined function reducer(intermediate_key,intermediate_value_list) is applied to each entry in the intermediate dictionary. For wordcount, reducer simply sums up the list of intermediate values, and return both the intermediate_key and the sum as the output. This is done by the following code:

def reducer(intermediate_key,intermediate_value_list):
  return (intermediate_key,sum(intermediate_value_list))

The output from the reduce phase, and from the total MapReduce computation, is thus:

[('and', 1), ('fox', 1), ('over', 1), ('one', 2), ('as', 1), 
 ('go', 1), ('its', 1), ('lamb', 2), ('giant', 1), ('for', 2), 
 ('jumped', 1), ('had', 1), ('snow', 1), ('to', 1), ('leap', 1), 
 ('white', 1), ('was', 2), ('mary', 2), ('brown', 1), 
 ('lazy', 1), ('sure', 1), ('that', 1), ('little', 1), 
 ('small', 1), ('step', 1), ('everywhere', 1), ('mankind', 1), 
 ('went', 1), ('man', 1), ('a', 2), ('fleece', 1), ('grey', 1), 
 ('dogs', 1), ('quick', 1), ('the', 3), ('thats', 1)]

You can easily check that this is just a list of the words in the three files we started with, and the associated wordcounts, as desired.

We’ve looked at code defining the input dictionary i, the mapper and reducer functions. Collecting it all up, and adding a call to the MapReduce library, here’s the complete wordcount.py program:

#word_count.py

import string
import map_reduce

def mapper(input_key,input_value):
  return [(word,1) for word in 
          remove_punctuation(input_value.lower()).split()]

def remove_punctuation(s):
  return s.translate(string.maketrans("",""), string.punctuation)

def reducer(intermediate_key,intermediate_value_list):
  return (intermediate_key,sum(intermediate_value_list))

filenames = ["text\\a.txt","text\\b.txt","text\\c.txt"]
i = {}
for filename in filenames:
  f = open(filename)
  i[filename] = f.read()
  f.close()

print map_reduce.map_reduce(i,mapper,reducer)

The map_reduce module imported by this program implements MapReduce in pretty much the simplest possible way, using some useful functions from the itertools library:

# map_reduce.py
"'Defines a single function, map_reduce, which takes an input
dictionary i and applies the user-defined function mapper to each
(input_key,input_value) pair, producing a list of intermediate 
keys and intermediate values.  Repeated intermediate keys then 
have their values grouped into a list, and the user-defined 
function reducer is applied to the intermediate key and list of 
intermediate values.  The results are returned as a list."'

import itertools

def map_reduce(i,mapper,reducer):
  intermediate = []
  for (key,value) in i.items():
    intermediate.extend(mapper(key,value))
  groups = {}
  for key, group in itertools.groupby(sorted(intermediate), 
                                      lambda x: x[0]):
    groups[key] = list([y for x, y in group])
  return [reducer(intermediate_key,groups[intermediate_key])
          for intermediate_key in groups] 

(Credit to a nice blog post from Dave Spencer for the use of itertools.groupby to simplify the reduce phase.)

Obviously, on a single machine an implementation of the MapReduce library is pretty trivial! In later posts we’ll extend this library so that it can distribute the execution of the mapper and reducer functions across multiple machines on a network. The payoff is that with enough improvement to the library we can with essentially no change use our wordcount.py program to count the words not just in 3 files, but rather the words in billions of files, spread over thousands of computers in a cluster. What the MapReduce library does, then, is provide an approach to developing in a distributed environment where many simple tasks (like wordcount) remain simple for the programmer. Important (but boring) tasks like parallelization, getting the right data into the right places, dealing with the failure of computers and networking components, and even coping with racks of computers being taken offline for maintenance, are all taken care of under the hood of the library.

In the posts that follow, we’re thus going to do two things. First, we’re going to learn how to develop MapReduce applications. That means taking familiar tasks – things like computing PageRank – and figuring out how they can be done within the MapReduce framework. We’ll do that in the next post in this series. In later posts, we’ll also take a look at Hadoop, an open source platform that can be used to develop MapReduce applications.

Second, we’ll go under the hood of MapReduce, and look at how it works. We’ll scale up our toy implementation so that it can be used over small clusters of computers. This is not only fun in its own right, it will also make us better MapReduce programmers, in the same way as understanding the innards of an operating system (for example) can make you a better application programmer.

To finish off this post, though, we’ll do just two things. First, we’ll sum up what MapReduce does, stripping out the wordcount-specific material. It’s not any kind of a formal specification, just a brief informal summary, together with a few remarks. We’ll refine this summary a little in some future posts, but this is the basic MapReduce model. Second, we’ll give a overview of how MapReduce takes advantage of a distributed environment to parallelize jobs.

MapReduce in general

Summing up our earlier description of MapReduce, and with the details about wordcount removed, the input to a MapReduce job is a set of (input_key,input_value) pairs. Each pair is used as input to a function mapper(input_key,input_value) which produces as output a list of intermediate keys and intermediate values:

[(intermediate_key,intermediate_value),
 (intermediate_key',intermediate_value'),
 ...]

The output from all the different input pairs is then sorted, so that values associated with the same intermediate_key are grouped together in a list of intermediate values. The reducer(intermediate_key,intermediate_value_list) function is then applied to each intermediate key and list of intermediate values, to produce the output from the MapReduce job.

A natural question is whether the order of values in intermediate_value_list matters. I must admit I’m not sure of the answer to this question – if it’s discussed in the original paper, then I missed it. In most of the examples I’m familiar with, the order doesn’t matter, because the reducer works by applying a commutative, associate operation across all intermediate values in the list. As we’ll see in a minute, because the mapper computations are potentially done in parallel, on machines which may be of varying speed, it’d be hard to guarantee the ordering, and this suggests that the ordering doesn’t matter. It’d be nice to know for sure – if anyone reading this does know the answer, I’d appreciate hearing it, and will update the post!

One of the most striking things about MapReduce is how restrictive it is. A priori it’s by no means clear that MapReduce should be all that useful in practical applications. It turns out, though, that many interesting computations can be expressed either directly in MapReduce, or as a sequence of a few MapReduce computations. We’ve seen wordcount implemented in this post, and we’ll see how to compute PageRank using MapReduce in the next post. Many other important computations can also be implemented using MapReduce, including doing things like finding shortest paths in a graph, grepping a large document collection, or many data mining algorithms. For many such problems, though, the standard approach doesn’t obviously translate into MapReduce. Instead, you need to think through the problem again from scratch, and find a way of doing it using MapReduce.

Exercises

  • How would you implement grep in MapReduce?

Problems

  • Take a well-known algorithms book, e.g., Cormen-Leiserson-Rivest-Stein, or a list of well-known algorithms. Which algorithms lend themselves to being expressed in MapReduce? Which do not? This isn’t so much a problem as it is a suggestion for a research program.
  • The early years of serial computing saw the introduction of powerful general-purpose ideas like those that went into the Lisp and Smalltalk programming languages. Arguably, those ideas are still the most powerful in today’s modern programming languages. MapReduce isn’t a programming language as we conventionally think of them, of course, but it’s similar in that it introduces a way of thinking about and approaching programming problems. I don’t think MapReduce has quite the same power as the ideas that went into Lisp and Smalltalk; it’s more like the Fortran of distributed computing. Can we find similarly powerful ideas to Lisp or Smalltalk in distributed computing? What’s the hundred-year framework going to look like for distributed computing?

How MapReduce takes advantage of the distributed setting

You can probably already see how MapReduce takes advantage of a large cluster of computers, but let’s spell out some of the details. There are two key points. First, the mapper functions can be run in parallel, on different processors, because they don’t share any data. Provided the right data is in the local memory of the right processor – a task MapReduce manages for you – the different computations can be done in parallel. The more machines are in the cluster, the more mapper computations can be simultaneously executed. Second, the reducer functions can also be run in parallel, for the same reason, and, again, the more machines are available, the more computations can be done in parallel.

The difficulty in this story arises in the grouping step that takes place between the map phase and the reduce phase. For the reducer functions to work in parallel, we need to ensure that all the intermediate values corresponding to the same key get sent to the same machine. Obviously, this requires communcation between machines, over the relatively slow network connections.

This looks tough to arrange, and you might think (as I did at first) that a lot of communication would be required to get the right data to the right machine. Fortunately, a simple and beautiful idea is used to make sure that the right data ends up in the right location, without there being too much communication overhead.

Imagine you’ve got 1000 machines that you’re going to use to run reducers on. As the mappers compute the intermediate keys and value lists, they compute hash(intermediate_key) mod 1000 for some hash function. This number is used to identify the machine in the cluster that the corresponding reducer will be run on, and the resulting intermediate key and value list is then sent to that machine. Because every machine running mappers uses the same hash function, this ensures that value lists corresponding to the same intermediate key all end up at the same machine. Furthermore, by using a hash we ensure that the intermediate keys end up pretty evenly spread over machines in the cluster. Now, I should warn you that in practice this hashing method isn’t literally quite what’s done (we’ll describe that in later lectures), but that’s the main idea.

Needless to say, there’s a lot more to the story of how MapReduce works in practice, especially the way it handles data distribution and fault-tolerance. In future posts we’ll explore many more of the details. Nonetheless, hopefully this post gives you a basic understanding of how MapReduce works. In the next post we’ll apply MapReduce to the computation of PageRank.

About this post: This is one in a series of posts about the Google Technology Stack – PageRank, MapReduce, and so on. The posts are summarized here, and there is FriendFeed room for the series here. You can subscribe to my blog to follow future posts in the series.

Published
Categorized as GTS

Biweekly links for 01/02/2009

  • Why is the free market letting us down?
    • Interesting summary of some of the problems and promise of markets.
  • Eric Drexler: Greenhouse Gases and Advanced Nanotechnology
  • Building Nutch: Open Source Search
    • A paper describing Nutch, an open source search engine, from the creator, Doug Cutting.
  • Now that’s what I call LEGO
    • A guy has built a LEGO aircraft carrier that looks to be about 25 feet long.
  • Multi-core in the Source Engine
    • A fun article describing some of the challenges involved in moving the Source Game Engine (used in Half-Life 2 and other games) to a world in which multi-core processors are the norm.
  • Steve Koch Research blog
    • Steve Koch is building up an experimental biophysics lab at the University of New Mexico. He’s blogging here about some of the work going on in the lab, in an open-sciencey way. The first post links to his lab’s “research principles”, which includes a discussion of some of the concrete problems surrounding open science.
  • Backreaction: We are Einstein
    • Excellent thoughtful post. Short quote (there’s much more): “The larger the body of knowledge grows that we are working with, the more important it thus becomes for scientists – as for everybody else – to not only have information available, but also have the tools to search, filter, and structure this information, and to direct it to where it is useful. Given this possibility, we could save a lot of time and effort by more efficiently sorting through available sources of information, by faster finding people with the right knowledge to complement our work, by outsourcing specialized tasks to those who have the best skills. And while the first of these points is readily under way with ever more powerful search tools, the latter two are only in the beginning and will need to bring changes in the way science is presently done.”
  • Daniel Lemire: Grabbing attention or building a reputation?
    • “However, I do not blog or write research papers merely to grab attention. Instead, I seek to increase my reputation. While attention fluctuates depending on your current actions, reputation builds up over time based on your reliability, your honesty, and your transparency. To build a good reputation, you do not need to do anything extraordinary: you just need to be consistent over a long time.”
  • Cloth Physics
    • Wonderful little demo that lets you manipulate a piece of cloth(!) by dragging bits of it around.
  • The Quantum Pontiff : A Curmudgeon’s and Improv’s Guide to Outliers: Introduction
    • Dave Bacon punches some holes in the opening story from Malcolm Gladwell’s new book.

Click here for all of my del.icio.us bookmarks.

Published