Class Rddb::Worker::Ec2Worker
In: lib/rddb/worker/ec2_worker.rb
Parent: Object

Instances of the Ec2Worker class are started up on N EC2 instances and listen for notifications over SQS to begin processing.

Methods

create_message   new   process   queue   run  

Attributes

queue_name  [R]  The queue name

Public Class methods

private

[Source]

    # File lib/rddb/worker/ec2_worker.rb, line 47
47:       def self.create_message(task)
48:         Builder::XmlMarkup.new.message { |x|
49:           x.task_id(task.task_id)
50:           x.partition(task.partition)
51:           x.view_name(task.view_name)
52:           x.document_store(task.document_store.class.name)
53:           task.args.sort { |a, b| a[0].to_s <=> b[0].to_s }.each do |name, value|
54:             x.arg {
55:               x.name(name.to_s)
56:               x.value(value.to_s)
57:             }
58:           end
59:         }
60:       end

Initialize the worker with the specified options

Options:

  • :sqs: The SQS configuration options

:sqs options:

  • :queue_name: The queue name
  • :access_key_id: The access key ID
  • :secret_access_key: The secret access key

[Source]

    # File lib/rddb/worker/ec2_worker.rb, line 18
18:       def initialize(options={})
19:         @options = options
20:         @queue_name = options[:sqs][:queue_name] || 'rddb_queue'
21:         
22:         SQS.access_key_id = options[:sqs][:credentials][:access_key_id]
23:         SQS.secret_access_key = options[:sqs][:credentials][:secret_access_key]
24:       end

Process the tasks with the worker.

[Source]

    # File lib/rddb/worker/ec2_worker.rb, line 27
27:       def self.process(tasks)
28:         q = queue
29:         tasks.each do |task|
30:           q.send_message(create_message(task))
31:         end
32:       end

[Source]

    # File lib/rddb/worker/ec2_worker.rb, line 62
62:       def self.queue
63:         begin
64:           SQS.get_queue(options[:sqs][:queue_name])
65:         rescue SQS::UnavailableQueue
66:           SQS.create_queue(options[:sqs][:queue_name])
67:         end
68:       end

Public Instance methods

Run the worker service

[Source]

    # File lib/rddb/worker/ec2_worker.rb, line 35
35:       def run
36:         Daemons.run_proc('worker', :multiple => true, :log_output => true) do
37:           q = Ec2Worker.queue
38:           while true do
39:             q.peek_message do |message|
40:               puts "message received: #{message.inspect}"
41:             end
42:           end
43:         end
44:       end

[Validate]