MongoDB Aggregation Pipeline and Pagination

One of the most common problems in web development is paginating a set of data.
It’s common that the set of data we need to show our users is pretty big and we might want to show only a part of it retrieving the next slices only when requested.

Skip/Limit Pagination

The most common way to achieve this is usually to count the number of items and split them in pages, each of a fixed set of items. This is usually achieved through the limit, skip and count operators.

For the purpose of this post I’ll be relying on a collection containing tickets for a project management tool, each document looks like:

{u'_id': ObjectId('4eca327260fc00346500000f'),
 u'_sprint': ObjectId('4eca318460fc00346500000b'),
 u'complexity': u'days',
 u'description': u'This is the ticket description',
 u'priority': u'low',
 u'status': u'new',
 u'title': u'This is the ticket main title'}

stored in the projects.ticket collection:

import pymongo
con = pymongo.MongoClient()
db = con.projects
tickets = db.ticket

then we can know the total amount of pages by counting the entries in the collection and dividing them for the number of entries we want to show in each page. In this case we are going to paginate over the list of tickets in status done (that have been completed by developer):

import math
pagecount = math.ceil(float(tickets.find({'status': 'done'}).count()) / ITEMS_PER_PAGE)
>>> pagecount

Now we know that to show all the items in our set we need 471 pages.
Then we just need to actually get the items in each page through limit and skip

page1 = list(tickets.find({'status': 'done'}).skip(0).limit(ITEMS_PER_PAGE))
page2 = list(tickets.find({'status': 'done'}).skip(1 * ITEMS_PER_PAGE).limit(ITEMS_PER_PAGE))
page3 = list(tickets.find({'status': 'done'}).skip(2 * ITEMS_PER_PAGE).limit(ITEMS_PER_PAGE))

and so on…

While this is not the most efficient paradigm (as skip is actually a pretty slow function), it’s one of the most common solutions to the pagination problem. It’s so common that it is usually the one you find in pagination support provided by libraries or frameworks.

For the pure purpose of comparison with other techniques, I’m going to time how long it takes to get number of pages and retrieve a page:

>>> def get_page():
...   pagecount = math.ceil(tickets.find({'status': 'done'}).count() / ITEMS_PER_PAGE)
...   page = list(tickets.find({'status': 'done'}).skip(1 * ITEMS_PER_PAGE).limit(ITEMS_PER_PAGE))
...   return pagecount, page

>>> import timeit
>>> timeit.timeit(get_page, number=1000)

So retrieving 1000 times a page for a set of 4703 items (totally there are 6313 items in the collection) with this approach required 2.3 seconds.

Aggregation based Pagination before 3.2

Trying to improve over this solution one might notice that to render each page we are required to perform two queries: one to get the total amount of items and one to retrieve the page items themselves. So we might try to achieve the same result using a single query to the database.

If there is a tool that allows us to perform multiple operations in a single command is the MongoDB aggregation pipeline, so might might try to see if there is a way to retrieve the items count and a page with a single aggregation pipeline.

First of all we know that we are only looking for the tickets in status done, so the first step of our pipeline will be a $match stage for those tickets:

pipeline = [
   {'$match': {'status': 'done'}}

Then we want to fetch those while actually counting them, which we can achieve through the $group stage which will put all the items in an array of which we can then ask the size through a $project stage:

pipeline = [
    {'$match': {'status': 'done'}},
    {'$group': {'_id': 'results', 'result': {'$push': '$$CURRENT'}}},
    {'$project': {'_id': 0, 'result': 1, 'pages': {'$divide': [{'$size': '$result'}, ITEMS_PER_PAGE]}}},

This is already enough the give us all the entries with their total, but we want to avoid having to send them all from the database to the client, so we can already slice them for the page we are looking for through the $limit and $skip stages. The only side-effect is that before being able to apply the $limit stage we must $unwind our array to get back a list of documents we can then limit:

pipeline = [
    {'$match': {'status': 'done'}},
    {'$group': {'_id': 'results', 'result': {'$push': '$$CURRENT'}}},
    {'$project': {'_id': 0, 'result': 1, 'pages': {'$divide': [{'$size': '$result'}, ITEMS_PER_PAGE]}}},
    {'$unwind': '$result'},
    {'$skip': 1 * ITEMS_PER_PAGE},
    {'$limit': ITEMS_PER_PAGE}

Now if we run our pipeline we will actually get 10 results (ITEMS_PER_PAGE is 10) with the total number of pages:

>>> r = list(tickets.aggregate(pipeline))
>>> len(r)
>>> r[0]
{u'pages': 470.3, u'result': {u'status': u'done', u'description': u"TICKET_DESCRIPTION", u'title': u'TICKET_TITLE', u'priority': u'HIGH', u'complexity': u'hour', u'_sprint': ObjectId('4eca331460fc00358d000005'), u'_id': ObjectId('4ecce02760fc0009fe00000d')}}

We will have to apply math.ceil to pages but most of the work is already done by mongodb, so we actually achieved our target.

Let’s see if this approach is actually faster for our date than the previous one:

>>> def get_page():
...   r = list(tickets.aggregate(pipeline))
...   return math.ceil(r[0]['pages']), r

>>> import timeit
>>> timeit.timeit(get_page, number=1000)

Sadly this approach is actually slower.

I wanted to show it empirically, but it was pretty clear that is would have been slower, because we are actually retrieving the whole set of data to store it inside an array we then have to unwind. First of all we retrieve far more data than previously, then we even push it inside an array and MongoDB arrays are not actually really performant for big amounts of data. As far as I remember they are implemented on memory arrays to support random indexing, so they have to be reallocated when growing, with the consequent cost of copying all the data each time.

Aggregation based Pagination on 3.2

One of the reason why the aggregation based approach is pretty slow is that it has to pass through the whole array of data twice, the second time just to unwind it. To our help in mongodb 3.2 a new array operator has been introduced, the $slice operator, which would remove the need to use $unwind, $skip and $limit to retrieve our page.

Let’s build a new pipeline based on the new operator and see if it can help us:

pipeline = [
    {'$match': {'status': 'done'}},
    {'$group': {'_id': 'results', 'result': {'$push': '$$CURRENT'}}},
    {'$project': {'_id': 0, 
                  'result': {'$slice': ['$result', 1 * ITEMS_PER_PAGE, ITEMS_PER_PAGE]}, 
                  'pages': {'$divide': [{'$size': '$result'}, ITEMS_PER_PAGE]}}},

Now the result will be a single document with pages and result values:

>>> r = next(tickets.aggregate(pipeline), None)
>>> r['pages']
>>> len(r['result'])
>>> r['result'][0]
{u'status': u'done', u'description': u'TICKET_DESCRIPTION', u'title': TICKET_TITLE', u'priority': u'HIGH', u'complexity': u'days', u'_sprint': ObjectId('4ece227d60fc003675000009'), u'_id': ObjectId('4ecbc52060fc0074bb00000d')}

so we are actually getting the same data as before with far fewer operations…
Let’s check if this approach is faster than the previous one:

>>> def get_page():
...   r = next(tickets.aggregate(pipeline), None)
...   return math.ceil(r['pages']), r['result']
>>> import timeit
>>> timeit.timeit(get_page, number=1000)

Well, we actually gained a 25% speed up related to the previous try, but still retrieving all the data and pushing it inside an array costs to much to follow this road. So far if we want to display the total amount of pages in pagination it seems that doing two separate queries is still the fastest solution for MongoDB.

There is actually another technique for pagination, which is called Range Based Pagination, it’s usually the way to achieve best performances when performing pagination and I plan to write another blog-post to show how to do it with MongoDB.

MongoDB and UnitOfWork love or hate?

One of the most common source of issues for MongoDB newcomers is the lack of transactions, people have been used to work with transactions for the past 10 years and probably their web frameworks automatically starts, commits and rolls back transactions for them whenever something wrong happens. So we are pretty used to web development environments where the problem of writing only a part of our changes is usually solved out of the box.

When people first approach MongoDB I noticed that this behaviour is often took for granted and messed up data might arise from code that crashes while creating or updating entities on the database.

No transaction, no party?

To showcase the issue, I’ll try to came up with an example. Suppose you must create users and put them in Group3, Group5 or Group10 groups randomly. For the sake of the example we came up with the idea of dividing 10 by a random number up to 3 which actually leads to 3, 5 and 10. Code that randomly fails whenever randint returns 0:

import pymongo, random
c = pymongo.MongoClient()
user_id = c.test.users.insert({'user_name': 'User1'})
group_id = c.test.groups.insert({'user_id': user_id, 'group_name': "Group {}".format(10 / random.randint(0, 3))})

I know that this is both a terrible schema design and using random.choice((3, 5, 10)) would prevent the crash, but it perfectly showcases the issue as it randomly crashes from time to time:

>>> group_id = c.test.groups.insert({'user_id': user_id, 'group_name': "Group {}".format(10 / random.randint(0, 3))})
>>> group_id = c.test.groups.insert({'user_id': user_id, 'group_name': "Group {}".format(10 / random.randint(0, 3))})
>>> group_id = c.test.groups.insert({'user_id': user_id, 'group_name': "Group {}".format(10 / random.randint(0, 3))})
Traceback (most recent call last):
  File "", line 1, in 
ZeroDivisionError: integer division or modulo by zero

Now what happens is that whenever our group creation fails we end up with an user on the database which is unrelated to any group, which might actually even cause crashes in other parts of our code that might take for granted that each user has a group.

That can be verified as we should have the same amount of users and groups if everything works correctly, while it is not the case when our code breaks:

>>> c.test.users.count()
>>> c.test.groups.count()

As we create the groups after the users, it’s easy to see that whenever the code fails we end up with more users than groups.

This is usually an issue that it’s rare to face when working with classic database engines as you would usually run in a transaction that gets rolled back for you whenever the code crashes (at least this is how it works on TurboGears when the transaction manager is enabled) and so both the user and the group would never have existed.

Working with a Unit Of Work

The good news is that a similar behaviour can actually be achieved through the UnitOfWork design pattern, which the Ming library for MongoDB provides on Python. When working with a Unit of Work all the changes to the database happen together when we flush the unit of work, when something fails we just clear the unit of work and nothing happened.

To start working with the UnitOfWork we need to declare an unit of work aware database session

from ming import create_datastore
from ming.odm import ThreadLocalODMSession

session = ThreadLocalODMSession(bind=create_datastore('test'))

Then we can create the models for our data which are used to represent the User and the Group

from ming import schema
from ming.odm import FieldProperty
from ming.odm.declarative import MappedClass

class User(MappedClass):
    class __mongometa__:
        session = session
        name = 'users'

    _id = FieldProperty(schema.ObjectId)
    user_name = FieldProperty(schema.String(required=True))

class Group(MappedClass):
    class __mongometa__:
        session = session
        name = 'groups'

    _id = FieldProperty(schema.ObjectId)
    user_id = FieldProperty(schema.ObjectId)
    group_name = FieldProperty(schema.String(required=True))

Now we can finally create our users and groups like we did before, the only major change is that we flush the session at the end and clear it in case of crashes:

import random
    u = User(user_name='User1')
    g = Group(user_id=u._id, group_name="Group {}".format(10 / random.randint(0, 3)))

Now running the same code three times leads to a crash like before:

Traceback (most recent call last):
  File "", line 33, in 
    g = Group(user_id=u._id, group_name="Group {}".format(10 / random.randint(0, 3)))
ZeroDivisionError: integer division or modulo by zero

The major difference is that now, if we look at the count of groups and users they always coincide:

>>> c.test.users.count()
>>> c.test.groups.count()

because in case of a failure we clear the unit of work and so we never created the user.

Uh?! But the relation?!

A really interesting thing to note is that we actually created a relation between the User and the Group before any of them even existed. That is explicitly visible on Group(user_id=u._id) we are storing the id of an user that actually doesn’t even exist yet.

How is that even possible?

The answer is actually in the ObjectId generation algorithm, on MongoDB you would expect object ids to be generated by the database like in any other database management system, but due to the distributed nature of MongoDB that is actually not required at all. The way the object id is generated ensures that it never collides also when it is generated on different machines by different processes at different times. That is because the ObjectId itself contains the machine, process and time that generated it.

This allows for mongodb clients to actually generate the object ids themselves, so when we created the User it actaully already had an ObjectId, one provided by Ming itself for use even though the object didn’t yet exist on the database.

This actually makes possible to fully leverage the power of the unit of work as otherwise it would be really hard (if not impossible) to handle relations between different objects inside the same unit of work.

UnitOfWorks looks great, but it might play bad tricks

Thanks to the convenience of working with such a pattern it’s easy to see everything as black or white, we get used to think that as we didn’t flush the unit of work yet nothing happened on the database and so we are safe.

While this is usually true, there are cases where it’s not, and they are actually pretty common cases when leveraging the full power of MongoDB.

MongoDB provides a really powerful feature which are the Update Operators, whenever using them through update or findAndModify we can change the object atomically and in relation to its current state. They are the way to go to avoid race conditions and implement some common patterns in mongodb, but they actually do not cope well with a UnitOfWork.

Whenever we issue an update operator we must instantly contact the database and perform the operation, as the result of the operation might change depending on the time it’s performed. So we cannot queue an update operator in the unit of work and then flush it.
The general rule is to flush the unit of work before performing and update operator or to perform the update operators before starting the work inside the unit of work, but to never mix the two, otherwise unexpected behaviours is what you are looking for.

What happens if we create an user with count=1 in the unit of work, then we perform an $inc on count and then we flush the unit of work? You would expect the user to have count=2, but what happens is that you actually end up with an user with count=1. Why?

If we think for a moment about it, it’s easy to see why.

When we perform the $inc the unit of work has not been flushed yet, so our user doesn’t yet exist nor have 1 as its count.
Then when we flush the unit of work we performed the $inc operation outside of the unit of work, so the unit of work knows nothing about it and actually creates an user with count=1.

UnitOfWork or Not?

At the end, UnitOfWork is a very powerful and convenient pattern when working with MongoDB.

In 99% of the cases it will solve data consistency problems and make our life easier, but pay attention to that 1% which is involved whenever you accidentally start performing operations outside the unit of work mixing them with operations in the unit of work. That will be origin of unexpected behaviours and will be pretty hard to track to the real cause.

As soon as you realise this and start paying attention to what’s going to happen inside your unit of work the few times you need to perform operations out of it I’m sure you will live an happy life and enjoy the convenience provided by frameworks like Ming and the UnitOfWork pattern.