demo.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. #!/usr/bin/python
  2. import httplib
  3. import socks
  4. import urllib2
  5. from Queue import Queue
  6. from threading import Thread, Condition, Lock
  7. from threading import active_count as threading_active_count
  8. import time
  9. from pymongo import Connection
  10. import pymongo
  11. url_format = 'http://www.imdb.com/user/ur{0}/ratings'
  12. http_codes_counter = {}
  13. MONGODB_HOSTNAME = '192.168.0.118'
  14. """
  15. https://gist.github.com/869791
  16. SocksiPy + urllib handler
  17. version: 0.2
  18. author: e
  19. This module provides a Handler which you can use with urllib2 to allow it to tunnel your connection through a socks.sockssocket socket, without monkey patching the original socket...
  20. """
  21. class SocksiPyConnection(httplib.HTTPConnection):
  22. def __init__(self, proxytype, proxyaddr, proxyport = None, rdns = True, username = None, password = None, *args, **kwargs):
  23. self.proxyargs = (proxytype, proxyaddr, proxyport, rdns, username, password)
  24. httplib.HTTPConnection.__init__(self, *args, **kwargs)
  25. def connect(self):
  26. self.sock = socks.socksocket()
  27. self.sock.setproxy(*self.proxyargs)
  28. if isinstance(self.timeout, float):
  29. self.sock.settimeout(self.timeout)
  30. self.sock.connect((self.host, self.port))
  31. class SocksiPyHandler(urllib2.HTTPHandler):
  32. def __init__(self, *args, **kwargs):
  33. self.args = args
  34. self.kw = kwargs
  35. urllib2.HTTPHandler.__init__(self)
  36. def http_open(self, req):
  37. def build(host, port=None, strict=None, timeout=0):
  38. conn = SocksiPyConnection(*self.args, host=host, port=port, strict=strict, timeout=timeout, **self.kw)
  39. return conn
  40. return self.do_open(build, req)
  41. class Monitor(Thread):
  42. def __init__(self, queue, discovery):
  43. Thread.__init__(self)
  44. self.queue = queue
  45. self.discovery = discovery
  46. self.finish_signal = False
  47. def finish(self):
  48. self.finish_signal = True
  49. def run(self):
  50. while not self.finish_signal:
  51. time.sleep(5)
  52. print "Elements in Queue:", self.queue.qsize(), "Active Threads:", threading_active_count(), "Exceptions Counter:", self.discovery.exception_counter
  53. class Worker(Thread):
  54. def __init__(self, queue, discovery, socks_proxy_port):
  55. Thread.__init__(self)
  56. self.queue = queue
  57. self.discovery = discovery
  58. self.socks_proxy_port = socks_proxy_port
  59. self.opener = urllib2.build_opener(SocksiPyHandler(socks.PROXY_TYPE_SOCKS4, 'localhost', self.socks_proxy_port))
  60. self.conn = Connection(MONGODB_HOSTNAME, 27017)
  61. self.db = self.conn.scraping
  62. self.coll = self.db.imdb.ratings
  63. def get_url(self, url):
  64. try:
  65. #h = urllib2.urlopen(url)
  66. h = self.opener.open(url)
  67. return h.getcode()
  68. except urllib2.URLError, e:
  69. return e.code
  70. def run(self):
  71. while True:
  72. try:
  73. index = self.queue.get()
  74. if index == None:
  75. self.queue.put(None) # Notify the next worker
  76. break
  77. url = url_format.format(index)
  78. code = self.get_url(url)
  79. self.coll.update({'index':index}, {'$set': {'last_response':code}})
  80. self.discovery.lock.acquire()
  81. self.discovery.records_to_process -= 1
  82. if self.discovery.records_to_process == 0:
  83. self.discovery.lock.notify()
  84. self.discovery.lock.release()
  85. except (socks.Socks4Error, httplib.BadStatusLine), e:
  86. # TypeError: 'Socks4Error' object is not callable
  87. print e
  88. self.discovery.exception_counter_lock.acquire()
  89. self.discovery.exception_counter += 1
  90. self.discovery.exception_counter_lock.release()
  91. pass # leave this element for the next cycle
  92. time.sleep(1.5)
  93. class Croupier(Thread):
  94. Base = 0
  95. Top = 25000000
  96. #Top = 1000
  97. def __init__(self, queue, discovery):
  98. Thread.__init__(self)
  99. self.conn = Connection(MONGODB_HOSTNAME, 27017)
  100. self.db = self.conn.scraping
  101. self.coll = self.db.imdb.ratings
  102. self.finish_signal = False
  103. self.queue = queue
  104. self.discovery = discovery
  105. self.discovery.records_to_process = 0
  106. def run(self):
  107. # Look if imdb collection is empty. Only if its empty we create all the items
  108. c = self.coll.count()
  109. if c == 0:
  110. print "Inserting items"
  111. self.coll.ensure_index([('index', pymongo.ASCENDING), ('last_response', pymongo.ASCENDING)])
  112. for i in xrange(Croupier.Base, Croupier.Top):
  113. self.coll.insert({'index':i, 'url': url_format.format(i), 'last_response': 0})
  114. else:
  115. print "Using #", c, " persisted items"
  116. while True:
  117. #items = self.coll.find({'last_response': {'$ne': 200}})
  118. items = self.coll.find({'$and': [{'last_response': {'$ne': 200}}, {'last_response' : {'$ne': 404}}]}, timeout = False)
  119. self.discovery.records_to_process = items.count()
  120. if self.discovery.records_to_process == 0:
  121. break
  122. for item in items:
  123. self.queue.put(item['index'])
  124. # Wait until the last item is updated on the db
  125. self.discovery.lock.acquire()
  126. while self.discovery.records_to_process != 0:
  127. self.discovery.lock.wait()
  128. self.discovery.lock.release()
  129. # time.sleep(5)
  130. # Send a 'signal' to workers to finish
  131. self.queue.put(None)
  132. def finish(self):
  133. self.finish_signal = True
  134. class Discovery:
  135. NWorkers = 71
  136. SocksProxyBasePort = 9050
  137. Contention = 10000
  138. def __init__(self):
  139. self.queue = Queue(Discovery.Contention)
  140. self.workers = []
  141. self.lock = Condition()
  142. self.exception_counter_lock = Lock()
  143. self.records_to_process = 0
  144. self.exception_counter = 0
  145. def start(self):
  146. croupier = Croupier(self.queue, self)
  147. croupier.start()
  148. for i in range(Discovery.NWorkers):
  149. worker = Worker(self.queue, self, Discovery.SocksProxyBasePort + i)
  150. self.workers.append(worker)
  151. for w in self.workers:
  152. w.start()
  153. monitor = Monitor(self.queue, self)
  154. monitor.start()
  155. for w in self.workers:
  156. w.join()
  157. croupier.join()
  158. print "Queue finished with:", self.queue.qsize(), "elements"
  159. monitor.finish()
  160. def main():
  161. discovery = Discovery()
  162. discovery.start()
  163. if __name__ == '__main__':
  164. main()
  165. #
  166. # MISC NOTES
  167. #
  168. # - How many IMDB ratings pages are currently indexed by Google? query: inurl:www.imdb.com/user/*/ratings
  169. # - [pymongo] cursor id '239432858681488351' not valid at server Options: http://groups.google.com/group/mongodb-user/browse_thread/thread/4ed6e3d77fb1c2cf?pli=1
  170. # That error generally means that the cursor timed out on the server -
  171. # this could be the case if you are performing a long running operation
  172. # while iterating over the cursor. The best bet is probably to turn off
  173. # the timeout by passing "timeout=False" in your call to find:
  174. #