john pfeiffer
  • Home
  • Categories
  • Tags
  • Archives

oxygen snapshot

import sys
import time
import Queue
from collections import deque

import logging      # logging acquires locks so it will affect multithreaded performance
import os

if getattr( sys , 'frozen' , None ):    # keyword 'frozen' is for setting basedir while in onefile mode in pyinstaller
    basedir = sys._MEIPASS
else:
    basedir = os.path.dirname( __file__ )
    basedir = os.path.normpath( basedir )
os.environ[ 'REQUESTS_CA_BUNDLE' ] = os.path.join( basedir , 'cacert.pem' ) # required for pyinstaller to not use some default location for lib/requests
cacert_location = os.environ[ 'REQUESTS_CA_BUNDLE' ]

from lib import o2lib
from lib.oxygen_agent_factory import AgentFactory
from lib.oxygen_path_object import OxygenPathObject
from lib.oxygen_scanner import OxygenScanner
from lib.oxygen_downloader import Downloader


APPLICATION_NAME = 'OxygenSnapshot-0.99'


class OxygenSnapshot:
    unvisited = Queue.Queue()       # dynamic typing and use of the ApiObject means be careful of your .put() and .get()
    visited = Queue.Queue()         # FIFO and BFS scanning means root levels are the oldest entries
    file_queue = Queue.Queue()      # files that have been found in the cloud for downloading
    remote_file_size_queue = Queue.Queue()  # enables displaying total bytes to be downloaded
    remote_file_size_total_in_bytes = 0
    local_file_size_total_in_bytes = 0
    logging_level = None
    retry_count_max = 3


    def list_root( self , agent_factory ):
        """ List all top level container objects
            SPACE: ApiSpaceInfo(capacity=0, name='1', writableDefault=False, oid='ff8080813cd059c7013d046b6ae5000d',
                ownerOid='ff8080813cf4e733013d046625f717d0', createdTimestamp=1361578781000L,
                utilized=0.0001659393310546875, ownerOxygenId='loadtest', storageName='Default loadtest Cloud',
                spaceDescription=None, repositoryNodeId='repositorynode1', listed=False)
            VOLUME: ApiSpaceInfo(capacity=0, name='.uv-ff8080813cf4e733013d046625f717d0', writableDefault=False,
                oid='ff8080813cd059c7013d046643230006', ownerOid='ff8080813cf4e733013d046625f717d0',
                createdTimestamp=1361578443000L, utilized=0.00017547607421875, ownerOxygenId='loadtest',
                storageName='Default loadtest Cloud', spaceDescription='', repositoryNodeId='repositorynode1', listed=False)
        """

        agent_list_root = agent_factory.create_valid_agent()
        if not agent_list_root:
            logging.error( 'Cannot list root , an Oxygen Agent must be initialized first' )
            sys.exit( 1 )

        self.space_list = OxygenPathObject.get_account_space_info_listing( agent_list_root ) # retrieves all batches!
        message = u'{} spaces found'.format( len( self.space_list ) )
        print message
        logging.info( message )
        counter = 0
        for space in self.space_list:
            converted_space = OxygenPathObject.convert_space_info_to_api_object( space )
            self.unvisited.put( OxygenPathObject( converted_space.id , converted_space , 'Spaces', 0 ) )  #  no modified timestamp available
            sys.stdout.write( '.' )
            counter += 1
            if counter % 100 == 0:
                print '\n'
                self.log_status()

        self.volume_list = OxygenPathObject.get_account_volume_info_listing( agent_list_root ) # retrieves all batches!
        message = u'{} user drives found'.format( len( self.volume_list ) )
        print message
        logging.info( message )
        counter = 0
        for volume in self.volume_list:
            converted_volume = OxygenPathObject.convert_space_info_to_api_object( volume )
            converted_volume.name = converted_volume.ownerOxygenId
            self.unvisited.put( OxygenPathObject( converted_volume.id ,  converted_volume , 'UserDrives', 0 ) ) # path is empty, no modified timestamp available
            sys.stdout.write( '.' )
            counter += 1
            if counter % 100 == 0:
                print '\n'
                self.log_status()


    def log_status( self ):
        self.__display_queue( self.visited , u'VISITED ({})'.format( self.visited.qsize() ) )
        self.__display_queue( self.unvisited , u'UNVISITED ({})'.format( self.unvisited.qsize() ) )
        self.__display_queue( self.file_queue , u'FILES ({})'.format( self.file_queue.qsize() ) )
        logging.info( u'{} bytes in the cloud'.format( self.remote_file_size_total_in_bytes ) )
        logging.info( u'{} bytes locally'.format( self.local_file_size_total_in_bytes ) )


    def __display_queue( self , q , title ):
        logging.info( title )
        if not q.empty():
            for element in deque( q.queue ):
                logging.debug( u'{}/{}'.format( OxygenPathObject.get_unicode_string( element.path ) , OxygenPathObject.get_unicode_string( element.o2object.name ) ) )


    def download_all( self , local_destination_path , agent_factory, download_agent_count ):
        Downloader.cacert_location = os.environ[ 'REQUESTS_CA_BUNDLE' ]
        downloader = Downloader( self.file_queue , local_destination_path , self.logging_level )
        downloader.download_all( self.retry_count_max , agent_factory )
        self.local_file_size_total_in_bytes = downloader.current_downloaded_bytes()


    def start_threaded_scan( self, agent_factory , scanner_agent_count, retry_count_max=3 ):
        for count in xrange( scanner_agent_count ):
            my_thread = OxygenScanner( agent_factory , self.unvisited, self.file_queue, self.visited, self.remote_file_size_queue, retry_count_max )
            my_thread.daemon = True         # does not block the main program from exiting, must use a join() to block
            my_thread.start()                   # starts this thread

        self.unvisited.join()     # wait on the queue until task_done count is correct

        if not self.remote_file_size_queue.empty():
            for element in deque( self.remote_file_size_queue.queue ):
                self.remote_file_size_total_in_bytes += int( element )

        return self.remote_file_size_total_in_bytes



# MAIN ################################################################

def create_agent_factory( api_url , api_key , user , password ):
    agent_factory = AgentFactory( 'oxygen_snapshot.py', api_url , api_key , user , password )
    return agent_factory


def main():
    start_time = time.time()  # performance benchmarking
    download_agent_count = 30
    scanner_agent_count = 20

    CORRECTUSAGE = 'oxygensnapshot.exe https://API.URL apikey user password local_destination_path'
    if len( sys.argv ) < 6:
        print 'ERROR: incorrect number of arguments (%s), correct usage: %s' % ( (len( sys.argv ) -1 ) , CORRECTUSAGE )
        sys.exit( 1 )

    api_url = sys.argv[1]
    api_key = sys.argv[2]
    user = sys.argv[3]
    password = sys.argv[4]
    local_destination_path = sys.argv[5]

    logging_level = logging.INFO
    if len( sys.argv ) == 7 and str.upper( sys.argv[6] ) == 'DEBUG':
        logging_level = logging.DEBUG

    logging_level = logging_level
    logging_output_filename = 'oxygen_snapshot.log'
    logging_output = os.path.join( os.path.normpath( local_destination_path ) , logging_output_filename )
    logging.basicConfig(
        name = 'oxygensnapshot' ,
        level = logging_level ,
        format = '%(asctime)s %(levelname)s %(message)s',
        filename = logging_output,
        filemode = 'w' )


    print APPLICATION_NAME
    logging.info( APPLICATION_NAME )

    if local_destination_path == '' or local_destination_path[0:1] == '..' or not os.path.exists( local_destination_path ) or not os.path.isdir( local_destination_path ):
        message = u'{} must be a valid target directory, please make sure the path exists and is valid.  Then try running the application again.'.format( local_destination_path )
        logging.error( message )
        print message
        sys.exit( 1 )

    logging.info( u'local_destination_path: {}'.format( local_destination_path ) )

    try:
        snapshot = OxygenSnapshot()
        agent_factory = create_agent_factory( api_url , api_key , user , password )
        snapshot.list_root( agent_factory )
        message = '\nroot level scan completed, current application run time: %.3f seconds' % ( time.time() - start_time )
        snapshot.log_status()
        print message


        total_bytes_in_cloud = snapshot.start_threaded_scan( agent_factory , scanner_agent_count=scanner_agent_count )
        message = '\nfull scan completed (%s bytes), current application run time: %.3f seconds' % (total_bytes_in_cloud , ( time.time() - start_time ) )
        snapshot.log_status()
        print message

        message = 'creating local folder tree ...'
        logging.info( message )
        print message
        Downloader.create_local_folder_tree( snapshot.visited , local_destination_path )
        message = '\nlocal file tree creation complete, current application run time: %.3f seconds' % ( time.time() - start_time )
        snapshot.log_status()
        print message

        message = 'downloading {} files from the cloud...'.format( snapshot.file_queue.qsize() )
        logging.info( message )
        print message

        snapshot.download_all( local_destination_path , agent_factory, download_agent_count)
        message = '\ndownloading completed (%s bytes), current application run time: %.3f seconds' % ( snapshot.local_file_size_total_in_bytes , ( time.time() - start_time ) )
        snapshot.log_status()
        logging.info( message )
        print message

        snapshot.file_queue = None      # kill switch for all threads
        message = 'application completed'
        logging.info( message )
        print message




    except o2lib.ApiInvalidInputException as error:
        error_detail = o2lib.ApiInvalidInputErrorCode._VALUES_TO_NAMES[ error.errorCode ]
        message = u'{} REASON: {}'.format( error , error_detail )
        logging.error( message )
        print message

    except o2lib.ApiRuleException as error:
        error_detail = o2lib.ApiRuleErrorCode._VALUES_TO_NAMES[ error.errorCode ]
        message = u'{} REASON: {}'.format( error , error_detail )
        logging.error( message )
        print message

    except o2lib.ApiSystemsException , error :
        error_detail = o2lib.ApiSystemsErrorCode._VALUES_TO_NAMES[ error.errorCode ]
        message = u'{} REASON: {}'.format( error , error_detail )
        logging.error( message )
        print message

    except o2lib.ApiSessionException , error :
        error_detail = o2lib.ApiSessionErrorCode._VALUES_TO_NAMES[ error.errorCode ]
        message = u'{} REASON: {}'.format( error , error_detail )
        logging.error( message )
        print message
        sys.exit( 1 )

    except IOError as error:
        message = u'REASON: {}'.format( error )
        logging.error( message )
        print message

    except o2lib.ApiUnexpectedException , error :
        message = u'REASON: {}'.format( error )
        logging.error( message )
        print message
        sys.exit( 1 )

    except KeyboardInterrupt as error:
        message = u'EXIT RECEIVED: {}'.format( error )
        logging.error( message )
        print message
        sys.exit( 1 )



if __name__ == "__main__":
    main()

  • « google app engine fixtures testbed unit testing
  • oxygen scanner »

Published

Jun 1, 2013

Category

python-oxygencloud-snapshot

~823 words

Tags

  • oxygen 14
  • python 180
  • snapshot 12