import sys
import time
import Queue
from collections import deque
import logging # logging acquires locks so it will affect multithreaded performance
import os
if getattr( sys , 'frozen' , None ): # keyword 'frozen' is for setting basedir while in onefile mode in pyinstaller
basedir = sys._MEIPASS
else:
basedir = os.path.dirname( __file__ )
basedir = os.path.normpath( basedir )
os.environ[ 'REQUESTS_CA_BUNDLE' ] = os.path.join( basedir , 'cacert.pem' ) # required for pyinstaller to not use some default location for lib/requests
cacert_location = os.environ[ 'REQUESTS_CA_BUNDLE' ]
from lib import o2lib
from lib.oxygen_agent_factory import AgentFactory
from lib.oxygen_path_object import OxygenPathObject
from lib.oxygen_scanner import OxygenScanner
from lib.oxygen_downloader import Downloader
APPLICATION_NAME = 'OxygenSnapshot-0.99'
class OxygenSnapshot:
unvisited = Queue.Queue() # dynamic typing and use of the ApiObject means be careful of your .put() and .get()
visited = Queue.Queue() # FIFO and BFS scanning means root levels are the oldest entries
file_queue = Queue.Queue() # files that have been found in the cloud for downloading
remote_file_size_queue = Queue.Queue() # enables displaying total bytes to be downloaded
remote_file_size_total_in_bytes = 0
local_file_size_total_in_bytes = 0
logging_level = None
retry_count_max = 3
def list_root( self , agent_factory ):
""" List all top level container objects
SPACE: ApiSpaceInfo(capacity=0, name='1', writableDefault=False, oid='ff8080813cd059c7013d046b6ae5000d',
ownerOid='ff8080813cf4e733013d046625f717d0', createdTimestamp=1361578781000L,
utilized=0.0001659393310546875, ownerOxygenId='loadtest', storageName='Default loadtest Cloud',
spaceDescription=None, repositoryNodeId='repositorynode1', listed=False)
VOLUME: ApiSpaceInfo(capacity=0, name='.uv-ff8080813cf4e733013d046625f717d0', writableDefault=False,
oid='ff8080813cd059c7013d046643230006', ownerOid='ff8080813cf4e733013d046625f717d0',
createdTimestamp=1361578443000L, utilized=0.00017547607421875, ownerOxygenId='loadtest',
storageName='Default loadtest Cloud', spaceDescription='', repositoryNodeId='repositorynode1', listed=False)
"""
agent_list_root = agent_factory.create_valid_agent()
if not agent_list_root:
logging.error( 'Cannot list root , an Oxygen Agent must be initialized first' )
sys.exit( 1 )
self.space_list = OxygenPathObject.get_account_space_info_listing( agent_list_root ) # retrieves all batches!
message = u'{} spaces found'.format( len( self.space_list ) )
print message
logging.info( message )
counter = 0
for space in self.space_list:
converted_space = OxygenPathObject.convert_space_info_to_api_object( space )
self.unvisited.put( OxygenPathObject( converted_space.id , converted_space , 'Spaces', 0 ) ) # no modified timestamp available
sys.stdout.write( '.' )
counter += 1
if counter % 100 == 0:
print '\n'
self.log_status()
self.volume_list = OxygenPathObject.get_account_volume_info_listing( agent_list_root ) # retrieves all batches!
message = u'{} user drives found'.format( len( self.volume_list ) )
print message
logging.info( message )
counter = 0
for volume in self.volume_list:
converted_volume = OxygenPathObject.convert_space_info_to_api_object( volume )
converted_volume.name = converted_volume.ownerOxygenId
self.unvisited.put( OxygenPathObject( converted_volume.id , converted_volume , 'UserDrives', 0 ) ) # path is empty, no modified timestamp available
sys.stdout.write( '.' )
counter += 1
if counter % 100 == 0:
print '\n'
self.log_status()
def log_status( self ):
self.__display_queue( self.visited , u'VISITED ({})'.format( self.visited.qsize() ) )
self.__display_queue( self.unvisited , u'UNVISITED ({})'.format( self.unvisited.qsize() ) )
self.__display_queue( self.file_queue , u'FILES ({})'.format( self.file_queue.qsize() ) )
logging.info( u'{} bytes in the cloud'.format( self.remote_file_size_total_in_bytes ) )
logging.info( u'{} bytes locally'.format( self.local_file_size_total_in_bytes ) )
def __display_queue( self , q , title ):
logging.info( title )
if not q.empty():
for element in deque( q.queue ):
logging.debug( u'{}/{}'.format( OxygenPathObject.get_unicode_string( element.path ) , OxygenPathObject.get_unicode_string( element.o2object.name ) ) )
def download_all( self , local_destination_path , agent_factory, download_agent_count ):
Downloader.cacert_location = os.environ[ 'REQUESTS_CA_BUNDLE' ]
downloader = Downloader( self.file_queue , local_destination_path , self.logging_level )
downloader.download_all( self.retry_count_max , agent_factory )
self.local_file_size_total_in_bytes = downloader.current_downloaded_bytes()
def start_threaded_scan( self, agent_factory , scanner_agent_count, retry_count_max=3 ):
for count in xrange( scanner_agent_count ):
my_thread = OxygenScanner( agent_factory , self.unvisited, self.file_queue, self.visited, self.remote_file_size_queue, retry_count_max )
my_thread.daemon = True # does not block the main program from exiting, must use a join() to block
my_thread.start() # starts this thread
self.unvisited.join() # wait on the queue until task_done count is correct
if not self.remote_file_size_queue.empty():
for element in deque( self.remote_file_size_queue.queue ):
self.remote_file_size_total_in_bytes += int( element )
return self.remote_file_size_total_in_bytes
# MAIN ################################################################
def create_agent_factory( api_url , api_key , user , password ):
agent_factory = AgentFactory( 'oxygen_snapshot.py', api_url , api_key , user , password )
return agent_factory
def main():
start_time = time.time() # performance benchmarking
download_agent_count = 30
scanner_agent_count = 20
CORRECTUSAGE = 'oxygensnapshot.exe https://API.URL apikey user password local_destination_path'
if len( sys.argv ) < 6:
print 'ERROR: incorrect number of arguments (%s), correct usage: %s' % ( (len( sys.argv ) -1 ) , CORRECTUSAGE )
sys.exit( 1 )
api_url = sys.argv[1]
api_key = sys.argv[2]
user = sys.argv[3]
password = sys.argv[4]
local_destination_path = sys.argv[5]
logging_level = logging.INFO
if len( sys.argv ) == 7 and str.upper( sys.argv[6] ) == 'DEBUG':
logging_level = logging.DEBUG
logging_level = logging_level
logging_output_filename = 'oxygen_snapshot.log'
logging_output = os.path.join( os.path.normpath( local_destination_path ) , logging_output_filename )
logging.basicConfig(
name = 'oxygensnapshot' ,
level = logging_level ,
format = '%(asctime)s %(levelname)s %(message)s',
filename = logging_output,
filemode = 'w' )
print APPLICATION_NAME
logging.info( APPLICATION_NAME )
if local_destination_path == '' or local_destination_path[0:1] == '..' or not os.path.exists( local_destination_path ) or not os.path.isdir( local_destination_path ):
message = u'{} must be a valid target directory, please make sure the path exists and is valid. Then try running the application again.'.format( local_destination_path )
logging.error( message )
print message
sys.exit( 1 )
logging.info( u'local_destination_path: {}'.format( local_destination_path ) )
try:
snapshot = OxygenSnapshot()
agent_factory = create_agent_factory( api_url , api_key , user , password )
snapshot.list_root( agent_factory )
message = '\nroot level scan completed, current application run time: %.3f seconds' % ( time.time() - start_time )
snapshot.log_status()
print message
total_bytes_in_cloud = snapshot.start_threaded_scan( agent_factory , scanner_agent_count=scanner_agent_count )
message = '\nfull scan completed (%s bytes), current application run time: %.3f seconds' % (total_bytes_in_cloud , ( time.time() - start_time ) )
snapshot.log_status()
print message
message = 'creating local folder tree ...'
logging.info( message )
print message
Downloader.create_local_folder_tree( snapshot.visited , local_destination_path )
message = '\nlocal file tree creation complete, current application run time: %.3f seconds' % ( time.time() - start_time )
snapshot.log_status()
print message
message = 'downloading {} files from the cloud...'.format( snapshot.file_queue.qsize() )
logging.info( message )
print message
snapshot.download_all( local_destination_path , agent_factory, download_agent_count)
message = '\ndownloading completed (%s bytes), current application run time: %.3f seconds' % ( snapshot.local_file_size_total_in_bytes , ( time.time() - start_time ) )
snapshot.log_status()
logging.info( message )
print message
snapshot.file_queue = None # kill switch for all threads
message = 'application completed'
logging.info( message )
print message
except o2lib.ApiInvalidInputException as error:
error_detail = o2lib.ApiInvalidInputErrorCode._VALUES_TO_NAMES[ error.errorCode ]
message = u'{} REASON: {}'.format( error , error_detail )
logging.error( message )
print message
except o2lib.ApiRuleException as error:
error_detail = o2lib.ApiRuleErrorCode._VALUES_TO_NAMES[ error.errorCode ]
message = u'{} REASON: {}'.format( error , error_detail )
logging.error( message )
print message
except o2lib.ApiSystemsException , error :
error_detail = o2lib.ApiSystemsErrorCode._VALUES_TO_NAMES[ error.errorCode ]
message = u'{} REASON: {}'.format( error , error_detail )
logging.error( message )
print message
except o2lib.ApiSessionException , error :
error_detail = o2lib.ApiSessionErrorCode._VALUES_TO_NAMES[ error.errorCode ]
message = u'{} REASON: {}'.format( error , error_detail )
logging.error( message )
print message
sys.exit( 1 )
except IOError as error:
message = u'REASON: {}'.format( error )
logging.error( message )
print message
except o2lib.ApiUnexpectedException , error :
message = u'REASON: {}'.format( error )
logging.error( message )
print message
sys.exit( 1 )
except KeyboardInterrupt as error:
message = u'EXIT RECEIVED: {}'.format( error )
logging.error( message )
print message
sys.exit( 1 )
if __name__ == "__main__":
main()