从s3的GLACIER上批量下载对象的Python代码

python代码

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import boto3
from botocore.exceptions import ClientError
import sys
import os
import time
import botocore


BUCKET_NAME=""
OBJECT_NAME=""
LOCAL_PATH=""
PREFIX=""


# restore object 
def restore_object(bucket_name, object_name, days, retrieval_type='Standard'):
    """Restore an archived S3 Glacier object in an Amazon S3 bucket

    :param bucket_name: string
    :param object_name: string
    :param days: number of days to retain restored object
    :param retrieval_type: 'Standard' | 'Expedited' | 'Bulk'
    :return: True if a request to restore archived object was submitted, otherwise
    False
    """

    # Create request to restore object
    request = {'Days': days,
               'GlacierJobParameters': {'Tier': retrieval_type}}

    # Submit the request
    s3 = boto3.client('s3')
    try:
        s3.restore_object(Bucket=bucket_name, Key=object_name, RestoreRequest=request)
    except ClientError as e:
        # NoSuchBucket, NoSuchKey, or InvalidObjectState error == the object's
        # storage class was not GLACIER
        logging.error(e)
        return False
    return True



# head object
def head_object(bucket_name, object_name):
    """Restore an archived S3 Glacier object in an Amazon S3 bucket

    :param bucket_name: string
    :param object_name: string
    :param days: number of days to retain restored object
    :param retrieval_type: 'Standard' | 'Expedited' | 'Bulk'
    :return: True if a request to restore archived object was submitted, otherwise
    False
    """

    # Submit the request
    s3 = boto3.client('s3')
    response = None
    try:
        response = s3.head_object(Bucket=bucket_name, Key=object_name)
    except ClientError as e:
        # NoSuchBucket, NoSuchKey, or InvalidObjectState error == the object's
        # storage class was not GLACIER
        logging.error(e)
        logging.error(f"NoSuchBucket, NoSuchKey, or InvalidObjectState error == the object's, storage class was not GLACIER. {bucket_name} {object_name} ")
        return None
    return response


# list object
def list_bucket_objects(bucket_name, prefix=None):
    """List the objects in an Amazon S3 bucket

    :param bucket_name: string
    :return: List of bucket objects. If error, return None.
    """

    # Retrieve the list of bucket objects
    s_3 = boto3.client('s3')
    try:
        if prefix:
            response = s_3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
        else:
            response = s_3.list_objects_v2(Bucket=bucket_name)
    except ClientError as e:
        # AllAccessDisabled error == bucket not found
        logging.error(e)
        return None

    # Only return the contents if we found some keys
    if response['KeyCount'] > 0:
        return response['Contents']
    return None


# download file
def download_object(bucket_name, object_name, local):
    s3 = boto3.resource('s3')
    try:
        s3.Bucket(bucket_name).download_file(object_name, local+object_name)
    except ClientError as e:
        if e.response['Error']['Code'] == "404":
            logging.error(f"The object does not exist. {bucket_name} {object_name}")
        return False
    return True


# mkdir 
def mkdir(path):
    if os.path.exists(path):
        return True
    os.makedirs(path, 0o770, exist_ok=True)
    return True


# 下载
def download_objects(bucket_name, object_name, local, progress_call=None):
    objects = list_bucket_objects(bucket_name, object_name)
    # no object
    if not objects:
        return False
    count = 0
    # show object list
    logging.info(f"*********object list begin************")
    for obj in objects:
        count+=1
        logging.info(f"{obj['Key']}")
    logging.info(f"*********object list count {count}************")

    long_time_wait = False
    # restore all objects 
    for obj in objects:
        # {'Key': 'a/b/c/d.xml'
        # , 'LastModified': datetime.datetime(2019, 11, 26, 8, 26, 24, tzinfo=tzutc())
        # , 'ETag': '"1234948ca1db112382c26d07db83971a"'
        # , 'Size': 2518
        # , 'StorageClass': 'GLACIER'}
        response = head_object(bucket_name, obj['Key'])
        # not in statdrad and not restore
        if (obj['StorageClass'] != 'STANDRAD' or obj['StorageClass'] != 'STANDRAD_IA') and not response.get('Restore'):
            # start restore  一天20G
            days = obj['Size']//20000000000 + 1
            success = restore_object(bucket_name, obj['Key'], days, 'Standard')
            if not success:
                return False
            long_time_wait = True
    downloaded = []
    download_count = 0
    is_complete = True
    while True:       
        for obj in objects:
            if obj['Key'] in downloaded:
                continue
            response = head_object(bucket_name, obj['Key'])
            if not response:
                return False
            #    
            if (obj['StorageClass'] != 'STANDRAD' or obj['StorageClass'] != 'STANDRAD_IA') and not response.get('Restore'):
                # start restore 
                days = obj['Size']//20000000000 + 1
                success = restore_object(bucket_name, obj['Key'], days, 'Standard')
                if not success:
                    return False
                continue

            if obj['StorageClass'] != 'STANDRAD' or obj['StorageClass'] != 'STANDRAD_IA':
                restore = response['Restore']
                index = restore.find('ongoing-request=\"false\"')
                if -1 == index:
                    logging.info(f"{obj['Key']} 正在还原中...")
                    continue
                else:
                    expiry = restore[restore.find('expiry-date='):]
                    logging.debug(f"{obj['Key']} 还原成功. {restore[restore.find('expiry-date='):]}")
            # download
            path = local + obj['Key']
            path = os.path.dirname(path) + "/"
            mkdir(path)
            logging.info(f"{obj['Key']} 开始下载...")
            success = download_object(bucket_name, obj['Key'], local)
            if success:
                logging.info(f"{obj['Key']} 下载成功.")
                downloaded.append(obj['Key'])
                if progress_call:
                    download_count+=1
                    progress_call(count, download_count, obj['Key'])
        # is complate
        for obj in objects:
            if obj['Key'] in downloaded:
                continue
            is_complete = False
        if is_complete:
            logging.info(f"All objects downloads successful!")
            # The download is complete
            break
        # waiting
        logging.info("Sleep and wait restore...")
        if long_time_wait:  # 
            long_time_wait = False
            time.sleep(60*60)
        else:  # 
            time.sleep(5*60)
    return is_complete


# 进度
next_commint=0
def progress_call(count, downloads, obj_name):
    global next_commint 
    percent = (100*downloads)//count
    if percent >= next_commint:
        next_commint += 5
    logging.info(f"downloads percent {percent}%!")
    return True


# test 
def test_mkdir():
    mkdir('e:/data/111/222/333')
    return True


def test_download_object():
    """Exercise restore_object()"""
    # Assign these values before running the program
    test_bucket_name = BUCKET_NAME
    test_object_name = OBJECT_NAME

    # Restore archived object for two days. Expedite the restoration.
    success = download_object(test_bucket_name, test_object_name, 'E:/data/')
    if success:
        logging.info(f'Submitted request to restore {test_object_name} '
                     f'in {test_bucket_name}')
    logging.info(f'{success}')
    return success


def test_restore_object():
    """Exercise restore_object()"""
    # Assign these values before running the program
    test_bucket_name = BUCKET_NAME
    test_object_name = OBJECT_NAME

    # Restore archived object for two days. Expedite the restoration.
    success = restore_object(test_bucket_name, test_object_name, 2, 'Expedited')
    if success:
        logging.info(f'Submitted request to restore {test_object_name} '
                     f'in {test_bucket_name}')
    logging.info(f'restore object {success}')
    return success


def test_head_object():
    """Exercise restore_object()"""
    # Assign these values before running the program
    test_bucket_name = BUCKET_NAME
    test_object_name = OBJECT_NAME

    # head object.
    success = head_object(test_bucket_name, test_object_name)
    if success:
        logging.info(f'Submitted request to restore {test_object_name} '
                     f'in {test_bucket_name}')
        if success.get('Restore'):
            logging.info('Restore {}'.format(success['Restore']))
            index = success['Restore'].find('ongoing-request=\"false\"')
            if -1 == index:
                logging.info("正在还原...")
            else:
                logging.info(success['Restore'][success['Restore'].find('expiry-date='):])
                logging.info("还原成功.")
        else:
            logging.info('need restore object {} {}'.format(test_bucket_name, test_object_name))
    # logging.info(f'{success}')
    return success


def test_list_bucket_objects():
    """Exercise list_bucket_objects()"""
    bucket_name = BUCKET_NAME
    prefix = PREFIX
    # Retrieve the bucket's objects
    objects = list_bucket_objects(bucket_name, prefix)
    if objects is not None:
        # List the object names
        logging.info(f'Objects in {bucket_name}')
        for obj in objects:
            logging.info(f'  {obj["Key"]}')
    else:
        # Didn't get any keys
        logging.info(f'No objects in {bucket_name}')


def test_download_objects():
    success = download_objects(BUCKET_NAME, OBJECT_NAME, LOCAL_PATH, progress_call)
    return success


if __name__ == '__main__':
    # Set up logging
    # logging.basicConfig(level=logging.ERROR, format='%(levelname)s: %(asctime)s: %(message)s', datefmt='%Y-%m-%d  %H:%M:%S')
    # logging.basicConfig(level=logging.WARNING, format='%(levelname)s: %(asctime)s: %(message)s', datefmt='%Y-%m-%d  %H:%M:%S')
    logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(asctime)s: %(message)s', datefmt='%Y-%m-%d  %H:%M:%S')
    # logging.basicConfig(level=logging.DEBUG, format='%(levelname)s: %(asctime)s: %(message)s', datefmt='%Y-%m-%d  %H:%M:%S')
    global BUCKET_NAME
    global OBJECT_NAME
    global LOCAL_PATH
    global PREFIX
    BUCKET_NAME="mybucket"  # 桶名
    OBJECT_NAME="obj/list1/"  # 批量下载的前缀
    LOCAL_PATH = "e:/download/"  # 放置目录
    PREFIX="show"  # 查询时使用的前缀
    # test_download_objects()
    # test_download_object()
    # test_mkdir()
    test_download_objects()



Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐