| Class | Delayed::Job |
| In: |
lib/delayed/job.rb
|
| Parent: | ActiveRecord::Base |
A job object that is persisted to the database. Contains the work object as a YAML field.
| MAX_ATTEMPTS | = | 25 |
| MAX_RUN_TIME | = | 4.hours |
| NextTaskSQL | = | '(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR (locked_by = ?)) AND failed_at IS NULL' |
| NextTaskOrder | = | 'priority DESC, run_at ASC' |
| ParseObjectFromYaml | = | /\!ruby\/\w+\:([^\s]+)/ |
When a worker is exiting, make sure we don‘t have any locked jobs.
# File lib/delayed/job.rb, line 37
37: def self.clear_locks!
38: update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
39: end
Add a job to the queue
# File lib/delayed/job.rb, line 107
107: def self.enqueue(*args, &block)
108: object = block_given? ? EvaledJob.new(&block) : args.shift
109:
110: unless object.respond_to?(:perform) || block_given?
111: raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
112: end
113:
114: priority = args.first || 0
115: run_at = args[1]
116:
117: Job.create(:payload_object => object, :priority => priority.to_i, :run_at => run_at)
118: end
Find a few candidate jobs to run (in case some immediately get locked by others).
# File lib/delayed/job.rb, line 121
121: def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME)
122:
123: time_now = db_time_now
124:
125: sql = NextTaskSQL.dup
126:
127: conditions = [time_now, time_now - max_run_time, worker_name]
128:
129: if self.min_priority
130: sql << ' AND (priority >= ?)'
131: conditions << min_priority
132: end
133:
134: if self.max_priority
135: sql << ' AND (priority <= ?)'
136: conditions << max_priority
137: end
138:
139: conditions.unshift(sql)
140:
141: ActiveRecord::Base.silence do
142: find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit)
143: end
144: end
Run the next job we can get an exclusive lock on. If no jobs are left we return nil
# File lib/delayed/job.rb, line 148
148: def self.reserve_and_run_one_job(max_run_time = MAX_RUN_TIME)
149:
150: # We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next.
151: # this leads to a more even distribution of jobs across the worker processes
152: find_available(5, max_run_time).each do |job|
153: t = job.run_with_lock(max_run_time, worker_name)
154: return t unless t == nil # return if we did work (good or bad)
155: end
156:
157: nil # we didn't do any work, all 5 were not lockable
158: end
Do num jobs and return stats on success/failure. Exit early if interrupted.
# File lib/delayed/job.rb, line 195
195: def self.work_off(num = 100)
196: success, failure = 0, 0
197:
198: num.times do
199: case self.reserve_and_run_one_job
200: when true
201: success += 1
202: when false
203: failure += 1
204: else
205: break # leave if no work could be done
206: end
207: break if $exit # leave if we're exiting
208: end
209:
210: return [success, failure]
211: end
Moved into its own method so that new_relic can trace it.
# File lib/delayed/job.rb, line 214
214: def invoke_job
215: payload_object.perform
216: end
Lock this job for this worker. Returns true if we have the lock, false otherwise.
# File lib/delayed/job.rb, line 162
162: def lock_exclusively!(max_run_time, worker = worker_name)
163: now = self.class.db_time_now
164: affected_rows = if locked_by != worker
165: # We don't own this job so we will update the locked_by name and the locked_at
166: self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?) and (run_at <= ?)", id, (now - max_run_time.to_i), now])
167: else
168: # We already own this job, this may happen if the job queue crashes.
169: # Simply resume and update the locked_at
170: self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
171: end
172: if affected_rows == 1
173: self.locked_at = now
174: self.locked_by = worker
175: return true
176: else
177: return false
178: end
179: end
This is a good hook if you need to report job processing errors in additional or different ways
# File lib/delayed/job.rb, line 188
188: def log_exception(error)
189: logger.error "* [JOB] #{name} failed with #{error.class.name}: #{error.message} - #{attempts} failed attempts"
190: logger.error(error)
191: end
# File lib/delayed/job.rb, line 50
50: def name
51: @name ||= begin
52: payload = payload_object
53: if payload.respond_to?(:display_name)
54: payload.display_name
55: else
56: payload.class.name
57: end
58: end
59: end
# File lib/delayed/job.rb, line 46
46: def payload_object
47: @payload_object ||= deserialize(self['handler'])
48: end
# File lib/delayed/job.rb, line 61
61: def payload_object=(object)
62: self['handler'] = object.to_yaml
63: end
Reschedule the job in the future (when a job fails). Uses an exponential scale depending on the number of failed attempts.
# File lib/delayed/job.rb, line 67
67: def reschedule(message, backtrace = [], time = nil)
68: if (self.attempts += 1) < MAX_ATTEMPTS
69: time ||= Job.db_time_now + (attempts ** 4) + 5
70:
71: self.run_at = time
72: self.last_error = message + "\n" + backtrace.join("\n")
73: self.unlock
74: save!
75: else
76: logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures."
77: destroy_failed_jobs ? destroy : update_attribute(:failed_at, Time.now)
78: end
79: end
Try to run one job. Returns true/false (work done/work failed) or nil if job can‘t be locked.
# File lib/delayed/job.rb, line 83
83: def run_with_lock(max_run_time, worker_name)
84: logger.info "* [JOB] acquiring lock on #{name}"
85: unless lock_exclusively!(max_run_time, worker_name)
86: # We did not get the lock, some other worker process must have
87: logger.warn "* [JOB] failed to acquire exclusive lock for #{name}"
88: return nil # no work done
89: end
90:
91: begin
92: runtime = Benchmark.realtime do
93: Timeout.timeout(max_run_time.to_i) { invoke_job }
94: destroy
95: end
96: # TODO: warn if runtime > max_run_time ?
97: logger.info "* [JOB] #{name} completed after %.4f" % runtime
98: return true # did work
99: rescue Exception => e
100: reschedule e.message, e.backtrace
101: log_exception(e)
102: return false # work failed
103: end
104: end
Unlock this job (note: not saved to DB)
# File lib/delayed/job.rb, line 182
182: def unlock
183: self.locked_at = nil
184: self.locked_by = nil
185: end