| Class | Rddb::Worker::RindaWorker |
| In: |
lib/rddb/worker/rinda_worker.rb
|
| Parent: | Object |
Worker that pulls tasks from a Rinda tuple space.
Initialize with the specified options
# File lib/rddb/worker/rinda_worker.rb, line 11
11: def initialize(options={})
12: end
Process the specified tasks.
# File lib/rddb/worker/rinda_worker.rb, line 15
15: def self.process(tasks)
16: returning Array.new do |results|
17: tasks.each do |task|
18: tuple_space.write(['task', DRb.uri, task])
19: end
20:
21: tasks.each do |task|
22: puts "taking result from tuple space for partition '#{task.partition}'"
23: tuple = tuple_space.take(['result', DRb.uri, task.partition, nil])
24: results << tuple[3]
25: end
26: end
27: end
Run the worker service.
# File lib/rddb/worker/rinda_worker.rb, line 30
30: def run
31: Daemons.run_proc('worker', :multiple => true, :log_output => true) do
32: begin
33: DRb.start_service
34: ring_server = Rinda::RingFinger.primary
35:
36: ts = ring_server.read([:name, :TupleSpace, nil, nil])[2]
37: ts = Rinda::TupleSpaceProxy.new ts
38:
39: # Wait for tasks, pull them off and run them
40: puts "executing worker loop"
41: loop do
42: begin
43: tuple = ts.take(['task', nil, nil])
44: task = tuple[2]
45: puts "processing partition #{task.partition}"
46: puts "using datastore #{task.datastore_class}"
47:
48: if task.respond_to?(:run)
49: result = task.run
50: puts "writing result to tuple space"
51: ts.write(['result', tuple[1], task.task_id, result])
52: else
53: puts "Task is not a task: #{task.class}"
54: end
55: rescue Errno::ECONNREFUSED
56: puts "Ring server has gone down, stopping worker."
57: break
58: rescue => e
59: puts "An error occured: #{e}"
60: puts e.backtrace.join("\n")
61: end
62: end
63: rescue RuntimeError
64: puts "Ring server not found, are you sure the ring server is running?"
65: end
66: end
67: end
Get the tuple space for distributed processing
# File lib/rddb/worker/rinda_worker.rb, line 70
70: def tuple_space
71: unless @tuple_space
72: DRb.start_service
73: ring_server = Rinda::RingFinger.primary
74:
75: ts = ring_server.read([:name, :TupleSpace, nil, nil])[2]
76: @tuple_space = Rinda::TupleSpaceProxy.new ts
77: end
78: @tuple_space
79: end