作者:解琛
时间:2021 年 2 月 3 日

十三、文件锁

13.1 写文件

#!/usr/bin/env python
# coding=utf-8
import json
import fcntl
import os


def write_file(path, data):
    '''写入文件;'''
    if data == "" or data == {} or data == None or data == []:
        print(">> 写入无效的数据;")
    else:
        for _ in range(10):
            with open(path, "w+") as file_obj:
                try:
                    fcntl.flock(file_obj, fcntl.LOCK_EX)
                    print("文件锁已开;")
                    file_obj.write(data)
                    fcntl.flock(file_obj.fileno(), fcntl.LOCK_UN)
                    break
                except IOError:
                    print(">> 文件已经被加锁;")
                except Exception:
                    pass


while True:
    write_file("./test.json", "jerome")

13.2 读文件

#!/usr/bin/env python
# coding=utf-8
import json
import fcntl
import os


# 读取文件;
def read_file(path, file_type):
    res = None
    if os.path.exists(path):
        for _ in range(10):
            with open(path, "r") as file_obj:
                try:
                    fcntl.flock(file_obj.fileno(), fcntl.LOCK_EX)
                    print("文件锁已开;")
                    if (file_type == "json"):
                        res = json.load(file_obj)
                    else:
                        res = file_obj.read()
                    fcntl.flock(file_obj.fileno(), fcntl.LOCK_UN)
                    break
                except IOError:
                    print(">> 文件已经被加锁;")
                except Exception:
                    pass

    return res


while True:
    data = read_file("./test.json", "")
    print(repr(data))

经过测试,文件锁极容易出现失效,导致写入的文件为空。

13.3 基于全局变量、文件锁的多线程文件保护

#!/usr/bin/env python
# coding=utf-8
import json
import fcntl
import os
import time
import threading


def write_file(path, data):
    '''写入文件;'''
    global FILEFLAG
    if "FILEFLAG" not in globals().keys():
        FILEFLAG = True
    for __ in range(10):
        if FILEFLAG == True:
            FILEFLAG = False
            if data == "" or data == {} or data == None or data == []:
                print(">> 写入无效的数据;")
            else:
                for _ in range(10):
                    with open(path, "w+") as file_obj:
                        try:
                            fcntl.flock(file_obj, fcntl.LOCK_EX)
                            file_obj.write(data)
                            fcntl.flock(file_obj.fileno(), fcntl.LOCK_UN)
                            file_obj.close()
                            break
                        except IOError:
                            print(">> 文件已经被加锁;")
                        except Exception:
                            pass
            FILEFLAG = True
            break
        else:
            # time.sleep(0.1)
            pass


# 读取文件;
def read_file(path, file_type):
    '''读取文件;'''
    global FILEFLAG
    if "FILEFLAG" not in globals().keys():
        FILEFLAG = True
    res = None
    for __ in range(10):
        if FILEFLAG == True:
            FILEFLAG = False
            if os.path.exists(path):
                for _ in range(10):
                    with open(path, "r") as file_obj:
                        try:
                            fcntl.flock(file_obj.fileno(), fcntl.LOCK_EX)
                            # print("文件已加锁;")
                            if (file_type == "json"):
                                res = json.load(file_obj)
                            else:
                                res = file_obj.read()
                            fcntl.flock(file_obj.fileno(), fcntl.LOCK_UN)
                            file_obj.close()
                            break
                        except IOError:
                            print(">> 文件已经被加锁;")
                        except Exception:
                            print("失败;")
            FILEFLAG = True
            break
        else:
            # time.sleep(0.1)
            pass

    return res


def test1():
    while True:
        write_file("./test.json", "jerome\n")
        time.sleep(0.01)


def test2():
    while True:
        data = read_file("./test.json", "")
        print(repr(data))


threading.Thread(target=test1).start()
threading.Thread(target=test2).start()
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐