NEW!

Our first course on implementing real-time data with websockets is live! Check it out here

How to Set Up a Job Queue in Node.js Using Agenda and MongoDB

What You Will Learn in This Tutorial

How to set up a job queue using Agenda and MongoDB and how to send jobs to that queue via an API endpoint managed by Express.

Getting Started

For this tutorial, we're going to use the CheatCode Node.js Boilerplate as a starting point for our work. First, let's clone a copy:

Terminal

git clone https://github.com/cheatcode/nodejs-server-boilerplate

Next, install the boilerplate's dependencies:

Terminal

cd nodejs-server-boilerplate && npm install

After that, install the dependency we'll use to create our job server agenda:

Terminal

npm i agenda

With all of the dependencies installed, start the development server:

Terminal

npm run dev

With that, we're ready to get started.

Adding an API endpoint for defining jobs

To start, we need to add an API endpoint via the existing Express.js server in the boilerplate. This will allow us to send jobs in to our server remotely.

/api/jobs/index.js

export default (app) => {
  // We'll define our API endpoint via Express here.
};

For the sake of organization, first, we're going to create a separate file for all of our jobs-related routes (technically we'll only have one, but this will keep our code flexible in case you want to expand it later).

Following the existing pattern in the boilerplate, here, we're defining a function and making it the default export from our file. This function expects an existing Express app instance to be passed in as its only argument. Before we implement our route, let's see how that existing app is setup and how this function needs to be called.

/index.js

import express from "express";
import startup from "./lib/startup";
import api from "./api/index";
import jobs from "./api/jobs";
import middleware from "./middleware/index";
import logger from "./lib/logger";

startup()
  .then(() => {
    const app = express();
    const port = process.env.PORT || 5001;

    middleware(app);
    api(app);
    jobs(app);

    app.listen(port, () => { ... });

    process.on("message", (message) => { ... });
  })
  .catch((error) => { ... });

Here, we've opened up the main index.js file for the boilerplate. This is where we set up our Express server and "set up" our app. The part we want to pay attention to here is right in the middle, where we're calling to the function we just exported from the other file and imported into this file as jobs. Just above the call to this function, we're creating our Express app instance.

Calling our function we pass in that app instance so that we can use it to add the route where we'll accept new jobs for our queue.

/api/jobs/index.js

export default (app) => {
  app.use("/jobs", (req, res) => {
    res.send("Job added to queue!");
  });
};

Back in the file where we defined our function, now, using the app instance we've passed in, we're adding a route to our Express server at /jobs. This will serve as the API endpoint where we'll receive requests for scheduling new jobs (when running locally, this will be accessible at http://localhost:5001/jobs).

Inside of the callback for our route, we've added a response for requests to the route via the res.send() function. Let's fulfill that message we're responding with now and set up the queue that we'll add our jobs to.

Setting up a job queue with Agenda

For the sake of simplicity, in the same file, let's import the agenda dependency that we installed at the start of the tutorial and create the queue for our jobs:

/api/jobs/index.js

import Agenda from "agenda";
import settings from "../../lib/settings";

const jobQueue = new Agenda({
  db: {
    address: settings.databases.mongodb.uri,
    collection: "jobs",
  },
});

jobQueue.start();

export default (app) => {
  app.use("/jobs", (req, res) => {
    res.send("Job added to queue!");
  });
};

Up at the top, we import Agenda from the agenda package we installed earlier (we use a capital A for the imported value because we expect it to be a JavaScript class constructor—using a capital letter is a common practice for these in the language).

We also import the settings for the boilerplate. This is a feature that's built in to the boilerplate and allows us to store configuration data for our app. Inside of that /lib/settings file, we have code that will attempt to load a settings file matching the name of the current environment. Right now, we're running in the development environment so it attempts to load a settings-development.json file (from the root of the app). If we were in a production environment, it would try to load settings-production.json from the root of the app.

In development, a settings-development.json file is already provided for us. Further, for our next step, it also includes the URL where our MongoDB database is running. Real quick, if we open up that file, we can see the structure:

/settings-development.json

{
  ...
  "databases": {
    "mongodb": {
      "uri": "mongodb://127.0.0.1/app"
    }
  },
  ...
}

In development, we just point to the copy of MongoDB started on localhost (here, denoted as 127.0.0.1, the IP address version of localhost) automatically for us when we start up the boilerplate.

/api/jobs/index.js

import Agenda from "agenda";
import settings from "../../lib/settings";

const jobQueue = new Agenda({
  db: {
    address: settings.databases.mongodb.uri,
    collection: "jobs",
  },
});

jobQueue.start();

export default (app) => {
  app.use("/jobs", (req, res) => {
    res.send("Job added to queue!");
  });
};

Looking back at our code, with Agenda imported, we create a new instance of it with new Agenda() passing in the settings for our queue. In order for agenda to work, we need to tell it which MongoDB database to store our jobs in and, optionally, the name of the collection in that database where it will store our jobs (each job is given an object with information about when it's supposed to run, any data associated with it, etc).

Here, we pass the settings.databases.mongodb.uri value we just saw in /settings-development.json as the db.address value and set the collection name to jobs (you can change this to whatever you'd like). It's important to note that we store the result of calling new Agenda() in a variable jobQueue. This now contains the queue instance which we'll use to add and manage jobs.

Finally, just beneath our definition for const jobQueue we make sure to call jobQueue.start(). This ensures that Agenda actually processes the jobs we hand to it. Here, we just start it as soon as the file we're working in is loaded onto the server (i.e., on server startup). In your own app, you may want to start this on a more conditional basis.

Next, we need to set up the handler functions for our jobs. We're going to define two functions: one to demonstrate running jobs immediately after they've been added to the queue and another to demonstrate running jobs after a delay.

/api/jobs/index.js

import Agenda from "agenda";
import dayjs from "dayjs";
import settings from "../../lib/settings";

const jobQueue = new Agenda({
  db: {
    address: settings.databases.mongodb.uri,
    collection: "jobs",
  },
});

jobQueue.define("instantJob", async (job) => {
  const data = job?.attrs?.data;
  console.log(
    "This job is running as soon as it was received. This is the data that was sent:"
  );
  console.log(data);
});

jobQueue.define("delayedJob", async (job) => {
  const data = job?.attrs?.data;
  console.log(
    "This job is running after a 5 second delay. This is the data that was sent:"
  );
  console.log(data);
});

jobQueue.start();

export default (app) => {
  app.use("/jobs", (req, res) => {
    res.send("Job added to queue!");
  });
};

Inbetween our jobQueue definition and the call to jobQueue.start(), we've added in two calls to jobQueue.define(). This is the function we use to tell Agenda what to do when a job of a given type is next to run in the queue. Here, we define two types of jobs instantJob and delayedJob (we pass these names as the first argument to jobQueue.define()).

Inside of the callback function for each job type, we pull the data we expect to be passed to the job (for our example this will just be dummy data but for your own app this will provide additional context when running your job—a userId, some data to store, etc.) from the job.attrs.data value where job is passed to us via Agenda and contains an object describing the current job we're trying to run. The custom data we pass is stored on this object in its attrs.data value.

With that data, next, in both jobs we log out a message to tell us what type of job we're running, followed by a log of the data we passed along. In your own app, this is where you would run the code for your job.

Right now, this may seem confusing—we've defined two types of jobs that are nearly identical. Next, we'll learn how to take in jobs via our API endpoint and how we'll differentiate between the two types we've defined above.

Scheduling jobs via the API endpoint

To make comprehension easier, we're going to add all of the remaining code now and step through it.

/api/jobs/index.js

import Agenda from "agenda";
import dayjs from "dayjs";
import settings from "../../lib/settings";

const jobQueue = new Agenda({ ... });

jobQueue.define("instantJob", async (job) => { ... });

jobQueue.define("delayedJob", async (job) => { ... });

jobQueue.start();

export default (app) => {
  app.use("/jobs", (req, res) => {
    const jobType = req?.query?.jobType;
    const allowedJobs = Object.keys(jobQueue._definitions);

    if (!jobType) {
      return res.send("Must pass a jobType in the query params.");
    }

    if (!allowedJobs.includes(jobType)) {
      return res.send(
        `${jobType} is not supported. Must pass one of ${allowedJobs.join(
          ", or "
        )} as jobType in the query params.`
      );
    }

    if (jobType === "instantJob") {
      jobQueue.now(req?.query?.jobType, req.body);
    }

    if (jobType === "delayedJob") {
      jobQueue.schedule(
        dayjs().add(5, "seconds").format(),
        req?.query?.jobType,
        req.body
      );
    }

    res.send("Job added to queue!");
  });
};

Focusing on the callback function for our route, the code we've added here is solving three problems: figuring out which job to run (instantJob or delayedJob), validating whether or not that job is one we've defined, and then, if it is, adding that job to the queue.

In order to identify which job to run, we look at the query object of the incoming req object passed to our route callback. Here, query represents the query params included in the URL when calling the route, like ?jobType=instantJob. The idea here being that when we run our job, we'll use the query parameter jobType to tell us where our job is headed.

Just beneath this, we get the list of allowed jobs by using the built-in JavaScript method Object.keys() to get back an array of the jobs we've defined our jobQueue (these are stored in the _definitions object on our Agenda instance).

Next, first, we make sure that a jobType has been passed in our query params. If it hasn't, we respond to the request with a warning message.

If we do have a jobType, next, we validate that it's one of our allowedJobs. Assuming that the value stored in jobType is in that array, we move on to adding the job to the queue. If it's not, we respond with another warning, letting the user know that the passed jobType is invalid and provide a list of the possible job types available (details!).

Moving down to queueing our jobs, recall that our goal is to either a.) add our job to the queue to run immediately, or, b.) schedule the job to run in the future. Here, for jobs of the type instantJob, we call to the .now() method on jobQueue, passing in the job type we want to run—from our query params—and the data (what we pull off of job.attrs.data inside of the job callback) we want to pass in, which in this case is the body of the req object (hint: we're assuming our jobs are passed to our route as an HTTP POST request).

Next, for our delayedJob type, we call to jobQueue.schedule(), passing a date that we want our job to run at along with our jobType and req.body, just like we did for instantJob. To generate a date, here, we're using the dayjs library that's included in the boilerplate we're using. The line dayjs().add(5, "seconds").format() here says "get the current date and time, add five seconds to it, and then format it as an ISO-8601 string ("iso string" for short, a standardized date format like 2021-07-29T23:00:00Z)."

That's it! If we test out our different jobs, we'll see the logs we added earlier displaying in our server console:

Wrapping up

In this tutorial, we learned how to implement a job queue using the Agenda library in conjunction with MongoDB. We learned how to set up a route via Express for receiving jobs, how to set up a queue with Agenda, how to define job types on that queue, and finally how to add jobs received via our API route to that queue.

Get the latest free JavaScript and Node.js tutorials, course announcements, and updates from CheatCode in your inbox.

No spam. Just new tutorials, course announcements, and updates from CheatCode.

Questions & Comments

Cart

Your cart is empty!

  • Subtotal

    $0.00

  • Total

    $0.00