cassandra_store.rb 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. # encoding: UTF-8
  2. require 'cassandra'
  3. require 'multi_json'
  4. require 'polipus'
  5. require 'thread'
  6. require 'zlib'
  7. module Polipus
  8. module Storage
  9. class CassandraStore < Base
  10. # CassandraStore wants to persists documents (please ignore the jargon
  11. # inherited from MongoDB) like the following JSON-ish entry:
  12. #
  13. # > db['linkedin-refresh'].find({})
  14. #
  15. # {
  16. # "_id" : ObjectId("...."),
  17. # "url" : "https://www.awesome.org/meh",
  18. # "code" : 200,
  19. # "depth" : 0,
  20. # "referer" : "",
  21. # "redirect_to" : "",
  22. # "response_time" : 1313,
  23. # "fetched" : true,
  24. # "user_data" :
  25. # {
  26. # "imported" : false,
  27. # "is_developer" : false,
  28. # "last_modified" : null
  29. # },
  30. # "fetched_at" : 1434977757,
  31. # "error" : "",
  32. # "uuid" : "4ddce293532ea2454356a4210e61c363"
  33. # }
  34. attr_accessor :cluster, :keyspace, :table
  35. BINARY_FIELDS = %w(body headers user_data)
  36. def initialize(options = {})
  37. @cluster = options[:cluster]
  38. @keyspace = options[:keyspace]
  39. @table = options[:table]
  40. @except = options[:except] || []
  41. @semaphore = Mutex.new
  42. end
  43. # {
  44. # 'url' => @url.to_s,
  45. # 'headers' => Marshal.dump(@headers),
  46. # 'body' => @body,
  47. # 'links' => links.map(&:to_s),
  48. # 'code' => @code,
  49. # 'depth' => @depth,
  50. # 'referer' => @referer.to_s,
  51. # 'redirect_to' => @redirect_to.to_s,
  52. # 'response_time' => @response_time,
  53. # 'fetched' => @fetched,
  54. # 'user_data' => @user_data.nil? ? {} : @user_data.marshal_dump,
  55. # 'fetched_at' => @fetched_at,
  56. # 'error' => @error.to_s
  57. # }
  58. def add(page)
  59. @semaphore.synchronize do
  60. table_ = [keyspace, table].compact.join '.'
  61. uuid_ = uuid(page)
  62. obj = page.to_hash
  63. Array(@except).each { |e| obj.delete(e.to_s) }
  64. begin
  65. BINARY_FIELDS.each do |field|
  66. obj[field] = obj[field].to_s.encode('UTF-8', {
  67. invalid: :replace,
  68. undef: :replace,
  69. replace: '?' }) if can_be_converted?(obj[field])
  70. # ec = Encoding::Converter.new("ASCII-8BIT", "UTF-8")
  71. # obj[field] = ec.convert(obj[field]) if can_be_converted?(obj[field])
  72. # obj[field] = obj[field].force_encoding('ASCII-8BIT').force_encoding('UTF-8') if can_be_converted?(obj[field])
  73. end
  74. json = MultiJson.encode(obj)
  75. url = obj.fetch('url', nil)
  76. code = obj.fetch('code', nil)
  77. depth = obj.fetch('depth', nil)
  78. referer = obj.fetch('referer', nil)
  79. redirectto = obj.fetch('redirect_to', nil)
  80. response_time = obj.fetch('response_time', nil)
  81. fetched = obj.fetch('fetched', nil)
  82. error = obj.fetch('error', nil)
  83. page = Zlib::Deflate.deflate(json)
  84. if obj.has_key?('user_data') && !obj['user_data'].empty?
  85. user_data = MultiJson.encode(obj['user_data'])
  86. else
  87. user_data = nil
  88. end
  89. value = obj.fetch('fetched_at', nil)
  90. fetched_at = case value
  91. when Fixnum
  92. Time.at(value)
  93. when String
  94. Time.parse(value)
  95. else
  96. nil
  97. end
  98. column_names = %w[ uuid url code depth referer redirect_to response_time fetched user_data fetched_at error page ]
  99. values_placeholders = column_names.map{|_| '?'}.join(',')
  100. statement = "INSERT INTO #{table_} ( #{column_names.join(',')} ) VALUES (#{values_placeholders});"
  101. session.execute(
  102. session.prepare(statement),
  103. arguments: [
  104. uuid_,
  105. url,
  106. code,
  107. depth,
  108. referer,
  109. redirectto,
  110. response_time,
  111. fetched,
  112. user_data,
  113. fetched_at,
  114. error,
  115. page
  116. ])
  117. rescue Encoding::UndefinedConversionError
  118. puts $!.error_char.dump
  119. puts $!.error_char.encoding
  120. end
  121. uuid_
  122. end
  123. end
  124. def clear
  125. table_ = [keyspace, table].compact.join '.'
  126. statement = "DROP TABLE #{table_};"
  127. session.execute statement
  128. end
  129. # TBH I'm not sure if being "defensive" and returning 0/nil in case
  130. # the results is_empty? ... I'm leaving (now) the code simple and noisy
  131. # if something went wrong in the COUNT.
  132. def count
  133. table_ = [keyspace, table].compact.join '.'
  134. statement = "SELECT COUNT (*) FROM #{table_} ;"
  135. result = session.execute(statement)
  136. result.first['count']
  137. end
  138. def each
  139. table_ = [keyspace, table].compact.join '.'
  140. statement = "SELECT * FROM #{table_};"
  141. session.execute(statement).each do |data|
  142. page = load_page(data) unless data.nil?
  143. yield data['uuid'], page
  144. end
  145. end
  146. def exists?(page)
  147. @semaphore.synchronize do
  148. table_ = [keyspace, table].compact.join '.'
  149. statement = "SELECT uuid FROM #{table_} WHERE uuid = ? LIMIT 1;"
  150. results = session.execute(session.prepare(statement),
  151. arguments: [uuid(page)])
  152. !results.first.nil?
  153. end
  154. end
  155. def get(page)
  156. @semaphore.synchronize do
  157. table_ = [keyspace, table].compact.join '.'
  158. statement = "SELECT * FROM #{table_} WHERE uuid = ? LIMIT 1;"
  159. results = session.execute(session.prepare(statement),
  160. arguments: [uuid(page)])
  161. data = results.first
  162. load_page(data) unless data.nil?
  163. end
  164. end
  165. def keyspace!(replication = nil, durable_writes = true)
  166. replication ||= "{'class': 'SimpleStrategy', 'replication_factor': '3'}"
  167. statement = "CREATE KEYSPACE IF NOT EXISTS #{keyspace} WITH replication = #{replication} AND durable_writes = #{durable_writes};"
  168. cluster.connect.execute statement
  169. end
  170. def remove(page)
  171. @semaphore.synchronize do
  172. table_ = [keyspace, table].compact.join '.'
  173. statement = "DELETE FROM #{table_} WHERE uuid = ?;"
  174. session.execute(session.prepare(statement),
  175. arguments: [uuid(page)])
  176. true
  177. end
  178. end
  179. def session
  180. @session ||= @cluster.connect(keyspace)
  181. end
  182. def table!(properties = nil)
  183. table_ = [keyspace, table].compact.join '.'
  184. def_ = "CREATE TABLE IF NOT EXISTS #{table_}
  185. (
  186. uuid TEXT PRIMARY KEY,
  187. url TEXT,
  188. code INT,
  189. depth INT,
  190. referer TEXT,
  191. redirect_to TEXT,
  192. response_time BIGINT,
  193. fetched BOOLEAN,
  194. user_data TEXT,
  195. fetched_at TIMESTAMP,
  196. error TEXT,
  197. page BLOB
  198. )"
  199. props = properties.to_a.join(' AND ')
  200. statement = props.empty? ? "#{def_};" : "#{def_} WITH #{props};"
  201. session.execute statement
  202. end
  203. def load_page(data)
  204. json = Zlib::Inflate.inflate(data['page'])
  205. hash = MultiJson.decode(json)
  206. page = Page.from_hash(hash)
  207. page.fetched_at = 0 if page.fetched_at.nil?
  208. page
  209. end
  210. private
  211. def can_be_converted?(field)
  212. !field.nil? && field.is_a?(String) && !field.empty?
  213. end
  214. end
  215. end
  216. end