john pfeiffer
  • Home
  • Categories
  • Tags
  • Archives

oxygen scanner

import logging
import threading
from ssl import SSLError

from lib import o2lib
from lib.oxygen_path_object import OxygenPathObject

from lib.retry_decorator import retry
from lib.oxygen_skip_decorator import oxygen_skip_decorator




class OxygenScanner( threading.Thread ):
    """ Take an item from the UNVISITED Queue, get enumeration, and add to the UNVISITED and FILE Queues, then mark as VISITED
    """
    def __init__( self , agent_factory , unvisited, file_queue, visited, cloud_size_queue, retry_count_max ):
        self.agent = None
        self.agent_factory = agent_factory
        self.unvisited = unvisited
        self.file_queue = file_queue
        self.visited = visited
        self.retry_count_max = retry_count_max
        self.cloud_size_queue = cloud_size_queue

        threading.Thread.__init__( self )

    def run( self ):
        """ generate valid agents as needed, BFS Traversal: get a container from the unvisited queue, list the container contents
            put Files in the file_queue, folders or Spaces in the unvisited queue, and finally put current in Visited
        """
        while True:

            try:
                if not self.agent:  # if after parsing we've timed out regenerate the agent
                    self.agent = self.agent_factory.create_valid_agent()
                    logging.info( "Generating valid agent" )
                else:
                    container = self.unvisited.get()        # START iteration progress towards termination of the loop

                    if not container:                             # received poison pill so terminating the thread
                        logging.info( "Received QUIT job, exiting thread" )
                        self.unvisited.task_done()
                        break

                    listing = self.__get_listing( container.o2object )

                    current_level = container.path + '/' + container.o2object.name
                    logging.debug( u"BEGIN Parsing Listing for {}".format( OxygenPathObject.get_unicode_string( current_level ) ) )

                    for item in listing:
                        if not item.deleted:                      # do not backup or download files or folders flagged as deleted

                            if item.type == o2lib.ApiObjectType.FILE:
                                file_unique_id = '-'.join( [item.spaceId , item.id] )       # file id's are only guaranteed to be unique in a Space
                                self.file_queue.put( OxygenPathObject( file_unique_id , item , current_level , item.modifiedTimestamp ) )
                                self.cloud_size_queue.put( item.sizeInByte )

                            elif item.type == o2lib.ApiObjectType.FOLDER or item.type == o2lib.ApiObjectType.SPACE :
                                self.unvisited.put( OxygenPathObject( item.id , item , current_level , 0 ) ) # no modified timestamp available

                    logging.debug( u"DONE Parsing Listing size {} for {}".format( len( listing ) , OxygenPathObject.get_unicode_string( current_level ) ) )
                    self.visited.put( container )
                    self.unvisited.task_done()     # COMPLETED one iteration towards termination of the loop


            except KeyboardInterrupt:
                message = 'received keyboard interrupt, terminating...'
                logging.warning( message )
                print message
                break

        return


    @retry( ( SSLError, EOFError, AttributeError)  , tries = 3 , logger = logging.getLogger( 'oxygensnapshot' ) )
    @oxygen_skip_decorator( logger = logging.getLogger( 'oxygensnapshot' ) )
    def __get_listing( self, o2object ):

        try:
            listing = self.agent.getChildObjects( o2object )
            return listing

        except o2lib.ApiSessionException as error:      # retry once if session is expired
            if error.errorCode == 1:
                message = u'WARN: SessionExpired while enumerating {}, renewing session and retrying...'.format( o2object.name )
                logging.warn( message )
                print message
                self.agent = self.agent_factory.create_valid_agent()
                listing = self.agent.getChildObjects( o2object )

        except AttributeError:
            message = u'ERROR: AttributeError, unable to get enumeration for container {} , SKIPPING and continuing'.format( o2object.name )
            logging.error( message )
            print message
            return

  • « oxygen snapshot
  • oxygen path object »

Published

Jun 1, 2013

Category

python-oxygencloud-snapshot

~346 words

Tags

  • oxygen 14
  • python 180
  • scanner 3
  • snapshot 12