cassandra_queue.rb 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. # encoding: UTF-8
  2. require 'cassandra'
  3. require 'polipus'
  4. module Polipus
  5. module QueueOverflow
  6. class CassandraQueue
  7. # CassandraQueue wants to persists documents (please, still ignore the
  8. # jargon inherited from Mongo) like the following JSON-ish entry.
  9. #
  10. # There is no superclass here but I've in mind the interface implicitly
  11. # defined by Polipus::QueueOverflow::DevNullQueue that, more or less has:
  12. #
  13. # def initialize
  14. # def length
  15. # def empty?
  16. # def clear
  17. # def push(_data)
  18. # def pop(_ = false)
  19. #
  20. # Taking some data from our backend.production.*****.com/polipus
  21. # I found:
  22. #
  23. # mongos> db.getCollectionNames()
  24. # [
  25. # "data-com-companies",
  26. # "data_com_companies",
  27. # "googleplus",
  28. # "linkedin",
  29. # "linkedin-companies",
  30. # "linkedin_companies_parsed",
  31. # "linkedin_jobs",
  32. # "linkedin_jobs_parsed",
  33. # "linkedin_pages_errors",
  34. # "polipus_q_overflow_data-com-companies_queue_overflow",
  35. # "polipus_q_overflow_data_com_companies_queue_overflow",
  36. # "polipus_q_overflow_googleplus_queue_overflow",
  37. # "polipus_q_overflow_linkedin-companies_queue_overflow",
  38. # "polipus_q_overflow_linkedin_jobs_queue_overflow",
  39. # "polipus_q_overflow_linkedin_jobs_queue_overflow_old",
  40. # "polipus_q_overflow_linkedin_refresh_queue_overflow",
  41. # "system.indexes"
  42. # ]
  43. #
  44. # mongos> db.getCollection("polipus_q_overflow_linkedin_jobs_queue_overflow").find().limit(1)
  45. # {
  46. # "_id" : ObjectId("54506b98e3d55b20c40b32d3"),
  47. # "payload" : "{\"url\":\"https://www.linkedin.com/job/product-designer-jobs/?page_num=7&trk=jserp_pagination_next\",\"depth\":6,\"referer\":\"https://www.linkedin.com/job/product-designer-jobs/?page_num=6&trk=jserp_pagination_6\",\"fetched\":false}"
  48. # }
  49. #
  50. # mongos> db.polipus_q_overflow_linkedin_refresh_queue_overflow.find().limit(10)
  51. # {
  52. # "_id" : ObjectId("544072b6e3d55b0db7000001"),
  53. # "payload" : "{\"url\":\"http://www.linkedin.com/in/*****\",\"depth\":0,\"fetched\":false}"
  54. # }
  55. #
  56. # We also assume this MonkeyPatch:
  57. # Polipus::QueueOverflow.cassandra_queue(namespace, options = {})
  58. # that returns instances of this class.
  59. attr_accessor :cluster, :keyspace, :table
  60. # There is a validation enforced to `:keyspace` and `:table` because
  61. # Cassandra is not happy when a keyspace or a table name contains an
  62. # hyphen.
  63. def initialize(options = {})
  64. raise ArgumentError unless options_are_valid?(options)
  65. @cluster = options[:cluster]
  66. @keyspace = options[:keyspace].gsub("-", "_")
  67. @table = options[:table].gsub("-", "_")
  68. @semaphore = Mutex.new
  69. @options = options
  70. @timeuuid_generator = Cassandra::Uuid::Generator.new
  71. @logger = @options[:logger] ||= Logger.new(STDOUT).tap { |l| l.level = Logger::INFO }
  72. end
  73. # Length aka Size aka Count is supported in Cassandra... like your POSQL
  74. # you can COUNT.
  75. #
  76. # SELECT COUNT (*) FROM keyspace.table_name;
  77. #
  78. # TBH I'm not sure if being "defensive" and returning 0/nil in case
  79. # the results is_empty? ... I'm leaving (now) the code simple and noisy
  80. # if something went wrong in the COUNT.
  81. def length
  82. table_ = [keyspace, table].compact.join '.'
  83. statement = "SELECT COUNT (*) FROM #{table_} ;"
  84. result = session.execute(statement)
  85. result.first['count']
  86. end
  87. # Return true if the table has no rows.
  88. # This is achieved with a 'SELECT WITH LIMIT 1' query.
  89. def empty?
  90. return get.first.nil?
  91. end
  92. # Clear is a fancy name for a DROP TABLE IF EXISTS <table_>.
  93. def clear
  94. table_ = [keyspace, table].compact.join '.'
  95. statement = "DROP TABLE IF EXISTS #{table_} ;"
  96. session.execute(statement)
  97. end
  98. # push is your the "write into Cassandra" method.
  99. def push(data)
  100. return nil if data.nil?
  101. obj = MultiJson.decode(data)
  102. table_ = [keyspace, table].compact.join('.')
  103. queue_name = @keyspace
  104. created_at = @timeuuid_generator.now
  105. begin
  106. @semaphore.synchronize do
  107. if obj.has_key?('payload') && !obj['payload'].empty?
  108. payload = MultiJson.encode(obj['payload'])
  109. else
  110. payload = nil
  111. end
  112. column_names = %w[ queue_name created_at payload ]
  113. values_placeholders = column_names.map{|_| '?'}.join(',')
  114. statement = "INSERT INTO #{table_} ( #{column_names.join(',')} ) VALUES (#{values_placeholders});"
  115. session.execute(
  116. session.prepare(statement),
  117. arguments: [
  118. queue_name,
  119. created_at,
  120. payload
  121. ])
  122. end
  123. rescue Encoding::UndefinedConversionError
  124. puts $!.error_char.dump
  125. puts $!.error_char.encoding
  126. end
  127. @logger.debug { "Writing this entry [#{[queue_name, created_at].to_s}]" }
  128. [queue_name, created_at].to_s
  129. end
  130. # Pop removes 'n' entries from the overflow table (treated as a queue)
  131. # and returns a paged result.
  132. # results.class #=> Cassandra::Results::Paged
  133. #
  134. # Polipus is expecting a String, that will be JSONparsed with the purpose
  135. # to build a
  136. def pop(n = 1)
  137. # A recap: pop should remove oldest N messages and return to the caller.
  138. #
  139. # Let's see how this queue is implemented.
  140. # In redis, messages are LPUSH-ed:
  141. #
  142. # 4 - 3 - 2 - 1 --> REDIS
  143. # 4 - 3 - 2 --> REDIS
  144. # 4 - 3 --> REDIS
  145. # 4 --> REDIS
  146. #
  147. # Then, in the fast_dequeue, are RPOP-ped:
  148. #
  149. # REDIS --> 1
  150. # REDIS --> 2 - 1
  151. # REDIS --> 3 - 2 - 1
  152. # REDIS --> 4 - 3 - 2 - 1
  153. #
  154. # Then, are received in this order:
  155. # [1] -> TimeUUID(1) = ...
  156. # [2] -> TimeUUID(1) = ...
  157. # [3] -> TimeUUID(1) = ...
  158. # [4] -> TimeUUID(1) = ...
  159. #
  160. # As you can see below, are ORDER BY (created_at ASC)... that means
  161. # "olders first". When using 'LIMIT n' in a query, you get the 'n'
  162. # olders entries.
  163. #
  164. # cqlsh> SELECT * FROM polipus_queue_overflow_linkedin.linkedin_overflow ;
  165. #
  166. # queue_name | created_at | payload
  167. # ---------------------------------+--------------------------------------+---------
  168. # polipus_queue_overflow_linkedin | 4632d49c-1c04-11e5-844b-0b314c777502 | "1"
  169. # polipus_queue_overflow_linkedin | 46339f8a-1c04-11e5-844b-0b314c777502 | "2"
  170. # polipus_queue_overflow_linkedin | 46349962-1c04-11e5-844b-0b314c777502 | "3"
  171. # polipus_queue_overflow_linkedin | 46351860-1c04-11e5-844b-0b314c777502 | "4"
  172. #
  173. # (4 rows)
  174. # cqlsh> SELECT * FROM polipus_queue_overflow_linkedin.linkedin_overflow LIMIT 1;
  175. #
  176. # queue_name | created_at | payload
  177. # ---------------------------------+--------------------------------------+---------
  178. # polipus_queue_overflow_linkedin | 4632d49c-1c04-11e5-844b-0b314c777502 | "1"
  179. #
  180. # (1 rows)
  181. #
  182. table_ = [keyspace, table].compact.join '.'
  183. results = get(n)
  184. results.each do |entry|
  185. statement = "DELETE FROM #{table_} WHERE queue_name = '#{entry['queue_name']}' AND created_at = #{entry['created_at']} ;"
  186. session.execute(statement)
  187. end
  188. # Let's rispect the API as expected by Polipus.
  189. # Otherwise the execute returns a Cassandra::Results::Paged
  190. if !results.nil? && results.respond_to?(:count) && results.count == 1
  191. return results.first['payload']
  192. end
  193. return results
  194. end
  195. alias_method :size, :length
  196. alias_method :dec, :pop
  197. alias_method :shift, :pop
  198. alias_method :enc, :push
  199. alias_method :<<, :push
  200. def keyspace!(replication = nil, durable_writes = true)
  201. replication ||= "{'class': 'SimpleStrategy', 'replication_factor': '3'}"
  202. statement = "CREATE KEYSPACE IF NOT EXISTS #{keyspace} WITH replication = #{replication} AND durable_writes = #{durable_writes};"
  203. cluster.connect.execute(statement)
  204. end
  205. def session
  206. @session ||= @cluster.connect(keyspace)
  207. end
  208. # Taking a look in the Cassandra KEYSPACE you will found:
  209. #
  210. # cqlsh> DESCRIBE KEYSPACE polipus_queue_overflow_linkedin ;
  211. #
  212. # CREATE KEYSPACE polipus_queue_overflow_linkedin WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'} AND durable_writes = true;
  213. #
  214. # CREATE TABLE polipus_queue_overflow_linkedin.linkedin_overflow (
  215. # queue_name text,
  216. # created_at timeuuid,
  217. # payload text,
  218. # PRIMARY KEY (queue_name, created_at)
  219. # ) WITH CLUSTERING ORDER BY (created_at ASC)
  220. # AND bloom_filter_fp_chance = 0.01
  221. # AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
  222. # AND comment = ''
  223. # AND compaction = {'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32'}
  224. # AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
  225. # AND dclocal_read_repair_chance = 0.1
  226. # AND default_time_to_live = 0
  227. # AND gc_grace_seconds = 864000
  228. # AND max_index_interval = 2048
  229. # AND memtable_flush_period_in_ms = 0
  230. # AND min_index_interval = 128
  231. # AND read_repair_chance = 0.0
  232. # AND speculative_retry = '99.0PERCENTILE';
  233. #
  234. # This means that:
  235. # - queue_name is partition key;
  236. # - created_at is clustering key;
  237. #
  238. # With sample data:
  239. #
  240. # cqlsh> SELECT * FROM polipus_queue_overflow_linkedin.linkedin_overflow LIMIT 1 ;
  241. #
  242. # queue_name | created_at | payload
  243. # ---------------------------------+--------------------------------------+---------------------------------------------------------------------------------+
  244. # polipus_queue_overflow_linkedin | de17ece6-1e5e-11e5-b997-47a87c40c422 | "{\"url\":\"http://www.linkedin.com/in/foobar\",\"depth\":0,\"fetched\":false}"
  245. #
  246. # (1 rows)
  247. # cqlsh>
  248. #
  249. def table!(properties = nil)
  250. table_ = [keyspace, table].compact.join '.'
  251. def_ = "CREATE TABLE IF NOT EXISTS #{table_}
  252. (
  253. queue_name TEXT,
  254. created_at TIMEUUID,
  255. payload TEXT,
  256. PRIMARY KEY (queue_name, created_at)
  257. )"
  258. props = Array(properties).join(' AND ')
  259. statement = props.empty? ? "#{def_};" : "#{def_} WITH #{props};"
  260. session.execute(statement)
  261. end
  262. private
  263. def options_are_valid?(options)
  264. options.has_key?(:cluster) && options.has_key?(:keyspace) && options.has_key?(:table)
  265. end
  266. def limit_is_valid?(limit)
  267. !limit.nil? && limit.respond_to?(:to_i) && limit.to_i > 0
  268. end
  269. # results.class => Cassandra::Results::Paged
  270. def get(limit = 1)
  271. # coerce to int if a TrueClass/FalseClass is given.
  272. limit = 1 if [true, false].include?(limit)
  273. raise ArgumentError.new("Invalid limit value: must be an INTEGER greater than 1 (got #{limit.inspect}).") unless limit_is_valid?(limit)
  274. table_ = [keyspace, table].compact.join '.'
  275. statement = "SELECT queue_name, created_at, payload FROM #{table_} LIMIT #{limit.to_i} ;"
  276. @semaphore.synchronize do
  277. return session.execute(session.prepare(statement), arguments: [])
  278. end
  279. end
  280. end
  281. end
  282. end