import logging
import threading
from ssl import SSLError
from lib import o2lib
from lib.oxygen_path_object import OxygenPathObject
from lib.retry_decorator import retry
from lib.oxygen_skip_decorator import oxygen_skip_decorator
class OxygenScanner( threading.Thread ):
""" Take an item from the UNVISITED Queue, get enumeration, and add to the UNVISITED and FILE Queues, then mark as VISITED
"""
def __init__( self , agent_factory , unvisited, file_queue, visited, cloud_size_queue, retry_count_max ):
self.agent = None
self.agent_factory = agent_factory
self.unvisited = unvisited
self.file_queue = file_queue
self.visited = visited
self.retry_count_max = retry_count_max
self.cloud_size_queue = cloud_size_queue
threading.Thread.__init__( self )
def run( self ):
""" generate valid agents as needed, BFS Traversal: get a container from the unvisited queue, list the container contents
put Files in the file_queue, folders or Spaces in the unvisited queue, and finally put current in Visited
"""
while True:
try:
if not self.agent: # if after parsing we've timed out regenerate the agent
self.agent = self.agent_factory.create_valid_agent()
logging.info( "Generating valid agent" )
else:
container = self.unvisited.get() # START iteration progress towards termination of the loop
if not container: # received poison pill so terminating the thread
logging.info( "Received QUIT job, exiting thread" )
self.unvisited.task_done()
break
listing = self.__get_listing( container.o2object )
current_level = container.path + '/' + container.o2object.name
logging.debug( u"BEGIN Parsing Listing for {}".format( OxygenPathObject.get_unicode_string( current_level ) ) )
for item in listing:
if not item.deleted: # do not backup or download files or folders flagged as deleted
if item.type == o2lib.ApiObjectType.FILE:
file_unique_id = '-'.join( [item.spaceId , item.id] ) # file id's are only guaranteed to be unique in a Space
self.file_queue.put( OxygenPathObject( file_unique_id , item , current_level , item.modifiedTimestamp ) )
self.cloud_size_queue.put( item.sizeInByte )
elif item.type == o2lib.ApiObjectType.FOLDER or item.type == o2lib.ApiObjectType.SPACE :
self.unvisited.put( OxygenPathObject( item.id , item , current_level , 0 ) ) # no modified timestamp available
logging.debug( u"DONE Parsing Listing size {} for {}".format( len( listing ) , OxygenPathObject.get_unicode_string( current_level ) ) )
self.visited.put( container )
self.unvisited.task_done() # COMPLETED one iteration towards termination of the loop
except KeyboardInterrupt:
message = 'received keyboard interrupt, terminating...'
logging.warning( message )
print message
break
return
@retry( ( SSLError, EOFError, AttributeError) , tries = 3 , logger = logging.getLogger( 'oxygensnapshot' ) )
@oxygen_skip_decorator( logger = logging.getLogger( 'oxygensnapshot' ) )
def __get_listing( self, o2object ):
try:
listing = self.agent.getChildObjects( o2object )
return listing
except o2lib.ApiSessionException as error: # retry once if session is expired
if error.errorCode == 1:
message = u'WARN: SessionExpired while enumerating {}, renewing session and retrying...'.format( o2object.name )
logging.warn( message )
print message
self.agent = self.agent_factory.create_valid_agent()
listing = self.agent.getChildObjects( o2object )
except AttributeError:
message = u'ERROR: AttributeError, unable to get enumeration for container {} , SKIPPING and continuing'.format( o2object.name )
logging.error( message )
print message
return