import requests
headers = {'Content-type': 'application/json'}
body = dict(title='my title', body='my body stuff here blah')
json_body = json.dumps(body)
response = requests.post(base_url + "/mystuff", headers=headers, data=json_body)
print response.status_code
print response.headers
print response.text
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
import requests
import tempfile
import os
# @staticmethod
def download_binary( url , local_file_path , original_modified_timestamp_milliseconds ):
response = requests.get( url , verify = True , stream = True ).raw # ssl, streaming, raw mode
# print response.text # small text files only
CHUNK = 1024 # 1KB
with open( local_file_path, 'wb' ) as file:
raw_data_chunk = response.read( CHUNK )
while raw_data_chunk:
print '.'
file.write( raw_data_chunk )
raw_data_chunk = response.read( CHUNK )
metadata = os.stat( local_file_path )
original_modified_timestamp_seconds = original_modified_timestamp_milliseconds / 1000 # unix epoch
print 'current modified: %s' % metadata.st_mtime
print 'original modified: %s' % original_modified_timestamp_seconds
os.utime( local_file_path, ( original_modified_timestamp_seconds , original_modified_timestamp_seconds ) ) # works with python 2.7.3 win7
target_filename = 'data.iso'
local_file_path = os.path.join( tempfile.gettempdir() , target_filename )
download_binary( 'https://example.com/data.iso' , local_file_path , 1361498096000 )
import requests
def manual_download(self, url, location='/var/cache/apt/archives/'):
""" reuse the remote filename as the local filename """
logging.info('saving: {} to: {}'.format(url, location))
deb_url_parts = url.split('/')
target = location + deb_url_parts[-1]
with open(target, 'wb') as f:
response = requests.get(url, stream=True)
if not response.ok:
raise IOError('unable to download: {} , {}'.format(response.status_code, response.content))
for block in response.iter_content(10240):
if not block:
break
f.write(block)
logging.info('downloaded: {} result {}'.format(url, response.status_code))
return target
manual_download('https://example.com/mirrors/packagename.deb')
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# 2013-03-07 johnpfeiffer
import logging
import Queue
from collections import deque
import threading
import sys
import os
from ssl import SSLError
cacert_location = os.environ[ 'REQUESTS_CA_BUNDLE' ] # required to avoid the pyinstaller with requests cacert bug
import requests
from oxygen_path_object import OxygenPathObject
from retry_decorator import retry
class Downloader():
agent = None
files_remaining = None
local_files = Queue.Queue()
local_destination_path = None
# TODO: input validation
def __init__( self , file_queue , local_destination_path , logging_level ):
self.files_remaining = file_queue
self.local_destination_path = local_destination_path
logging_output_filename = 'oxygen_snapshot.log'
logging_output = os.path.join( os.path.normpath( local_destination_path ) , logging_output_filename )
logging.basicConfig( level = logging_level ,
format = '%(asctime)s %(levelname)s %(message)s',
filename = logging_output,
filemode='a' )
if not self.files_remaining:
error_message = 'ERROR: file_queue is not initialized'
logging.error( error_message )
raise SyntaxWarning( error_message )
if not self.local_destination_path:
error_message = 'ERROR: local_destination_path is not initialized'
logging.error( error_message )
raise SyntaxWarning( error_message )
@staticmethod
def create_local_folder_tree( folder_queue , local_destination_path ):
while not folder_queue.empty():
oxygen_folder_and_path = folder_queue.get()
new_folder_full_local_path = OxygenPathObject.convert_to_local_file_path( local_destination_path , oxygen_folder_and_path.path , oxygen_folder_and_path.o2object.name )
logging.debug( 'cloud path and name = %s , %s converts to %s' % ( repr( oxygen_folder_and_path.path) , oxygen_folder_and_path.o2object.name , new_folder_full_local_path ) )
try:
os.makedirs( new_folder_full_local_path ) # this will create intermediate level directories too # TODO: default permission of directory?
except OSError as error:
logging.error( '%s , directory %s already exists' % ( str( error) , new_folder_full_local_path ) )
print '%s , directory %s already exists' % ( str( error) , new_folder_full_local_path )
pass
@staticmethod
def download_binary( url , local_file_path , original_modified_timestamp_milliseconds ):
"""https connection gets a binary stream in chunks and sets the finished download's modified timestamp"""
downloaded_bytes = 0
response = requests.get( url , verify = cacert_location , stream = True , timeout=30 ).raw # ssl enabled, streaming, raw mode, 30 second timeout
CHUNK = 1024 * 2048 # 1KB * 1024 = 1MB or 2MB
with open( local_file_path, 'wb' ) as file:
raw_data_chunk = response.read( CHUNK )
while raw_data_chunk:
file.write( raw_data_chunk )
raw_data_chunk = response.read( CHUNK )
try: # TODO: extract into a helper method?
original_modified_timestamp_seconds = original_modified_timestamp_milliseconds / 1000 # unix epoch
os.utime( local_file_path, ( original_modified_timestamp_seconds , original_modified_timestamp_seconds ) ) # works with python 2.7.3 win7
downloaded_bytes = os.stat( local_file_path ).st_size
except TypeError as error:
logging.error( 'Updating timestamp failed: %s' % error )
return downloaded_bytes
@staticmethod
@retry( (SSLError, requests.RequestException , requests.exceptions.RequestException , requests.exceptions.SSLError , AttributeError , BufferError) , tries = 4 , backoff = 1 , logger = logging.getLogger( 'oxygensnapshot' ) ) #TODO: figure out how to reference an external argument
def download_binary_retry_wrapper( current_download , new_file_full_local_path , original_modified_timestamp_milliseconds ):
bytes_downloaded = Downloader.download_binary( current_download.downloadURL , new_file_full_local_path , original_modified_timestamp_milliseconds )
if bytes_downloaded != current_download.sizeInByte:
message = '%s downloaded bytes (%s) does not match the remote size in bytes (%s)' % ( new_file_full_local_path , bytes_downloaded , current_download.sizeInByte )
# logging.warn( message )
raise BufferError( message )
else:
return bytes_downloaded
def download_all( self , retry_count_max , agent_pool ):
"""a queue of File objects are converted to a queue of Download objects
FILE: ApiObject(canManage=False, sizeInByte=3094, name='getoxygenspaceinfo.txt', modifiedByUserName='John Pfeiffer',
canWrite=False, repositoryNodeServiceId='repositorynode1', modifiedTimestamp=1361299824000L, createdTimestamp=1361299824000L,
deleted=False, versionId=1, ownerOxygenId='', spaceId='uv-ff80808134f2fda80134f75ab6de0108', parentId='',
createdByUserName='John Pfeiffer', type=0, id='c4b17469-c926-4b15-b579-f736fafcb513')
ApiDownload(downloadURL='https://i-d0a4e2ae.o2cloud.net/storagegateway/download?
c=5bbae56c-78a2-48fc-ab47-fb9caa0664a9&s=58ea90b5ed0215f40831589ad756372d241e7d1c&h=0ad3a2be906e05681d8ed24c2eb8c1d38ec4578c',
sizeInByte=570, versionId='86bb79ca-cb61-40c5-ac02-17e130423e8f')
"""
try:
retry_count_max = int( retry_count_max )
except ValueError:
message = 'ERROR: Retry count max must be an integer, cannot continue'
logging.error( message )
print message
sys.exit( 1 )
if agent_pool.qsize() < 1:
message = 'ERROR: no agents available for downloading, cannot continue'
logging.error( message )
print message
sys.exit( 1 )
for i in range( agent_pool.qsize() ):
self.files_remaining.put( None ) # Poison Pill to terminate the threads cleanly
try:
for i in range( agent_pool.qsize() ):
my_thread = ThreadUrl( agent_pool.get() , self.files_remaining , self.local_destination_path , self.local_files , retry_count_max )
my_thread.daemon = True # does not block the main program from exiting, must use a join() to block
my_thread.start() # starts this thread
self.files_remaining.join() # wait on the que until task_done count is correct
except KeyboardInterrupt:
with self.files_remaining.mutex:
self.files_remaining.clear()
def current_downloaded_bytes( self ):
total = 0
for element in deque( self.local_files.queue ):
total += element
return total
class ThreadUrl( threading.Thread ):
""" Each thread for generating a download link and getting the binary must have it's own agent (connection) """
def __init__( self , agent , files_remaining , local_destination_path , local_files , retry_count_max ): # fork-join design - each thread has the whole data set
self.agent = agent
self.files_remaining = files_remaining
self.local_destination_path = local_destination_path
self.local_files = local_files
self.retry_count_max = retry_count_max
threading.Thread.__init__( self )
def run( self ):
while True:
try:
if not self.files_remaining: # a convenient way to kill the thread
break
oxygen_file_and_path = self.files_remaining.get() # iteration progress towards termination of the loop
if oxygen_file_and_path == None: # received poison pill so terminating the thread
self.files_remaining.task_done()
break
new_file_full_local_path = OxygenPathObject.convert_to_local_file_path( self.local_destination_path , oxygen_file_and_path.path , oxygen_file_and_path.o2object.name )
oxygen_file = oxygen_file_and_path.o2object
current_download = self.agent.create_download_object( oxygen_file )
logging.debug( '%s from %s' % ( new_file_full_local_path , oxygen_file ) )
bytes_downloaded = 0
try:
bytes_downloaded = Downloader.download_binary_retry_wrapper( current_download , new_file_full_local_path , oxygen_file.modifiedTimestamp )
except BufferError as error:
logging.error( 'SKIPPING FILE %s (and removing incomplete download), after %s retries: downloaded bytes does not match the remote size in bytes (%s)' % ( new_file_full_local_path , self.retry_count_max , current_download.sizeInByte ) )
os.remove( new_file_full_local_path )
except SSLError as error:
logging.error( 'SKIPPING FILE %s (and removing incomplete download), after %s retries: SSLError' % ( new_file_full_local_path , self.retry_count_max ) )
os.remove( new_file_full_local_path )
logging.debug( '%s downloaded successfully (%s bytes)' % ( new_file_full_local_path , bytes_downloaded ) )
self.local_files.put( bytes_downloaded )
self.files_remaining.task_done()
except KeyboardInterrupt as error:
message = 'received keyboard interrupt, terminating...'
logging.warning( message )
print message
break
return