Class Rddb::DocumentStore::S3DocumentStore
In: lib/rddb/document_store/s3_document_store.rb
Parent: Base

DocumentStore that stores documents in Amazon S3.

Methods

Included Modules

AWS::S3 Enumerable

Public Class methods

Initialize the datastore.

Options:

  • :basedir: The base directory for data storage
  • :partition_strategy: A Proc that defines partitioning
  • :cache: Provide a cache implementation (such as a Hash) to enable caching. If not specified then caching will not be used. Note that caching only works on direct ID lookups, not on queries. Queries should use materialized views for performance.

[Source]

    # File lib/rddb/document_store/s3_document_store.rb, line 18
18:       def initialize(bucket_name, options={})
19:         AWS::S3::Base.establish_connection!(options[:s3])
20:         @bucket_name = bucket_name
21:         @options = options
22:         @options[:basedir] ||= 'data'
23:         load_index
24:       end

Public Instance methods

Get the bucket name.

[Source]

    # File lib/rddb/document_store/s3_document_store.rb, line 27
27:       def bucket_name
28:         returning @bucket_name do |name|
29:           begin
30:             Bucket.create(name)
31:           rescue => e
32:             #puts "Error creating bucket: #{e}"
33:           end
34:         end
35:       end

The number of documents in the datastore

[Source]

    # File lib/rddb/document_store/s3_document_store.rb, line 90
90:       def count
91:         index.length
92:       end

Delete the document at the given path.

[Source]

    # File lib/rddb/document_store/s3_document_store.rb, line 75
75:       def delete(id)
76:         raise RuntimeError, "File datastore does not support document deletion"
77:       end

Yield each document from the datastore to the given block

[Source]

     # File lib/rddb/document_store/s3_document_store.rb, line 95
 95:       def each(partition=nil, &block)
 96:         if partition.nil?
 97:           index.each do |id, info|
 98:             yield find(id)
 99:           end
100:         else
101:           S3Object.stream(File.join(basedir, p), bucket_name) do |f|
102:             until f.eof?
103:               yield Marshal.load(f)
104:             end
105:           end
106:         end
107:       end

Yield each partition name

[Source]

     # File lib/rddb/document_store/s3_document_store.rb, line 110
110:       def each_partition(&block)
111:         Bucket.objects(bucket_name, :prefix => basedir) do |o|
112:           f = File.basename(o.key)
113:           next if f == 'index'
114:           yield f
115:         end
116:       end

Return true if the document exists.

[Source]

    # File lib/rddb/document_store/s3_document_store.rb, line 80
80:       def exists?(id)
81:         !index[id.to_s].nil?
82:       end

Find the document for the given id.

[Source]

    # File lib/rddb/document_store/s3_document_store.rb, line 38
38:       def find(id)
39:         id = id.to_s
40:         return nil unless index[id]
41:         return cache[id] if cache? && cache.key?(id)
42:         p = index[id][:partition]
43:         v = S3Object.value(File.join(basedir, p), bucket_name)
44:         data = v[index[id][:offset]..(index[id][:offset] + index[id][:length])]
45:         #puts "Data: #{data}"
46:         document = Marshal.load(data)
47:         cache[id] = document if cache?
48:         document
49:       end

Store the document.

[Source]

    # File lib/rddb/document_store/s3_document_store.rb, line 52
52:       def store(document)
53:         id = document.id.to_s
54:         p = partition_for(document)
55:         index[id] = {:partition => p}
56:         if S3Object.exists?(File.join(basedir, p), bucket_name)
57:           v = S3Object.value(File.join(basedir, p), bucket_name)
58:           index[id][:offset] = v.length
59:           s = Marshal.dump(document)
60:           #puts "Marshalled value: #{s}"
61:           index[id][:length] = s.length
62:           S3Object.store(File.join(basedir, p), v + s, bucket_name)
63:         else
64:           s = Marshal.dump(document)
65:           #puts "Marshalled value: #{s}"
66:           S3Object.store(File.join(basedir, p), s, bucket_name)
67:           index[id][:offset] = 0
68:           index[id][:length] = s.length
69:         end
70:         cache[id] = document if cache?
71:         document
72:       end

Returns true

[Source]

     # File lib/rddb/document_store/s3_document_store.rb, line 119
119:       def supports_partitioning?
120:         true
121:       end

Trigger indexes to be stored if necessary.

[Source]

    # File lib/rddb/document_store/s3_document_store.rb, line 85
85:       def write_indexes
86:         S3Object.store(File.join(basedir, 'index'),  Marshal.dump(index), bucket_name)
87:       end

[Validate]