| Class | Rddb::Worker::Ec2Worker |
| In: |
lib/rddb/worker/ec2_worker.rb
|
| Parent: | Object |
Instances of the Ec2Worker class are started up on N EC2 instances and listen for notifications over SQS to begin processing.
| queue_name | [R] | The queue name |
private
# File lib/rddb/worker/ec2_worker.rb, line 47
47: def self.create_message(task)
48: Builder::XmlMarkup.new.message { |x|
49: x.task_id(task.task_id)
50: x.partition(task.partition)
51: x.view_name(task.view_name)
52: x.document_store(task.document_store.class.name)
53: task.args.sort { |a, b| a[0].to_s <=> b[0].to_s }.each do |name, value|
54: x.arg {
55: x.name(name.to_s)
56: x.value(value.to_s)
57: }
58: end
59: }
60: end
Initialize the worker with the specified options
Options:
:sqs options:
# File lib/rddb/worker/ec2_worker.rb, line 18
18: def initialize(options={})
19: @options = options
20: @queue_name = options[:sqs][:queue_name] || 'rddb_queue'
21:
22: SQS.access_key_id = options[:sqs][:credentials][:access_key_id]
23: SQS.secret_access_key = options[:sqs][:credentials][:secret_access_key]
24: end
Process the tasks with the worker.
# File lib/rddb/worker/ec2_worker.rb, line 27
27: def self.process(tasks)
28: q = queue
29: tasks.each do |task|
30: q.send_message(create_message(task))
31: end
32: end
# File lib/rddb/worker/ec2_worker.rb, line 62
62: def self.queue
63: begin
64: SQS.get_queue(options[:sqs][:queue_name])
65: rescue SQS::UnavailableQueue
66: SQS.create_queue(options[:sqs][:queue_name])
67: end
68: end
Run the worker service
# File lib/rddb/worker/ec2_worker.rb, line 35
35: def run
36: Daemons.run_proc('worker', :multiple => true, :log_output => true) do
37: q = Ec2Worker.queue
38: while true do
39: q.peek_message do |message|
40: puts "message received: #{message.inspect}"
41: end
42: end
43: end
44: end