Building microservices for Telegram bot using Node.js, RabbitMQ, MongoDB and Docker from scratch

Last week I felt inspired to write a weekend project, but I had no idea what to create. So I asked my best friend for help. Luckily she is more clever than me, so I got a blazing fast response: “Can you create a service which notifies me about changes happening on some web page?” In other words, grab the page content, cache it, grab it again after some interval, compare the two and then notify a user about any changes that may have occurred.

We could write a simple Node.js application and it would work just fine, but it isn’t as interesting as using a service-oriented approach.

Let’s describe what user experience we are going to implement:

  • The user should be able to subscribe and unsubscribe from the service
  • The user should be able to set up the page to watch and select what content should be monitored for changes. Page URL and queryselector of target DOM elements handle this setup

Now that we have described user requirements, let’s o discuss possible architecture. The bot’s workflow is described above. Let’s outline a more detailed schema:
schema

This is the general overview. As you may guess, the green octagons are services, orange shapes represent stacks of papers – messaging queues, and yellow cylinders are databases. The blue cloud is a target web page to watch for changes.

General description of the services.

  1. BOT – communicates with the user through the Telegram in both sides, receives commands from the user and notifies about changes or errors. It saves data to the DB and notifies other services about changes using messaging
  2. PLANNER – takes all notification parameters from the DB at specified intervals and passes them to the working queue, as well as consumes changes from the BOT service and passes changed data immediately to the working queue. This is the master service.
  3. CRAWLER – consumes parameters from the working queue, fetches the HTML page from the specified URL, and finds the target content by query selector. Passes the result or error to the next service using another queue. This is the worker service.
  4. COMPARATOR – this is the caching service. It consumes the result of CRAWLER and compares it with the previous result, which is stored in the DB. If a difference is detected the service will send the message to the resulting queue, and the BOT service will consume the message. And, voila, we made an iterations. In addition to consuming messages from the crawler, this service is also subscribed to the notification changes. It clears the cache when the user changes the target URL selector.

Enough talk – let’s implement our project!

Development environment.

We will use Docker and Docker Compose for running all our services. Let’s start with a few words about project structure. All services have common package.json and Dockerfile. Some services won’t use all of the dependencies, but it’s handy to have one Docker image for all our services with pre-installed modules. This will dramatically reduce building time, for example in CI/CD.

sneakyBot
-- package.json
-- Dockerfile
-- docker-compose.yml
-- bot
---- src
-- planner
---- src
-- crawler
---- src
-- comparator
---- src
-- common

The ‘common’ folder contains all reusable code: utils, mongoose models, queue helpers and so on.

As you may notice, we have four services, RabbitMQ and MongoDB server – so six containers in total. DB and message queue containers are using standard images. For other services we will build images using following Dockerfile:

FROM node:latest
 
ARG service_src
 
WORKDIR /usr/src/app
COPY package*.json ./
RUN npm install
COPY ${service_src} ./service/src
COPY ./common ./common
CMD [ "npm", "run", "start" ]

There isn’t any magic here. Just a simple Dockerfile from the Node.js official image. It copies package.json and package-lock.json to the working directory, installs dependencies, then takes the destination folder path from the build argument, and copies service and ‘common’ sources to the working directory.

Docker compose config is simple too. Here is a description of one service with mongo and rabbit below (others are just similar):

version: "3"
 
services:
  bot:
    build:
      context: .
      args:
        - service_src=./bot/src
    command: bash -c "npm run start:dev"
    restart: on-failure
    env_file:
      - ./bot/environment
    volumes:
      - ./bot/src:/usr/src/app/service/src:cached
      - ./common:/usr/src/app/common:cached
 
  mongo:
    image: "mongo:latest"
    ports:
      - 27017:27017
  rabbitmq:
    image: "rabbitmq:management"
    ports:
      - "5672:5672"
      - "15672:15672"

A fast overview of the above docker-compose.yml. In ‘bot’ section we specify the list of properties:
`build` context and args, `service_src` – the path to service sources folder
` command: bash -c “npm run start:dev”` – redefines CMD instruction from Dockerfile . In our case, it will run the index.js script using nodemon. It will watch file changes and restart node instance
`restart: on-failure` – restarts a crushed container
`env_file:` – specifies the path to the file with environment variables. We will use these vars to set up our services
`volumes` – specifies local folders mounted into the container. Nodemon inside the container will observe all local changes

This approach for the local environment provides many benefits. We can quickly start all services by running `docker-compose up`, it will build images if needed, and start all the containers.

That’s it for infrastructure. At this point, we have efficient and straightforward tools which speed-up our developing process.

Dive into the code:

Telegram bot. Telegraf.

Telegraf (https://github.com/telegraf/telegraf) is a powerful and easy to use framework. We require a telegram API token to configure it, so contact the BotFather (https://telegram.me/BotFather) to register your bot and get one.

The code to work with a bot is straightforward:

const bot = new Telegraf(config.botToken)
 
bot.start(start)
bot.help(help)
bot.command('url', url)
bot.command('selector', selector)
bot.command('stop', stop)
bot.startPolling()

We created a bot instance, added handlers for user commands and then started polling. Let’s take a closer look at one of the handlers:

const start = async ctx => {
  try {
    const chatId = ctx.from.id
    const firstName = ctx.from.first_name
    const lastName = ctx.from.last_name
 
    if ( await isNotificationExists(chatId, ctx) ) {
      return
    }
    await Notification.create({
      chatId,
      firstName,
      lastName,
      url: '',
      selector: ''
    })
    const message = `${qMessages.notificationCreated}/${chatId}` 
    await queue.publish(config.notificationsChangesQueue, 'fanout', message)
    ctx.reply(messages.start)
  } catch (error) {
    console.error(error)
    ctx.reply('ERROR')
  }
}

`ctx` – is the object which Telegraf passes to the bot handlers. We can get some useful data from it: chatId, firstName, lastName. It’s enough to register and save a new Notification in the DB. Here we call `Notification.create` to do that using mongoose ( I’ll explain it a little bit later). After that, we are publishing messages to the queue and answering to the user by calling `ctx.reply(messages.start)`. That’s it. We aren’t going to wait for any responses from the service. This workflow is similar for all handlers from `botHandlers.js`.

Working with MongoDB

Let’s return to the `Notification.create`. First of all, we should establish the connection to the DB:

mongoose.connect(config.mongoUrl, {
  useNewUrlParser: true, 
  dbName: config.dbName
})
mongoose.Promise = global.Promise

And define the Notification schema:

const NotificationSchema = new mongoose.Schema({
  chatId: String,
  firstName: String,
  lastName: String,
  url: String,
  selector: String,
  date: { type: Date, default: Date.now }
})
module.exports = mongoose.model('Notification', NotificationSchema)

This will define Notification schema and create the collection in the DB if it doesn’t already exist.

Since we can now create, read, update and delete Notifications data from DB, we. should make this code reusable for all services. Let’s place it into the ‘common’ directory.

PubSub.

We will use `amqplib` module to work with RabbitMQ server. There are a lot of tutorials and explanations of using Node.js and Rabbit here https://www.rabbitmq.com/getstarted.html. We will implement a few cases in practice. The first is PubSub.

The bot service publishes messages to the queue by executing:

await queue.publish(config.notificationsChangesQueue, 'fanout', message)

And it calls this method:

const publish = async (exchangeName, exchangeType, message) => {
  const connect = await amqp.connect(config.rabbitMqUrl)
  const channel = await connect.createChannel()
 
  await channel.assertExchange(exchangeName, exchangeType, {durable: false})
  await channel.publish(exchangeName, '', Buffer.from(message))
 
  console.log('Message published: ', exchangeName, message)
}

I’m going to explain the code line-by-line:

We are establishing a connection to RabbitMQ server using `amqp.connect` and creating a communication channel from this connection using `connect.createChannel`.
If we want to broadcast messages about notification services to PLANNER and COMPARATOR we should properly configured for this queue. We should create an exchange called ‘fanout’ using `channel.assertExchange`. This method will create an exchange in rabbit only if it doesn’t exist. The diagram below will help us to find out what exchange and ‘fanout’ actually are:
fanout

This is the particular schema based on the first diagram. In the rabbit, you don’t push messages to the queue directly, instead you use exchanges. There are four types of exchanges in the rabbit. We are using `fanout`, because we want to implement the PubSub pattern. This exchange makes our messages deliverable to PLANNER and COMPARATOR. If we used the default `direct` exchange here, our messages would be consumed only by one of the services.

That’s it for publishing, let’s continue with subscribe. It is executed in PLANNER services as follows:

const subscribeEmmitter = await queue.subscribe(config.notificationsChangesQueue, 'fanout')
And it calls method from common/queue

const subscribe = async (exchangeName, exchangeType) => {
  const connect = await amqp.connect(config.rabbitMqUrl)
  const channel = await connect.createChannel()
 
  await channel.assertExchange(exchangeName, exchangeType, {durable: false})
  const queue = await channel.assertQueue('', {exclusive: true})
  channel.bindQueue(queue.queue, exchangeName, '')
  const consumeEmitter = new EventEmitter()
 
  try {
    channel.consume(queue.queue, message => {
      if (message !== null) {
        consumeEmitter.emit('data', message.content.toString())
      } else {
        const error = new Error('NullMessageException')
        consumeEmitter.emit('error', error)
      }
    }, {noAck: true})
  } catch (error) {
    consumeEmitter.emit('error', error)
  }
  return consumeEmitter
}

We already know what `amqp.connect`, `connect.createChannel` and `channel.assertExchange` are. `await channel.assertQueue(”, {exclusive: true})` – will create an exclusive queue if it doesn’t exist, `channel.bindQueue` – binds a queue to the exchange. After calling these methods, we will receive structure as on the diagram above. The next step is to create Node.js EventEmitter, start to consume messages from the queue and define the callback with emitters calls. “Consume” method returns the emitter instance so that we can listen to it in our services. Please, notice the `{noAck: true}` option. It turns off the acknowledge mechanism. Messages will be removed from the queue immediately after pushing to the consumer. It’s ok in this case, but we’ll use ack mechanism in other patterns.

Working queue.

The next significant pattern is the working queue. It’s implemented in a pair of services: PLANNER as master and CRAWLER as a worker. Planner grabs all notifications after some time interval from Mongo and pushes them into the queue. Also, it can push tasks to the queue from consuming notification changes.

Why should we use this pattern here? Because of CRAWLER service. It’s the bottleneck in our application as it does most of the heavy tasks – fetches pages, parses them and calculates the md5 hash from the result so we don’t want to store any data in the DB and a hash will be enough. In case the number of users increases, grow, we should have an ability to scale the CRAWLER by adding more running containers. To do this, we have to implement a mechanism such as one described by the diagram below.
mechanism

Each crawler instance should receive one unique message, process it and then send the ack signal to the queue to remove the message from it. This is highly important because there is a great possibility of occurring errors in CRAWLER service. Also, we don’t want to achieve racing conditions which results in the execution of one working task by two or more instances in parallel.

Let’s look at a code sample for getting notifications from mongo, validating URLs and pushing data to the queue:

const pushToQueue = async () => {
  const query = {
    url: { "$exists": true, $ne: '' },
    selector: { "$exists": true, $ne: '' },
  }
  const notifications = await Notification.find(query).select('url selector chatId -_id')
  for (let notification of notifications) {
    if ( utils.isUrl(notification.url) ) {
      await queue.produce(config.workerQueue, JSON.stringify(notification), true, true)
    }
  }
}
setInterval(pushToQueue, config.updateRate)

And the “produce” method:

const produce = async (queue, message, durable = false, persistent = false) => {
  const connect = await amqp.connect(config.rabbitMqUrl)
  const channel = await connect.createChannel()
 
  await channel.assertQueue(queue, { durable })
  await channel.sendToQueue(queue, Buffer.from(message), { persistent })
 
  console.log('Message produced: ', queue, message)
}

It’s quite similar to “publish” method, but here we use default `direct` exchange, so we don’t define it. Also, we set strong and persistent parameters as `true` – it’s essential for this pattern.

In crawler we set up “consume” method:

const consumeEmmitter = await queue.consume(config.workerQueue, false, true, 1)

As you can see from the “consume” method, we will use “ack”, “durable” as true and “prefetch = 1”. They are essential for setting up a working queue:

const consume = async (queue, isNoAck = false, durable = false, prefetch = null) => {
  const connect = await amqp.connect(config.rabbitMqUrl)
  const channel = await connect.createChannel()
  await channel.assertQueue(queue, { durable })
 
  if (prefetch) {
    channel.prefetch(prefetch)
  }
  const consumeEmitter = new EventEmitter()
  try {
    channel.consume(queue, message => {
      if (message !== null) {
        consumeEmitter.emit('data', message.content.toString(), () => channel.ack(message))
      } else {
        const error = new Error('NullMessageException')
        consumeEmitter.emit('error', error)
      }
    }, {noAck: isNoAck})
  } catch (error) {
    consumeEmitter.emit('error', error)
  }
  return consumeEmitter
}

As you can see the implementation of “consume” method is similar to the “subscribe” method, but here we don’t need to setup exchange. Also, we should set the prefetch `channel.prefetch`. With that condition, only one message will be pushed to each consumer until the ack signal is sent to the queue. Take a look at this line `consumeEmitter.emit(‘data’, message.content.toString(), () => channel.ack(message))`. We emit ack callback along with the received message. It will be called after the worker finishes the job.

Collecting the data.

Let’s find out how crawler consumes and processes messages.
`consumeEmmitter` emits a data event, then messages are parsed, and the destination URL is fetched. Then the result is parsed by the query selector, and the hash is calculated from the result. The result or error is dispatched to the next service, then ack() callback is called to remove the message from the queue.

    consumeEmmitter.on('data', async (message, ack) => {
      const { url, selector, chatId} = JSON.parse(message)
      const page = await fetchPage(url).catch(async (error) => {
        const errorResponse = {
          chatId,
          error: `${error.name}: ${error.message}`
        }
        await queue.produce(config.comparatorQueue, JSON.stringify(errorResponse))
        ack()
        throw error
      })
      const parsedString = parseHTML(page, selector)
      const hash = createHash(parsedString)
      const response = {chatId, hash}
      await queue.produce(config.comparatorQueue, JSON.stringify(response))
      ack()
    })
    consumeEmmitter.on('error', error => console.error(error))

You can find implementation of `fetchPage` and `parseHTML` in /crawler/request.js and /crawler/parser.js

Detecting changes.

To detect changes we should create a Cache schema like so:

const Cache = new mongoose.Schema({
  chatId:  String,
  hash: String,
})
module.exports = mongoose.model('Cache', Cache)

After that it’s possible to consume messages from CRAWLER:

consumeEmmitter.on('data', async (message, ack) => {
      const { error, hash, chatId } = JSON.parse(message)
 
      if (error) {
        const message = `${qMessages.fetchError}/${chatId}` 
        await queue.produce(config.responseQueue, message)
      }
 
      const previous = await Cache.findOne({chatId})
 
      if ( previous && hash !== previous.hash ) {
        await Cache.findOneAndUpdate({chatId}, {$set: {hash}})
        const message = `${qMessages.changesDetected}/${chatId}`
        await queue.produce(config.responseQueue, message)
      }
      if (!previous) {
        await Cache.create({chatId, hash})
      }
      ack()
    })
</pre lang="javascript">
As you can see, we  take the cached hash from mongo, and compare it with the receive one. If they aren’t equal, we update the cache and send the message to BOT through the response queue. If the cached data doesn’t exist in the DB, we create it. In case of errors received from CRAWLER, we notify the user. 
 
You will recall, the  COMPARATOR listens to the notification changes to:
 
<pre lang="javascript">subscribeEmmitter.on('data', async message => {
      const messageArr = message.split('/')
      const type = messageArr[0]
      const chatId = messageArr[1]
 
      if (type === qMessages.urlChanged || type === qMessages.selectorChanged) {
        await Cache.findOneAndDelete({chatId})
      }
    })

As you see, the cache will be cleared for `urlChanged` and `selectorChanged` events. It will prevent the comparator from sending messages to the user after making some changes in the bot settings.

Replying to the user.

Congrats, we have almost made a full circle on our quest! Let’s discuss the mechanism of replying to the user. The BOT service consumes messages from COMPARATOR and sends the result to the user by chatId:

consumeEmmitter.on('data', (message, ack) => {
      const messageArr = message.split('/')
      const type = messageArr[0]
      const chatId = messageArr[1]
 
      if (type === qMessages.changesDetected) {
        bot.telegram.sendMessage(chatId, messages.changeDetected)
      }
      if (type === qMessages.fetchError) {
        bot.telegram.sendMessage(chatId, messages.fetchError)
      }
      ack()
    })

Conclusion

That’s it for now. We completed a full cycle of developing Telegram bot using microservices architecture. The implementation of some parts is naive, but it provides some understanding of async patterns. I hope you found this interesting and useful. You can find full project code here https://github.com/malykhin/sneakyBot. Please, feel free to play with it and ask any questions/provide feedback!