john pfeiffer
  • Home
  • Categories
  • Tags
  • Archives

concurrency threading get url parse html output queue

import requests
import time
import threading

def get_page(thread_id, url):
    response = requests.get(url)
    # print response.status_code  # DEBUG


start_time = time.time()
threads = list()
url = 'http://kittyandbear.net'
for i in xrange(0, 5):
    threads.append(threading.Thread(target=get_page, args=(i, url)))
[x.start() for x in threads]
[x.join() for x in threads]
print 'create user threads finished: elapsed time: %s' % (time.time() - start_time)


- - -
import requests
import Queue
import threading

class HTTPRequestThread(threading.Thread):

    def __init__(self, jobs):
        threading.Thread.__init__(self)
        self.jobs = jobs

    def run(self):
        while True:
            request_dictionary = self.jobs.get()
            self.send_request(request_dictionary)
            self.jobs.task_done()

    @staticmethod
    def send_request(request_dictionary):
        url = request_dictionary.get('url')
        verify_ssl = request_dictionary.get('verifyssl', False)
        method = request_dictionary.get('method', 'GET')
        parameters = request_dictionary.get('parameters', None)
        headers = request_dictionary.get('headers', None)
        data = request_dictionary.get('data', None)
        if method == 'GET':
            r = requests.get(url=url, headers=headers, parameters=parameters, verify=verify_ssl)
        elif method == 'POST':
            r = requests.post(url=url, headers=headers, data=data, verify=verify_ssl)
        # print r.status_code, r.content  # DEBUG


if __name__ == '__main__':

        jobs = Queue.Queue()
        for i in xrange(3):
            data = {'from': 'intTest', 'message': '{} message'.format(i), 'auth_token': self.token}
            request_dictionary = {'method': 'POST', 'url': 'http://kittyandbear.net/message, 'data': data}
            jobs.put(request_dictionary)
            # r = requests.post(send_message_url, data=parameters, verify=False)

        for x in xrange(THREADCOUNT):
            t = HTTPRequestThread(jobs)
            t.setDaemon(True)
            t.start()
        jobs.join()
        print('done')

- - -

# 2013-02-22 johnpfeiffer
# python get_urls.py 1024 threaded kittyandbear.net yahoo.com apple.com ibm.com slashdot.org = 1 seconds
# python get_urls.py 1024 nonthreaded kittyandbear.net yahoo.com apple.com ibm.com  slashdot.org = 2 seconds
# TODO: use a thread pool

import urllib2
import time
import sys

import Queue        # synchronized data structure
import threading

from HTMLParser import HTMLParser       # TODO: http://www.crummy.com/software/BeautifulSoup


# import urllib
# from urlparse import parse_qs
# def parse_url_string( url_string ):
#    """Turn URL string into parameters dictionary, e.g. parse_url_string( 'http://example.org/resource?a=1&b=2 )"""
#    parsed = urlparse.urlparse( url_string )
#    parameters = parse_qs( parsed.query.encode( 'utf-8' ), keep_blank_values=True )
#    for key, value in parameters.iteritems():
#        parameters[ key ] = urllib.unquote( value[0] )      # assumes only a single value per key
#    return parameters



class MyHTMLParser( HTMLParser ):
    def handle_starttag( self, tag, attrs ):
        print "Encountered a start tag:", tag
#    def handle_endtag( self, tag ):
#        print "Encountered an end tag :", tag
    def handle_data( self, data ):
        print "Encountered some data  :", data


class ThreadUrl( threading.Thread ):        # extend the thread class
    def __init__( self , queue , bytes_to_download , out_queue):   # fork-join design - each thread has the whole data set
        self.queue = queue
        self.bytes_to_download = bytes_to_download
        self.out_queue = out_queue
        threading.Thread.__init__( self )

    def run( self ):
        while True:
            host = self.queue.get()
            print '\ndownloading %s' % host
            urlsock = urllib2.urlopen( host )
            chunk = urlsock.read( self.bytes_to_download )
            urlsock.close()
            self.out_queue.put( chunk )
            self.queue.task_done()


class DatamineThread( threading.Thread ):

    def __init__(self, out_queue):
        threading.Thread.__init__( self )
        self.out_queue = out_queue

    def run(self):
        while True:
            chunk = self.out_queue.get()
            parser = MyHTMLParser()
            parser.feed( chunk )
            self.out_queue.task_done()





def threaded( hosts , bytes_to_download ):
    start = time.time()
    queue = Queue.Queue()
    out_queue = Queue.Queue()

    for i in range( 5 ):            # create 5 threads (so for max performance there should be 5 hosts)
        my_thread = ThreadUrl( queue , bytes_to_download , out_queue )
        my_thread.setDaemon( True )         # does not block the main program from exiting, must use a join() to block
        my_thread.start()                   # starts this thread

    for host in hosts:
        queue.put( host )

    for i in range(5):
        dt = DatamineThread( out_queue )
        dt.setDaemon(True)
        dt.start()


    queue.join()
    out_queue.join()        #wait on the queue until all threads mark task done, can be join( 1 ) for max wait 1 second
    print "Elapsed Time: %s" % (time.time() - start)



def non_threaded( hosts , bytes_to_download ):
    start_time = time.time()
    for host in hosts:
        print '\ndownloading %s' % host
        urlsock = urllib2.urlopen( host )
        chunk = urlsock.read( bytes_to_download )
        urlsock.close()
        parser = MyHTMLParser()
        parser.feed( chunk )

    print "Elapsed Time: %s" % ( time.time() - start_time )



def convert_dns_name_to_url( hosts ):
    urls = list()
    for host in hosts:
        urls.append( ''.join( [ 'http://' , host ] )  )
    return urls


# MAIN ################################################################

def main():
    CORRECTUSAGE = 'python get_urls.py 1024 [nonthreaded OR threaded] yahoo.com slashdot.org apple.com kittyandbear.net more-urls.example.com'
    if len( sys.argv ) < 4:
        print 'ERROR: incorrect number of arguments, correct usage: %s' % CORRECTUSAGE
        sys.exit( 1 )

    sys.argv.reverse()      # pop removes the last element so we need the reverse order
    app_name = sys.argv.pop()
    bytes_to_download = sys.argv.pop()
    try:
        bytes_to_download = int( bytes_to_download )
    except ValueError as e:
        print 'ERROR: bytes_to_download (%s) must be an integer' % bytes_to_download
        sys.exit( 1 )

    threading_option = sys.argv.pop()
    urls = convert_dns_name_to_url( sys.argv )

    if threading_option == 'threaded':
        threaded( urls , bytes_to_download )
    else:
        non_threaded( urls , bytes_to_download )


if __name__ == "__main__":
    main()

  • « requests beautifulsoup html parse scrape link get post library byte to hex
  • for loop modulo reversed countdown xrange dictionary Counter »

Published

Jan 25, 2015

Category

python

~571 words

Tags

  • concurrency 10
  • get 22
  • html 23
  • output 3
  • parse 5
  • python 180
  • queue 3
  • threading 3
  • url 14