john pfeiffer
  • Home
  • Categories
  • Tags
  • Archives

concurrency threaded multiprocess requests upload

"""
   serial completed 1000 uploads in 4.77999997139 seconds
2 second sleep to allow the server to settle

2 threads completed 1000 uploads in 7.07800006866 seconds
2 second sleep to allow the serverto settle

cpus = 8, currently using 2
ProcessUploadURL-1 exiting
ProcessUploadURL-2 exiting
2 processes completed 1000 uploads in 2.76099991798 seconds



   serial completed 2000 uploads in 11.625 seconds
2 second sleep to allow the server to settle

4 threads completed 2000 uploads in 16.5559999943 seconds
2 second sleep to allow the server to settle

cpus = 8, currently using 4
ProcessUploadURL-2 exiting
PProcessUploadURL-1 exitingrocessUploadURL-3 exiting

ProcessUploadURL-4 exiting
4 processes completed 2000 uploads in 7.42800021172 seconds
"""

import hashlib
import time
import requests
from cStringIO import StringIO
import Queue
import threading
import multiprocessing

# Helper Classes for multi-threaded benchmark testing

class ThreadUploadUrl( threading.Thread ):        # extend the thread class
    def __init__( self , queue ):
        threading.Thread.__init__( self )
        self.queue = queue

    def run( self ):
        while True:
            url,data = self.queue.get()     # get the tuple of url and data
            response = requests.put( url, data=data )
            assert response.status_code == 200
            self.queue.task_done()      # decrement the queue



class ProcessUploadURL( multiprocessing.Process ):
    def __init__( self , url_queue ):
        multiprocessing.Process.__init__( self )
        self.url_queue = url_queue

    def run( self ):
        while True:
            tuple = self.url_queue.get()     # get the tuple of url and data
            if not tuple:                     # received the termination signal
                print '%s exiting' % self.name
                self.url_queue.task_done()
                break
            url = tuple[0]
            data = tuple[1]
            response = requests.put( url, data=data )
            assert response.status_code == 200
            self.url_queue.task_done()      # decrement the queue



def test_old_sgw():

    JOBS = 2000
    THREADS = 4
    PROCESSES = 4

    # prepare a list of urls and data
    urls = list()
    for i in xrange( JOBS ):
        hash_func = hashlib.sha256()
        string_buffer = StringIO()
        for k in xrange( i + 1 ):
            string_buffer.write( 'k' )

        data = string_buffer.getvalue()
        hash_func.update( data )
        size = len( data )

        hash_func.update( str( size ) )      # need to deprecate eric's stupid thing
        fileHash = hash_func.hexdigest()

        url = ''.join( [ "http://10.10.10.201:8080/content/upload?hash=" , fileHash , "&len=" , str( size ) ] )
        urls.append( ( url, data ))
        string_buffer.close()


    # Upload serially
    start = time.time()
    for i in xrange( JOBS ):
        url,data = urls[i]  # get the url, data tuple
        response = requests.put( url, data=data )
        assert response.status_code == 200

    print "\n   serial completed {} uploads in {} seconds".format( JOBS, time.time() - start )
    print "2 second sleep to allow the sgw to settle\n"
    time.sleep( 2 )


    # Begin Multithreaded test
    queue = Queue.Queue()   # Queue is thread safe
    for i in xrange( THREADS ):                     # spin up each thread (doing nothing as queue is empty)
        my_thread = ThreadUploadUrl( queue )        # allow each thread access to the whole dataset
        my_thread.setDaemon( True )                 # allow the thread to run in the background
        my_thread.start()

    start = time.time()
    for url in urls:
        queue.put( url )    # seed the queue with the tuples of url and data
    queue.join()    # blocks the program until the queue is empty
    print "{} threads completed {} uploads in {} seconds".format( THREADS, JOBS, time.time() - start )

    print "2 second sleep to allow the sgw to settle\n"
    time.sleep( 2 )


    # Begin Multiprocessing test
    print 'cpus = %s, currently using %s' % ( ( multiprocessing.cpu_count() , PROCESSES) )

    url_queue = multiprocessing.JoinableQueue()     # multi process safe Queue
    for i in xrange( PROCESSES ):                   # spin up each process (doing nothing as queue is empty)
        p = ProcessUploadURL( url_queue )           # allow each process access to the whole dataset
        p.start()

    start = time.time()
    for url in urls:
        url_queue.put( url )        # seed the queue with the tuples of url and data

    for i in xrange( PROCESSES ):
        url_queue.put( None )   # poison pills to ensure all processes quit
    url_queue.join()            # blocks the program until the queue is empty (maybe unnecessary with the poison pills)
    print "{} processes completed {} uploads in {} seconds".format( PROCESSES, JOBS, time.time() - start )



if __name__ == '__main__':
    test_old_sgw()

  • « file cstringio write 1 gb fast stat size join generator timeit
  • sort sorted list of dictionaries itemgetter »

Published

May 15, 2013

Category

python

~525 words

Tags

  • concurrency 10
  • multiprocess 3
  • python 180
  • requests 5
  • threaded 3
  • upload 10