|
@@ -0,0 +1,226 @@
|
|
|
+#!/usr/bin/python
|
|
|
+
|
|
|
+import httplib
|
|
|
+import socks
|
|
|
+import urllib2
|
|
|
+from Queue import Queue
|
|
|
+from threading import Thread, Condition, Lock
|
|
|
+from threading import active_count as threading_active_count
|
|
|
+
|
|
|
+import time
|
|
|
+from pymongo import Connection
|
|
|
+import pymongo
|
|
|
+
|
|
|
+url_format = 'http://www.imdb.com/user/ur{0}/ratings'
|
|
|
+
|
|
|
+http_codes_counter = {}
|
|
|
+
|
|
|
+MONGODB_HOSTNAME = '192.168.0.118'
|
|
|
+
|
|
|
+"""
|
|
|
+https://gist.github.com/869791
|
|
|
+
|
|
|
+SocksiPy + urllib handler
|
|
|
+
|
|
|
+version: 0.2
|
|
|
+author: e
|
|
|
+
|
|
|
+This module provides a Handler which you can use with urllib2 to allow it to tunnel your connection through a socks.sockssocket socket, without monkey patching the original socket...
|
|
|
+"""
|
|
|
+
|
|
|
+class SocksiPyConnection(httplib.HTTPConnection):
|
|
|
+ def __init__(self, proxytype, proxyaddr, proxyport = None, rdns = True, username = None, password = None, *args, **kwargs):
|
|
|
+ self.proxyargs = (proxytype, proxyaddr, proxyport, rdns, username, password)
|
|
|
+ httplib.HTTPConnection.__init__(self, *args, **kwargs)
|
|
|
+
|
|
|
+ def connect(self):
|
|
|
+ self.sock = socks.socksocket()
|
|
|
+ self.sock.setproxy(*self.proxyargs)
|
|
|
+ if isinstance(self.timeout, float):
|
|
|
+ self.sock.settimeout(self.timeout)
|
|
|
+ self.sock.connect((self.host, self.port))
|
|
|
+
|
|
|
+class SocksiPyHandler(urllib2.HTTPHandler):
|
|
|
+ def __init__(self, *args, **kwargs):
|
|
|
+ self.args = args
|
|
|
+ self.kw = kwargs
|
|
|
+ urllib2.HTTPHandler.__init__(self)
|
|
|
+
|
|
|
+ def http_open(self, req):
|
|
|
+ def build(host, port=None, strict=None, timeout=0):
|
|
|
+ conn = SocksiPyConnection(*self.args, host=host, port=port, strict=strict, timeout=timeout, **self.kw)
|
|
|
+ return conn
|
|
|
+ return self.do_open(build, req)
|
|
|
+
|
|
|
+class Monitor(Thread):
|
|
|
+ def __init__(self, queue, discovery):
|
|
|
+ Thread.__init__(self)
|
|
|
+ self.queue = queue
|
|
|
+ self.discovery = discovery
|
|
|
+ self.finish_signal = False
|
|
|
+
|
|
|
+ def finish(self):
|
|
|
+ self.finish_signal = True
|
|
|
+
|
|
|
+ def run(self):
|
|
|
+ while not self.finish_signal:
|
|
|
+ time.sleep(5)
|
|
|
+ print "Elements in Queue:", self.queue.qsize(), "Active Threads:", threading_active_count(), "Exceptions Counter:", self.discovery.exception_counter
|
|
|
+
|
|
|
+class Worker(Thread):
|
|
|
+ def __init__(self, queue, discovery, socks_proxy_port):
|
|
|
+ Thread.__init__(self)
|
|
|
+ self.queue = queue
|
|
|
+ self.discovery = discovery
|
|
|
+ self.socks_proxy_port = socks_proxy_port
|
|
|
+ self.opener = urllib2.build_opener(SocksiPyHandler(socks.PROXY_TYPE_SOCKS4, 'localhost', self.socks_proxy_port))
|
|
|
+ self.conn = Connection(MONGODB_HOSTNAME, 27017)
|
|
|
+ self.db = self.conn.scraping
|
|
|
+ self.coll = self.db.imdb.ratings
|
|
|
+
|
|
|
+ def get_url(self, url):
|
|
|
+ try:
|
|
|
+ #h = urllib2.urlopen(url)
|
|
|
+ h = self.opener.open(url)
|
|
|
+
|
|
|
+ return h.getcode()
|
|
|
+
|
|
|
+ except urllib2.URLError, e:
|
|
|
+ return e.code
|
|
|
+
|
|
|
+ def run(self):
|
|
|
+ while True:
|
|
|
+ try:
|
|
|
+ index = self.queue.get()
|
|
|
+
|
|
|
+ if index == None:
|
|
|
+ self.queue.put(None) # Notify the next worker
|
|
|
+ break
|
|
|
+
|
|
|
+ url = url_format.format(index)
|
|
|
+
|
|
|
+ code = self.get_url(url)
|
|
|
+
|
|
|
+ self.coll.update({'index':index}, {'$set': {'last_response':code}})
|
|
|
+
|
|
|
+ self.discovery.lock.acquire()
|
|
|
+ self.discovery.records_to_process -= 1
|
|
|
+ if self.discovery.records_to_process == 0:
|
|
|
+ self.discovery.lock.notify()
|
|
|
+ self.discovery.lock.release()
|
|
|
+
|
|
|
+ except (socks.Socks4Error, httplib.BadStatusLine), e:
|
|
|
+ # TypeError: 'Socks4Error' object is not callable
|
|
|
+ print e
|
|
|
+ self.discovery.exception_counter_lock.acquire()
|
|
|
+ self.discovery.exception_counter += 1
|
|
|
+ self.discovery.exception_counter_lock.release()
|
|
|
+ pass # leave this element for the next cycle
|
|
|
+
|
|
|
+ time.sleep(1.5)
|
|
|
+
|
|
|
+class Croupier(Thread):
|
|
|
+ Base = 0
|
|
|
+ Top = 25000000
|
|
|
+ #Top = 1000
|
|
|
+ def __init__(self, queue, discovery):
|
|
|
+ Thread.__init__(self)
|
|
|
+ self.conn = Connection(MONGODB_HOSTNAME, 27017)
|
|
|
+ self.db = self.conn.scraping
|
|
|
+ self.coll = self.db.imdb.ratings
|
|
|
+ self.finish_signal = False
|
|
|
+ self.queue = queue
|
|
|
+ self.discovery = discovery
|
|
|
+ self.discovery.records_to_process = 0
|
|
|
+
|
|
|
+ def run(self):
|
|
|
+ # Look if imdb collection is empty. Only if its empty we create all the items
|
|
|
+ c = self.coll.count()
|
|
|
+ if c == 0:
|
|
|
+ print "Inserting items"
|
|
|
+ self.coll.ensure_index([('index', pymongo.ASCENDING), ('last_response', pymongo.ASCENDING)])
|
|
|
+ for i in xrange(Croupier.Base, Croupier.Top):
|
|
|
+ self.coll.insert({'index':i, 'url': url_format.format(i), 'last_response': 0})
|
|
|
+
|
|
|
+ else:
|
|
|
+ print "Using #", c, " persisted items"
|
|
|
+
|
|
|
+ while True:
|
|
|
+ #items = self.coll.find({'last_response': {'$ne': 200}})
|
|
|
+ items = self.coll.find({'$and': [{'last_response': {'$ne': 200}}, {'last_response' : {'$ne': 404}}]}, timeout = False)
|
|
|
+
|
|
|
+ self.discovery.records_to_process = items.count()
|
|
|
+
|
|
|
+ if self.discovery.records_to_process == 0:
|
|
|
+ break
|
|
|
+
|
|
|
+ for item in items:
|
|
|
+ self.queue.put(item['index'])
|
|
|
+
|
|
|
+ # Wait until the last item is updated on the db
|
|
|
+ self.discovery.lock.acquire()
|
|
|
+ while self.discovery.records_to_process != 0:
|
|
|
+ self.discovery.lock.wait()
|
|
|
+ self.discovery.lock.release()
|
|
|
+
|
|
|
+# time.sleep(5)
|
|
|
+
|
|
|
+ # Send a 'signal' to workers to finish
|
|
|
+ self.queue.put(None)
|
|
|
+
|
|
|
+ def finish(self):
|
|
|
+ self.finish_signal = True
|
|
|
+
|
|
|
+class Discovery:
|
|
|
+ NWorkers = 71
|
|
|
+ SocksProxyBasePort = 9050
|
|
|
+ Contention = 10000
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self.queue = Queue(Discovery.Contention)
|
|
|
+ self.workers = []
|
|
|
+ self.lock = Condition()
|
|
|
+ self.exception_counter_lock = Lock()
|
|
|
+ self.records_to_process = 0
|
|
|
+ self.exception_counter = 0
|
|
|
+
|
|
|
+ def start(self):
|
|
|
+ croupier = Croupier(self.queue, self)
|
|
|
+ croupier.start()
|
|
|
+
|
|
|
+ for i in range(Discovery.NWorkers):
|
|
|
+ worker = Worker(self.queue, self, Discovery.SocksProxyBasePort + i)
|
|
|
+ self.workers.append(worker)
|
|
|
+
|
|
|
+ for w in self.workers:
|
|
|
+ w.start()
|
|
|
+
|
|
|
+ monitor = Monitor(self.queue, self)
|
|
|
+ monitor.start()
|
|
|
+
|
|
|
+ for w in self.workers:
|
|
|
+ w.join()
|
|
|
+
|
|
|
+ croupier.join()
|
|
|
+
|
|
|
+ print "Queue finished with:", self.queue.qsize(), "elements"
|
|
|
+
|
|
|
+ monitor.finish()
|
|
|
+
|
|
|
+def main():
|
|
|
+ discovery = Discovery()
|
|
|
+ discovery.start()
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ main()
|
|
|
+
|
|
|
+#
|
|
|
+# MISC NOTES
|
|
|
+#
|
|
|
+# - How many IMDB ratings pages are currently indexed by Google? query: inurl:www.imdb.com/user/*/ratings
|
|
|
+# - [pymongo] cursor id '239432858681488351' not valid at server Options: http://groups.google.com/group/mongodb-user/browse_thread/thread/4ed6e3d77fb1c2cf?pli=1
|
|
|
+# That error generally means that the cursor timed out on the server -
|
|
|
+# this could be the case if you are performing a long running operation
|
|
|
+# while iterating over the cursor. The best bet is probably to turn off
|
|
|
+# the timeout by passing "timeout=False" in your call to find:
|
|
|
+#
|