john pfeiffer
  • Home
  • Categories
  • Tags
  • Archives

concurrency threaded multiprocess queue urlib2 get url comparison

# 2013-03-04 johnpfeiffer
# python downloaders.py  1024 nonthreaded yahoo.com slashdot.org apple.com kittyandbear.net ibm.com msn.com = 2.5 seconds
# python downloaders.py  1024 threaded yahoo.com slashdot.org apple.com kittyandbear.net ibm.com msn.com = .7 seconds
# python downloaders.py  1024 multiprocess yahoo.com slashdot.org apple.com kittyandbear.net ibm.com msn.com = 1.5 seconds


import urllib2
import time
import sys

import Queue        # synchronized data structure
import threading

import multiprocessing


class Worker( multiprocessing.Process ):
    def __init__( self , url_queue , bytes_to_download ):
        multiprocessing.Process.__init__( self )
        self.url_queue = url_queue
        self.bytes_to_download = bytes_to_download

    def run( self ):
        while True:
            url = self.url_queue.get()
            if url == None:
                print '%s exiting' % self.name
                self.url_queue.task_done()
                break

            print '\ndownloading %s' % url
            urlsock = urllib2.urlopen( url )
            print urlsock.read( self.bytes_to_download )
            urlsock.close()
            self.url_queue.task_done()
        return


class ThreadedDownloader( threading.Thread ):        # extend the thread class
    def __init__( self , url_queue , bytes_to_download ):   # fork-join design - each thread has the whole data set
        threading.Thread.__init__( self )
        self.url_queue = url_queue
        self.bytes_to_download = bytes_to_download

    def run( self ):
        while True:
            url = self.url_queue.get()
            print '\ndownloading %s' % url
            urlsock = urllib2.urlopen( url )
            print urlsock.read( self.bytes_to_download )
            urlsock.close()
            self.url_queue.task_done()


def multiprocess( urls , bytes_to_download ):
    cpus = multiprocessing.cpu_count()
    cpus = 2
    print 'cpus = %s' % cpus

    url_queue = multiprocessing.JoinableQueue()

    jobs = []
    for i in range( cpus ):         # start consumers (they wait for the 'poison pill' None)
        p = Worker( url_queue , bytes_to_download )
        jobs.append( p )
        p.start()

    for url in urls:
        url_queue.put( url )        # add the tasks to be done into the shared Queue

    for i in xrange( cpus ):
        url_queue.put( None )   # poison pills to ensure all processes quit

    url_queue.join()            # wait for all queued worker jobs to complete?



def threaded( urls , bytes_to_download ):
    url_queue = Queue.Queue()
    for url in urls:
        url_queue.put( url )

    for i in range( 5 ):            # create 5 threads (so for max performance there should be 5 hosts)
        my_thread = ThreadedDownloader( url_queue , bytes_to_download )
        my_thread.setDaemon( True )         # does not block the main program from exiting, must use a join() to block
        my_thread.start()                   # starts this thread
    url_queue.join()    #wait on the queue until all threads mark task done, can be join( 1 ) for max wait 1 second


def non_threaded( urls , bytes_to_download ):
    for url in urls:
        print '\ndownloading %s' % url
        urlsock = urllib2.urlopen( url )
        print urlsock.read( bytes_to_download )
        urlsock.close()



def convert_dns_name_to_url( hosts ):
    urls = list()
    for host in hosts:
        urls.append( ''.join( [ 'http://' , host ] )  )
    return urls


# MAIN ################################################################

def main():
    CORRECTUSAGE = 'python get_urls.py 1024 [nonthreaded OR threaded] yahoo.com slashdot.org apple.com kittyandbear.net more-urls.example.com'
    if len( sys.argv ) < 4:
        print 'ERROR: incorrect number of arguments, correct usage: %s' % CORRECTUSAGE
        sys.exit( 1 )

    sys.argv.reverse()      # pop removes the last element so we need the reverse order
    app_name = sys.argv.pop()
    bytes_to_download = sys.argv.pop()
    try:
        bytes_to_download = int( bytes_to_download )
    except ValueError as e:
        print 'ERROR: bytes_to_download (%s) must be an integer' % bytes_to_download
        sys.exit( 1 )

    threading_option = sys.argv.pop()

    urls = convert_dns_name_to_url( sys.argv )

    start_time = time.time()
    if threading_option == 'threaded':
        threaded( urls , bytes_to_download )
    elif threading_option == 'multiprocess':
        multiprocess( urls , bytes_to_download )
    else:
        non_threaded( urls , bytes_to_download )

    print 'Elapsed Time: %s' % ( time.time() - start_time )

if __name__ == "__main__":
    main()

  • « concurrency multiprocess simple lock
  • decorator retry »

Published

Mar 5, 2013

Category

python

~424 words

Tags

  • comparison 4
  • concurrency 10
  • get 22
  • multiprocess 3
  • python 180
  • queue 3
  • threaded 3
  • url 14
  • urlib2 1