采用的故障转移方式是官方提供的步骤,实现方式是python3+pymssql

这里直接贴代码造福全人类:

注意的是,直接用是不行的,该脚本是针对特定开发测试环境编写的,不具有通用性,当然小修改一下就可以 了。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
mssqlScript.py 处理sql server 数据库分布式可用性组的故障转移
故障转移方式采用官方推荐的5个步骤
Steps:
#####1.若要确保不会丢失任何数据,请停止全局主数据库(即主可用性组的数据库)上的所有事务,然后将分布式可用性组设置为同步提交。
#####2.等待直到分布式可用性组同步完成,且每个数据库具有相同的 last_hardened_lsn。
#####3.在全局主要副本上,将分布式可用性组角色设置为 SECONDARY。
#####4.测试故障转移就绪情况。
#####5.故障转移主要可用性组。
"""
# author: zhoujiajun@gsafety.com
# Copyright by GSafety.HeFei 2020
import pymssql
import decimal
import time
import sys
import configparser

decimal.__version__

cp = configparser.ConfigParser()
cp.read("database.ini")
system = 'system'
distributeAG = cp.get(system, 'distribute_availability_group_name')
primaryAG = cp.get(system, 'primary_availability_group_name')
secondaryAG = cp.get(system, 'secondary_availability_group_name')
# ------------------------------------------------sql-------------------------------------------------------------------
# -- sets the distributed availability group to synchronous commit
sql1 = " ALTER AVAILABILITY GROUP [" + distributeAG + "] " \
       "MODIFY " \
       "AVAILABILITY GROUP ON " \
       "'" + primaryAG + "' WITH " \
       "( " \
       "AVAILABILITY_MODE = SYNCHRONOUS_COMMIT " \
       "), " \
       "'" + secondaryAG + "' WITH  " \
       "( " \
       "AVAILABILITY_MODE = SYNCHRONOUS_COMMIT " \
       " );"

# -- verifies the commit state of the distributed availability group
sql2 = "select ag.name, ag.is_distributed, ar.replica_server_name, ar.availability_mode_desc," \
       " ars.connected_state_desc, ars.role_desc, ars.operational_state_desc," \
       " ars.synchronization_health_desc from sys.availability_groups ag  " \
       "join sys.availability_replicas ar on ag.group_id=ar.group_id " \
       "left join sys.dm_hadr_availability_replica_states ars " \
       "on ars.replica_id=ar.replica_id " \
       "where ag.is_distributed=1 "

# -- Run this query on the Global Primary and the forwarder
# -- Check the results to see if synchronization_state_desc is SYNCHRONIZED,
# and the last_hardened_lsn is the same per database on both the global primary and forwarder
# -- If not rerun the query on both side every 5 seconds until it is the case
sql3 = "SELECT ag.name" \
       ", drs.database_id" \
       ", db_name(drs.database_id) as database_name" \
       ", drs.group_id" \
       ", drs.replica_id" \
       ", drs.synchronization_state_desc" \
       ", drs.last_hardened_lsn " \
       "FROM sys.dm_hadr_database_replica_states drs " \
       "INNER JOIN sys.availability_groups ag on drs.group_id = ag.group_id;"

sql4 = "ALTER AVAILABILITY GROUP " + distributeAG + " SET (ROLE = SECONDARY);"

# -- Run this query on the Global Primary and the forwarder
# -- Check the results to see if the last_hardened_lsn is the same per database on both the global primary and forwarder
# -- The availability group is ready to fail over when the last_hardened_lsn is the same for both
# availability groups per database
sql5 = "SELECT ag.name, " \
       "drs.database_id," \
       "db_name(drs.database_id) as database_name," \
       "drs.group_id," \
       "drs.replica_id," \
       "drs.last_hardened_lsn " \
       "FROM sys.dm_hadr_database_replica_states drs " \
       "INNER JOIN sys.availability_groups ag ON drs.group_id = ag.group_id;"

# -- Once the last_hardened_lsn is the same per database on both sides
# -- We can Fail over from the primary availability group to the secondary availability group.
# -- Run the following command on the forwarder, the SQL Server instance that hosts
# the primary replica of the secondary availability group.
sql6 = "ALTER AVAILABILITY GROUP " + distributeAG + " FORCE_FAILOVER_ALLOW_DATA_LOSS;"

# -- sets the distributed availability group to asynchronous commit
sql7 = " ALTER AVAILABILITY GROUP [" + distributeAG + "] " \
       "MODIFY " \
       "AVAILABILITY GROUP ON " \
       "'" + primaryAG + "' WITH " \
       "( " \
       "AVAILABILITY_MODE = ASYNCHRONOUS_COMMIT " \
       "), " \
       "'" + secondaryAG + "' WITH  " \
       "( " \
       "AVAILABILITY_MODE = ASYNCHRONOUS_COMMIT " \
       " );"
# -------------------------------------------------sql------------------------------------------------------------------
interval = int(cp.get(system, 'interval'))
data_sync_time_out = cp.get(system, 'data_sync_time_out')


def check_data_sync(sql, cursor_):
    """
    Check whether the data is synchronized through LSN
    if synchronized failure try fail back and exit this system
    :param cursor_: cursor
    :param sql: sql script
    """
    times = int(data_sync_time_out)/interval
    failure = 0
    ok = 1
    # 1-9 3-10 5-11 7-12 计算方式得来 由4个库,两个节点 故 1-8为primary ag 的8个副本,需要比较的从9-12为DAG副本---------------------
    while ok:
        print("please waiting data sync ...")
        time.sleep(interval)
        cursor_.execute(sql)
        num = 0
        end_of_log_lsn1 = 0
        end_of_log_lsn2 = 0
        end_of_log_lsn3 = 0
        end_of_log_lsn4 = 0
        for result in cursor_:
            end_of_log_lsn = result['last_hardened_lsn']
            num += 1
            print(num, result)
            if num == 1:
                end_of_log_lsn1 = end_of_log_lsn
            if num == 3:
                end_of_log_lsn2 = end_of_log_lsn
            if num == 5:
                end_of_log_lsn3 = end_of_log_lsn
            if num == 7:
                end_of_log_lsn4 = end_of_log_lsn
            # 判断是否同步完成 --------------------------------------------------------------------------------------------
            if num == 9:
                if end_of_log_lsn1 == end_of_log_lsn:
                    ok = 0
                else:
                    ok = 1
            if ok == 0 & num == 10:
                if end_of_log_lsn2 == end_of_log_lsn:
                    ok = 0
                else:
                    ok = 1
            if ok == 0 & num == 11:
                if end_of_log_lsn3 == end_of_log_lsn:
                    ok = 0
                else:
                    ok = 1
            if ok == 0 & num == 12:
                if end_of_log_lsn4 == end_of_log_lsn:
                    ok = 0
                else:
                    ok = 1
        if ok == 1:
            failure += 1
            print("Synchronization is not complete, please wait.")
            print("trying times: ", failure)
        # 长时间(timeout)没有同步成功则退出循环 --------------------------------------------------------------------------------
        if failure >= times:
            ok = 0

    # -- If the last_hardened_lsn is not the same after a period of time, to avoid data loss,
    # -- we need to fail back to the global primary by running this command on the global primary
    # -- and then start over from the second step:
    if failure >= times:
        print("fail back ...")
        cursor_.execute(sql6)
        print("mode back to async ...")
        cursor_.execute(sql7)
        print("back steps executed ok !")
        print("Why data sync failure. May have to check database by your self !")
        print("we have try to fail back to the global primary by running this command on the global primary.")
        print("and this script will exit by 3 seconds ...")
        time.sleep(3)
        sys.exit()


def check_data_sync_ok(cursor_ag1, cursor_ag2, sql):
    """
    Check whether the data is synchronized through LSN. primary ag & secondary ag
    if synchronized failure try fail back and exit this system
    :param cursor_ag1: cursor
    :param cursor_ag2: cursor
    :param sql: sql script
    """
    times = int(data_sync_time_out)/interval
    failure = 0
    ok = 1
    # 此时primary & secondary 都只有8个副本,比较8个副本的lsn一致即同步完成-----------------------------------------------------
    while ok:
        print("this is a last check for data sync ...")
        time.sleep(interval)
        num = 0
        end_of_log_lsn_array = []
        # primary ag 查询
        cursor_ag1.execute(sql)
        print("[ ", primaryAG, "] ")
        for result in cursor_ag1:
            end_of_log_lsn_array.append(result)
            num += 1
            print(num, result)
        num = 0
        # secondary ag 执行查询
        cursor_ag2.execute(sql)
        print("[ ", secondaryAG, "] ")
        for result in cursor_ag2:
            end_of_log_lsn = result['last_hardened_lsn']
            database_name = result['database_name']
            # 保证程序稳定执行
            obj = result
            for r1 in end_of_log_lsn_array:
                if r1['database_name'] == database_name:
                    obj = r1
                    break
            if obj['last_hardened_lsn'] == end_of_log_lsn:
                ok = 0
            else:
                ok = 1
                print("failure at: ", num)
                print("primary: ", obj)
                print("secondary: ", result)
                print("break check.")
                break
            num += 1
            print(num, result)
        if ok == 1:
            failure += 1
            print("Synchronization is not complete, please wait.")
            print("trying times: ", failure)
        # 长时间(5min)没有同步成功则退出循环 --------------------------------------------------------------------------------
        if failure >= times:
            ok = 0

    # -- If the last_hardened_lsn is not the same after a period of time, to avoid data loss,
    # -- we need to fail back to the global primary by running this command on the global primary
    # -- and then start over from the second step:
    if failure >= times:
        print("fail back ...")
        cursor_ag1.execute(sql6)
        print("mode back to async ...")
        cursor_ag1.execute(sql7)
        print("back steps executed ok !")
        print("Why data sync failure. May have to check database by your self !")
        print("we have try to fail back to the global primary by running this command on the global primary.")
        print("and this script will exit by 3 seconds ...")
        time.sleep(3)
        sys.exit()


# Primary AG------------------------------------------------------------------------------------------------------------
role1 = 'primary'
conn = pymssql.connect(
    server=cp.get(role1, 'server'),
    user=cp.get(role1, 'user'),
    password=cp.get(role1, 'password'),
    port=cp.get(role1, 'port'),
    database=cp.get(role1, 'database'),
    autocommit=True)

cursor = conn.cursor(as_dict=True)

cursor.execute(sql1)
print("sets the distributed availability group to synchronous commit")

state = 1
while state:
    print("please waiting state sync ...")
    time.sleep(5)
    cursor.execute(sql2)
    index = 0
    desc1 = "1"
    desc2 = "2"
    for o in cursor:
        desc = o['availability_mode_desc']
        index += 1
        print(index, o)
        if index == 1:
            desc1 = desc
        if index == 2:
            desc2 = desc
    # 判断状态同步过程
    if desc1 == desc2 == "SYNCHRONOUS_COMMIT":
        state = 0
    else:
        state = 1
        print("Please check that the master replica transaction is closed. If it closed, just wait a second.")

print("DAG Database state synchronization successful !")

check_data_sync(sql3, cursor)

print("DAG Data synchronization completed !")

cursor.execute(sql4)
print("Update Primary AG Role to SECONDARY success ...")
print("DAG disabled now ! Please wait for this script to do a final check ...")


# Secondary AG----------------------------------------------------------------------------------------------------------
role2 = 'secondary'

conn2 = pymssql.connect(
    server=cp.get(role2, 'server'),
    user=cp.get(role2, 'user'),
    password=cp.get(role2, 'password'),
    port=cp.get(role2, 'port'),
    database=cp.get(role2, 'database'),
    autocommit=True)

cursor2 = conn2.cursor(as_dict=True)

print("The last check to prevent data loss ...")
check_data_sync_ok(cursor, cursor2, sql5)
print("check over and success.")

conn.close()
print("close connection from ", primaryAG)

print("Failover starting ...")
cursor2.execute(sql6)
print("OK !")

cursor2.execute(sql7)
print("sets the distributed availability group to asynchronous commit")

print("Failover script execution is complete.")
conn2.close()
print("close connection from ", secondaryAG)

下面是配置文件,参数名应该是见名知意了吧。。

对应文件名:database.ini

[primary]
server = 
user = 
password = 
port = 
database = master
[secondary]
server = 
user = 
password = 
port = 
database = master
[system]
interval = 10
data_sync_time_out = 60
distribute_availability_group_name = 
primary_availability_group_name = 
secondary_availability_group_name = 

 

 以上

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐