Class Rddb::Worker::RindaWorker
In: lib/rddb/worker/rinda_worker.rb
Parent: Object

Worker that pulls tasks from a Rinda tuple space.

Methods

new   process   run   tuple_space  

Public Class methods

Initialize with the specified options

[Source]

    # File lib/rddb/worker/rinda_worker.rb, line 11
11:       def initialize(options={})
12:       end

Process the specified tasks.

[Source]

    # File lib/rddb/worker/rinda_worker.rb, line 15
15:       def self.process(tasks)
16:         returning Array.new do |results|
17:           tasks.each do |task|
18:             tuple_space.write(['task', DRb.uri, task])
19:           end
20:           
21:           tasks.each do |task|
22:             puts "taking result from tuple space for partition '#{task.partition}'"
23:             tuple = tuple_space.take(['result', DRb.uri, task.partition, nil])
24:             results << tuple[3]
25:           end
26:         end
27:       end

Public Instance methods

Run the worker service.

[Source]

    # File lib/rddb/worker/rinda_worker.rb, line 30
30:       def run
31:         Daemons.run_proc('worker', :multiple => true, :log_output => true) do
32:           begin
33:             DRb.start_service
34:             ring_server = Rinda::RingFinger.primary
35: 
36:             ts = ring_server.read([:name, :TupleSpace, nil, nil])[2]
37:             ts = Rinda::TupleSpaceProxy.new ts
38: 
39:             # Wait for tasks, pull them off and run them
40:             puts "executing worker loop"
41:             loop do
42:               begin
43:                 tuple = ts.take(['task', nil, nil])
44:                 task = tuple[2]
45:                 puts "processing partition #{task.partition}"
46:                 puts "using datastore #{task.datastore_class}"
47: 
48:                 if task.respond_to?(:run)
49:                   result = task.run
50:                   puts "writing result to tuple space"
51:                   ts.write(['result', tuple[1], task.task_id, result])
52:                 else
53:                   puts "Task is not a task: #{task.class}"
54:                 end
55:               rescue Errno::ECONNREFUSED
56:                 puts "Ring server has gone down, stopping worker."
57:                 break
58:               rescue => e
59:                 puts "An error occured: #{e}"
60:                 puts e.backtrace.join("\n")
61:               end
62:             end
63:           rescue RuntimeError
64:             puts "Ring server not found, are you sure the ring server is running?"
65:           end
66:         end
67:       end

Get the tuple space for distributed processing

[Source]

    # File lib/rddb/worker/rinda_worker.rb, line 70
70:       def tuple_space
71:         unless @tuple_space 
72:           DRb.start_service
73:           ring_server = Rinda::RingFinger.primary
74: 
75:           ts = ring_server.read([:name, :TupleSpace, nil, nil])[2]
76:           @tuple_space = Rinda::TupleSpaceProxy.new ts
77:         end
78:         @tuple_space
79:       end

[Validate]