#!usr/bin/python
# -*- coding: utf-8 -*-
import shutil

import testlink
import win32com
from selenium import webdriver
from selenium.common.exceptions import NoSuchElementException, TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

from WagAutoTestLibrary.Logger.logger import log
import time
import sys
import os

reload(sys)
sys.setdefaultencoding('utf-8')

# 创建cert文件夹
cur_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
cert_path = os.path.join(cur_path, 'cert')

# 如果不存在这个证书cert文件夹,就自动创建一个
if not os.path.exists(cert_path): os.mkdir(cert_path)

from WagAutoTestLibrary.config import *

browsername = browsername


class CommonAW:

    def open_browser(self, browsername=browsername):
        '''
        打开浏览器
        :param browsername:浏览器名称 "firefox"、"chrome"、"ie"、"phantomjs"
        :return:
        '''
        try:
            if browsername == "firefox" or browsername == "Firefox" or browsername == "ff":
                profile = webdriver.FirefoxProfile()
                # profile.set_preference("browser.link.open_newwindow", 3)
                # profile.set_preference("browser.link.open_newwindow.restriction",0)
                profile.set_preference('browser.download.dir', cert_path)
                profile.set_preference('browser.download.folderList', 2)
                profile.set_preference('browser.download.manager.showWhenStarting', False)
                profile.set_preference('browser.helperApps.neverAsk.saveToDisk', 'application/octet-stream,text/plain,'
                                                                                 'text/html,application/x-tar')
                # profile.set_preference('browser.helperApps.neverAsk.saveToDisk', 'text/plain')
                # profile.set_preference('browser.helperApps.neverAsk.saveToDisk', 'text/html')
                self.driver = webdriver.Firefox(firefox_profile=profile)
                # self.driver = webdriver.Firefox()
            elif browsername == "chrome" or browsername == "Chrome":
                self.driver = webdriver.Chrome()
            elif browsername == "ie" or browsername == "Ie":
                self.driver = webdriver.Ie()
            elif browsername == "phantomjs" or browsername == "Phantomjs":
                self.driver = webdriver.PhantomJS()
            else:
                log.error("Not found this browser,You can use 'firefox', 'chrome', 'ie' or 'phantomjs'")
            self.driver.maximize_window()
            return self.driver
        except Exception as msg:
            log.error("启动浏览器出现异常:%s" % str(msg))

    def close_browser(self):
        '''
        关闭当前页面
        :return:
        '''
        self.driver.close()

    def quit_browser(self):
        '''
        退出浏览器
        :return:
        '''
        if os.path.exists(cert_path):
            shutil.rmtree(cert_path)
        if os.path.exists(download_file_pth):
            shutil.rmtree(download_file_pth)
        self.driver.quit()
        self.close_process()

    def find_element(self, *loc):
        '''
        重写的定位元素方法
        :param loc:查找元素的元组如 (By.ID,'com.topsec.topsap:id/ip')
        :return:
        '''
        try:
            WebDriverWait(self.driver, 10, 0.5).until(EC.visibility_of_element_located(loc))
            return self.driver.find_element(*loc)
        except:
            log.error("查找不到%s为%s的元素" % loc)

    def find_elements(self, *loc):
        '''
        重写查找多个元素方法(多个元素具有相同属性)
        :param loc:查找元素的元组如 (By.ID,'com.topsec.topsap:id/ip')
        :return:
        '''
        try:
            WebDriverWait(self.driver, 10, 0.5).until(EC.visibility_of_element_located(loc))
            return self.driver.find_elements(*loc)
        except:
            log.error("查找不到%s为%s的元素列表" % loc)

    def is_element_exits(self, *loc):
        '''
        判断元素是否存在
        :param loc: 查找元素的元组如 (By.ID,'com.topsec.topsap:id/ip')
        :return: True,元素存在,False元素不存在
        '''
        flag = None
        try:
            WebDriverWait(self.driver, 5, 0.5).until(EC.visibility_of_element_located(loc))
            flag = True
        except NoSuchElementException:
            # log.error("%s为%s的元素不存在" % loc)
            # log.error(e.message)
            flag = False
        except TimeoutException:
            flag = False
        finally:
            return flag

    def is_toast_exist(self, message, fuzzy="False"):
        '''
        判断toast是否存在
        :param message: toast的文本
        :param fuzzy: 是否模糊匹配,"False"精确匹配,"True"模糊匹配
        :return: 返回True表示存在,False不存在
        '''
        try:
            if fuzzy == "False":
                element = WebDriverWait(self.driver, 10, 0.01).until(
                    EC.presence_of_all_elements_located((By.XPATH, '//*[text()=\'{}\']'.format(message))))
            elif fuzzy == "True":
                element = WebDriverWait(self.driver, 10, 0.01).until(
                    EC.presence_of_all_elements_located((By.XPATH, '//*[contains(text(),\'{}\')]'.format(message))))
            return True
        except Exception, e:
            log.error(e.message)
            return False

    def click_by_id(self, element_id):
        '''
        通过id点击
        :param element_id: 要点击的控件的id,需要当前界面唯一
        :return:
        '''
        self.find_element(By.ID, element_id).click()

    def input_text_by_id(self, element_id, text):
        '''
        输入文本
        :param element_id: 要输入文本的控件的id,需要当前界面唯一
        :param text:要输入的文本
        :return:
        '''
        input_element = self.find_element(By.ID, element_id)
        input_element.clear()
        input_element.send_keys(text)

    def get_url(self, vpn_b_url=vpn_b_url, browsername=browsername):
        '''
        打开网页
        :param vpn_b_url:要打开的url地址
        :param browsername:浏览器名称
        :return:
        '''
        self.driver = self.open_browser(browsername)
        self.driver.get(vpn_b_url)

    def login_vpn(self):
        '''
        登录vpn设备
        :return:
        '''
        self.find_element(By.ID, "password").send_keys("talent")
        self.click_by_id("login")
        time.sleep(2)

    def click_nav_list(self, menu_text1, menu_text2):
        '''
        点击导航列表,eg:click_nav_list("VPN设置","静态隧道")
        :param menu_text1: 要点击的一级文本
        :param menu_text2: 要点击的二级文本
        :return:
        '''
        if not self.is_element_exits(By.XPATH, "//a[text()='%s']" % menu_text2):
            self.click_element_obscuresed(By.XPATH, "//a/span[text()='%s']" % menu_text1)
            self.click_element_obscuresed(By.XPATH, "//a[text()='%s']" % menu_text2)
        else:
            self.click_element_obscuresed(By.XPATH, "//a[text()='%s']" % menu_text2)

    def click_refresh_btn(self):
        '''
        点击右上角刷新按钮
        :return:
        '''
        self.find_element(By.XPATH, "//ul[@class='nav ace-nav']/li[1]").click()

    def click_logout_btn(self):
        '''
        点击右上角退出登录按钮
        :return:
        '''
        self.find_element(By.XPATH, "//ul[@class='nav ace-nav']/li[2]").click()

    def get_custom_value(self, testcase_name, project_name="自动化WAG新硬件平台自动化测试基础用例"):
        '''
        获取testlink中testcase中的自定义字段
        :param testcase_name:用例名称
        :param project_name:自动化项目名称
        :return:
        '''
        my_testlink = testlink.TestlinkAPIClient(testlink_url, testlink_key)
        project_id = my_testlink.getProjectIDByName(project_name)
        testcase_id = my_testlink.getTestCaseIDByName(testcase_name, testprojectname=project_name)[0]["id"]
        testcase = my_testlink.getTestCase(testcaseid=testcase_id)[0]
        custom_value_list = ["tunnel_name", "protocol_type", "authentication_method", "psk", "local_identification",
                             "peer_identification", "peerhost", "ipsec_link",
                             "encryption_algorithm_1", "calibration_algorithm_1",
                             "DH_algorithm", "ISAKMP_SA_1", "consult", "ike_consult_pattern", "localsubnet",
                             "localsubmask", "peersubnet", "peersubmask", "encryption_algorithm_2",
                             "calibration_algorithm_2"
            , "tunnel_preservation", "metric_cardinality", "ISAKMP_SA_2", "ipsec_sa_security",
                             "cert_peer_identification"]

        args = {'devKey': my_testlink.devKey,
                'testprojectid': project_id,
                'testcaseexternalid': testcase['full_tc_external_id'],
                'version': int(testcase['version'])}
        tunnel_name_str = my_testlink.getTestCaseCustomFieldDesignValue(
            args['testcaseexternalid'], args['version'],
            args['testprojectid'], "localsubnet", 'value')
        tunnel_name_list = tunnel_name_str.split("|")
        custom_value_dict_list = []
        for i in range(len(tunnel_name_list)):
            custom_value_dict_list.append({})
        for custom_value_name in custom_value_list:
            custom_value = my_testlink.getTestCaseCustomFieldDesignValue(
                args['testcaseexternalid'], args['version'],
                args['testprojectid'], custom_value_name, 'value')
            for i in range(len(custom_value_dict_list)):
                if "|" in custom_value:
                    custom_value_dict_list[i][custom_value_name] = custom_value.split("|")[i]
                else:
                    custom_value_dict_list[i][custom_value_name] = custom_value
        return custom_value_dict_list

    def update_custom_value(self, testcase_name, project_name="自动化WAG新硬件平台自动化测试基础用例"):
        '''
        更新testlink中testcase中的自定义字段
        :param testcase_name:要更新的自动化用例的名称
        :param project_name:工程名称
        :return:
        '''
        # custom_value_dict1为testlink中自定义自段的参数值,|前后分别是本端和对端参数,如果没有|表示两端参数一样
        custom_value_dict1 = {"tunnel_name": "隧道1",  # 隧道名称
                              "protocol_type": "国际",  # 协议类型
                              "authentication_method": "预共享秘钥",  # 认证方式
                              "psk": "123",  # 预共享秘钥
                              "local_identification": "",  # 本地标识
                              "peer_identification": "",  # 对端标识
                              "peerhost": "172.18.40.236|172.18.40.235",  # 对方地址或域名,|后边是隧道另一端参数
                              "ipsec_link": "ipsec0-EXTERNAL",  # 选择ipsec链路
                              "encryption_algorithm_1": "",  # 第一阶段加密算法
                              "calibration_algorithm_1": "",  # 第一阶段校验算法
                              "DH_algorithm": "",  # 第一阶段DH算法
                              "ISAKMP_SA_1": "86400",  # ISAKMP-SA存活时间
                              "consult": "是",  # 是否主动协商
                              "ike_consult_pattern": "主模式",  # IKE协商模式
                              "localsubnet": "1.1.1.0|2.2.2.0",  # 本地子网
                              "localsubmask": "255.255.255.0",  # 本地
                              "peersubnet": "2.2.2.0|1.1.1.0",  # 对方子网
                              "peersubmask": "255.255.255.0",  # 对方子网掩码
                              "encryption_algorithm_2": "",  # 第二阶段加密算法
                              "calibration_algorithm_2": "",  # 第二阶段校验算法
                              "tunnel_preservation": "否",  # 启用私有隧道保活
                              "metric_cardinality": "30",  # 度量基数
                              "ISAKMP_SA_2": "28800",  # IPSEC-SA存活时间
                              "ipsec_sa_security": "01010",  # IPSEC-SA的安全政策属性,0代表未勾选,1代表勾选
                              "cert_peer_identification": "",
                              "autotest": testcase_name
                              }
        my_testlink = testlink.TestlinkAPIClient(testlink_url, testlink_key)
        project_id = my_testlink.getProjectIDByName(project_name)
        testcase_id = my_testlink.getTestCaseIDByName(testcase_name, testprojectname=project_name)[0]["id"]
        testcase = my_testlink.getTestCase(testcaseid=testcase_id)[0]
        args = {'devKey': my_testlink.devKey,
                'testprojectid': project_id,
                'testcaseexternalid': testcase['full_tc_external_id'],
                'version': int(testcase['version'])}
        my_testlink.updateTestCaseCustomFieldDesignValue(args['testcaseexternalid'], args['version'],
                                                         args['testprojectid'],
                                                         custom_value_dict1)

    def window_switch(self):
        '''
        两个窗口,切换到另一个窗口
        :return:
        '''
        window_handlers = self.driver.window_handles
        current_handler = self.driver.current_window_handle
        for window_handler in window_handlers:
            if window_handler != current_handler:
                self.driver.switch_to.window(window_handler)

    def open_local_and_peer_web(self):
        '''
        打开本端和对端网页并登陆
        :return:
        '''
        self.open_browser()
        # 勾选火狐浏览器打开链接在新标签页而非新窗口(W)
        self.driver.get("about:preferences")
        self.find_element(By.ID, "linkTargeting").click()
        # 打开235网站,
        self.driver.get(vpn_a_url)
        self.login_vpn()
        # 新窗口打开236网站,
        js = 'window.open("%s");' % vpn_b_url
        self.driver.execute_script(js)
        self.window_switch()
        self.login_vpn()
        # 切换到235
        self.window_switch()

    def click_element_obscuresed(self, *locator):
        '''
        被遮挡元素点击方法
        :param locator: 定位该元素方法
        :return:
        '''
        WebDriverWait(self.driver, 10, 0.5).until(
            element_be_clickable(locator))

    def handle_safe_page(self):
        '''
        处理此站点不安全问题方法
        :return:
        '''
        if self.driver.title == "此站点不安全":
            self.click_by_id("moreInfoContainer")
            self.click_by_id("overridelink")
        elif self.driver.title == "证书错误: 导航已阻止":
            self.driver.get("javascript:document.getElementById('overridelink').click();")

    def close_process(self, process_name="geckodriver.exe"):
        '''
        杀掉进程
        :param process_name: 要杀掉的进程名称
        :return:
        '''
        WMI = win32com.client.GetObject('winmgmts:')
        processCodeCov = WMI.ExecQuery('select * from Win32_Process where Name="%s"' % process_name)
        if len(processCodeCov) > 0:
            os.system("taskkill /f /im " + process_name)


class element_be_clickable(object):
    '''
    判断元素是否被遮挡类
    '''

    def __init__(self, locator):
        self.locator = locator

    def __call__(self, driver):

        element = driver.find_element(*self.locator)
        try:
            element.click()
            return True
        except Exception:
            return False


if __name__ == '__main__':
    common = CommonAW()
    common.close_process()
    # common.get_url(browsername="ie")
    # common.handle_safe_page()
    # common.login_vpn()
    # common.get_url()
    # common.login_vpn()
    # common.get_url()
    # common.login_vpn()
    # common.click_nav_list("PKI设置", "本机VPN证书")
    # common.click_nav_list("PKI设置", "本地CA策略")
    # common.get_aaa()
    # common.update_custom_value("国际静态隧道")

    # common.update_custom_value("时分3")
    # common.login_vpn()
    # common.click_refresh_btn()
    # common.click_logout_btn()
    # common.click_nav_list("VPN设置", "静态隧道")
    # common.find_elements(By.CLASS_NAME, "icon-plus")[0].click()
    # a = common.find_element(By.XPATH,"//div[@id='modal-form3']//button[@class='btn btn-primary']").text
    # print '1111'+a+"11111"
    # common.click_nav_list("VPN设置", "静态隧道")
    # common.find_element(By.XPATH, "//a[text()='添加静态隧道']").click()
    # common.input_text_by_id("tunnel_name", "1111111")
    # common.input_text_by_id("presharekey", "12345")
    # common.input_text_by_id("peerhost", "172.18.40.5")
    # common.find_element(By.XPATH, "//a[@href='#feed']").click()
    # common.input_text_by_id("localsubnet", "36.36.0.0")
    # common.input_text_by_id("peersubnet", "36.36.32.0")
    # common.input_text_by_id("localsubmask", "255.255.255.0")
    # common.input_text_by_id("peersubmask", "255.255.255.0")
    # common.find_element(By.CLASS_NAME, "btn.btn-sm.btn-primary").click()

 

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐