import logging
import Queue
from collections import deque
import threading
import sys
import os
from ssl import SSLError
#cacert_location = os.environ[ 'REQUESTS_CA_BUNDLE' ] # required to avoid the pyinstaller with requests cacert bug
import requests
from oxygen_path_object import OxygenPathObject
from retry_decorator import retry
class Downloader( object ):
agent = None
files_remaining = None
local_files = Queue.Queue()
local_destination_path = None
cacert_location = None
def __init__( self , file_queue , local_destination_path , logging_level ):
self.files_remaining = file_queue
self.local_destination_path = local_destination_path
logging_output_filename = 'oxygen_snapshot.log'
logging_output = os.path.join( os.path.normpath( local_destination_path ) , logging_output_filename )
logging.basicConfig( level = logging_level ,
format = '%(asctime)s %(levelname)s %(message)s',
filename = logging_output,
filemode='a' )
if not self.files_remaining:
error_message = 'ERROR: file_queue is not initialized'
logging.error( error_message )
raise SyntaxWarning( error_message )
if not self.local_destination_path:
error_message = 'ERROR: local_destination_path is not initialized'
logging.error( error_message )
raise SyntaxWarning( error_message )
@staticmethod
def create_local_folder_tree( folder_queue , local_destination_path ):
while not folder_queue.empty():
oxygen_folder_and_path = folder_queue.get()
new_folder_full_local_path = OxygenPathObject.convert_to_local_file_path( local_destination_path , oxygen_folder_and_path.path , oxygen_folder_and_path.o2object.name )
logging.debug( u'cloud path and name = {} , {} converts to {}'.format(
OxygenPathObject.get_unicode_string( oxygen_folder_and_path.path) ,
OxygenPathObject.get_unicode_string( oxygen_folder_and_path.o2object.name) ,
OxygenPathObject.get_unicode_string( new_folder_full_local_path) ) )
try:
os.makedirs( new_folder_full_local_path ) # this will create intermediate level directories too # TODO: default permission of directory?
except OSError as error:
message = u'{} , directory {} already exists'.format( error , OxygenPathObject.get_unicode_string( new_folder_full_local_path ) )
logging.error( message )
print message
pass
@staticmethod
def download_binary( url , local_file_path , original_modified_timestamp_milliseconds ):
"""https connection gets a binary stream in chunks and sets the finished download's modified timestamp"""
downloaded_bytes = 0
response = requests.get( url , verify=Downloader.cacert_location , stream=True , timeout=30 ).raw # ssl enabled, streaming, raw mode, 30 second timeout
# response = requests.get( url , verify=True, stream=True , timeout=30 ).raw # ssl enabled, streaming, raw mode, 30 second timeout
CHUNK = 1024 * 2048 # 1KB * N = 2MB
with open( local_file_path, 'wb' ) as local_file:
raw_data_chunk = response.read( CHUNK )
while raw_data_chunk:
local_file.write( raw_data_chunk )
raw_data_chunk = response.read( CHUNK )
try: # TODO: extract into a helper method?
original_modified_timestamp_seconds = original_modified_timestamp_milliseconds / 1000 # unix epoch
os.utime( local_file_path, ( original_modified_timestamp_seconds , original_modified_timestamp_seconds ) ) # works with python 2.7.3 win7
downloaded_bytes = os.stat( local_file_path ).st_size
except TypeError as error:
logging.error( u'Updating timestamp failed: {}'.format( OxygenPathObject.get_unicode_string( error ) ) )
return downloaded_bytes
@staticmethod
@retry( (SSLError, requests.RequestException , requests.exceptions.RequestException , requests.exceptions.SSLError , AttributeError , BufferError) , tries = 4 , backoff = 1 , logger = logging.getLogger( 'oxygensnapshot' ) ) #TODO: figure out how to reference an external argument
def download_binary_retry_wrapper( current_download , new_file_full_local_path , original_modified_timestamp_milliseconds ):
bytes_downloaded = Downloader.download_binary( current_download.downloadURL , new_file_full_local_path , original_modified_timestamp_milliseconds )
if bytes_downloaded != current_download.sizeInByte:
message = u'{} downloaded bytes ({}) does not match the remote size in bytes ({})'.format(
OxygenPathObject.get_unicode_string( new_file_full_local_path ), str( bytes_downloaded ), str( current_download.sizeInByte ) )
raise BufferError( message )
else:
return bytes_downloaded
def download_all( self , retry_count_max , agent_factory, download_agent_count=1 ):
"""a queue of File objects are converted to a queue of Download objects
FILE: ApiObject(canManage=False, sizeInByte=3094, name='getoxygenspaceinfo.txt', modifiedByUserName='John Pfeiffer',
canWrite=False, repositoryNodeServiceId='repositorynode1', modifiedTimestamp=1361299824000L, createdTimestamp=1361299824000L,
deleted=False, versionId=1, ownerOxygenId='', spaceId='uv-ff80808134f2fda80134f75ab6de0108', parentId='',
createdByUserName='John Pfeiffer', type=0, id='c4b17469-c926-4b15-b579-f736fafcb513')
ApiDownload(downloadURL='https://i-d0a4e2ae.o2cloud.net/storagegateway/download?
c=5bbae56c-78a2-48fc-ab47-fb9caa0664a9&s=58ea90b5ed0215f40831589ad756372d241e7d1c&h=0ad3a2be906e05681d8ed24c2eb8c1d38ec4578c',
sizeInByte=570, versionId='86bb79ca-cb61-40c5-ac02-17e130423e8f')
"""
try:
retry_count_max = int( retry_count_max )
except ValueError:
message = 'ERROR: Retry count max must be an integer, cannot continue'
logging.error( message )
print message
sys.exit( 1 )
for i in range( download_agent_count ):
self.files_remaining.put( None ) # Poison Pill to terminate the threads cleanly
try:
for i in range( download_agent_count ):
my_thread = ThreadUrl( agent_factory.create_valid_agent() , self.files_remaining , self.local_destination_path , self.local_files , retry_count_max )
my_thread.daemon = True # does not block the main program from exiting, must use a join() to block
my_thread.start() # starts this thread
self.files_remaining.join() # wait on the que until task_done count is correct
except KeyboardInterrupt:
with self.files_remaining.mutex:
self.files_remaining.clear()
def current_downloaded_bytes( self ):
total = 0
for element in deque( self.local_files.queue ):
total += element
return total
class ThreadUrl( threading.Thread ):
""" Each thread for generating a download link and getting the binary must have it's own agent (connection) """
def __init__( self , agent , files_remaining , local_destination_path , local_files , retry_count_max ): # fork-join design - each thread has the whole data set
self.agent = agent
self.files_remaining = files_remaining
self.local_destination_path = local_destination_path
self.local_files = local_files
self.retry_count_max = retry_count_max
threading.Thread.__init__( self )
def run( self ):
while True:
try:
if not self.files_remaining: # a convenient way to kill the thread
break
oxygen_file_and_path = self.files_remaining.get() # iteration progress towards termination of the loop
if not oxygen_file_and_path: # received poison pill so terminating the thread
self.files_remaining.task_done()
break
new_file_full_local_path = OxygenPathObject.convert_to_local_file_path( self.local_destination_path , oxygen_file_and_path.path , oxygen_file_and_path.o2object.name )
oxygen_file = oxygen_file_and_path.o2object
current_download = self.agent.create_download_object( oxygen_file )
logging.debug( u'{} from {}'.format( OxygenPathObject.get_unicode_string( new_file_full_local_path ) , OxygenPathObject.get_unicode_string( oxygen_file ) ) )
bytes_downloaded = 0
try:
bytes_downloaded = Downloader.download_binary_retry_wrapper( current_download , new_file_full_local_path , oxygen_file.modifiedTimestamp )
except BufferError:
logging.error( u'SKIPPING FILE {} (and removing incomplete download), after {} retries: downloaded bytes does not match the remote size in bytes ({})'.format(
OxygenPathObject.get_unicode_string( new_file_full_local_path ) , str( self.retry_count_max ) , str( current_download.sizeInByte ) ) )
os.remove( new_file_full_local_path )
except SSLError:
logging.error( u'SKIPPING FILE {} (and removing incomplete download), after {} retries: SSLError'.format(
OxygenPathObject.get_unicode_string( new_file_full_local_path ) , str( self.retry_count_max ) ) )
os.remove( new_file_full_local_path )
logging.debug( u'{} downloaded successfully ({} bytes)'.format( OxygenPathObject.get_unicode_string( new_file_full_local_path ), str( bytes_downloaded ) ) )
self.local_files.put( bytes_downloaded )
self.files_remaining.task_done()
except KeyboardInterrupt:
message = 'received keyboard interrupt, terminating...'
logging.warning( message )
print message
break
return