| Class | Delayed::Worker |
| In: |
lib/delayed/worker.rb
|
| Parent: | Object |
| DEFAULT_SLEEP_DELAY | = | 5 |
| DEFAULT_MAX_ATTEMPTS | = | 25 |
| DEFAULT_MAX_RUN_TIME | = | 4.hours |
| DEFAULT_DEFAULT_PRIORITY | = | 0 |
| DEFAULT_DELAY_JOBS | = | true |
| DEFAULT_QUEUES | = | [] |
| DEFAULT_READ_AHEAD | = | 5 |
| name_prefix | [RW] | name_prefix is ignored if name is set directly |
# File lib/delayed/worker.rb, line 78
78: def self.after_fork
79: # Re-open file handles
80: @files_to_reopen.each do |file|
81: begin
82: file.reopen file.path, "a+"
83: file.sync = true
84: rescue ::Exception
85: end
86: end
87:
88: backend.after_fork
89: end
# File lib/delayed/worker.rb, line 53
53: def self.backend=(backend)
54: if backend.is_a? Symbol
55: require "delayed/serialization/#{backend}"
56: require "delayed/backend/#{backend}"
57: backend = "Delayed::Backend::#{backend.to_s.classify}::Job".constantize
58: end
59: @@backend = backend
60: silence_warnings { ::Delayed.const_set(:Job, backend) }
61: end
# File lib/delayed/worker.rb, line 67
67: def self.before_fork
68: unless @files_to_reopen
69: @files_to_reopen = []
70: ObjectSpace.each_object(File) do |file|
71: @files_to_reopen << file unless file.closed?
72: end
73: end
74:
75: backend.before_fork
76: end
# File lib/delayed/worker.rb, line 63
63: def self.guess_backend
64: warn "[DEPRECATION] guess_backend is deprecated. Please remove it from your code."
65: end
# File lib/delayed/worker.rb, line 91
91: def self.lifecycle
92: @lifecycle ||= Delayed::Lifecycle.new
93: end
# File lib/delayed/worker.rb, line 95
95: def initialize(options={})
96: @quiet = options.has_key?(:quiet) ? options[:quiet] : true
97: self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority)
98: self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority)
99: self.class.sleep_delay = options[:sleep_delay] if options.has_key?(:sleep_delay)
100: self.class.read_ahead = options[:read_ahead] if options.has_key?(:read_ahead)
101: self.class.queues = options[:queues] if options.has_key?(:queues)
102:
103: self.plugins.each { |klass| klass.new }
104: end
# File lib/delayed/worker.rb, line 21
21: def self.reset
22: self.sleep_delay = DEFAULT_SLEEP_DELAY
23: self.max_attempts = DEFAULT_MAX_ATTEMPTS
24: self.max_run_time = DEFAULT_MAX_RUN_TIME
25: self.default_priority = DEFAULT_DEFAULT_PRIORITY
26: self.delay_jobs = DEFAULT_DELAY_JOBS
27: self.queues = DEFAULT_QUEUES
28: self.read_ahead = DEFAULT_READ_AHEAD
29: end
# File lib/delayed/worker.rb, line 205
205: def failed(job)
206: self.class.lifecycle.run_callbacks(:failure, self, job) do
207: job.hook(:failure)
208: self.class.destroy_failed_jobs ? job.destroy : job.fail!
209: end
210: end
# File lib/delayed/worker.rb, line 218
218: def max_attempts(job)
219: job.max_attempts || self.class.max_attempts
220: end
Every worker has a unique name which by default is the pid of the process. There are some advantages to overriding this with something which survives worker retarts: Workers can# safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
# File lib/delayed/worker.rb, line 110
110: def name
111: return @name unless @name.nil?
112: "#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue "#{@name_prefix}pid:#{Process.pid}"
113: end
Reschedule the job in the future (when a job fails). Uses an exponential scale depending on the number of failed attempts.
# File lib/delayed/worker.rb, line 193
193: def reschedule(job, time = nil)
194: if (job.attempts += 1) < max_attempts(job)
195: time ||= job.reschedule_at
196: job.run_at = time
197: job.unlock
198: job.save!
199: else
200: say "PERMANENTLY removing #{job.name} because of #{job.attempts} consecutive failures.", Logger::INFO
201: failed(job)
202: end
203: end
# File lib/delayed/worker.rb, line 176
176: def run(job)
177: runtime = Benchmark.realtime do
178: Timeout.timeout(self.class.max_run_time.to_i) { job.invoke_job }
179: job.destroy
180: end
181: say "#{job.name} completed after %.4f" % runtime
182: return true # did work
183: rescue DeserializationError => error
184: job.last_error = "{#{error.message}\n#{error.backtrace.join("\n")}"
185: failed(job)
186: rescue Exception => error
187: self.class.lifecycle.run_callbacks(:error, self, job){ handle_failed_job(job, error) }
188: return false # work failed
189: end
# File lib/delayed/worker.rb, line 212
212: def say(text, level = Logger::INFO)
213: text = "[Worker(#{name})] #{text}"
214: puts text unless @quiet
215: logger.add level, "#{Time.now.strftime('%FT%T%z')}: #{text}" if logger
216: end
# File lib/delayed/worker.rb, line 121
121: def start
122: trap('TERM') { say 'Exiting...'; stop }
123: trap('INT') { say 'Exiting...'; stop }
124:
125: say "Starting job worker"
126:
127: self.class.lifecycle.run_callbacks(:execute, self) do
128: loop do
129: self.class.lifecycle.run_callbacks(:loop, self) do
130: result = nil
131:
132: realtime = Benchmark.realtime do
133: result = work_off
134: end
135:
136: count = result.sum
137:
138: break if @exit
139:
140: if count.zero?
141: sleep(self.class.sleep_delay)
142: else
143: say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last]
144: end
145: end
146:
147: break if @exit
148: end
149: end
150: end
Do num jobs and return stats on success/failure. Exit early if interrupted.
# File lib/delayed/worker.rb, line 158
158: def work_off(num = 100)
159: success, failure = 0, 0
160:
161: num.times do
162: case reserve_and_run_one_job
163: when true
164: success += 1
165: when false
166: failure += 1
167: else
168: break # leave if no work could be done
169: end
170: break if $exit # leave if we're exiting
171: end
172:
173: return [success, failure]
174: end
# File lib/delayed/worker.rb, line 224
224: def handle_failed_job(job, error)
225: job.last_error = "{#{error.message}\n#{error.backtrace.join("\n")}"
226: say "#{job.name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts", Logger::ERROR
227: reschedule(job)
228: end