MongoDB Aggregation Pipeline and Pagination

One of the most common problems in web development is paginating a set of data.
It’s common that the set of data we need to show our users is pretty big and we might want to show only a part of it retrieving the next slices only when requested.

Skip/Limit Pagination

The most common way to achieve this is usually to count the number of items and split them in pages, each of a fixed set of items. This is usually achieved through the limit, skip and count operators.

For the purpose of this post I’ll be relying on a collection containing tickets for a project management tool, each document looks like:

{u'_id': ObjectId('4eca327260fc00346500000f'),
 u'_sprint': ObjectId('4eca318460fc00346500000b'),
 u'complexity': u'days',
 u'description': u'This is the ticket description',
 u'priority': u'low',
 u'status': u'new',
 u'title': u'This is the ticket main title'}

stored in the projects.ticket collection:

import pymongo
con = pymongo.MongoClient()
db = con.projects
tickets = db.ticket

then we can know the total amount of pages by counting the entries in the collection and dividing them for the number of entries we want to show in each page. In this case we are going to paginate over the list of tickets in status done (that have been completed by developer):

import math
 
ITEMS_PER_PAGE = 10
 
pagecount = math.ceil(float(tickets.find({'status': 'done'}).count()) / ITEMS_PER_PAGE)
>>> pagecount
471.0

Now we know that to show all the items in our set we need 471 pages.
Then we just need to actually get the items in each page through limit and skip

page1 = list(tickets.find({'status': 'done'}).skip(0).limit(ITEMS_PER_PAGE))
page2 = list(tickets.find({'status': 'done'}).skip(1 * ITEMS_PER_PAGE).limit(ITEMS_PER_PAGE))
page3 = list(tickets.find({'status': 'done'}).skip(2 * ITEMS_PER_PAGE).limit(ITEMS_PER_PAGE))

and so on…

While this is not the most efficient paradigm (as skip is actually a pretty slow function), it’s one of the most common solutions to the pagination problem. It’s so common that it is usually the one you find in pagination support provided by libraries or frameworks.

For the pure purpose of comparison with other techniques, I’m going to time how long it takes to get number of pages and retrieve a page:

>>> def get_page():
...   pagecount = math.ceil(tickets.find({'status': 'done'}).count() / ITEMS_PER_PAGE)
...   page = list(tickets.find({'status': 'done'}).skip(1 * ITEMS_PER_PAGE).limit(ITEMS_PER_PAGE))
...   return pagecount, page

>>> import timeit
>>> timeit.timeit(get_page, number=1000)
2.3415567874908447

So retrieving 1000 times a page for a set of 4703 items (totally there are 6313 items in the collection) with this approach required 2.3 seconds.

Aggregation based Pagination before 3.2

Trying to improve over this solution one might notice that to render each page we are required to perform two queries: one to get the total amount of items and one to retrieve the page items themselves. So we might try to achieve the same result using a single query to the database.

If there is a tool that allows us to perform multiple operations in a single command is the MongoDB aggregation pipeline, so might might try to see if there is a way to retrieve the items count and a page with a single aggregation pipeline.

First of all we know that we are only looking for the tickets in status done, so the first step of our pipeline will be a $match stage for those tickets:

pipeline = [
   {'$match': {'status': 'done'}}
]

Then we want to fetch those while actually counting them, which we can achieve through the $group stage which will put all the items in an array of which we can then ask the size through a $project stage:

pipeline = [
    {'$match': {'status': 'done'}},
    {'$group': {'_id': 'results', 'result': {'$push': '$$CURRENT'}}},
    {'$project': {'_id': 0, 'result': 1, 'pages': {'$divide': [{'$size': '$result'}, ITEMS_PER_PAGE]}}},
]

This is already enough the give us all the entries with their total, but we want to avoid having to send them all from the database to the client, so we can already slice them for the page we are looking for through the $limit and $skip stages. The only side-effect is that before being able to apply the $limit stage we must $unwind our array to get back a list of documents we can then limit:

pipeline = [
    {'$match': {'status': 'done'}},
    {'$group': {'_id': 'results', 'result': {'$push': '$$CURRENT'}}},
    {'$project': {'_id': 0, 'result': 1, 'pages': {'$divide': [{'$size': '$result'}, ITEMS_PER_PAGE]}}},
    {'$unwind': '$result'},
    {'$skip': 1 * ITEMS_PER_PAGE},
    {'$limit': ITEMS_PER_PAGE}
]

Now if we run our pipeline we will actually get 10 results (ITEMS_PER_PAGE is 10) with the total number of pages:

>>> r = list(tickets.aggregate(pipeline))
>>> len(r)
10
>>> r[0]
{u'pages': 470.3, u'result': {u'status': u'done', u'description': u"TICKET_DESCRIPTION", u'title': u'TICKET_TITLE', u'priority': u'HIGH', u'complexity': u'hour', u'_sprint': ObjectId('4eca331460fc00358d000005'), u'_id': ObjectId('4ecce02760fc0009fe00000d')}}

We will have to apply math.ceil to pages but most of the work is already done by mongodb, so we actually achieved our target.

Let’s see if this approach is actually faster for our date than the previous one:

>>> def get_page():
...   r = list(tickets.aggregate(pipeline))
...   return math.ceil(r[0]['pages']), r

>>> import timeit
>>> timeit.timeit(get_page, number=1000)
33.202540159225464

Sadly this approach is actually slower.

I wanted to show it empirically, but it was pretty clear that is would have been slower, because we are actually retrieving the whole set of data to store it inside an array we then have to unwind. First of all we retrieve far more data than previously, then we even push it inside an array and MongoDB arrays are not actually really performant for big amounts of data. As far as I remember they are implemented on memory arrays to support random indexing, so they have to be reallocated when growing, with the consequent cost of copying all the data each time.

Aggregation based Pagination on 3.2

One of the reason why the aggregation based approach is pretty slow is that it has to pass through the whole array of data twice, the second time just to unwind it. To our help in mongodb 3.2 a new array operator has been introduced, the $slice operator, which would remove the need to use $unwind, $skip and $limit to retrieve our page.

Let’s build a new pipeline based on the new operator and see if it can help us:

pipeline = [
    {'$match': {'status': 'done'}},
    {'$group': {'_id': 'results', 'result': {'$push': '$$CURRENT'}}},
    {'$project': {'_id': 0, 
                  'result': {'$slice': ['$result', 1 * ITEMS_PER_PAGE, ITEMS_PER_PAGE]}, 
                  'pages': {'$divide': [{'$size': '$result'}, ITEMS_PER_PAGE]}}},
]

Now the result will be a single document with pages and result values:

>>> r = next(tickets.aggregate(pipeline), None)
>>> r['pages']
470.3
>>> len(r['result'])
10
>>> r['result'][0]
{u'status': u'done', u'description': u'TICKET_DESCRIPTION', u'title': TICKET_TITLE', u'priority': u'HIGH', u'complexity': u'days', u'_sprint': ObjectId('4ece227d60fc003675000009'), u'_id': ObjectId('4ecbc52060fc0074bb00000d')}

so we are actually getting the same data as before with far fewer operations…
Let’s check if this approach is faster than the previous one:

>>> def get_page():
...   r = next(tickets.aggregate(pipeline), None)
...   return math.ceil(r['pages']), r['result']
... 
>>> import timeit
>>> timeit.timeit(get_page, number=1000)
26.79308009147644

Well, we actually gained a 25% speed up related to the previous try, but still retrieving all the data and pushing it inside an array costs to much to follow this road. So far if we want to display the total amount of pages in pagination it seems that doing two separate queries is still the fastest solution for MongoDB.

There is actually another technique for pagination, which is called Range Based Pagination, it’s usually the way to achieve best performances when performing pagination and I plan to write another blog-post to show how to do it with MongoDB.

MongoDB and UnitOfWork love or hate?

One of the most common source of issues for MongoDB newcomers is the lack of transactions, people have been used to work with transactions for the past 10 years and probably their web frameworks automatically starts, commits and rolls back transactions for them whenever something wrong happens. So we are pretty used to web development environments where the problem of writing only a part of our changes is usually solved out of the box.

When people first approach MongoDB I noticed that this behaviour is often took for granted and messed up data might arise from code that crashes while creating or updating entities on the database.

No transaction, no party?

To showcase the issue, I’ll try to came up with an example. Suppose you must create users and put them in Group3, Group5 or Group10 groups randomly. For the sake of the example we came up with the idea of dividing 10 by a random number up to 3 which actually leads to 3, 5 and 10. Code that randomly fails whenever randint returns 0:

import pymongo, random
c = pymongo.MongoClient()
user_id = c.test.users.insert({'user_name': 'User1'})
group_id = c.test.groups.insert({'user_id': user_id, 'group_name': "Group {}".format(10 / random.randint(0, 3))})
  

I know that this is both a terrible schema design and using random.choice((3, 5, 10)) would prevent the crash, but it perfectly showcases the issue as it randomly crashes from time to time:

>>> group_id = c.test.groups.insert({'user_id': user_id, 'group_name': "Group {}".format(10 / random.randint(0, 3))})
>>> group_id = c.test.groups.insert({'user_id': user_id, 'group_name': "Group {}".format(10 / random.randint(0, 3))})
>>> group_id = c.test.groups.insert({'user_id': user_id, 'group_name': "Group {}".format(10 / random.randint(0, 3))})
Traceback (most recent call last):
  File "", line 1, in 
ZeroDivisionError: integer division or modulo by zero
  

Now what happens is that whenever our group creation fails we end up with an user on the database which is unrelated to any group, which might actually even cause crashes in other parts of our code that might take for granted that each user has a group.

That can be verified as we should have the same amount of users and groups if everything works correctly, while it is not the case when our code breaks:

>>> c.test.users.count()
3
>>> c.test.groups.count()
2

As we create the groups after the users, it’s easy to see that whenever the code fails we end up with more users than groups.

This is usually an issue that it’s rare to face when working with classic database engines as you would usually run in a transaction that gets rolled back for you whenever the code crashes (at least this is how it works on TurboGears when the transaction manager is enabled) and so both the user and the group would never have existed.

Working with a Unit Of Work

The good news is that a similar behaviour can actually be achieved through the UnitOfWork design pattern, which the Ming library for MongoDB provides on Python. When working with a Unit of Work all the changes to the database happen together when we flush the unit of work, when something fails we just clear the unit of work and nothing happened.

To start working with the UnitOfWork we need to declare an unit of work aware database session

from ming import create_datastore
from ming.odm import ThreadLocalODMSession

session = ThreadLocalODMSession(bind=create_datastore('test'))

Then we can create the models for our data which are used to represent the User and the Group

from ming import schema
from ming.odm import FieldProperty
from ming.odm.declarative import MappedClass

class User(MappedClass):
    class __mongometa__:
        session = session
        name = 'users'

    _id = FieldProperty(schema.ObjectId)
    user_name = FieldProperty(schema.String(required=True))


class Group(MappedClass):
    class __mongometa__:
        session = session
        name = 'groups'

    _id = FieldProperty(schema.ObjectId)
    user_id = FieldProperty(schema.ObjectId)
    group_name = FieldProperty(schema.String(required=True))

Now we can finally create our users and groups like we did before, the only major change is that we flush the session at the end and clear it in case of crashes:

import random
try:
    u = User(user_name='User1')
    g = Group(user_id=u._id, group_name="Group {}".format(10 / random.randint(0, 3)))
    session.flush()
finally:
    session.clear()

Now running the same code three times leads to a crash like before:

Traceback (most recent call last):
  File "test.py", line 33, in 
    g = Group(user_id=u._id, group_name="Group {}".format(10 / random.randint(0, 3)))
ZeroDivisionError: integer division or modulo by zero

The major difference is that now, if we look at the count of groups and users they always coincide:

>>> c.test.users.count()
2
>>> c.test.groups.count()
2

because in case of a failure we clear the unit of work and so we never created the user.

Uh?! But the relation?!

A really interesting thing to note is that we actually created a relation between the User and the Group before any of them even existed. That is explicitly visible on Group(user_id=u._id) we are storing the id of an user that actually doesn’t even exist yet.

How is that even possible?

The answer is actually in the ObjectId generation algorithm, on MongoDB you would expect object ids to be generated by the database like in any other database management system, but due to the distributed nature of MongoDB that is actually not required at all. The way the object id is generated ensures that it never collides also when it is generated on different machines by different processes at different times. That is because the ObjectId itself contains the machine, process and time that generated it.

This allows for mongodb clients to actually generate the object ids themselves, so when we created the User it actaully already had an ObjectId, one provided by Ming itself for use even though the object didn’t yet exist on the database.

This actually makes possible to fully leverage the power of the unit of work as otherwise it would be really hard (if not impossible) to handle relations between different objects inside the same unit of work.

UnitOfWorks looks great, but it might play bad tricks

Thanks to the convenience of working with such a pattern it’s easy to see everything as black or white, we get used to think that as we didn’t flush the unit of work yet nothing happened on the database and so we are safe.

While this is usually true, there are cases where it’s not, and they are actually pretty common cases when leveraging the full power of MongoDB.

MongoDB provides a really powerful feature which are the Update Operators, whenever using them through update or findAndModify we can change the object atomically and in relation to its current state. They are the way to go to avoid race conditions and implement some common patterns in mongodb, but they actually do not cope well with a UnitOfWork.

Whenever we issue an update operator we must instantly contact the database and perform the operation, as the result of the operation might change depending on the time it’s performed. So we cannot queue an update operator in the unit of work and then flush it.
The general rule is to flush the unit of work before performing and update operator or to perform the update operators before starting the work inside the unit of work, but to never mix the two, otherwise unexpected behaviours is what you are looking for.

What happens if we create an user with count=1 in the unit of work, then we perform an $inc on count and then we flush the unit of work? You would expect the user to have count=2, but what happens is that you actually end up with an user with count=1. Why?

If we think for a moment about it, it’s easy to see why.

When we perform the $inc the unit of work has not been flushed yet, so our user doesn’t yet exist nor have 1 as its count.
Then when we flush the unit of work we performed the $inc operation outside of the unit of work, so the unit of work knows nothing about it and actually creates an user with count=1.

UnitOfWork or Not?

At the end, UnitOfWork is a very powerful and convenient pattern when working with MongoDB.

In 99% of the cases it will solve data consistency problems and make our life easier, but pay attention to that 1% which is involved whenever you accidentally start performing operations outside the unit of work mixing them with operations in the unit of work. That will be origin of unexpected behaviours and will be pretty hard to track to the real cause.

As soon as you realise this and start paying attention to what’s going to happen inside your unit of work the few times you need to perform operations out of it I’m sure you will live an happy life and enjoy the convenience provided by frameworks like Ming and the UnitOfWork pattern.

Redis and MongoDB insertion performance analysis

Recently we had to study a software where reads can be slow, but writes need to be as fast as possible. Starting from this requirement we thought about which one between redis and mongodb would better fit the problem. Redis should be the obvious choice as its simpler data structure should make it light-speed fast, and actually that is true, but we found a we interesting things that we would like to share.

This first graph is about MongoDB Insertion vs Redis RPUSH.
Up to 2000 entries the two are quite equivalent, then redis starts to get faster, usually twice as fast as mongodb. I expected this, and I have to say that antirez did a good job in thinking the redis paradigm, in some situations it is the perfect match solution.
Anyway I would expect mongodb to be even slower relating to the features that a mongodb collection has over a simple list.

This second graph is about Redis RPUSH vs Mongo $PUSH vs Mongo insert, and I find this graph to be really interesting.
Up to 5000 entries mongodb $push is faster even when compared to Redis RPUSH, then it becames incredibly slow, probably the mongodb array type has linear insertion time and so it becomes slower and slower. mongodb might gain a bit of performances by exposing a constant time insertion list type, but even with the linear time array type (which can guarantee constant time look-up) it has its applications for small sets of data.

I would like to say that this benchmarks have no real value, as usual, and have been performed just for curiosity

You can find here the three benchmarks snippets

import redis, time
MAX_NUMS = 1000

r = redis.Redis(host='localhost', port=6379, db=0)
del r['list']

nums = range(0, MAX_NUMS)
clock_start = time.clock()
time_start = time.time()
for i in nums:
    r.rpush('list', i)
time_end = time.time()
clock_end = time.clock()

print 'TOTAL CLOCK', clock_end-clock_start
print 'TOTAL TIME', time_end-time_start
import pymongo, time
MAX_NUMS = 1000

con = pymongo.Connection()
db = con.test_db
db.testcol.remove({})
db.testlist.remove({})

nums = range(0, MAX_NUMS)
clock_start = time.clock()
time_start = time.time()
for i in nums:
    db.testlist.insert({'v':i})
time_end = time.time()
clock_end = time.clock()

print 'TOTAL CLOCK', clock_end-clock_start
print 'TOTAL TIME', time_end-time_start
import pymongo, time
MAX_NUMS = 1000

con = pymongo.Connection()
db = con.test_db
db.testcol.remove({})
db.testlist.remove({})
oid = db.testcol.insert({'name':'list'})

nums = range(0, MAX_NUMS)
clock_start = time.clock()
time_start = time.time()
for i in nums:
    db.testcol.update({'_id':oid}, {'$push':{'values':i}})
time_end = time.time()
clock_end = time.clock()

print 'TOTAL CLOCK', clock_end-clock_start
print 'TOTAL TIME', time_end-time_start

Turbogears authentication over mongodb users database

As we saw that there isn’t a lot of documentation around about how to perform authentication in turbogears over mongodb we decided to create a simple code snippet and public it here to help people trying to obtain the same thing.

This is mainly a proof of concept and is quick and dirty way to obtain it. You will probably have something like ming as your model, instead of directly accessing mongo.

This code also validates password over the clear text one, you will probably have hashed passwords in your database, so remember to change validate_password method as required

To make it work you will have to place this code inside your config.app_cfg, it also expects you to have you database exposed as db inside your model

from my_app.model import db
from zope.interface import implements
from repoze.who.interfaces import IAuthenticator, IMetadataProvider
from repoze.who.plugins.friendlyform import FriendlyFormPlugin
from repoze.who.plugins.auth_tkt import AuthTktCookiePlugin
from repoze.who.middleware import PluggableAuthenticationMiddleware

def validate_password(user, password):
   return user['password'] == password

class MongoAuthenticatorPlugin(object):
    implements(IAuthenticator)

    # IAuthenticator
    def authenticate(self, environ, identity):
        if not ('login' in identity and 'password' in identity):
            return None

        login = identity.get('login')
        user = db.users.find_one({'user_name':login})
        if user and validate_password(user, identity.get('password')):
            return identity['login']

class MongoUserMDPlugin(object):
    implements(IMetadataProvider)

    def add_metadata(self, environ, identity):
        user_data = {'user_name':identity['repoze.who.userid']}
        identity['user'] = db.users.find_one(user_data)

class MyAppConfig(AppConfig):
    auth_backend = 'sqlalchemy' #this is a fake, but it's needed to enable
                                #auth middleware at least on TG2.0

    login_url = '/login'
    login_handler = '/login_handler'
    post_login_url = None
    logout_handler = '/logout_handler'
    post_logout_url = None
    login_counter_name = None

    def add_auth_middleware(self, app, skip_authentication):
        cookie_secret = pylons_config.get('auth_cookie_secret', 
                                          'myapp_adsfsdfh3423')
        cookie_name = pylons_config.get('auth_cookie_name', 
                                        'myapp_auth')

        who_args = {}

        form_plugin = FriendlyFormPlugin(self.login_url,
                              self.login_handler,
                              self.post_login_url,
                              self.logout_handler,
                              self.post_logout_url,
                              login_counter_name=self.login_counter_name,
                              rememberer_name='cookie')
        challengers = [('form', form_plugin)]

        auth = MongoAuthenticatorPlugin()
        authenticators = [('mongoauth', auth)]

        cookie = AuthTktCookiePlugin(cookie_secret, cookie_name)

        identifiers = [('cookie', cookie), ('form', form_plugin)]

        provider = MongoUserMDPlugin()
        mdproviders = [('mongoprovider', provider)]

        from repoze.who.classifiers import default_request_classifier
        from repoze.who.classifiers import default_challenge_decider
        log_stream = None

        app = PluggableAuthenticationMiddleware(app,
                                          identifiers,
                                          authenticators,
                                          challengers,
                                          mdproviders,
                                          default_request_classifier,
                                          default_challenge_decider)

        return app

base_config = MyAppConfig()
base_config.renderers = []

base_config.package = my_app

#Set the default renderer
base_config.default_renderer = 'genshi'
base_config.renderers.append('genshi')
base_config.renderers.append('json')

#Configure the base SQLALchemy Setup
base_config.use_sqlalchemy = False
base_config.model = my_app.model

Tweelter, the twitter filter

While speaking with the top-ix people during a meeting we started to talk about the need of a way to filter out “noise” from twitter searches.

Probably everyone found that searching something on twitter returns a big list of retweets and duplicated tweets. As those reduce the ability to follow a discussion or an event on twitter they are usually more a problem than a useful result.

At the end of that meeting Tweelter was born.

Tweelter is a twitter search engine which filters out duplicated entries, retweets and permits to search results older than one month on most followed topics. More interesting thing is that tweelter performs those search in a parallel manner and on a distributed mongodb. While retrieving all the results of the same search using the twitter api would require more then 10-20 seconds by using tweelter you will get the same results in 2-3 seconds and the more a search is performed the faster it gets.

So give tweelter a try if you need to follow a discussion on twitter, it might help you to follow the discussion in an easier manner.