john pfeiffer
  • Home
  • Categories
  • Tags
  • Archives

requests binary download chunked temp directory os stat change modified timestamp

import requests
headers = {'Content-type': 'application/json'}

body = dict(title='my title', body='my body stuff here blah')
json_body = json.dumps(body)
response = requests.post(base_url + "/mystuff", headers=headers, data=json_body)
print response.status_code
print response.headers
print response.text


- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
import requests
import tempfile
import os


# @staticmethod
def download_binary( url , local_file_path , original_modified_timestamp_milliseconds ):
    response = requests.get( url , verify = True , stream = True ).raw  # ssl, streaming, raw mode
#        print response.text        # small text files only
    CHUNK = 1024        # 1KB
    with open( local_file_path, 'wb' ) as file:
        raw_data_chunk = response.read( CHUNK )
        while raw_data_chunk:
            print '.'
            file.write( raw_data_chunk )
            raw_data_chunk = response.read( CHUNK )

    metadata = os.stat( local_file_path )
    original_modified_timestamp_seconds = original_modified_timestamp_milliseconds / 1000   # unix epoch
    print 'current modified: %s' % metadata.st_mtime
    print 'original modified: %s' % original_modified_timestamp_seconds
    os.utime( local_file_path, ( original_modified_timestamp_seconds , original_modified_timestamp_seconds ) ) # works with python 2.7.3 win7



target_filename = 'data.iso'
local_file_path = os.path.join( tempfile.gettempdir() , target_filename )
download_binary( 'https://example.com/data.iso' , local_file_path , 1361498096000 )


import requests
    def manual_download(self, url, location='/var/cache/apt/archives/'):
        """ reuse the remote filename as the local filename """
        logging.info('saving: {} to: {}'.format(url, location))
        deb_url_parts = url.split('/')
        target = location + deb_url_parts[-1]
        with open(target, 'wb') as f:
            response = requests.get(url, stream=True)
            if not response.ok:
                raise IOError('unable to download: {} , {}'.format(response.status_code, response.content))
            for block in response.iter_content(10240):
                if not block:
                    break
                f.write(block)
        logging.info('downloaded: {} result {}'.format(url, response.status_code))
        return target

manual_download('https://example.com/mirrors/packagename.deb')

- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -


# 2013-03-07 johnpfeiffer
import logging
import Queue
from collections import deque
import threading

import sys
import os
from ssl import SSLError

cacert_location = os.environ[ 'REQUESTS_CA_BUNDLE' ]    # required to avoid the pyinstaller with requests cacert bug
import requests

from oxygen_path_object import OxygenPathObject
from retry_decorator import retry

class Downloader():
    agent = None
    files_remaining = None
    local_files = Queue.Queue()
    local_destination_path = None


    # TODO: input validation
    def __init__( self , file_queue , local_destination_path , logging_level ):
        self.files_remaining = file_queue
        self.local_destination_path = local_destination_path

        logging_output_filename = 'oxygen_snapshot.log'
        logging_output = os.path.join( os.path.normpath( local_destination_path ) , logging_output_filename )
        logging.basicConfig( level = logging_level ,
                             format = '%(asctime)s %(levelname)s %(message)s',
                             filename = logging_output,
                             filemode='a' )

        if not self.files_remaining:
            error_message = 'ERROR: file_queue is not initialized'
            logging.error( error_message )
            raise SyntaxWarning( error_message )

        if not self.local_destination_path:
            error_message = 'ERROR: local_destination_path is not initialized'
            logging.error( error_message )
            raise SyntaxWarning( error_message )


    @staticmethod
    def create_local_folder_tree( folder_queue , local_destination_path ):
        while not folder_queue.empty():
            oxygen_folder_and_path = folder_queue.get()
            new_folder_full_local_path = OxygenPathObject.convert_to_local_file_path( local_destination_path , oxygen_folder_and_path.path , oxygen_folder_and_path.o2object.name )
            logging.debug( 'cloud path and name = %s , %s converts to %s' % ( repr( oxygen_folder_and_path.path) , oxygen_folder_and_path.o2object.name , new_folder_full_local_path ) )
            try:
                os.makedirs( new_folder_full_local_path )         # this will create intermediate level directories too       # TODO: default permission of directory?
            except OSError as error:
                logging.error( '%s , directory %s already exists' % ( str( error) , new_folder_full_local_path ) )
                print '%s , directory %s already exists' % ( str( error) , new_folder_full_local_path )
                pass


    @staticmethod
    def download_binary( url , local_file_path , original_modified_timestamp_milliseconds ):
        """https connection gets a binary stream in chunks and sets the finished download's modified timestamp"""
        downloaded_bytes = 0
        response = requests.get( url , verify = cacert_location , stream = True , timeout=30 ).raw  # ssl enabled, streaming, raw mode, 30 second timeout
        CHUNK = 1024 * 2048      # 1KB * 1024 = 1MB or 2MB
        with open( local_file_path, 'wb' ) as file:
            raw_data_chunk = response.read( CHUNK )
            while raw_data_chunk:
                file.write( raw_data_chunk )
                raw_data_chunk = response.read( CHUNK )

        try:    # TODO: extract into a helper method?
            original_modified_timestamp_seconds = original_modified_timestamp_milliseconds / 1000   # unix epoch
            os.utime( local_file_path, ( original_modified_timestamp_seconds , original_modified_timestamp_seconds )  ) # works with python 2.7.3 win7
            downloaded_bytes = os.stat( local_file_path ).st_size
        except TypeError as error:
            logging.error( 'Updating timestamp failed: %s' % error )
        return downloaded_bytes

    @staticmethod
    @retry( (SSLError, requests.RequestException , requests.exceptions.RequestException , requests.exceptions.SSLError , AttributeError , BufferError) , tries = 4 , backoff = 1 , logger = logging.getLogger( 'oxygensnapshot' ) )  #TODO: figure out how to reference an external argument
    def download_binary_retry_wrapper( current_download , new_file_full_local_path , original_modified_timestamp_milliseconds ):
        bytes_downloaded = Downloader.download_binary( current_download.downloadURL , new_file_full_local_path , original_modified_timestamp_milliseconds )
        if bytes_downloaded != current_download.sizeInByte:
            message = '%s downloaded bytes (%s) does not match the remote size in bytes (%s)' % ( new_file_full_local_path , bytes_downloaded , current_download.sizeInByte )
#            logging.warn( message )
            raise BufferError( message )
        else:
            return bytes_downloaded


    def download_all( self , retry_count_max , agent_pool ):
        """a queue of File objects are converted to a queue of Download objects
        FILE: ApiObject(canManage=False, sizeInByte=3094, name='getoxygenspaceinfo.txt', modifiedByUserName='John Pfeiffer',
            canWrite=False, repositoryNodeServiceId='repositorynode1', modifiedTimestamp=1361299824000L, createdTimestamp=1361299824000L,
            deleted=False, versionId=1, ownerOxygenId='', spaceId='uv-ff80808134f2fda80134f75ab6de0108', parentId='',
            createdByUserName='John Pfeiffer', type=0, id='c4b17469-c926-4b15-b579-f736fafcb513')

        ApiDownload(downloadURL='https://i-d0a4e2ae.o2cloud.net/storagegateway/download?
            c=5bbae56c-78a2-48fc-ab47-fb9caa0664a9&s=58ea90b5ed0215f40831589ad756372d241e7d1c&h=0ad3a2be906e05681d8ed24c2eb8c1d38ec4578c',
            sizeInByte=570, versionId='86bb79ca-cb61-40c5-ac02-17e130423e8f')
        """
        try:
            retry_count_max = int( retry_count_max )
        except ValueError:
            message = 'ERROR: Retry count max must be an integer, cannot continue'
            logging.error( message )
            print message
            sys.exit( 1 )

        if agent_pool.qsize() < 1:
            message = 'ERROR: no agents available for downloading, cannot continue'
            logging.error( message )
            print message
            sys.exit( 1 )

        for i in range( agent_pool.qsize() ):
            self.files_remaining.put( None )        # Poison Pill to terminate the threads cleanly

        try:
            for i in range( agent_pool.qsize() ):
                my_thread = ThreadUrl( agent_pool.get() , self.files_remaining , self.local_destination_path , self.local_files , retry_count_max )
                my_thread.daemon = True         # does not block the main program from exiting, must use a join() to block
                my_thread.start()                   # starts this thread

            self.files_remaining.join()     # wait on the que until task_done count is correct

        except KeyboardInterrupt:
            with self.files_remaining.mutex:
                self.files_remaining.clear()


    def current_downloaded_bytes( self ):
        total = 0
        for element in deque( self.local_files.queue ):
            total += element
        return total



class ThreadUrl( threading.Thread ):
    """ Each thread for generating a download link and getting the binary must have it's own agent (connection) """
    def __init__( self , agent , files_remaining , local_destination_path , local_files ,  retry_count_max ):   # fork-join design - each thread has the whole data set
        self.agent = agent
        self.files_remaining = files_remaining
        self.local_destination_path = local_destination_path
        self.local_files = local_files
        self.retry_count_max = retry_count_max
        threading.Thread.__init__( self )

    def run( self ):
        while True:
            try:
                if not self.files_remaining:        # a convenient way to kill the thread
                    break

                oxygen_file_and_path = self.files_remaining.get()        # iteration progress towards termination of the loop
                if oxygen_file_and_path == None:                # received poison pill so terminating the thread
                    self.files_remaining.task_done()
                    break

                new_file_full_local_path = OxygenPathObject.convert_to_local_file_path( self.local_destination_path , oxygen_file_and_path.path , oxygen_file_and_path.o2object.name )
                oxygen_file = oxygen_file_and_path.o2object
                current_download = self.agent.create_download_object( oxygen_file )
                logging.debug( '%s from %s' % ( new_file_full_local_path , oxygen_file )  )

                bytes_downloaded = 0
                try:
                    bytes_downloaded = Downloader.download_binary_retry_wrapper( current_download , new_file_full_local_path , oxygen_file.modifiedTimestamp )
                except BufferError as error:
                    logging.error( 'SKIPPING FILE %s (and removing incomplete download), after %s retries: downloaded bytes does not match the remote size in bytes (%s)' % ( new_file_full_local_path , self.retry_count_max , current_download.sizeInByte ) )
                    os.remove( new_file_full_local_path )
                except SSLError as error:
                    logging.error( 'SKIPPING FILE %s (and removing incomplete download), after %s retries: SSLError' % ( new_file_full_local_path , self.retry_count_max ) )
                    os.remove( new_file_full_local_path )

                logging.debug( '%s downloaded successfully (%s bytes)' % ( new_file_full_local_path , bytes_downloaded ) )
                self.local_files.put( bytes_downloaded )

                self.files_remaining.task_done()

            except KeyboardInterrupt as error:
                message = 'received keyboard interrupt, terminating...'
                logging.warning( message )
                print message
                break

        return

  • « Logging logrotate var log
  • Bash background process pid kill wait »

Published

Jun 13, 2014

Category

python

~856 words

Tags

  • binary 7
  • change 10
  • chunked 2
  • directory 13
  • download 12
  • modified 4
  • os 7
  • python 180
  • requests 5
  • stat 3
  • temp 3
  • timestamp 3