Class Dsadmin::Admind::RequestProcessor
In: lib/dsadmin/admind/request_processor.rb
Parent: Object

The class coordinating all request processing, i.e. effectively the central "router" for requests.

Singleton

This class (via the process method) checks the validity of requests’ authentication credentials and passes them on to the proper AdmindController or to a different admind instance. It also keeps track of all requests that are currently being processed.

On the master instance it also supports broadcasting a request to all slaves (broadcast method).

Methods

Included Modules

Dsadmin::CoreClient Dsadmin::Contractor Singleton

Classes and Modules

Class Dsadmin::Admind::RequestProcessor::RequestInfo

Public Class methods

[Source]

     # File lib/dsadmin/admind/request_processor.rb, line 189
189:       def initialize
190:         @mutex   = Mutex.new
191:       end

Public Instance methods

Send a Request to all slaves (not to the master instance).

Returns:Hash of instanceid => response

[Source]

     # File lib/dsadmin/admind/request_processor.rb, line 89
 89:       def broadcast(controller, action)
 90:         requireTrue(cfg.master?)
 91:         
 92:         log.notice("Broadcasting /#{controller}/#{action} to all slaves")
 93:         
 94:         result  = Hash.new
 95:         threads = Array.new
 96:         
 97:         req = Request.new
 98:         req.controller = controller
 99:         req.action = action
100:         
101:         @instances.keys.each { |id|
102:           next if(id == cfg.instanceid)
103:           log.debug("  Broadcasting to #{id}")
104:           
105:           threads << spawnThread {
106:             result[id] = send_to(id, req)
107:             log.debug("   #{id} -> #{result[id].status}")
108:           }
109:         }
110:         
111:         threads.each { |cur| cur.join }
112:         
113:         return result
114:       end

Process the given request.

Automatically forwards it to the correct instance if needed, and logs to the "access" log. Threadsafe — specifically intended for use by multiple parallel threads.

[Source]

    # File lib/dsadmin/admind/request_processor.rb, line 59
59:       def process(request)
60:         res   = nil
61:         Thread.current[:request] = request
62:         
63:         begin
64:           res = @active ? processImpl(request) : Dsadmin::Response::TEMP_UNAVAILABLE
65:         rescue ArgumentError => xcept
66:           log.error(xcept)
67:           res =  Dsadmin::Response::FAILED
68:         rescue SecurityError => xcept
69:           log.warning("#{request.uri}: #{xcept.class.name}: #{xcept.message}")
70:           res = Dsadmin::Response::FORBIDDEN
71:         rescue StandardError => xcept
72:           log.bug(xcept)
73:           res = Dsadmin::Response::INTERNAL_ERROR
74:         rescue Exception => xcept
75:           log.bug(xcept) # gives us a backtrace by default
76:           res = Dsadmin::Response::INTERNAL_ERROR
77:         ensure
78:           sys.killChild(Thread.current)
79:           Thread.current[:request] = nil
80:         end
81:         
82:         return res
83:       end

Get a list of currently running processes (requests being processed).

Returns:Array of Hashes, one per active request

[Source]

     # File lib/dsadmin/admind/request_processor.rb, line 165
165:       def ps
166:         res = Array.new
167:         
168:         @mutex.synchronize {
169:           @process_table.each { |key, value|
170:             entry = Hash.new
171:             entry['uri']        = value.request.uri
172:             entry['user']       = value.request.username
173:             entry['internal']   = value.request.internal
174:             entry['time_start'] = value.time_start
175:             entry['local']      = value.local?
176:             res << entry
177:           }
178:         }
179:         
180:         return res
181:       end

Shut down request processing

cascade:If both this is set to true and we‘re on the master instance, this will also shut down all slaves
restart:If set to true and we‘re on the master instance, the slaves will be restarted instead of shut down.

[Source]

     # File lib/dsadmin/admind/request_processor.rb, line 124
124:       def shutdown(cascade = false, restart = false)
125:         # Make sure we don't acept any more requests. We don't wait for
126:         # requests to finish here -- that's for the later call
127:         @active = false
128:         
129:         # cascade the shutdown/restart to slaves if desired
130:         if(cfg.master? and cascade)
131:           action = restart ? 'restart' : 'shutdown'
132:           broadcast('local', action)
133:         end
134:         
135:         # Wait up to 10s for requests to finish
136:         waitReq(10)
137:         
138:         # Force all still-running requests to terminate (timeout 5s)
139:         kill(5)
140:         
141:         AdmindController.shutdown
142:       end

Initialize the request processing system.

It‘s perfectly fine to do multiple startup/shutdown cycles. Just don‘t nest them.

[Source]

     # File lib/dsadmin/admind/request_processor.rb, line 148
148:       def startup
149:         # for simple authentication between admind instances
150:         @secrets_in    = Hash.new
151:         @secrets_out   = Hash.new
152:         
153:         @process_table = Hash.new # Request.object_id => RequestInfo
154:         
155:         init_instances
156:         AdmindController.startup
157: 
158:         @active  = true
159:       end

Private Instance methods

Check if the admind-admind authentication is ok. Also properly sets the request‘s internal attribute.

Raises:SecurityError if the request is from another admind instance, but the supplied credentials (callerid/auth_token) do not verify that

[Source]

     # File lib/dsadmin/admind/request_processor.rb, line 277
277:       def auth_internal(request)
278:         request.internal = false # assumption to be revised later.
279:         secret = request.callerid ? secret_from(request.callerid) : "not " + request.auth_token.to_s
280: 
281:         if(request.from_outside)
282:           if(request.callerid and (request.callerid != ''))
283:             if(request.callerid && (request.auth_token == secret))
284:               # The request poses as coming from another admind instance (the +callerid+ is set) and
285:               # the authentication credentials verify that. But it's only a real "internal" request
286:               # if no +username+ is set
287:               request.internal = (request.username == nil)
288:             else
289:               log.error("Internal auth failed for callerid '#{request.callerid}', from host '#{request.remote_host}', request '#{request.uri}'")
290:               sleep(3) unless TEST_MODE # make an attacker wait for the answer
291:               raise SecurityError.new("Forbidden!")
292:             end
293:           end
294:         else
295:           # The request was created in this instance. If there is a username attached to it, it's 
296:           # just a "part" of a user request and thus does not qualify as "internal". Otherwise it's
297:           # a real "internal" request
298:           request.internal = (request.username == nil)
299:         end
300:         
301:         return false # authentication failed
302:       end

Check if the supplied username/password combo is valid. If no username is set in the request (like for purely internal requests), nothing is done and the method returns true (for "success")

Returns:(boolean) whether the credentials are ok (or not given)

[Source]

     # File lib/dsadmin/admind/request_processor.rb, line 260
260:       def auth_user(request)
261:         if(request.username == nil) # no username specified => nothing to do
262:           return true
263:         end
264:       
265:         # Find the user and check the password
266:           
267:         request.user = User.find_by_credentials(request.username, request.password)
268:         return (request.user != nil)
269:       end

[Source]

     # File lib/dsadmin/admind/request_processor.rb, line 325
325:       def build_secret(from_id, dest_id)
326:         secret_s = cfg.get("globaldefs/internal_secret", String)
327:         tmp = from_id + secret_s + dest_id
328:         return Digest::MD5.hexdigest(tmp)
329:       end

[Source]

     # File lib/dsadmin/admind/request_processor.rb, line 332
332:       def init_instances
333:         @instances = Hash.new
334:         cfg.root.admind.each { |key, cfgsect|
335:           listen = cfgsect.get('net/listen', Array)
336:           
337:           if(listen.empty?)
338:             raise Dsadmin::BadConfigurationError("Adming '#{key}' does not have a <listen> section!")
339:           end
340:           
341:           listensect = listen.find { |cur| (cur.host != "127.0.0.1") && (cur.host != "localhost") }
342:           
343:           if(listensect == nil)
344:             log.warning("Could not find a non-localhost listen address for instance '#{key}' -- falling back to localhost")
345:             listensect = listen[0]
346:           end
347:             
348:           host = listensect.host
349:           port = listensect.port.to_i
350:           instance = Dsadmin::BackendInterface.new(host, port, cfg.instanceid, secret_to(key))
351:           @instances[key] = instance
352:         }
353:       end

Kill all running exec()’d processes, first via SIGTERM, then with SIGKILL.

aTimeout:Total amount of seconds to wait for the processes to finish. After each signal, we wait for up to half that time.

[Source]

     # File lib/dsadmin/admind/request_processor.rb, line 407
407:       def kill(aTimeout = 5)
408:         killall('SIGTERM')
409:         waitReq(aTimeout / 2.0) # /
410:         
411:         killall('SIGKILL')
412:         waitReq(aTimeout / 2.0) # /
413:       end

Send the given signal to all running exec()’d processes

[Source]

     # File lib/dsadmin/admind/request_processor.rb, line 417
417:       def killall(signal)
418:         @mutex.synchronize {
419:           @process_table.each { |id, info|
420:             pid = info.thread[:exec_pid]
421:             
422:             if(pid != nil)
423:               log.notice("  Sending #{signal} to pid #{pid} (Request: #{info.request.uri})")
424:               sys.runAs('root') {
425:                 Process.kill(signal, pid)
426:               }
427:             end
428:           }
429:         }
430:       end

The heart of this class.

Technically a part of process, but moved into a seperate method for readability.

Raises:SecurityError if access to the desired action is forbidden (typically if internal authentication fails)

[Source]

     # File lib/dsadmin/admind/request_processor.rb, line 201
201:       def processImpl(request)
202:         auth_internal(request)
203:         
204:         controller = AdmindController.controller_for(request.controller)
205:         return Dsadmin::Response::NOT_FOUND if(controller.nil?)
206:         
207:         iid = controller.host_for(request.action)
208:         return Dsadmin::Response::NOT_FOUND if(iid.nil?)
209:         
210:         local = (iid == cfg.instanceid)
211:         
212:         ps_add(request, local)
213:         
214:         begin
215:           if(iid != cfg.instanceid) # request has to be forwarded
216:             if(request.forwarded)
217:               log.bug("Forwarding loop on '#{request.uri}': #{request.callerid} -> #{cfg.instanceid} -> #{iid}")
218:               return Dsadmin::Response::INTERNAL_ERROR
219:             else
220:               return send_to(iid, request)
221:             end
222:           end
223:           
224:           # request will be executed in this instance
225:           
226:           unless auth_user(request) 
227:             log.warning("Auth for user '#{request.username}' from host '#{request.remote_host}' on '#{request.uri}' failed")
228:             return Dsadmin::Response::UNAUTHORIZED          
229:           end
230:           return controller.process(request)
231:         ensure
232:           ps_remove(request)
233:         end
234:       end

Add entry to @process_table

[Source]

     # File lib/dsadmin/admind/request_processor.rb, line 357
357:       def ps_add(request, local)
358:         ri = RequestInfo.new
359:         ri.request    = request
360:         ri.time_start = sys.now
361:         ri.local      = local
362:         ri.thread     = Thread.current
363:         
364:         @mutex.synchronize {
365:           @process_table[request.object_id] = ri
366:         }
367:       end

Remove entry from @process_table

[Source]

     # File lib/dsadmin/admind/request_processor.rb, line 371
371:       def ps_remove(request)
372:         @mutex.synchronize {
373:           @process_table.delete(request.object_id)
374:         }
375:       end

[Source]

     # File lib/dsadmin/admind/request_processor.rb, line 305
305:       def secret_from(from_id)
306:         res = @secrets_in[from_id]
307:         return res if res
308:         
309:         res = build_secret(from_id, cfg.instanceid)
310:         @secrets_in[from_id] = res
311:         return res
312:       end

[Source]

     # File lib/dsadmin/admind/request_processor.rb, line 315
315:       def secret_to(dest_id)
316:         res = @secrets_out[dest_id]
317:         return res if res
318:         
319:         res = build_secret(cfg.instanceid, dest_id)
320:         @secrets_out[dest_id] = res
321:         return res
322:       end

Forward a request to a different instance

[Source]

     # File lib/dsadmin/admind/request_processor.rb, line 238
238:       def send_to(instanceid, request)
239:         instance = @instances[instanceid]
240:         #pp instanceid, @instances
241:         assertNotNil(instance)
242:         
243:         log.debug("Forwarding request '#{request.uri}' to #{instanceid}")
244:         begin
245:           request.callerid   = cfg.instanceid
246:           request.auth_token = secret_to(instanceid)
247:           return instance.call(request)
248:         rescue StandardError => xcept
249:           log.bug(xcept)
250:           return Dsadmin::Response::INTERNAL_ERROR # Internal communication failure
251:         end
252:       end

Wait for all requests to finish, up to the given timeout (seconds)

[Source]

     # File lib/dsadmin/admind/request_processor.rb, line 380
380:       def waitReq(aTimeout = 5)
381:         log.debug("  #{cfg.instanceid}: Waiting for Requests to finish (timeout #{aTimeout}s)")
382:         
383:         begin
384:           timeout(aTimeout) do
385:             # Allow new requests to be added to the process table
386:             # if we interrupted that
387:             Thread.pass
388:             
389:             while(@process_table.size > 0)
390:               sleep(0.5)
391:             end
392:           end
393:           
394:           log.debug("  #{cfg.instanceid}: Waiting for Requests to finish -- done")
395:           return true
396:         rescue TimeoutError
397:           log.notice("  #{cfg.instanceid}: Waiting for Requests to finish -- timed out")
398:           return false
399:         end
400:       end

[Validate]