MongoDB as Queue Message System

MongoDB as Queue Message System

Introduction

Queue Message System is a very nice tool for sequential attending of messages. The message could be any existence, from notification to command.

It exists many specialized software projects implementing QMS solutions like RabbitMQ, Amazon Simple Queue Service, Firebase Cloud Messaging etc. Usually, they offer messages persistence for statistics generation.

Today I will show you how to build a similar solution based on MongoDB database.

Applications

Take a look at possible application examples. You can apply them in safe (works always) and alternative (works with WiredTiger only) solutions.

Event sourcing

In event sourcing, you have 3 options after insert new event to Read Model collection.

  • Manually call Write Model update code

  • Wait until periodically Write Model update code will be run

  • Use one of the solutions described below, both cursor based, to shape Write Model contents

Command Queue

In command queue application you have also 3 options after inserting new command to queue.

  • Manually call Command Handler code

  • Wait until periodically Command Handler code will be run

  • Use one of the solutions described below, both cursor based, to run Command Handler code

But you haven’t to create a separated collection for finished commands. After command attends, you can delete the command or save them for statistic generation, depending on your case.

Safe Solution

For all MongoDB versions and all use cases, you can use Tailable Cursors. To use the cursors, you must fulfill the following requirement.

Store your Messages at capped collections

Capped collection is a fixed-sized collection that automatically overwrites its oldest entries when it reaches its maximum size.

And important: you can’t change size of the updating document. You can update data values but you can’t add a new field to existing document or delete them. That’s the main limitation of capped collection.

Collection Setup

Sample of creation capped collection in Mongo Shell:

db.createCollection( "notifications", { capped: true, size: 100000 } )

Note that the size is specified in bytes, which will be pre-allocated by the database for the collection rounded to the power of 2. Minimal size of capped collection is 4 kilobytes. You can read more about that collections at MongoDB docs.

Don’t forget about the index for querying field! It’s required by cursor to be tailable.

db.notifications.createIndex({ createdAt: 1 })

And we need at least one document in the collection.

db.notifications.insert({ createdAt: new Date(), message: 'First message', sent: false, received: false })

Tailable Cursor Usage

We can use the tailable cursor now. For example, let insert the second document to our notifications capped collection.

Let’s create our tailable cursor now!

var c = db.notifications.find({ createdAt: { $gt: new Date()} } ).tailable()
c.hasNext()

Our cursor hasn’t next element yet, so result of hasNext() method will be false.

The magic starts while you insert your second element to the collection.

db.notifications.insert({ createdAt: new Date(), message: 'Second message', sent: false, received: false })
c.hasNext()

Yay! Our cursor has next item now. Let’s get the document:

c.next()

We will get document like below.

{
	"_id" : ObjectId("5b75f3eee01183a90c80651d"),
	"createdAt" : ISODate("2018-08-16T22:00:14.338Z"),
	"message" : "Second message",
	"sent" : false,
	"received" : false
}

Please remember that you must type all fields while inserting document into collection. You should be gently also at updating the document and you can not change their size. Otherwise, the queue would not work.

Alternative, more modern and powerful Solution

Sometimes you can’t create a brand new capped collection, described in the first section. I have nice solution for you: Change Streams. But here you must satisfy a few requirements:

  • use MongoDB 3.6 or higher
  • use WiredTiger storage engine
  • use replica set or shared cluster
  • use replica set protocol version 1

You can read more at MongoDB Docs pages.

Change Streams vs oplog

Change streams work in a different way than tailable cursors. Tailable cursors return document witch just inserted. However, change streams return more complex document, a bit similar to oplog document structure but more user-friendly.

Oplog is special replica set collection furthermore, embedded as core replication functionalities. The disadvantage of oplog is global monitoring of all changes in the whole database.

Change streams could be connected to database, collection or deployment (replica set or sharded cluster). You can filter change events in a very elastic way. I will show you the basics of that for the collection.

Watch!

To create change stream cursor, use db.collection.watch() method. It passes following arguments:

  1. Aggregation pipeline array. Available stages: $match, $project, $addFields, $replaceRoot and $redact.

  2. Options document (optional). Check out docs for details.

The second parameter is too boring to write more about it 😉 Let’s focus the first argument. For practice let’s create the first document in a brand new collection.

db.queue.insert({createdAt: new Date(), message: 'First message'})

And create cursor now…

var c = db.queue.watch([{$match: {}}])
c.hasNext()

Off course, the cursor hasn’t next document yet. Let’s create second document.

db.queue.insert({createdAt: new Date(), message: 'Second message'})

And check out cursors next document:

c.hasNext()
c.next()

You should see change stream document, something like that:

{
	"_id" : {
		"_data" : BinData(0,"gluGiKAAAAABRmRfaWQAZFuGiKAagDVcbeV7WwBaEATiFVe3Zy5Nj6tLUo6lELiZBA==")
	},
	"operationType" : "insert",
	"fullDocument" : {
		"_id" : ObjectId("5b8688a01a80355c6de57b5b"),
		"createdAt" : ISODate("2018-08-29T11:50:56.415Z"),
		"message" : "Second message"
	},
	"ns" : {
		"db" : "blog",
		"coll" : "queue"
	},
	"documentKey" : {
		"_id" : ObjectId("5b8688a01a80355c6de57b5b")
	}
}

You can see now the power of change stream! The document contains very interesting fields:

  • operationType – one of the values: insert, update, replace, delete or invalidate

  • fullDocument – inserted document

  • ns – working namespace: database and collection

  • documentKey – id of the manipulated document

For more details see the reference.

Match and Replace Root

So let’s produce the simple output containing just inserted document. You need change your cursor.

c = db.queue.watch([
	{$match: {operationType: 'insert'}},
	{$replaceRoot: {newRoot: '$fullDocument'}}])

Insert the third document to the collection.

db.queue.insert({createdAt: new Date(), message: 'Third message'})

And check out the cursor.

c.hasNext()
c.next()

It will produce a simple output.

{
	"_id" : ObjectId("5b86a9331a80355c6de57b5d"),
	"createdAt" : ISODate("2018-08-29T14:09:55.044Z"),
	"message" : "Third message"
}

Note that the cursor wouldn’t return updated (or deleted etc.) document because of $match pipeline stage which filters insert operations only.

Where are missing flags?

Nah… As you can see, in change streams you haven’t to declare all data while inserting document. You can modify existing document at every moment. That’s very elastic!

Summary

When I hit on this article idea, I was intending to promote capped collection and tailable cursors. But then I was started reading and doing a bit of practice watch method usage, so

Forget about capped collection – change streams rock!

Post a comment