"""
serial completed 1000 uploads in 4.77999997139 seconds
2 second sleep to allow the server to settle
2 threads completed 1000 uploads in 7.07800006866 seconds
2 second sleep to allow the serverto settle
cpus = 8, currently using 2
ProcessUploadURL-1 exiting
ProcessUploadURL-2 exiting
2 processes completed 1000 uploads in 2.76099991798 seconds
serial completed 2000 uploads in 11.625 seconds
2 second sleep to allow the server to settle
4 threads completed 2000 uploads in 16.5559999943 seconds
2 second sleep to allow the server to settle
cpus = 8, currently using 4
ProcessUploadURL-2 exiting
PProcessUploadURL-1 exitingrocessUploadURL-3 exiting
ProcessUploadURL-4 exiting
4 processes completed 2000 uploads in 7.42800021172 seconds
"""
import hashlib
import time
import requests
from cStringIO import StringIO
import Queue
import threading
import multiprocessing
# Helper Classes for multi-threaded benchmark testing
class ThreadUploadUrl( threading.Thread ): # extend the thread class
def __init__( self , queue ):
threading.Thread.__init__( self )
self.queue = queue
def run( self ):
while True:
url,data = self.queue.get() # get the tuple of url and data
response = requests.put( url, data=data )
assert response.status_code == 200
self.queue.task_done() # decrement the queue
class ProcessUploadURL( multiprocessing.Process ):
def __init__( self , url_queue ):
multiprocessing.Process.__init__( self )
self.url_queue = url_queue
def run( self ):
while True:
tuple = self.url_queue.get() # get the tuple of url and data
if not tuple: # received the termination signal
print '%s exiting' % self.name
self.url_queue.task_done()
break
url = tuple[0]
data = tuple[1]
response = requests.put( url, data=data )
assert response.status_code == 200
self.url_queue.task_done() # decrement the queue
def test_old_sgw():
JOBS = 2000
THREADS = 4
PROCESSES = 4
# prepare a list of urls and data
urls = list()
for i in xrange( JOBS ):
hash_func = hashlib.sha256()
string_buffer = StringIO()
for k in xrange( i + 1 ):
string_buffer.write( 'k' )
data = string_buffer.getvalue()
hash_func.update( data )
size = len( data )
hash_func.update( str( size ) ) # need to deprecate eric's stupid thing
fileHash = hash_func.hexdigest()
url = ''.join( [ "http://10.10.10.201:8080/content/upload?hash=" , fileHash , "&len=" , str( size ) ] )
urls.append( ( url, data ))
string_buffer.close()
# Upload serially
start = time.time()
for i in xrange( JOBS ):
url,data = urls[i] # get the url, data tuple
response = requests.put( url, data=data )
assert response.status_code == 200
print "\n serial completed {} uploads in {} seconds".format( JOBS, time.time() - start )
print "2 second sleep to allow the sgw to settle\n"
time.sleep( 2 )
# Begin Multithreaded test
queue = Queue.Queue() # Queue is thread safe
for i in xrange( THREADS ): # spin up each thread (doing nothing as queue is empty)
my_thread = ThreadUploadUrl( queue ) # allow each thread access to the whole dataset
my_thread.setDaemon( True ) # allow the thread to run in the background
my_thread.start()
start = time.time()
for url in urls:
queue.put( url ) # seed the queue with the tuples of url and data
queue.join() # blocks the program until the queue is empty
print "{} threads completed {} uploads in {} seconds".format( THREADS, JOBS, time.time() - start )
print "2 second sleep to allow the sgw to settle\n"
time.sleep( 2 )
# Begin Multiprocessing test
print 'cpus = %s, currently using %s' % ( ( multiprocessing.cpu_count() , PROCESSES) )
url_queue = multiprocessing.JoinableQueue() # multi process safe Queue
for i in xrange( PROCESSES ): # spin up each process (doing nothing as queue is empty)
p = ProcessUploadURL( url_queue ) # allow each process access to the whole dataset
p.start()
start = time.time()
for url in urls:
url_queue.put( url ) # seed the queue with the tuples of url and data
for i in xrange( PROCESSES ):
url_queue.put( None ) # poison pills to ensure all processes quit
url_queue.join() # blocks the program until the queue is empty (maybe unnecessary with the poison pills)
print "{} processes completed {} uploads in {} seconds".format( PROCESSES, JOBS, time.time() - start )
if __name__ == '__main__':
test_old_sgw()