elasticsearch_store.rb 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. # encoding: UTF-8
  2. require 'base64'
  3. require 'multi_json'
  4. require 'polipus'
  5. require 'polipus-elasticsearch'
  6. module Polipus
  7. module Storage
  8. class ElasticSearchStore < Base
  9. BINARY_FIELDS = %w(body headers user_data)
  10. DEFAULT_INDEX = Polipus::ElasticSearch::Page
  11. attr_accessor :index, :index_name, :except, :compress, :semaphore, :refresh
  12. def initialize(client, options = {})
  13. @index = options[:index] || options['index'] || DEFAULT_INDEX
  14. @index_name = options[:index_name] || options['index_name']
  15. @except = options[:except] || options['except'] || []
  16. @compress = options[:compress] || options['compress']
  17. @semaphore = Mutex.new
  18. @refresh = options[:refresh] || options['refresh'] || true
  19. index.setup(client, index_name)
  20. index.create_index!(index_name) unless index.index_exists?
  21. end
  22. def add(page)
  23. semaphore.synchronize do
  24. obj = page.to_hash
  25. Array(except).each { |field| obj.delete(field.to_s) }
  26. BINARY_FIELDS.each do |field|
  27. next if obj[field.to_s].nil? || obj[field.to_s].empty?
  28. obj[field.to_s] = MultiJson.encode(obj[field.to_s]) if field.to_s == 'user_data'
  29. obj[field.to_s] = Base64.encode64(obj[field.to_s])
  30. end
  31. obj['id'] = uuid(page)
  32. obj['fetched_at'] = obj['fetched_at'].to_i
  33. begin
  34. index.store(obj, refresh)
  35. rescue Elasticsearch::Transport::Transport::Errors::Conflict => ex
  36. # you're trying to store an old version.
  37. end
  38. end
  39. end
  40. def clear
  41. index.clear_index! if index.index_exists?
  42. end
  43. def count
  44. index.count
  45. end
  46. def drop
  47. index.delete_index! if index.index_exists?
  48. end
  49. def each
  50. # This method is implemented only for testing purposes
  51. response = index.client.search(
  52. index: index_name,
  53. body: {
  54. query: { match_all: {} },
  55. from: 0,
  56. size: 25
  57. }
  58. )
  59. response['hits']['hits'].each do |data|
  60. page = load_page(data['_source'])
  61. yield uuid(page), page
  62. end
  63. end
  64. def exists?(page)
  65. @semaphore.synchronize do
  66. index.exists?(uuid(page))
  67. end
  68. end
  69. def get(page)
  70. @semaphore.synchronize do
  71. load_page(index.get(uuid(page)))
  72. end
  73. end
  74. def remove(page)
  75. @semaphore.synchronize do
  76. index.remove(uuid(page), refresh)
  77. end
  78. end
  79. def load_page(data)
  80. return nil if data.nil?
  81. BINARY_FIELDS.each do |field|
  82. next if data[field.to_s].nil? || data[field.to_s].empty?
  83. data[field.to_s] = Base64.decode64(data[field.to_s])
  84. data[field.to_s] = MultiJson.decode(data[field.to_s]) if field.to_s == 'user_data'
  85. end
  86. page = Page.from_hash(data)
  87. page.fetched_at ||= 0
  88. page
  89. end
  90. end
  91. end
  92. end