agenda/find-and-lock-next-job.js

'use strict';
const debug = require('debug')('agenda:internal:_findAndLockNextJob');
const {createJob} = require('../utils');

/**
 * Find and lock jobs
 * @name Agenda#findAndLockNextJob
 * @function
 * @param {String} jobName name of job to try to lock
 * @param {Object} definition definition used to tell how job is run
 * @access protected
 * @caller jobQueueFilling() only
 * @returns Promise
 */
module.exports = async function(jobName, definition) {
  const self = this;
  const now = new Date();
  const lockDeadline = new Date(Date.now().valueOf() - definition.lockLifetime);
  debug('_findAndLockNextJob(%s, [Function])', jobName);

  // Don't try and access MongoDB if we've lost connection to it.
  // Trying to resolve crash on Dev PC when it resumes from sleep. NOTE: Does this still happen?
  const s = this._mdb.s || this._mdb.db.s;
  if (s.topology.connections && s.topology.connections().length === 0 && !this._mongoUseUnifiedTopology) {
    if (s.topology.autoReconnect && !s.topology.isDestroyed()) {
      // Continue processing but notify that Agenda has lost the connection
      debug('Missing MongoDB connection, not attempting to find and lock a job');
      self.emit('error', new Error('Lost MongoDB connection'));
    } else {
      // No longer recoverable
      debug('topology.autoReconnect: %s, topology.isDestroyed(): %s', s.topology.autoReconnect, s.topology.isDestroyed());
      throw new Error('MongoDB connection is not recoverable, application restart required');
    }
  } else {
    // /**
    // * Query used to find job to run
    // * @type {{$and: [*]}}
    // */
    const JOB_PROCESS_WHERE_QUERY = {
      $and: [{
        name: jobName,
        disabled: {$ne: true}
      }, {
        $or: [{
          lockedAt: {$eq: null},
          nextRunAt: {$lte: this._nextScanAt}
        }, {
          lockedAt: {$lte: lockDeadline}
        }]
      }]
    };

    /**
     * Query used to set a job as locked
     * @type {{$set: {lockedAt: Date}}}
     */
    const JOB_PROCESS_SET_QUERY = {$set: {lockedAt: now}};

    /**
     * Query used to affect what gets returned
     * @type {{returnOriginal: boolean, sort: object}}
     */
    const JOB_RETURN_QUERY = {returnOriginal: false, sort: this._sort};

    // Find ONE and ONLY ONE job and set the 'lockedAt' time so that job begins to be processed
    const result = await this._collection.findOneAndUpdate(JOB_PROCESS_WHERE_QUERY, JOB_PROCESS_SET_QUERY, JOB_RETURN_QUERY);

    let job;
    if (result.value) {
      debug('found a job available to lock, creating a new job on Agenda with id [%s]', result.value._id);
      job = createJob(self, result.value);
    }

    return job;
  }
};