var _ = require('./utility'); /* * Queue - an object which handles which handles a queue of items to be sent to Rollbar. * This object handles rate limiting via a passed in rate limiter, retries based on connection * errors, and filtering of items based on a set of configurable predicates. The communication to * the backend is performed via a given API object. * * @param rateLimiter - An object which conforms to the interface * rateLimiter.shouldSend(item) -> bool * @param api - An object which conforms to the interface * api.postItem(payload, function(err, response)) * @param logger - An object used to log verbose messages if desired * @param options - see Queue.prototype.configure */ function Queue(rateLimiter, api, logger, options) { this.rateLimiter = rateLimiter; this.api = api; this.logger = logger; this.options = options; this.predicates = []; this.pendingItems = []; this.pendingRequests = []; this.retryQueue = []; this.retryHandle = null; this.waitCallback = null; this.waitIntervalID = null; } /* * configure - updates the options this queue uses * * @param options */ Queue.prototype.configure = function(options) { this.api && this.api.configure(options); var oldOptions = this.options; this.options = _.merge(oldOptions, options); return this; }; /* * addPredicate - adds a predicate to the end of the list of predicates for this queue * * @param predicate - function(item, options) -> (bool|{err: Error}) * Returning true means that this predicate passes and the item is okay to go on the queue * Returning false means do not add the item to the queue, but it is not an error * Returning {err: Error} means do not add the item to the queue, and the given error explains why * Returning {err: undefined} is equivalent to returning true but don't do that */ Queue.prototype.addPredicate = function(predicate) { if (_.isFunction(predicate)) { this.predicates.push(predicate); } return this; }; Queue.prototype.addPendingItem = function(item) { this.pendingItems.push(item); }; Queue.prototype.removePendingItem = function(item) { var idx = this.pendingItems.indexOf(item); if (idx !== -1) { this.pendingItems.splice(idx, 1); } }; /* * addItem - Send an item to the Rollbar API if all of the predicates are satisfied * * @param item - The payload to send to the backend * @param callback - function(error, repsonse) which will be called with the response from the API * in the case of a success, otherwise response will be null and error will have a value. If both * error and response are null then the item was stopped by a predicate which did not consider this * to be an error condition, but nonetheless did not send the item to the API. * @param originalError - The original error before any transformations that is to be logged if any */ Queue.prototype.addItem = function(item, callback, originalError, originalItem) { if (!callback || !_.isFunction(callback)) { callback = function() { return; }; } var predicateResult = this._applyPredicates(item); if (predicateResult.stop) { this.removePendingItem(originalItem); callback(predicateResult.err); return; } this._maybeLog(item, originalError); this.removePendingItem(originalItem); if (!this.options.transmit) { callback(new Error('Transmit disabled')); return; } this.pendingRequests.push(item); try { this._makeApiRequest(item, function(err, resp) { this._dequeuePendingRequest(item); callback(err, resp); }.bind(this)); } catch (e) { this._dequeuePendingRequest(item); callback(e); } }; /* * wait - Stop any further errors from being added to the queue, and get called back when all items * currently processing have finished sending to the backend. * * @param callback - function() called when all pending items have been sent */ Queue.prototype.wait = function(callback) { if (!_.isFunction(callback)) { return; } this.waitCallback = callback; if (this._maybeCallWait()) { return; } if (this.waitIntervalID) { this.waitIntervalID = clearInterval(this.waitIntervalID); } this.waitIntervalID = setInterval(function() { this._maybeCallWait(); }.bind(this), 500); }; /* _applyPredicates - Sequentially applies the predicates that have been added to the queue to the * given item with the currently configured options. * * @param item - An item in the queue * @returns {stop: bool, err: (Error|null)} - stop being true means do not add item to the queue, * the error value should be passed up to a callbak if we are stopping. */ Queue.prototype._applyPredicates = function(item) { var p = null; for (var i = 0, len = this.predicates.length; i < len; i++) { p = this.predicates[i](item, this.options); if (!p || p.err !== undefined) { return {stop: true, err: p.err}; } } return {stop: false, err: null}; }; /* * _makeApiRequest - Send an item to Rollbar, callback when done, if there is an error make an * effort to retry if we are configured to do so. * * @param item - an item ready to send to the backend * @param callback - function(err, response) */ Queue.prototype._makeApiRequest = function(item, callback) { var rateLimitResponse = this.rateLimiter.shouldSend(item); if (rateLimitResponse.shouldSend) { this.api.postItem(item, function(err, resp) { if (err) { this._maybeRetry(err, item, callback); } else { callback(err, resp); } }.bind(this)); } else if (rateLimitResponse.error) { callback(rateLimitResponse.error); } else { this.api.postItem(rateLimitResponse.payload, callback); } }; // These are errors basically mean there is no internet connection var RETRIABLE_ERRORS = ['ECONNRESET', 'ENOTFOUND', 'ESOCKETTIMEDOUT', 'ETIMEDOUT', 'ECONNREFUSED', 'EHOSTUNREACH', 'EPIPE', 'EAI_AGAIN']; /* * _maybeRetry - Given the error returned by the API, decide if we should retry or just callback * with the error. * * @param err - an error returned by the API transport * @param item - the item that was trying to be sent when this error occured * @param callback - function(err, response) */ Queue.prototype._maybeRetry = function(err, item, callback) { var shouldRetry = false; if (this.options.retryInterval) { for (var i = 0, len = RETRIABLE_ERRORS.length; i < len; i++) { if (err.code === RETRIABLE_ERRORS[i]) { shouldRetry = true; break; } } if (shouldRetry && _.isFiniteNumber(this.options.maxRetries)) { item.retries = item.retries ? item.retries + 1 : 1; if (item.retries > this.options.maxRetries) { shouldRetry = false; } } } if (shouldRetry) { this._retryApiRequest(item, callback); } else { callback(err); } }; /* * _retryApiRequest - Add an item and a callback to a queue and possibly start a timer to process * that queue based on the retryInterval in the options for this queue. * * @param item - an item that failed to send due to an error we deem retriable * @param callback - function(err, response) */ Queue.prototype._retryApiRequest = function(item, callback) { this.retryQueue.push({item: item, callback: callback}); if (!this.retryHandle) { this.retryHandle = setInterval(function() { while (this.retryQueue.length) { var retryObject = this.retryQueue.shift(); this._makeApiRequest(retryObject.item, retryObject.callback); } }.bind(this), this.options.retryInterval); } }; /* * _dequeuePendingRequest - Removes the item from the pending request queue, this queue is used to * enable to functionality of providing a callback that clients can pass to `wait` to be notified * when the pending request queue has been emptied. This must be called when the API finishes * processing this item. If a `wait` callback is configured, it is called by this function. * * @param item - the item previously added to the pending request queue */ Queue.prototype._dequeuePendingRequest = function(item) { var idx = this.pendingRequests.indexOf(item); if (idx !== -1) { this.pendingRequests.splice(idx, 1); this._maybeCallWait(); } }; Queue.prototype._maybeLog = function(data, originalError) { if (this.logger && this.options.verbose) { var message = originalError; message = message || _.get(data, 'body.trace.exception.message'); message = message || _.get(data, 'body.trace_chain.0.exception.message'); if (message) { this.logger.error(message); return; } message = _.get(data, 'body.message.body'); if (message) { this.logger.log(message); } } }; Queue.prototype._maybeCallWait = function() { if (_.isFunction(this.waitCallback) && this.pendingItems.length === 0 && this.pendingRequests.length === 0) { if (this.waitIntervalID) { this.waitIntervalID = clearInterval(this.waitIntervalID); } this.waitCallback(); return true; } return false; }; module.exports = Queue;