mysql_store.rb 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. # coding: utf-8
  2. require 'polipus/storage'
  3. require 'polipus/page'
  4. require 'mysql2'
  5. require 'thread'
  6. module Polipus
  7. module Storage
  8. def self.mysql_store(mysql_options = {}, table_name = 'pages')
  9. self::MysqlStore.new(mysql_options.merge(table_name: table_name))
  10. end
  11. class MysqlStore < Base
  12. def initialize(options = {})
  13. @tbl = options.delete :table_name
  14. @my = Mysql2::Client.new(options)
  15. @mutex = Mutex.new
  16. setup
  17. end
  18. def add(page)
  19. @mutex.synchronize do
  20. @my.query(page_to_sql(page))
  21. uuid(page)
  22. end
  23. end
  24. def exists?(page)
  25. @mutex.synchronize do
  26. @my.query("SELECT
  27. EXISTS (SELECT 1 FROM #{@tbl}
  28. WHERE uuid = '#{@my.escape(uuid(page))}') AS CNT")
  29. .first['CNT'] == 1
  30. end
  31. end
  32. def get(page)
  33. @mutex.synchronize do
  34. load_page(
  35. @my.query("SELECT * FROM #{@tbl} WHERE uuid = '#{@my.escape(uuid(page))}' LIMIT 1", cast_booleans: true)
  36. .first
  37. )
  38. end
  39. end
  40. def remove(page)
  41. @mutex.synchronize do
  42. @my.query("DELETE FROM #{@tbl} WHERE uuid = '#{@my.escape(uuid(page))}'")
  43. end
  44. end
  45. def count
  46. @mutex.synchronize do
  47. @my.query("SELECT COUNT(*) AS CNT FROM #{@tbl}").first['CNT'].to_i
  48. end
  49. end
  50. def each
  51. @my.query("SELECT * FROM #{@tbl}").each do |row|
  52. yield row['uuid'], load_page(row)
  53. end
  54. end
  55. def clear
  56. @mutex.synchronize do
  57. @my.query("DELETE FROM #{@tbl}")
  58. end
  59. end
  60. private
  61. def setup
  62. create_table = %Q(
  63. CREATE TABLE IF NOT EXISTS #{@tbl} (
  64. uuid varchar(32) PRIMARY KEY,
  65. url varchar(255),
  66. headers blob,
  67. body blob,
  68. links blob,
  69. code int,
  70. depth int,
  71. referer varchar(255),
  72. redirect_to varchar(255),
  73. response_time int,
  74. fetched boolean,
  75. user_data blob,
  76. fetched_at int,
  77. error varchar(255)
  78. )
  79. )
  80. @my.query(create_table)
  81. end
  82. def page_to_sql(page)
  83. %Q(
  84. INSERT INTO #{@tbl}
  85. VALUES (
  86. '#{uuid(page)}',
  87. '#{@my.escape(page.url.to_s)}',
  88. '#{@my.escape(Marshal.dump(page.headers))}',
  89. '#{@my.escape(page.body)}',
  90. '#{@my.escape(Marshal.dump(page.links))}',
  91. #{page.code.to_i},
  92. #{page.depth.to_i},
  93. '#{@my.escape(page.referer.to_s)}',
  94. '#{@my.escape(page.redirect_to.to_s)}',
  95. #{page.response_time.to_i},
  96. #{page.fetched?},
  97. '#{@my.escape(Marshal.dump(page.user_data))}',
  98. #{page.fetched_at.to_i},
  99. '#{@my.escape(page.error.to_s)}'
  100. )
  101. ON DUPLICATE KEY UPDATE
  102. fetched_at = UNIX_TIMESTAMP()
  103. )
  104. end
  105. def load_page(hash)
  106. %w(links user_data).each do |f|
  107. hash[f] = Marshal.load(hash[f]) unless hash[f].nil?
  108. end
  109. Page.from_hash(hash)
  110. end
  111. end
  112. end
  113. end