john pfeiffer
  • Home
  • Categories
  • Tags
  • Archives

oxygen downloader

import logging
import Queue
from collections import deque
import threading

import sys
import os
from ssl import SSLError

#cacert_location = os.environ[ 'REQUESTS_CA_BUNDLE' ]    # required to avoid the pyinstaller with requests cacert bug
import requests

from oxygen_path_object import OxygenPathObject
from retry_decorator import retry

class Downloader( object ):
    agent = None
    files_remaining = None
    local_files = Queue.Queue()
    local_destination_path = None
    cacert_location = None

    def __init__( self , file_queue , local_destination_path , logging_level ):
        self.files_remaining = file_queue
        self.local_destination_path = local_destination_path


        logging_output_filename = 'oxygen_snapshot.log'
        logging_output = os.path.join( os.path.normpath( local_destination_path ) , logging_output_filename )
        logging.basicConfig( level = logging_level ,
                             format = '%(asctime)s %(levelname)s %(message)s',
                             filename = logging_output,
                             filemode='a' )

        if not self.files_remaining:
            error_message = 'ERROR: file_queue is not initialized'
            logging.error( error_message )
            raise SyntaxWarning( error_message )

        if not self.local_destination_path:
            error_message = 'ERROR: local_destination_path is not initialized'
            logging.error( error_message )
            raise SyntaxWarning( error_message )


    @staticmethod
    def create_local_folder_tree( folder_queue , local_destination_path ):
        while not folder_queue.empty():
            oxygen_folder_and_path = folder_queue.get()
            new_folder_full_local_path = OxygenPathObject.convert_to_local_file_path( local_destination_path , oxygen_folder_and_path.path , oxygen_folder_and_path.o2object.name )
            logging.debug( u'cloud path and name = {} , {} converts to {}'.format(
                OxygenPathObject.get_unicode_string( oxygen_folder_and_path.path) ,
                OxygenPathObject.get_unicode_string( oxygen_folder_and_path.o2object.name) ,
                OxygenPathObject.get_unicode_string( new_folder_full_local_path)  ) )
            try:
                os.makedirs( new_folder_full_local_path )         # this will create intermediate level directories too       # TODO: default permission of directory?
            except OSError as error:
                message = u'{} , directory {} already exists'.format( error , OxygenPathObject.get_unicode_string(  new_folder_full_local_path ) )
                logging.error( message )
                print message
                pass


    @staticmethod
    def download_binary( url , local_file_path , original_modified_timestamp_milliseconds ):
        """https connection gets a binary stream in chunks and sets the finished download's modified timestamp"""
        downloaded_bytes = 0
        response = requests.get( url , verify=Downloader.cacert_location , stream=True , timeout=30 ).raw  # ssl enabled, streaming, raw mode, 30 second timeout
#        response = requests.get( url , verify=True, stream=True , timeout=30 ).raw  # ssl enabled, streaming, raw mode, 30 second timeout
        CHUNK = 1024 * 2048      # 1KB * N = 2MB
        with open( local_file_path, 'wb' ) as local_file:
            raw_data_chunk = response.read( CHUNK )
            while raw_data_chunk:
                local_file.write( raw_data_chunk )
                raw_data_chunk = response.read( CHUNK )

        try:    # TODO: extract into a helper method?
            original_modified_timestamp_seconds = original_modified_timestamp_milliseconds / 1000   # unix epoch
            os.utime( local_file_path, ( original_modified_timestamp_seconds , original_modified_timestamp_seconds )  ) # works with python 2.7.3 win7
            downloaded_bytes = os.stat( local_file_path ).st_size
        except TypeError as error:
            logging.error( u'Updating timestamp failed: {}'.format( OxygenPathObject.get_unicode_string( error ) ) )
        return downloaded_bytes


    @staticmethod
    @retry( (SSLError, requests.RequestException , requests.exceptions.RequestException , requests.exceptions.SSLError , AttributeError , BufferError) , tries = 4 , backoff = 1 , logger = logging.getLogger( 'oxygensnapshot' ) )  #TODO: figure out how to reference an external argument
    def download_binary_retry_wrapper( current_download , new_file_full_local_path , original_modified_timestamp_milliseconds ):
        bytes_downloaded = Downloader.download_binary( current_download.downloadURL , new_file_full_local_path , original_modified_timestamp_milliseconds )
        if bytes_downloaded != current_download.sizeInByte:
            message = u'{} downloaded bytes ({}) does not match the remote size in bytes ({})'.format(
                OxygenPathObject.get_unicode_string( new_file_full_local_path ), str( bytes_downloaded ), str( current_download.sizeInByte ) )
            raise BufferError( message )
        else:
            return bytes_downloaded


    def download_all( self , retry_count_max , agent_factory, download_agent_count=1 ):
        """a queue of File objects are converted to a queue of Download objects
        FILE: ApiObject(canManage=False, sizeInByte=3094, name='getoxygenspaceinfo.txt', modifiedByUserName='John Pfeiffer',
            canWrite=False, repositoryNodeServiceId='repositorynode1', modifiedTimestamp=1361299824000L, createdTimestamp=1361299824000L,
            deleted=False, versionId=1, ownerOxygenId='', spaceId='uv-ff80808134f2fda80134f75ab6de0108', parentId='',
            createdByUserName='John Pfeiffer', type=0, id='c4b17469-c926-4b15-b579-f736fafcb513')

        ApiDownload(downloadURL='https://i-d0a4e2ae.o2cloud.net/storagegateway/download?
            c=5bbae56c-78a2-48fc-ab47-fb9caa0664a9&s=58ea90b5ed0215f40831589ad756372d241e7d1c&h=0ad3a2be906e05681d8ed24c2eb8c1d38ec4578c',
            sizeInByte=570, versionId='86bb79ca-cb61-40c5-ac02-17e130423e8f')
        """
        try:
            retry_count_max = int( retry_count_max )
        except ValueError:
            message = 'ERROR: Retry count max must be an integer, cannot continue'
            logging.error( message )
            print message
            sys.exit( 1 )

        for i in range( download_agent_count ):
            self.files_remaining.put( None )        # Poison Pill to terminate the threads cleanly

        try:
            for i in range( download_agent_count ):
                my_thread = ThreadUrl( agent_factory.create_valid_agent() , self.files_remaining , self.local_destination_path , self.local_files , retry_count_max )
                my_thread.daemon = True         # does not block the main program from exiting, must use a join() to block
                my_thread.start()                   # starts this thread

            self.files_remaining.join()     # wait on the que until task_done count is correct

        except KeyboardInterrupt:
            with self.files_remaining.mutex:
                self.files_remaining.clear()


    def current_downloaded_bytes( self ):
        total = 0
        for element in deque( self.local_files.queue ):
            total += element
        return total



class ThreadUrl( threading.Thread ):
    """ Each thread for generating a download link and getting the binary must have it's own agent (connection) """
    def __init__( self , agent , files_remaining , local_destination_path , local_files ,  retry_count_max ):   # fork-join design - each thread has the whole data set
        self.agent = agent
        self.files_remaining = files_remaining
        self.local_destination_path = local_destination_path
        self.local_files = local_files
        self.retry_count_max = retry_count_max
        threading.Thread.__init__( self )

    def run( self ):
        while True:
            try:
                if not self.files_remaining:        # a convenient way to kill the thread
                    break

                oxygen_file_and_path = self.files_remaining.get()       # iteration progress towards termination of the loop
                if not oxygen_file_and_path:                            # received poison pill so terminating the thread
                    self.files_remaining.task_done()
                    break

                new_file_full_local_path = OxygenPathObject.convert_to_local_file_path( self.local_destination_path , oxygen_file_and_path.path , oxygen_file_and_path.o2object.name )
                oxygen_file = oxygen_file_and_path.o2object
                current_download = self.agent.create_download_object( oxygen_file )
                logging.debug( u'{} from {}'.format( OxygenPathObject.get_unicode_string( new_file_full_local_path ) , OxygenPathObject.get_unicode_string( oxygen_file )  ) )

                bytes_downloaded = 0
                try:
                    bytes_downloaded = Downloader.download_binary_retry_wrapper( current_download , new_file_full_local_path , oxygen_file.modifiedTimestamp )
                except BufferError:
                    logging.error( u'SKIPPING FILE {} (and removing incomplete download), after {} retries: downloaded bytes does not match the remote size in bytes ({})'.format(
                        OxygenPathObject.get_unicode_string( new_file_full_local_path ) , str( self.retry_count_max ) , str( current_download.sizeInByte ) ) )
                    os.remove( new_file_full_local_path )
                except SSLError:
                    logging.error( u'SKIPPING FILE {} (and removing incomplete download), after {} retries: SSLError'.format(
                        OxygenPathObject.get_unicode_string( new_file_full_local_path ) , str( self.retry_count_max ) ) )
                    os.remove( new_file_full_local_path )

                logging.debug( u'{} downloaded successfully ({} bytes)'.format( OxygenPathObject.get_unicode_string( new_file_full_local_path ), str( bytes_downloaded ) ) )
                self.local_files.put( bytes_downloaded )

                self.files_remaining.task_done()

            except KeyboardInterrupt:
                message = 'received keyboard interrupt, terminating...'
                logging.warning( message )
                print message
                break

        return

  • « oxygen path object
  • oxygen agent factory »

Published

Jun 1, 2013

Category

python-oxygencloud-snapshot

~671 words

Tags

  • downloader 1
  • oxygen 14
  • python 180
  • snapshot 12