+N Consulting, Inc.

Common Rollup $unionWith and $merge

TL;DR

Schedule periodic rollup and aggregation on historic data using $merge into a collection, then run ad-hoc reports on cumulative live data for the current period, $unionWith historic data.

WUT???

We often need to produce reports summarizing sales over time which include both current partial period, such as “sales this year to date”. While it’s perfectly fine to run ad hoc queries on the data set, this can become a performance issue when report frequency increases or data sets become extremely large or the periods are very long (eg: all sales to date since inception).

It’s been common practice to pre-aggregate historic data and persist it to some collection. This lets us query only the most recent yet-to-be-rolled-up period, and add it up to historic data queried from the other cumulative collection. Trivial concept, nothing terribly new here. So where’s the catch? Details (it’s always the details…):

sequenceDiagram
    autonumber
    participant a as Query
    participant ms as [monthlySales]
    participant s as [Sales]

    a->>+ms: January - July?
    ms-->>-a: <>

    a->>+s: Current Month?
    s-->>-a: <>

    a-->> a: Union

The implementation of this technique relies on 2 high level operations: the

  1. A periodic calculation to populate the historic data.
  2. An ad-hoc query over both historic and current data.

Historic Data

The technique used with MongoDB to persist historic data used to be mapReduce in the old days. Very old days. Using mapReduce allowed you to persist the results into a collection all server-side. Before or without that, you’d need to have client code computing a result-set, then turning around and writing it explicitly back into MongoDB.

With introduction the Aggregation Framework, more efficient query operators were introduced, as well as the $out pipeline operator. The issue with that though, was that $out did not play very well with sharded clusters, and did not allow for incremental data insertion. Instead, $out wiped the output collection clean and replaced its content with the new data. This implied that for a long history, you would have to compute again over a growing number of documents - not very efficient, and exacerbates the problem we were trying to avoid in the first place. You could of course run a query just across a year say, and store it into a yearly collection. But then the problem of summing up across years would require more and more collections be queried independently by the client, and you’d end up with many more collections just to handle this case. Not elegant.

Now with $merge the story becomes manageable again. $merge lets you perform incremental updates to an existing collection, so your historic data can largely remain save for the newest addition or update to the latest period.

Collection sales contains document-per-sale with a field total and date in the example below:

{ "date" : ISODate("2021-08-10T23:54:01.028Z"), "total" : NumberDecimal("178.4349") }
{ "date" : ISODate("2021-08-10T23:48:37.671Z"), "total" : NumberDecimal("17.6805") }
{ "date" : ISODate("2021-08-10T23:45:37.238Z"), "total" : NumberDecimal("135.4978") }
// etc.

To produce sum of sales, count of sale numbers across August, we just run an aggregation, producing the result into the monthlySales collection

db.sales.aggregate([
{$match: { date: {$gte: ISODate('2021-08-01'), $lt: ISODate('2021-09-01')}}},

{$group: { _id: ISODate('2021-08-01'), total: {$sum: '$total'}, saleCount: {$sum:1}}},

{$merge: { into: {db: 'test', coll: 'monthlySales'}}}
])

The shape of the document in monthlySales is verbatim the document(s) produced by the predecessor pipeline stage of $merge:

// db.monthlySales.findOne()

{ "_id" : ISODate("2021-08-01T00:00:00Z"), "saleCount" : 1613, "total" : NumberDecimal("815527.3523") }

What happens though if it is still August? A previous value for August may exist already. We have to address what would be done in case a document with the same _id is not present, as well as what do do if it is.

When a document is not present, $merge defaults to inserting it. But you can also tell it to discard the new document, or produce an error. For example, you may want to prevent accidental creation of documents in the target collection in order to prevent arbitrary items created unless some approved well known document already exists. The full universe of choices is expressed via the whenNotMatched field with value being one of insert,discard,or fail. Note that in case of fail, other documents may have been inserted so the state is kind of non-determinant , it is not wrapped in a transaction.

This example shows discarding silently and documents that $merge found no match for in the target collection. Someone would have had to create a document for August 2021 otherwise the result for August is effectively ignored by $merge and nothing will be written.

{$merge: { 
into: {db: 'test', coll: 'monthlySales'},
whenNotMatched: 'discard'
}}

When are document is matched, you have a few more choices. The default is to replace the existing document. The full list of options: replace, keepExisting, merge, fail, and pipeline. You can choose to keep the existing document to preserve things as they were. Replace and fail are pretty straightforward as well.

Specifying whenMatched: 'merge' will slam fields from the new document into the existing document.

Document Value
Existing {_id:1, name: 'bob', age: 33}
New Document {_id:1, age: 34, height: 1.74 }
Resultant Document {_id:1, name: 'bob', age: 34, height: 1.74 }

An existing field’s value is replaced and previously non-existing fields will be created. Fields not present in the new document which existed before are left as is.

That’s good for having multiple separate calculations affecting some destination fields. But for incremental update, we’ll be looking to combine the value of the existing field in the target collection with a value from the new calculation on the origin collection.

Given the document in an origin collection, and a corresponding document already in the destination collection:

db.origin.insert({ _id:1, x:2})
db.destination.insert({ _id:1, x:40})

We can now add the existing value in the destination‘s document field x to the sum of that value and the value of the field x from the preceding pipeline operator ($match for simplicity here):

db.origin.aggregate([{$match: {}}, {$merge: {into: 'destination', whenMatched: [{$set: {x: {$add: ["$x","$$new.x"]}}} ]}}] );

db.destination.find();
{ "_id" : 1, "x" : 42 }

Using the special pipeline variable $$new, we can distinguish the field x from the existing field x that was in the destination document.

The pipeline constructed to produce the merge of existing documents can only take one of the stages $addFields, $set, $project, $unset, $replaceRoot, or $replaceWith. This mini-pipeline only ever sees a single document input from the destination collection - the one that matched - so other pipeline operators really don’t make a lot of sense. Only single-document-modifier types are allowed - or at least that’s how I make sense of it.

Union Queries

With all this syntax background, let’s turn back to our original problem. We want to have cheaper queries over historical periods yet combine them with recent on-the-fly values. Storing the past data can be don in some frequency, either monthly or weekly or daily. Given that we can incrementally update the destination collection, we’re free to pick a frequency that makes sense.

To combine documents for a partial month, we’ll need to query 2 collections though. The historic sales in the monthlySales collection, and the results from the month-to-date query on the fly from sales. Would be nice to do it in one command to Mongo. Historically, we would shoot 2 queries and combine the results into one client-side. But that’s extra code that someone had to write, and makes populating generic graphs more involved.

With the introduction of $unionWith, a single aggregation command can now return results from more than one collection! Harnessing this for our scenario we’ll query the “main” collection sales for the current month-to-date, then union the results with a pipeline over the monthlySales for already computed document since the beginning of the year:

db.sales.aggregate([
{ $match: { date: { $gte: ISODate('2021-08-01'), $lt: ISODate('2021-09-01') } } },

{ $group: { _id: ISODate('2021-08-01'), saleCount: { $sum: 1 }, total: { $sum: '$total'} }},

{
$unionWith: {
coll: 'monthlySales',
pipeline: [
{ $match: { _id: { $gte: ISODate('2021-01-01'), $lt: ISODate('2021-08-01') } } },
]
}
},

{ $sort: { _id: -1 } }
])

{ "_id" : ISODate("2021-08-01T00:00:00Z"), "saleCount" : 1613, "total" : NumberDecimal("815527.3523") }
{ "_id" : ISODate("2021-07-01T00:00:00Z"), "saleCount" : 5255, "total" : NumberDecimal("2615967.9695") }
{ "_id" : ISODate("2021-06-01T00:00:00Z"), "saleCount" : 5134, "total" : NumberDecimal("2540986.0768") }
{ "_id" : ISODate("2021-05-01T00:00:00Z"), "saleCount" : 5294, "total" : NumberDecimal("2680115.7638") }
{ "_id" : ISODate("2021-04-01T00:00:00Z"), "saleCount" : 5083, "total" : NumberDecimal("2532271.9124") }
{ "_id" : ISODate("2021-03-01T00:00:00Z"), "saleCount" : 5270, "total" : NumberDecimal("2647041.1003") }
{ "_id" : ISODate("2021-02-01T00:00:00Z"), "saleCount" : 4809, "total" : NumberDecimal("2424930.7338") }
{ "_id" : ISODate("2021-01-01T00:00:00Z"), "saleCount" : 5252, "total" : NumberDecimal("2608231.7712") }

Mic drop. A single query processed all on the server side, giving us everything we need in one shot.

Which leaves us with the small matter of granularity. How much of the work do we want to do on the fly vs. just digging up the computed data?

If we populate the monthlySales monthly after the edn of each month, we’ll need to query up to 31 days worth of data on the ad-hoc side. If we update the sales daily we’ll have less than a day’s worth to sum on the fly, with the onus of determining exactly what cutoff window we set for the current vs the existing already merged data. Whatever we choose, we’ll just need to make sure our boundaries for the time window don’t overlap so we don’t count the same sale too many or too few times.

Getting There

To round things off, you might want to start accumulating pre-calculated data but already have a slew of individual documents in your live collection. You could write some script client side to iterate the months and years of the past. Or you could run a query to populate the monthlySales like so:

db.sales.aggregate([
{
$group:
{
_id: { $dateFromParts: { year: { $year: '$date' }, month: { $month: '$date' } } },
total: { $sum: '$total' },
saleCount: { $sum: 1 }
}
},
{
$merge:
{
into: { db: 'test', coll: 'monthlySales' }
}
}
])

There are more nuances bells and whistles that come along with $unionWith and $merge, but I think with the above techniques we can already achieve a good portion of the historic+current type scenarios. You will of course need to have a recent MongoDB version - $merge was introduced in 4.2, and $unionWith in 4.4.

Notice

We use cookies to personalise content, to allow you to contact us, to provide social media features and to analyse our site usage. Information about your use of our site may be combined by our analytics partners with other information that you’ve provided to them or that they’ve collected from your use of their services. You consent to our cookies if you continue to use our website.