| Class | Dsadmin::Admind::MirrorManager |
| In: |
lib/dsadmin/admind/mirror_manager.rb
|
| Parent: | Object |
FIX: Reexamine threadsafety
| cron | [R] | |
| max_update_duration | [R] |
# File lib/dsadmin/admind/mirror_manager.rb, line 198
198: def initialize
199: @cronjobs = Hash.new
200: @cron = CronProxy.new
201:
202: @running_updates = Hash.new # mirrorid => thread
203: @mutex = Mutex.new
204:
205: @max_update_duration = cfg.get("modules/mirror/max_update_duration/minutes", Integer) * 60
206: end
Pause the scheduling of updates of the specified mirror.
This does not pause a running update - it just makes sure that no more updates are initiated (until cronResume() is called) for that mirror.
| Throws: | ArgumentError if the given mirror does not exist |
# File lib/dsadmin/admind/mirror_manager.rb, line 133
133: def cron_pause(mirrorid)
134: find_mirror(mirrorid)
135:
136: job = @cronjobs[mirrorid]
137: if(job)
138: req = Dsadmin::Request.new_simple(:meta, :cron_pause, 'jobid' => job)
139: RequestProcessor.instance.process(req)
140: end
141: end
Resume the scheduling of updates of the specified mirror.
| Throws: | ArgumentError if the given mirror does not exist |
# File lib/dsadmin/admind/mirror_manager.rb, line 148
148: def cron_resume(mirrorid)
149: # For checking the validity of mirrorid
150: find_mirror(mirrorid)
151:
152: job = @cronjobs[mirrorid]
153: if(job)
154: req = Dsadmin::Request.new_simple(:meta, :cron_resume, 'jobid' => job)
155: RequestProcessor.instance.process(req)
156: end
157: end
Force the system to reload / reevaluate the specified mirror.
Call this every time some mirror is created/edited/destroyed
| Throws: | ArgumentError if the given mirror does not exist |
# File lib/dsadmin/admind/mirror_manager.rb, line 115
115: def invalidate(mirrorid)
116: reschedule(mirrorid)
117:
118: begin
119: trigger_update(mirrorid)
120: rescue ArgumentError => e
121: # This is the case when invalidate() is called after the mirror has been destroyed.
122: # A perfectly fine case, and nothing to worry about.
123: end
124: end
Returns an array of configfile resources used by this class (for mapping the MirrorController to the proper admind instance)
# File lib/dsadmin/admind/mirror_manager.rb, line 181
181: def resources
182: [cfg.get("modules/mirror/commands/update/du", ConfigSection)] +
183: FtpUpdater.resources +
184: RsyncUpdater.resources
185: end
Schedule all active mirrors. FIX: Thread safety?
# File lib/dsadmin/admind/mirror_manager.rb, line 162
162: def schedule_all
163: mirrors = Mirror.find(:all)
164:
165: for mirror in mirrors do
166: next if(@cronjobs.has_key?(mirror.id))
167:
168: if(mirror.active)
169: schedule_update(mirror)
170: else
171: @cronjobs.delete(mirror.id)
172: end
173: end
174:
175: schedule_index_building
176: end
Trigger an update of the specified mirror.
Returns immediately.
| Throws: | ArgumentError if the given mirror does not exist |
# File lib/dsadmin/admind/mirror_manager.rb, line 93
93: def trigger_update(mirrorid)
94: if(@cronjobs.has_key?(mirrorid))
95: log.debug("Triggering forced cronjob run for mirror ##{mirrorid}")
96: req = Dsadmin::Request.new_simple(:meta, :cron_forcerun, 'jobid' => @cronjobs[mirrorid])
97: RequestProcessor.instance.process(req)
98: else
99: # For checking the validity of mirrorid
100: find_mirror(mirrorid)
101:
102: log.debug("Triggering update of mirror ##{mirrorid} in separate thread")
103: spawnThread {
104: update(mirrorid)
105: }
106: end
107: end
Perform an update of the specified mirror.
Blocks until the mirror update is complete or timed out (which can take several hours!) May not be called via an external request - use trigger_update() for that!
| Throws: | ArgumentError if the given mirror does not exist |
# File lib/dsadmin/admind/mirror_manager.rb, line 54
54: def update(mirrorid)
55: res = nil
56: mirror = find_mirror(mirrorid)
57: time_start = sys.now
58: msg = ""
59:
60: return wrap_update(mirrorid) do
61: updater = MirrorUpdater.create(mirror)
62:
63: res, msg = perform_update(mirror, updater)
64:
65: response = nil;
66:
67: if(res == Mirror::S_SUCCESS)
68: mirror.size = mirror_size(mirror)
69: mirror.last_update = sys.now
70: mirror.save
71: response = Dsadmin::Response::SUCCESS
72: elsif(res == Mirror::S_DELAYED)
73: response = Dsadmin::Response.new(:temporarily_unavailable, :message => "Delayed - Load limit reached")
74: else
75: response = Dsadmin::Response.new(:operation_failed, :message => msg)
76: end
77:
78: write_statistics(mirror, res, time_start, updater.transfer_size, msg)
79:
80: rebuild_symlinks
81:
82: response
83: end
84: end
Little wrapper around Mirror#find(id) that raises an ArgumentError instead of ActiveRecord::RecordNotFound
# File lib/dsadmin/admind/mirror_manager.rb, line 211
211: def find_mirror(mirrorid)
212: begin
213: Mirror.find(mirrorid)
214: rescue ActiveRecord::RecordNotFound
215: raise ArgumentError.new("Unknown mirror ##{mirrorid}")
216: end
217: end
Determine the current actual size of the given mirror (using du(1))
# File lib/dsadmin/admind/mirror_manager.rb, line 332
332: def mirror_size(mirror)
333: csect = cfg.get("modules/mirror/commands/update/du", ConfigSection)
334: cmd = Dsadmin::SystemCommand.new(csect)
335: dir = File.join(mirror_path, mirror.local_dir)
336:
337: res = cmd.exec3(dir)
338:
339: unless(res['status'].success?)
340: log.error("Could not determine size of mirror '#{mirror.name}' " +
341: "- Command '#{cmd.to_s}' failed with '#{res['stderr']}'")
342: end
343:
344: return res['stdout'][/^\d+/].to_i
345: end
# File lib/dsadmin/admind/mirror_manager.rb, line 302
302: def perform_update (mirror, updater)
303: begin
304: timeout(max_update_duration) {
305: res = updater.update
306: return [res, updater.error_msg]
307: }
308: rescue TimeoutError => xcept
309: msg = "Update of mirror '#{mirror.name}' timed out"
310: log.notice(msg)
311: return [Mirror::S_FAILURE, msg]
312: end
313: end
# File lib/dsadmin/admind/mirror_manager.rb, line 348
348: def rebuild_symlinks
349: # FIX: Do this via cfengine??
350: end
(Re-)schedule regular updates of the given mirror.
# File lib/dsadmin/admind/mirror_manager.rb, line 258
258: def reschedule(mirrorid)
259: @mutex.synchronize {
260: if(@cronjobs.has_key?(mirrorid))
261: cron.remove(@cronjobs[mirrorid])
262: @cronjobs.delete(mirrorid)
263: end
264: }
265:
266: mirror = find_mirror(mirrorid)
267: schedule_update(mirror) if(mirror and mirror.active?) # mirror might have been deleted
268: end
# File lib/dsadmin/admind/mirror_manager.rb, line 245
245: def reschedule_all
246: @mutex.synchronize {
247: @cronjobs.each { |mirror, job|
248: cron.remove(job) unless(job.nil?)
249: @cronjobs.delete[mirror]
250: }
251: }
252:
253: schedule_all
254: end
# File lib/dsadmin/admind/mirror_manager.rb, line 287
287: def schedule_index_building
288: req = Dsadmin::Request.new_simple(:mirror, :buildindex)
289:
290: spec = CronjobSpec.new
291: spec.request = req
292: spec.last_run_time = Time.at(0)
293: spec.timespec = "+5m"
294: spec.failure_policy = Cron::RETRY_EXP015
295: spec.delay_policy = Cron::RETRY_REG15
296:
297: job = cron.add(spec)
298: return job
299: end
# File lib/dsadmin/admind/mirror_manager.rb, line 271
271: def schedule_update(mirror)
272: req = Dsadmin::Request.new_simple(:mirror, :update, 'id' => mirror.id)
273:
274: spec = CronjobSpec.new
275: spec.request = req
276: spec.last_run_time = mirror.last_update || Time.at(0)
277: spec.timespec = mirror.update_timespec
278: spec.failure_policy = Cron::RETRY_EXP015
279: spec.delay_policy = Cron::RETRY_REG15
280:
281: job = cron.add(spec)
282: @cronjobs[mirror.id] = job
283: return job
284: end
Wrap the given block of mirror updating code (a) in code to keep track of mirrors being updates (see @running_updates) and (b) in a catchall exception handler
# File lib/dsadmin/admind/mirror_manager.rb, line 222
222: def wrap_update(mirrorid, &block)
223: @mutex.synchronize {
224: if(@running_updates.has_key?(mirrorid))
225: log.notice("Mirror ##{mirrorid} is already being updated")
226: return Dsadmin::Response::SUCCESS
227: end
228:
229: @running_updates[mirrorid] = Thread.current
230: }
231:
232: begin
233: return block.yield
234: rescue StandardError => xcept
235: log.bug(xcept)
236: return response500("Internal Error: #{xcept.message}")
237: ensure
238: @mutex.synchronize {
239: @running_updates.delete(mirrorid)
240: }
241: end
242: end
Write information of an update run to the "mirror_updates" table
# File lib/dsadmin/admind/mirror_manager.rb, line 317
317: def write_statistics(mirror, result, time_start, transfer_size, error_msg = nil)
318: stat = MirrorUpdate.new
319:
320: stat.mirror = mirror.id
321: stat.mirror_size = mirror.size
322: stat.transfer_size = transfer_size
323: stat.time_start = time_start
324: stat.duration = (sys.now - time_start).to_i
325: stat.result = result
326: stat.error_msg = error_msg unless((error_msg.nil?) || (error_msg == ''))
327: stat.save
328: end