之前研究了苹果超级签,为了方便的注册udid并且下载描述文件,特意写此API,方便程序调用
2022.1.8 更新

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# project: 4月 
# author: NinEveN
# date: 2020/4/17

# pip install pyjwt
import base64
import logging
import os
import time
from collections import namedtuple
from functools import wraps

import jwt
import requests
from django.conf import settings

logger = logging.getLogger(__name__)


# https://developer.apple.com/documentation/appstoreconnectapi/creating_api_keys_for_app_store_connect_api
#  https://appstoreconnect.apple.com/access/api 去申请秘钥
#

proxies = {}

timeout = 120


def request_format_log(req):
    try:
        logger.info(f"url:{req.url} header:{req.headers} code:{req.status_code} body:{req.content}")
    except Exception as e:
        logger.error(e)
    return req


# 需要和model里面的对应起来
capability_info = [
    [],
    ["PUSH_NOTIFICATIONS"],
    [
        "PERSONAL_VPN",
        "PUSH_NOTIFICATIONS",
        "NETWORK_EXTENSIONS",
    ],
    [
        "PERSONAL_VPN",
        "PUSH_NOTIFICATIONS",
        "NETWORK_EXTENSIONS",
        "WALLET",
        "ICLOUD",
        "INTER_APP_AUDIO",
        "ASSOCIATED_DOMAINS",
        "APP_GROUPS",
        "HEALTHKIT",
        "HOMEKIT",
        "WIRELESS_ACCESSORY_CONFIGURATION",
        "APPLE_PAY",
        "DATA_PROTECTION",
        "SIRIKIT",
        "MULTIPATH",
        "HOT_SPOT",
        "NFC_TAG_READING",
        "CLASSKIT",
        "AUTOFILL_CREDENTIAL_PROVIDER",
        "ACCESS_WIFI_INFORMATION",
        "COREMEDIA_HLS_LOW_LATENCY",
    ]
]


def get_capability(s_type):
    return capability_info[s_type]


class DevicesAPI(object):
    # https://developer.apple.com/documentation/appstoreconnectapi/devices
    def __init__(self, base_uri, jwt_headers):
        self.headers = jwt_headers
        self.devices_url = '%s/devices' % base_uri

    def list_devices(self, query_parameters=None):
        """
        :param query_parameters:
        :return:
            200 DevicesResponse OK  Content-Type: application/json
            400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json
            403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json

        """
        params = {
            "fields[devices]": "addedDate, deviceClass, model, name, platform, status, udid",
            "filter[platform]": "IOS",
            "limit": 200
        }
        if query_parameters:
            for k, v in query_parameters.items():
                params[k] = v
        return request_format_log(
            requests.get(self.devices_url, params=params, headers=self.headers, proxies=self.proxies,
                         timeout=self.timeout))

    def list_enabled_devices(self):
        return self.list_devices({"filter[status]": "ENABLED"})

    def list_disabled_devices(self):
        return self.list_devices({"filter[status]": "DISABLED"})

    def list_device_by_device_id(self, device_id):
        return self.list_devices({"filter[id]": device_id})

    def register_device(self, device_name, device_udid, platform="IOS"):
        """
        :param device_name:
        :param device_udid:
        :param platform:
        :return:
            201 DeviceResponse Created Content-Type: application/json
            400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json
            403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json
            409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json

        """
        json = {
            'data': {
                'type': 'devices',
                'attributes': {
                    'name': device_name,
                    'udid': device_udid,
                    'platform': platform  # IOS or MAC_OS
                }
            }
        }
        return request_format_log(
            requests.post(self.devices_url, json=json, headers=self.headers, proxies=self.proxies,
                          timeout=self.timeout))

    def read_device_information(self, device_id):
        """
        :param device_id:
        :return:
            200 DeviceResponse OK Content-Type: application/json
            400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json
            403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json
            404 ErrorResponse Not Found Resource not found. Content-Type: application/json
        """
        base_url = '%s/%s' % (self.devices_url, device_id)
        params = {
            "fields[devices]": "addedDate, deviceClass, model, name, platform, status, udid",
        }
        return request_format_log(
            requests.get(base_url, params=params, headers=self.headers, proxies=self.proxies, timeout=self.timeout))

    def enabled_device(self, device_id, device_name):
        return self.modify_registered_device(device_id, device_name, 'ENABLED')

    def disabled_device(self, device_id, device_name):
        return self.modify_registered_device(device_id, device_name, 'DISABLED')

    def modify_registered_device(self, device_id, device_name, status):
        """
        :param device_id:
        :param device_name:
        :param status:
        :return:
            200 DeviceResponse OK Content-Type: application/json
            400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json
            403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json
            404 ErrorResponse Not Found Resource not found. Content-Type: application/json
            409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json
        """
        base_url = '%s/%s' % (self.devices_url, device_id)
        json = {
            'data': {
                'type': 'devices',
                'id': device_id,
                'attributes': {
                    'name': device_name,
                    'status': status
                }
            }
        }
        return request_format_log(
            requests.patch(base_url, json=json, headers=self.headers, proxies=self.proxies, timeout=self.timeout))


class BundleIDsAPI(object):
    # https://developer.apple.com/documentation/appstoreconnectapi/bundle_ids
    def __init__(self, base_uri, jwt_headers):
        self.headers = jwt_headers
        self.bundle_ids_url = '%s/bundleIds' % base_uri

    def register_bundle_id(self, bundle_id_name, bundle_id_identifier, platform="IOS", seed_id=''):
        """
        :param bundle_id_name:
        :param bundle_id_identifier:
        :param platform:
        :param seed_id:
        :return:
            201 BundleIdResponse Created Content-Type: application/json
            400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json
            403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json
            409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json
        """
        json = {
            'data': {
                'type': 'bundleIds',
                'attributes': {
                    'name': bundle_id_name,
                    'identifier': bundle_id_identifier,
                    'platform': platform,
                    'seedId': seed_id
                }
            }
        }
        return request_format_log(
            requests.post(self.bundle_ids_url, json=json, headers=self.headers, proxies=self.proxies,
                          timeout=self.timeout))

    def delete_bundle_id_by_id(self, bundle_id):
        """
        :param bundle_id:
        :return:
            204	No Content
            400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json
            403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json
            404 ErrorResponse Not Found Resource not found. Content-Type: application/json
            409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json
        """
        base_url = '%s/%s' % (self.bundle_ids_url, bundle_id)
        json = {}
        return request_format_log(
            requests.delete(base_url, json=json, headers=self.headers, proxies=self.proxies, timeout=self.timeout))

    def list_bundle_ids(self, query_parameters=None):
        """
        :param query_parameters:
        :return:
            200 BundleIdsResponse  OK  Content-Type: application/json
            400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json
            403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json
        """
        params = {
            "fields[bundleIds]": "identifier, name, platform, profiles, seedId",
            # "filter[platform]": "IOS",
            "limit": 200
        }
        if query_parameters:
            for k, v in query_parameters.items():
                params[k] = v
        return request_format_log(
            requests.get(self.bundle_ids_url, params=params, headers=self.headers, proxies=self.proxies,
                         timeout=self.timeout))

    def list_bundle_id_by_identifier(self, identifier):
        return self.list_bundle_ids({"filter[identifier]": identifier})

    def list_bundle_id_by_id(self, bundle_id):
        return self.list_bundle_ids({"filter[id]": bundle_id})

    def modify_bundle_id(self, bundle_id, bundle_name):
        """
        :param bundle_id:
        :param bundle_name:
        :return:
            200 BundleIdResponse OK Content-Type: application/json
            400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json
            403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json
            404 ErrorResponse Not Found Resource not found. Content-Type: application/json
            409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json
        """
        base_url = '%s/%s' % (self.bundle_ids_url, bundle_id)
        json = {
            'data': {
                'type': 'bundleIds',
                'id': bundle_id,
                'attributes': {
                    'name': bundle_name,
                }
            }
        }
        return request_format_log(
            requests.patch(base_url, json=json, headers=self.headers, proxies=self.proxies, timeout=self.timeout))


class BundleIDsCapabilityAPI(object):
    # https://developer.apple.com/documentation/appstoreconnectapi/bundle_id_capabilities
    def __init__(self, base_uri, jwt_headers):
        self.headers = jwt_headers
        self.bundle_ids_capability_url = '%s/bundleIdCapabilities' % base_uri

    def disable_capability(self, bundle_id, capability_type):
        """
        :param capability_type:
        :param bundle_id:
        :return:
            204	No Content
            400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json
            403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json
            404 ErrorResponse Not Found Resource not found. Content-Type: application/json
            409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json
        """
        base_url = '%s/%s_%s' % (self.bundle_ids_capability_url, bundle_id, capability_type)
        json = {}
        return request_format_log(
            requests.delete(base_url, json=json, headers=self.headers, proxies=self.proxies, timeout=self.timeout))

    def enable_capability(self, bundle_id, capability_type):
        """
        :param bundle_id:
        :param capability_type:
        :return:
            201 BundleIdCapabilityResponse Created Content-Type: application/json
            400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json
            403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json
            409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json
        """
        json = {
            'data': {
                'type': 'bundleIdCapabilities',
                'attributes': {
                    'capabilityType': capability_type,  # 'PUSH_NOTIFICATIONS',#PERSONAL_VPN
                    'settings': []
                },
                'relationships': {
                    'bundleId': {
                        'data': {
                            'id': bundle_id,
                            'type': 'bundleIds',
                        }
                    }
                }
            }
        }
        return request_format_log(
            requests.post(self.bundle_ids_capability_url, json=json, headers=self.headers, proxies=self.proxies,
                          timeout=self.timeout))


class ProfilesAPI(object):
    # https://developer.apple.com/documentation/appstoreconnectapi/profiles
    def __init__(self, base_uri, jwt_headers):
        self.headers = jwt_headers
        self.profiles_url = '%s/profiles' % base_uri

    def create_profile(self, bundle_id, certificate_id_list, profile_name, device_id_list,
                       profile_type='IOS_APP_ADHOC'):
        """

        :param bundle_id:
        :param certificate_id_list:
        :param profile_name:
        :param device_id_list:
        :param profile_type:
        :return:
            201 ProfileResponse Created Content-Type: application/json
            400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json
            403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json
            409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json
        """
        json = {
            'data': {
                'type': 'profiles',
                'attributes': {
                    'name': profile_name,
                    'profileType': profile_type,
                    # Possible values: IOS_APP_DEVELOPMENT, IOS_APP_STORE, IOS_APP_ADHOC, IOS_APP_INHOUSE,
                    # MAC_APP_DEVELOPMENT, MAC_APP_STORE, MAC_APP_DIRECT, TVOS_APP_DEVELOPMENT, TVOS_APP_STORE,
                    # TVOS_APP_ADHOC, TVOS_APP_INHOUSE, MAC_CATALYST_APP_DEVELOPMENT, MAC_CATALYST_APP_STORE,
                    # MAC_CATALYST_APP_DIRECT
                },
                'relationships': {
                    'bundleId': {
                        'data': {'id': bundle_id, 'type': 'bundleIds'}
                    },
                    'certificates': {
                        'data': [
                            {'id': certificate_id, 'type': 'certificates'} for certificate_id in certificate_id_list
                        ]
                    },
                    'devices': {
                        'data': [
                            {'id': device_id, 'type': 'devices'} for device_id in device_id_list
                        ]
                    },
                }
            }
        }
        return request_format_log(
            requests.post(self.profiles_url, json=json, headers=self.headers, proxies=self.proxies,
                          timeout=self.timeout))

    def delete_profile(self, profile_id):
        """
        :param profile_id:
        :return:
            204	No Content
            400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json
            403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json
            404 ErrorResponse Not Found Resource not found. Content-Type: application/json
            409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json
        """
        base_url = '%s/%s' % (self.profiles_url, profile_id)
        json = {}
        return request_format_log(
            requests.delete(base_url, json=json, headers=self.headers, proxies=self.proxies, timeout=self.timeout))

    def download_profile(self, profile_id):
        # n=base64.b64decode(profileContent)
        # with open('profilea','wb') as f:
        #     f.write(n)
        # print(n)
        pass

    def list_profiles(self, query_parameters=None):
        """

        :param query_parameters:
        :return:
            200 ProfilesResponse OK Content-Type: application/json
            400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json
            403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json

        """
        params = {
            "limit": 200
        }
        if query_parameters:
            for k, v in query_parameters.items():
                params[k] = v
        return request_format_log(
            requests.get(self.profiles_url, params=params, headers=self.headers, proxies=self.proxies,
                         timeout=self.timeout))

    def list_profile_by_profile_id(self, profile_id):
        return self.list_profiles({"filter[id]": profile_id, "include": ""})

    def list_profile_by_profile_name(self, profile_name):
        return self.list_profiles({"filter[name]": profile_name, "include": ""})


class CertificatesAPI(object):
    # https://developer.apple.com/documentation/appstoreconnectapi/certificates
    def __init__(self, base_uri, jwt_headers):
        self.headers = jwt_headers
        self.certificates_url = '%s/certificates' % base_uri

    def create_certificate(self, csr_content, certificate_type='IOS_DISTRIBUTION'):
        """
        :param csr_content:
        :param certificate_type:
        :return:
            201 CertificateResponse Created Content-Type: application/json
            400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json
            403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json
            409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json
        """
        json = {
            'data': {
                'type': 'certificates',
                'attributes': {
                    'csrContent': csr_content,
                    'certificateType': certificate_type,
                    # https://developer.apple.com/documentation/appstoreconnectapi/certificatetype
                }
            }
        }
        return request_format_log(
            requests.post(self.certificates_url, json=json, headers=self.headers, proxies=self.proxies,
                          timeout=self.timeout))

    def download_certificate(self, certificate_id):
        # req.json()['data'][0]['attributes']['certificateContent']
        # n=base64.b64decode(certificateContent)
        # with open('xxxxxx','wb') as f:
        #     f.write(n)
        # print(n)
        pass

    def list_certificate(self, query_parameters=None):
        """
        :param query_parameters:
        :return:
            200 CertificatesResponse  OK  Content-Type: application/json
            400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json
            403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json
        """
        params = {
            "fields[certificates]": "certificateContent, certificateType, csrContent, displayName, expirationDate, "
                                    "name, platform, serialNumber",
        }
        if query_parameters:
            for k, v in query_parameters.items():
                params[k] = v
        return request_format_log(
            requests.get(self.certificates_url, params=params, headers=self.headers, proxies=self.proxies,
                         timeout=self.timeout))

    def list_certificate_by_certificate_id(self, certificate_id):
        return self.list_certificate({"filter[id]": certificate_id, })

    def revoke_certificate(self, certificate_id):
        """
        :param certificate_id:
        :return:
            204	No Content
            400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json
            403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json
            404 ErrorResponse Not Found Resource not found. Content-Type: application/json
            409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json
        """
        base_url = '%s/%s' % (self.certificates_url, certificate_id)
        json = {}
        return request_format_log(
            requests.delete(base_url, json=json, headers=self.headers, proxies=self.proxies, timeout=self.timeout))


class BaseInfoObj(object):
    @staticmethod
    def filter(obj_lists, query_parameters=None):
        if not isinstance(obj_lists, list):
            obj_lists = [obj_lists]
        if query_parameters:
            new_obj_lists = []
            for obj in obj_lists:
                flag = True
                for k, v in query_parameters.items():
                    if getattr(obj, k) != v:
                        flag = False
                        continue
                if flag:
                    new_obj_lists.append(obj)
            return new_obj_lists
        return obj_lists

    @staticmethod
    def update(obj_lists, up_obj_list):
        conn_obj = []
        conn_obj.extend(obj_lists)
        if not isinstance(up_obj_list, list):
            up_obj_list = [up_obj_list]
        conn_obj.extend(up_obj_list)
        repeat_id = []
        repeat_obj = []
        for i in range(len(conn_obj) - 1):
            for j in range(i + 1, len(conn_obj)):
                if conn_obj[i].id == conn_obj[j].id:
                    repeat_obj.append(conn_obj[j])
                    repeat_id.append(conn_obj[i].id)
        new_list = []
        for ob in conn_obj:
            if ob.id in repeat_id:
                continue
            new_list.append(ob)
        new_list.extend(repeat_obj)
        return new_list

    @staticmethod
    def delete(obj_lists, up_obj_list):
        new_obj_list = []
        for obj in obj_lists:
            flag = True
            for up_obj in up_obj_list:
                if obj.id == up_obj.id:
                    flag = False
            if flag:
                new_obj_list.append(obj)
        return new_obj_list


class Devices(namedtuple("Devices", ["id", "addedDate", "name", "deviceClass", "model", "udid", "platform", "status"])):

    @classmethod
    def from_json_list(cls, json_list):
        new_cls_list = []
        for json in json_list:
            new_cls_list.append(cls.from_json(json))
        return new_cls_list

    @classmethod
    def from_json(cls, json):
        new_dict = {'id': json.get('id', '')}
        attributes = json.get("attributes", {})
        for k, v in attributes.items():
            new_dict[k] = v
        return cls(**new_dict)

    def copy_and_replace(self, **kwargs):
        return self._replace(**kwargs)


class BundleIds(namedtuple("BundleIds", ["id", "name", "identifier", "platform", "seedId", ]), ):
    @classmethod
    def from_json_list(cls, json_list):
        new_cls_list = []
        for json in json_list:
            new_cls_list.append(cls.from_json(json))
        return new_cls_list

    @classmethod
    def from_json(cls, json):
        new_dict = {'id': json.get('id', '')}
        attributes = json.get("attributes", {})
        for k, v in attributes.items():
            new_dict[k] = v
        return cls(**new_dict)

    def copy_and_replace(self, **kwargs):
        return self._replace(**kwargs)


class Profiles(namedtuple("Profiles",
                          ["id", "name", "profileState", "createdDate", "profileType", "profileContent", "uuid",
                           "platform",
                           "expirationDate"]), ):
    @classmethod
    def from_json_list(cls, json_list):
        new_cls_list = []
        for json in json_list:
            new_cls_list.append(cls.from_json(json))
        return new_cls_list

    @classmethod
    def from_json(cls, json):
        new_dict = {'id': json.get('id', '')}
        attributes = json.get("attributes", {})
        for k, v in attributes.items():
            new_dict[k] = v
        return cls(**new_dict)

    def copy_and_replace(self, **kwargs):
        return self._replace(**kwargs)

    def download_profile(self, filepath):
        dirname = os.path.dirname(filepath)
        if os.path.isdir(dirname) and os.path.exists(dirname):
            pass
        else:
            os.makedirs(dirname)
        n = base64.b64decode(self.profileContent)
        with open(filepath, 'wb') as f:
            f.write(n)
        return filepath


class Certificates(namedtuple("Certificates",
                              ["id", "serialNumber", "certificateContent", "displayName", "name", "csrContent",
                               "platform",
                               "expirationDate",
                               "certificateType"]), ):
    @classmethod
    def from_json_list(cls, json_list):
        new_cls_list = []
        for json in json_list:
            new_cls_list.append(cls.from_json(json))
        return new_cls_list

    @classmethod
    def from_json(cls, json):
        new_dict = {'id': json.get('id', '')}
        attributes = json.get("attributes", {})
        for k, v in attributes.items():
            new_dict[k] = v
        return cls(**new_dict)

    def copy_and_replace(self, **kwargs):
        return self._replace(**kwargs)

    def download_certificate(self, filepath):
        dirname = os.path.dirname(filepath)
        if os.path.isdir(dirname) and os.path.exists(dirname):
            pass
        else:
            os.makedirs(dirname)
        n = base64.b64decode(self.certificateContent)
        with open(filepath, 'wb') as f:
            f.write(n)
        return filepath


def call_function_try_attempts(try_attempts=3, sleep_time=3):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            flag = False
            res = ''
            for i in range(try_attempts):
                try:
                    res = func(*args, **kwargs)
                    flag = True
                    break
                except Exception as e:
                    if 'Cannot connect to proxy' in str(e) or 'Read timed out' in str(
                            e) or 'Max retries exceeded with' in str(e):
                        logger.error('access apple api failed . change proxy ip again')
                        
                    logger.warning(
                        f'exec {func} failed. Failed:{e} {try_attempts} times in total. now {sleep_time} later try '
                        f'again...{i}')
                    res = str(e)
                    # 'Authentication credentials are missing or invalid' in str(e) or
                    if 'FORBIDDEN.REQUIRED_AGREEMENTS_MISSING_OR_EXPIRED' in str(e):
                        raise Exception(res)
                    time.sleep(sleep_time)
            logger.info(f"exec {func} finished. time:{time.time() - start_time}")
            if not flag:
                logger.error(f'exec {func} failed after the maximum number of attempts. Failed:{res}')
                raise Exception(res)
            return res

        return wrapper

    return decorator


class AppStoreConnectApi(DevicesAPI, BundleIDsAPI, BundleIDsCapabilityAPI, ProfilesAPI, CertificatesAPI):
    BASE_URI = 'https://api.appstoreconnect.apple.com/v1'
    JWT_AUD = 'appstoreconnect-v1'
    JWT_ALG = 'ES256'

    def __init__(self, issuer_id, private_key_id, p8_private_key, exp_seconds=1200):
        """
        根据 Apple 文档,会话最长持续时间为 20 分钟:https : //developer.apple.com/documentation/appstoreconnectapi/generating_tokens_for_api_requests
        :param issuer_id:
        :param private_key_id:
        :param p8_private_key:
        :param exp_seconds: max 20*60
        """
        self.issuer_id = issuer_id
        self.private_key_id = private_key_id
        self.p8_private_key = p8_private_key
        self.exp_seconds = exp_seconds
        self.proxies = proxies
        self.timeout = timeout
        self.__make_jwt_headers()
        DevicesAPI.__init__(self, self.BASE_URI, self.headers)
        BundleIDsAPI.__init__(self, self.BASE_URI, self.headers)
        BundleIDsCapabilityAPI.__init__(self, self.BASE_URI, self.headers)
        ProfilesAPI.__init__(self, self.BASE_URI, self.headers)
        CertificatesAPI.__init__(self, self.BASE_URI, self.headers)
        self.rate_limit_info = {}

    def __set_rate_limit_info(self, req_headers):
        for par in req_headers.get('X-Rate-Limit').split(";"):
            if par:
                limit_info_list = par.split(":")
                self.rate_limit_info[limit_info_list[0]] = limit_info_list[1]
        user_rem_info = self.rate_limit_info
        if int(user_rem_info.get('user-hour-rem')) < 3595:
            logger.warning(f"user-hour-rem over limit. so get jwt headers")
            self.__init__(self.issuer_id, self.private_key_id, self.p8_private_key)
            self.rate_limit_info = user_rem_info
        logger.info(f"rate_limit_info:{self.rate_limit_info}")

    def __make_jwt_headers(self):
        data = {
            "iss": self.issuer_id,
            "iat": int(time.time()),
            "exp": int(time.time()) + self.exp_seconds,
            "aud": self.JWT_AUD,
        }
        jwt_headers = {
            "alg": self.JWT_ALG,
            "kid": self.private_key_id,
            "typ": "JWT"
        }
        jwt_encoded = jwt.encode(data, self.p8_private_key, algorithm=self.JWT_ALG, headers=jwt_headers)
        headers = {
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.39 (KHTML, like Gecko) '
                          'Chrome/72.0.3626.109 Safari/537.39',
            'Authorization': 'Bearer %s' % jwt_encoded
        }
        self.headers = headers

    def __base_format(self, s_type, req, success_code):
        # self.__set_rate_limit_info(req.headers)
        if req.status_code == success_code:
            req_data = req.json()
            data = req_data.get('data')
            if isinstance(data, list) or isinstance(data, dict):
                obj = None
                if isinstance(data, dict):
                    data = [data]
                if s_type == 'devices':
                    obj = Devices.from_json_list(data)
                elif s_type == 'bundleIds':
                    obj = BundleIds.from_json_list(data)
                elif s_type == 'profiles':
                    obj = Profiles.from_json_list(data)
                elif s_type == 'certificates':
                    obj = Certificates.from_json_list(data)
                if len(obj) == 1:
                    return obj[0]
                return obj
            else:
                # self.__init_jwt_headers()
                raise Exception(f'error instance: {req.text}')
        elif req.status_code == 401:  # 授权问题
            raise Exception(req.text)
        elif req.status_code == 429:  # 请求超过每小时限制 {'user-hour-lim': '3600', 'user-hour-rem': '3586'}
            raise Exception(req.text)
        elif req.status_code == 500:
            time.sleep(60)
            raise Exception(req.text)
        else:
            raise Exception('unknown error: %s  code:%s' % (req.text, req.status_code))

    def __device_store(self, req, success_code=200):
        return self.__base_format('devices', req, success_code)

    def __profile_store(self, req, success_code=200):
        return self.__base_format('profiles', req, success_code)

    def __certificates_store(self, req, success_code=200):
        return self.__base_format('certificates', req, success_code)

    def __bundle_ids_store(self, req, success_code=200):
        return self.__base_format('bundleIds', req, success_code)

    @call_function_try_attempts()
    def get_all_devices(self):
        req = self.list_devices()
        return self.__device_store(req)

    def list_enabled_devices(self):
        req = super().list_enabled_devices()
        return self.__device_store(req)

    def get_all_bundle_ids(self):
        req = self.list_bundle_ids()
        return self.__bundle_ids_store(req)

    def get_all_profiles(self):
        req = self.list_profiles()
        return self.__profile_store(req)

    @call_function_try_attempts(try_attempts=2)
    def get_all_certificates(self):
        req = self.list_certificate()
        return self.__certificates_store(req)

    @call_function_try_attempts()
    def get_certificate_by_cid(self, certificate_id):
        req = self.list_certificate_by_certificate_id(certificate_id)
        return self.__certificates_store(req)

    def list_device_by_udid(self, udid):
        device_obj_list = BaseInfoObj.filter(self.get_all_devices(), {"udid": udid})
        if not device_obj_list:
            raise Exception('Device obj is None')
        if len(device_obj_list) != 1 and len(set([device_obj.udid for device_obj in device_obj_list])) != 1:
            raise Exception('more than one Device obj')
        return device_obj_list

    @call_function_try_attempts()
    def register_device(self, device_name, device_udid, platform="IOS"):
        device_obj_list = BaseInfoObj.filter(self.get_all_devices(), {"udid": device_udid})
        # 发现同一个开发者账户里面有两个一样的udid,奇了怪
        if device_obj_list and (
                len(device_obj_list) == 1 or len(set([device_obj.udid for device_obj in device_obj_list])) == 1):
            device_obj = device_obj_list[0]
            req = self.modify_registered_device(device_obj.id, device_name, 'ENABLED')
            return self.__device_store(req)
        else:
            req = super().register_device(device_name, device_udid, platform)
            return self.__device_store(req, 201)

    @call_function_try_attempts()
    def enabled_device(self, device_id, device_name, udid):
        if device_id and device_name:
            req = super().enabled_device(device_id, device_name)
            if req.status_code == 200:
                return self.__device_store(req)
        if udid:
            device_obj_list = self.list_device_by_udid(udid)
            for device_obj in device_obj_list:
                req = self.modify_registered_device(device_obj.id, device_obj.name, 'ENABLED')
                return self.__device_store(req)

    @call_function_try_attempts()
    def disabled_device(self, device_id, device_name, udid):
        if device_id and device_name:
            req = super().disabled_device(device_id, device_name)
            if req.status_code == 200:
                return self.__device_store(req)
        if udid:
            device_obj_list = self.list_device_by_udid(udid)
            for device_obj in device_obj_list:
                req = self.modify_registered_device(device_obj.id, device_obj.name, 'DISABLED')
                return self.__device_store(req)

    def list_bundle_ids_by_identifier(self, identifier):
        req = super().list_bundle_id_by_identifier(identifier)
        return self.__bundle_ids_store(req)

    def __do_success(self, req, status=200):
        if req.status_code == status:
            return True
        return False

    @call_function_try_attempts()
    def enable_capability_by_s_type(self, bundle_id, s_type):
        capability_list = get_capability(s_type)
        if capability_list:
            for capability in capability_list:
                req = super().enable_capability(bundle_id, capability)
                if self.__do_success(req, 201):
                    logger.info(f"{bundle_id} enable_capability {capability} success")
                else:
                    logger.warning(f"{bundle_id} enable_capability {capability} failed {req.content}")
        return True

    @call_function_try_attempts()
    def disable_capability_by_s_type(self, bundle_id, s_type=len(capability_info) - 1):
        capability_list = get_capability(s_type)
        if capability_list:
            for capability in capability_list:
                req = super().disable_capability(bundle_id, capability)
                if self.__do_success(req, 204):
                    logger.info(f"{bundle_id} enable_capability {capability} success")
                else:
                    logger.warning(f"{bundle_id} enable_capability {capability} failed {req.content}")
        return True

    def enable_push_vpn_capability(self, bundle_id):
        req = super().enable_capability(bundle_id, 'PUSH_NOTIFICATIONS')
        if self.__do_success(req, 201):
            req = super().enable_capability(bundle_id, 'PERSONAL_VPN')
            if self.__do_success(req, 201):
                return True
        return False

    def disable_push_vpn_capability(self, bundle_id):
        req = super().disable_capability(bundle_id, 'PUSH_NOTIFICATIONS')
        if self.__do_success(req, 204):
            req = super().disable_capability(bundle_id, 'PERSONAL_VPN')
            if self.__do_success(req, 204):
                return True
        return False

    @call_function_try_attempts()
    def register_bundle_id(self, bundle_id_name, bundle_id_identifier, platform="IOS", seed_id=''):
        identifier_obj = self.list_bundle_ids_by_identifier(bundle_id_identifier)
        if isinstance(identifier_obj, BundleIds):
            req = self.modify_bundle_id(identifier_obj.id, bundle_id_name)
            return self.__bundle_ids_store(req)
        else:
            req = super().register_bundle_id(bundle_id_name, bundle_id_identifier, platform, seed_id)
            return self.__bundle_ids_store(req, 201)

    @call_function_try_attempts()
    def register_bundle_id_enable_capability(self, bundle_id_name, bundle_id_identifier, s_type, platform="IOS",
                                             seed_id=''):
        bundle_ids = self.register_bundle_id(bundle_id_name, bundle_id_identifier, platform, seed_id)
        if isinstance(bundle_ids, BundleIds):
            if self.enable_capability_by_s_type(bundle_ids.id, s_type):
                return bundle_ids

    @call_function_try_attempts()
    def delete_bundle_by_identifier(self, identifier_id, identifier_name):
        if identifier_id:
            req = self.delete_bundle_id_by_id(identifier_id)
            if req.status_code == 204:
                return True
        identifier_obj = self.list_bundle_ids_by_identifier(identifier_name)
        if isinstance(identifier_obj, BundleIds):
            req = self.delete_bundle_id_by_id(identifier_obj.id)
            if req.status_code == 204:
                return True

    @call_function_try_attempts()
    def create_profile(self, profile_id, bundle_id, certificate_id, profile_name, device_id_list=None,
                       profile_type='IOS_APP_ADHOC'):
        if device_id_list is None:
            device_id_list = []
        if not device_id_list:
            device_id_list = [device.id for device in self.list_enabled_devices()]

        self.delete_profile_by_id(profile_id, profile_name)
        # profile_obj = self.list_profile_by_profile_name(profile_name)
        # if isinstance(profile_obj, Profiles):
        #     self.delete_profile_by_id(profile_obj.id, profile_name)

        req = super().create_profile(bundle_id, [certificate_id], profile_name, device_id_list)
        if req.status_code == 201:
            self.__profile_store(req, 201)
            return Profiles.from_json(req.json().get("data"))
        raise KeyError(req.text)

    def list_profile_by_profile_name(self, profile_name):
        req = super().list_profile_by_profile_name(profile_name)
        return self.__profile_store(req)

    @call_function_try_attempts()
    def delete_profile_by_id(self, profile_id, profile_name):
        if profile_id:
            req = super().delete_profile(profile_id)
            if self.__do_success(req, 204):
                return True
        profile_obj = self.list_profile_by_profile_name(profile_name)
        if profile_obj:
            req = super().delete_profile(profile_obj.id)
            if self.__do_success(req, 204):
                return True

    @call_function_try_attempts()
    def create_certificate(self, csr_content, certificate_type='IOS_DISTRIBUTION'):
        req = super().create_certificate(csr_content, certificate_type)
        if req.status_code == 201:
            return self.__certificates_store(req, 201)
        raise KeyError(req.text)

    @call_function_try_attempts()
    def revoke_certificate(self, certificate_id):
        req = super().revoke_certificate(certificate_id)
        if req.status_code == 204:
            return True
        return False
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐