Seitenanfang

MapReduce multiple MongoDB collections into one

Dieser Post wurde aus meiner alten WordPress-Installation importiert. Sollte es Darstellungsprobleme, falsche Links oder fehlende Bilder geben, bitte einfach hier einen Kommentar hinterlassen. Danke.


There are few better ways for taking a SQL server down than SELECT ... JOIN statements. Don't get me wrong - a good JOIN could really fast, but huge JOIN blocks adding 10 or more foreign tables maybe even without using indexes could easily slow down the biggest SQL server. MongoDB has MapReduce to do the job with much less server impact - but more developer brain usage.

A recent project of my has 4 collections containing different information about users and I'ld like to merge all of them into one object per user aggregating most data into a more usable form. I'll simplify the data for the samples.

The MongoDB team removed the good documentation from the web, but the WaybackMachine saved it. The bad documentation is still visible at docs.mongodb.org.

MongoDB could "reduce" (merge) the result of a MapReduce run with an existing collection using the output mode "reduce".

Collection 1

  • _id is user-id
  • last_seen is a timestamp for the last login of that user

My first MapReduce call is:

$database->run_command([  'mapreduce' => 'users', # Collection to be read  # Limit results to users with have been seen at least once  'query' => {'last_seen' => {'$exists' => 'true'}},  'map' => <<'_EOJS_',function () {'.  if (this.hasOwnProperty("last_seen")) {    // Fill the (temporary internal) map with the user-id as key and    // a new object as the value.    emit(      this._id,      { last_seen: this.last_seen, _fresh: true}    );  }}_EOJS_ 'reduce' => <<_EOJS_,function(k,value_list) { // Merge existing and new keys, override only if _fresh is set  var result = {}; // Prepare empty result object

value_list.forEach(function(value) { // Loop through all items var field; for (field in value) { // check all keys (properties) of the object // Skip the "_fresh" key (it's only used as internal marker for the new value) // && do not override any existing fields unless they're new if (field != "_fresh" && (! result.hasOwnProperty(field) || value._fresh)) { result[field] = value[field]; } } }); return result;}_EOJS_ # Remove the "_fresh" key from all objects before storing them into the database 'finalize' => 'function (k,v) { delete v._fresh; return v; }', # Combine the new objects with the existing results 'out' => {reduce => 'users_merged_info'},])

MapReduce isn't that obvious like most SQL queries. I'll show the process step-by-step. Here is a sample "users" collection (showing only the relevant keys):

_id last_seen
111 2012/12/01
222 2012/12/15
333 2012/12/31

The "map" run creates a temporary collection with unique keys and objects as values:

key value
111 { last_seen: '2012/12/01', _fresh: true }
222 { last_seen: '2012/12/15', _fresh: true }
333 { last_seen: '2012/12/31', _fresh: true }

Each key has only one value - no reduce run is necessary. The "finalize" phase will remove the "_fresh" markers and the remainder will become the new result collection (called "users_merged_info") having the map-key as the new _id:

{ _id: 111, value: { last_seen: '2012/12/01' } }{ _id: 222, value: { last_seen: '2012/12/15' } }{ _id: 333, value: { last_seen: '2012/12/31' } }

That's all for the first run. Now let's run the same MapReduce call some days later after some users have been seen again:

_id last_seen
111 2013/01/01
222 2013/01/02
333 2012/12/31

The "map" run creates a temporary collection with unique keys and objects as values (the same as above):

key value
111 { last_seen: '2013/01/01', _fresh: true }
222 { last_seen: '2013/01/02', _fresh: true }
333 { last_seen: '2012/12/31', _fresh: true }

Reduce won't run on the temporary collection again, because every key still has only one value.

The result collection has some values itself (remember the first run) and the reduce function will be called with every key and two values: The fresh one and the old one.

reduce( 111, [ { last_seen: '2013/01/01', _fresh: true }, { last_seen: '2012/12/01' } ] )reduce( 222, [ { last_seen: '2013/01/02', _fresh: true }, { last_seen: '2012/12/15' } ] )reduce( 333, [ { last_seen: '2012/12/31', _fresh: true }, { last_seen: '2012/12/31' } ] )

The reduce function will merge everything down to one value object without a _fresh marker (because _fresh isn't copied by reduce!) - nothing to do for finalize. The new result collection contains the new values.

Collection 2

The "_fresh" key isn't needed at all if there is only one input collection - but there are more. The second one has all messages for one user and we'ld like to import the number of unread messages into the merged result collection:

_id user_id unread_messages
aaa 111 [ { from: 222, text: 'Hello World' }, { from: 333, text: "How are you?" } ]
bbb 222 [ ]

User 111 has two unread messages (stored as array of message objects or message hashs) and user 222 might have had some unread messages in the past but doesn't have any currently waiting (empty array). Here is the MapReduce call:

$database->run_command([  'mapreduce' => 'user_unread_messages', # Collection to be read  # Limit results to users with have been seen at least once  'map' => <<'_EOJS_',function () {'.  // Fill the (temporary internal) map with the user-id as key and  // a new object as the value.  emit(    this._id,    { unread: this.unread_messages.length, _fresh: true }  );}_EOJS_ 'reduce' => <<_EOJS_,function(k,value_list) { // Merge existing and new keys, override only if _fresh is set  var result = {}; // Prepare empty result object

value_list.forEach(function(value) { // Loop through all items var field; for (field in value) { // check all keys (properties) of the object // Skip the "_fresh" key (it's only used as internal marker for the new value) // && do not override any existing fields unless they're new if (field != "_fresh" && (! result.hasOwnProperty(field) || value._fresh)) { result[field] = value[field]; } } }); return result;}_EOJS_ # Remove the "_fresh" key from all objects before storing them into the database 'finalize' => 'function (k,v) { delete v._fresh; return v; }', # Combine the new objects with the existing results 'out' => {reduce => 'users_merged_info'},])

Pretty much the same as the first one: The "query" is gone (because we assume that every document in this collection must be processed) and the map function simply counts the messages (by using the .length property of the unread_messagesarray).

key value
111 { unread: 2, _fresh: true }
222 { unread: 0, _fresh: true }

Reduce won't run on the temporary collection again, because every key still has only one value.

The result collection has some values for the same keys (see collection 1 above) and the reduce function will be called with every key and two values: The new one and the old one.

reduce( 111, [ { unread: 2, _fresh: true }, { last_seen: '2013/01/01' } ] )reduce( 222, [ { unread: 0, _fresh: true }, { last_seen: '2013/01/02' } ] )

There is no "new" value for user 333 and thus reduce won't run for this key.

Here is the result collection after all result calls:

{ _id: 111, value: { last_seen: '2012/12/01', unread: 2 } }{ _id: 222, value: { last_seen: '2012/12/15', unread: 0 } }{ _id: 333, value: { last_seen: '2012/12/31' } }

Notice that user 333 has no "unread" property for the value - quite normal for document-based databases.

Let's assume that user 111 read all messages and replied to user 333. The same MapReduce call is running again and map created this temporary collection:

key value
111 { unread: 0, _fresh: true }
222 { unread: 0, _fresh: true }
333 { unread: 1, _fresh: true }

Reduce will merge these values with the old ones from the result collection:

reduce( 111, [ { unread: 0, _fresh: true }, { last_seen: '2013/01/01', unread: 2 } ] )reduce( 222, [ { unread: 0, _fresh: true }, { last_seen: '2013/01/02', unread: 0 } ] )reduce( 333, [ { unread: 1, _fresh: true }, { last_seen: '2012/12/31' } ] )

User 333 doesn't have any old "unread" value but the reduce function doesn't care. Here is the result collection after all result calls:

{ _id: 111, value: { last_seen: '2012/12/01', unread: 0 } }{ _id: 222, value: { last_seen: '2012/12/15', unread: 0 } }{ _id: 333, value: { last_seen: '2012/12/31', unread: 1 } }

Merging two or more source collections into one isn't that hard as long as reduce doesn't need to process more than one map/emit value from the source collection. I'ld really like MongoDB to accept two reduce functions: One for reducing the map results and one for merging everything with the existing values from the result collection.

 

4 Kommentare. Schreib was dazu

  1. joe

    If i have the requirement to apply map reduce for the scenario where i have two input files one is multidimensional input data file with time stamps and other is meta data configuration and they dont have anything in common, then how to run map reduce over multiple files?
    > db.config.find();
    { “_id” : ObjectId(“51419da08366ded56c483ac5″), “_dim_id” : 2, “Type” : “categor
    ical”, “gran” : “4″, “value1″ : “B”, “value2″ : “G”, “value3″ : “R”, “value4″ :
    “Y” }
    { “_id” : ObjectId(“51419dc78366ded56c483ac6″), “_dim_id” : 1, “Type” : “Numeric
    “, “gran” : “2″, “value1″ : “0″, “value2″ : “50″ }
    { “_id” : ObjectId(“51419ddb8366ded56c483ac7″), “_dim_id” : 0, “Type” : “Numeric
    “, “gran” : “4″, “value1″ : “0″, “value2″ : “100″ }
    >
    > db.datafile.find();
    { “_id” : ObjectId(“51419f268366ded56c483ac8″), “_TS_id” : “6″, “data” : [ "46",
    "26", "Y" ] }
    { “_id” : ObjectId(“51419f4b8366ded56c483ac9″), “_TS_id” : 7, “data” : [ "90", "
    45", "B" ] }
    { “_id” : ObjectId(“51419f5c8366ded56c483aca”), “_TS_id” : 8, “data” : [ "23", "
    11", "R" ] }
    { “_id” : ObjectId(“51419f768366ded56c483acb”), “_TS_id” : 9, “data” : [ "22", "
    34", "G" ] }
    { “_id” : ObjectId(“51419f9b8366ded56c483acc”), “_TS_id” : 10, “data” : [ "78",
    "45", "B" ] }
    { “_id” : ObjectId(“51419faf8366ded56c483acd”), “_TS_id” : 11, “data” : [ "46",
    "26", "Y" ] }
    { “_id” : ObjectId(“51419fc28366ded56c483ace”), “_TS_id” : 12, “data” : [ "56",
    "33", "R" ] }
    >

    i hve to use map reduce to map data points coming wrt to the config file.

    Thnks in advance for ur guidance.

  2. Sebastian

    Sorry, I don't get what you want to do. I'ld suggest that you merge some sample lines manually (not by programming) and find out how you match them together. There is also some point where MapReduce can't help any longer and you need to use your favorite programming language to solve the problem, maybe you reached this point?

  3. Phill Rosen

    If finalize is called multiple times, which it often can be, this won't work.

  4. Annika

    Hab grad Probleme mit MapReduce und wo lande ich zufällig beim Googeln? Hier! Die Welt ist klein ^^

    Viele Grüße von deiner Lieblings-Exkollegin ;D
    Annika

Schreib was dazu

Die folgenden HTML-Tags sind erlaubt:<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>