SeNet代码实现
SeNet开发环境准备工作项目代码结构cifar10转png程序数据集构建文件SeNet模型推理程序SeNet模型训练程序senet网络结构构建程序开发环境python–3.7torch–1.8+cu101torchsummarytorchvision–0.6.1+cu101PILnumpyopencv-pythonpillowimageio准备工作cifar10数据集下载地址:https://w
·
开发环境
- python–3.7
- torch–1.8+cu101
- torchsummary
- torchvision–0.6.1+cu101
- PIL
- numpy
- opencv-python
- pillow
- imageio
准备工作
cifar10数据集下载地址:
https://www.cs.toronto.edu/~kriz/cifar.html
预训练权重下载地址:
https://github.com/moskomule/senet.pytorch/releases/download/archive/seresnet50-60a8950a85b2b.pkl
项目代码结构
- data文件夹下存储了cifar10数据集(python版)和SeNet预训练权重
- src存储了SeNet推理demo程序和SeNet训练程序
- tools存储了通用程序文件:cifar10转png程序、cifar10数据集构建文件、resnext50模型构建程序、模型训练程序。
cifar10转png程序
import numpy as np
import os
import sys
import pickle
import imageio
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
def unpickle(file):
fo = open(file, 'rb')
if sys.version_info < (3, 0):
dict_ = pickle.load(fo)
else:
dict_ = pickle.load(fo, encoding='bytes')
fo.close()
return dict_
def my_mkdir(my_dir):
if not os.path.isdir(my_dir):
os.makedirs(my_dir)
if __name__ == '__main__':
data_dir = os.path.join(BASE_DIR, "..", "..", "Data", "cifar-10", "cifar-10-batches-py")
train_o_dir = os.path.join(BASE_DIR, "..", "..", "Data", "cifar-10", "cifar10_train")
test_o_dir = os.path.join(BASE_DIR, "..", "..", "Data", "cifar-10", "cifar10_test")
for j in range(1, 6):
data_path = os.path.join(data_dir, "data_batch_" + str(j)) # data_batch_12345
train_data = unpickle(data_path)
print(data_path + " is loading...")
for i in range(0, 10000):
img = np.reshape(train_data[b'data'][i], (3, 32, 32))
img = img.transpose(1, 2, 0)
label_num = str(train_data[b'labels'][i])
o_dir = os.path.join(train_o_dir, label_num)
my_mkdir(o_dir)
img_name = label_num + '_' + str(i + (j - 1)*10000) + '.png'
img_path = os.path.join(o_dir, img_name)
imageio.imwrite(img_path, img)
print(data_path + " loaded.")
print("test_batch is loading...")
test_data_path = os.path.join(data_dir, "test_batch")
test_data = unpickle(test_data_path)
for i in range(0, 10000):
img = np.reshape(test_data[b'data'][i], (3, 32, 32))
img = img.transpose(1, 2, 0)
label_num = str(test_data[b'labels'][i])
o_dir = os.path.join(test_o_dir, label_num)
my_mkdir(o_dir)
img_name = label_num + '_' + str(i) + '.png'
img_path = os.path.join(o_dir, img_name)
imageio.imwrite(img_path, img)
print("test_batch loaded.")
数据集构建文件
import os
from PIL import Image
from torch.utils.data import Dataset
class CifarDataset(Dataset):
def __init__(self, data_dir, transform=None):
assert (os.path.exists(data_dir)), "data_dir:{} 不存在!".format(data_dir)
self.data_dir = data_dir
self._get_img_info()
self.transform = transform
def __getitem__(self, index):
fn, label = self.img_info[index]
img = Image.open(fn).convert('RGB')
if self.transform is not None:
img = self.transform(img)
return img, label
def __len__(self):
if len(self.img_info) == 0:
raise Exception("未获取任何图片路径,请检查dataset及文件路径!")
return len(self.img_info)
def _get_img_info(self):
sub_dir_ = [name for name in os.listdir(self.data_dir) if os.path.isdir(os.path.join(self.data_dir, name))]
sub_dir = [os.path.join(self.data_dir, c) for c in sub_dir_]
self.img_info = []
for c_dir in sub_dir:
path_img = [(os.path.join(c_dir, i), int(os.path.basename(c_dir))) for i in os.listdir(c_dir) if
i.endswith("png")]
self.img_info.extend(path_img)
SeNet模型推理程序
import os
os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
import time
import json
import torch
import torchvision.transforms as transforms
from PIL import Image
from matplotlib import pyplot as plt
from lesson.J_SENet.tools.se_resnet import get_se_resnet_50
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def img_transform(img_rgb, transform=None):
"""
将数据转换为模型读取的形式
:param img_rgb: PIL Image
:param transform: torchvision.transform
:return: tensor
"""
if transform is None:
raise ValueError("找不到transform!必须有transform对img进行处理")
img_t = transform(img_rgb)
return img_t
def process_img(path_img):
# hard code
norm_mean = [0.485, 0.456, 0.406]
norm_std = [0.229, 0.224, 0.225]
inference_transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop((224, 224)),
transforms.ToTensor(),
transforms.Normalize(norm_mean, norm_std),
])
# path --> img
img_rgb = Image.open(path_img).convert('RGB')
# img --> tensor
img_tensor = img_transform(img_rgb, inference_transform)
img_tensor.unsqueeze_(0) # chw --> bchw
img_tensor = img_tensor.to(device)
return img_tensor, img_rgb
def load_class_names(p_clsnames, p_clsnames_cn):
"""
加载标签名
:param p_clsnames:
:param p_clsnames_cn:
:return:
"""
with open(p_clsnames, "r") as f:
class_names = json.load(f)
with open(p_clsnames_cn, encoding='UTF-8') as f: # 设置文件对象
class_names_cn = f.readlines()
return class_names, class_names_cn
if __name__ == "__main__":
# config
path_state_dict = os.path.join(BASE_DIR, "..", "data", "seresnet50-60a8950a85b2b.pkl")
path_img = os.path.join(BASE_DIR, "..", "data","Golden Retriever from baidu.jpg")
# path_img = os.path.join(BASE_DIR, "..", "data", "tiger cat.jpg")
path_classnames = os.path.join(BASE_DIR, "..", "data", "imagenet1000.json")
path_classnames_cn = os.path.join(BASE_DIR, "..", "data", "imagenet_classnames.txt")
# load class names
cls_n, cls_n_cn = load_class_names(path_classnames, path_classnames_cn)
# 1/5 load img
img_tensor, img_rgb = process_img(path_img)
# 2/5 load model
se_resnet_model = get_se_resnet_50(path_state_dict, device)
# 3/5 inference tensor --> vector
with torch.no_grad():
time_tic = time.time()
outputs = se_resnet_model(img_tensor)
time_toc = time.time()
# 4/5 index to class names
_, pred_int = torch.max(outputs.data, 1)
_, top5_idx = torch.topk(outputs.data, 5, dim=1)
pred_idx = int(pred_int.cpu().numpy())
pred_str, pred_cn = cls_n[pred_idx], cls_n_cn[pred_idx]
print("img: {} is: {}\n{}".format(os.path.basename(path_img), pred_str, pred_cn))
print("time consuming:{:.2f}s".format(time_toc - time_tic))
# 5/5 visualization
plt.imshow(img_rgb)
plt.title("predict:{}".format(pred_str))
top5_num = top5_idx.cpu().numpy().squeeze()
text_str = [cls_n[t] for t in top5_num]
for idx in range(len(top5_num)):
plt.text(5, 15+idx*30, "top {}:{}".format(idx+1, text_str[idx]), bbox=dict(fc='yellow'))
plt.show()
SeNet模型训练程序
import os
from datetime import datetime
import numpy as np
import torch.nn as nn
import torch
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
import torch.optim as optim
from tools.cifar10_dataset import CifarDataset
from tools.common_tools import ModelTrainer, show_confMat, plot_line
from tools.se_resnet import CifarSEBasicBlock
from tools.resnet import resnet20
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# print(device)
if __name__ == "__main__":
# config
train_dir = os.path.join(BASE_DIR, "..", "..", "Data", "cifar-10", "cifar10_train")
test_dir = os.path.join(BASE_DIR, "..", "..", "Data", "cifar-10", "cifar10_test")
now_time = datetime.now()
time_str = datetime.strftime(now_time, '%m-%d_%H-%M')
log_dir = os.path.join(BASE_DIR, "..", "results", time_str)
if not os.path.exists(log_dir):
os.makedirs(log_dir)
class_names = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
num_classes = 10
MAX_EPOCH = 300
BATCH_SIZE = 64
LR = 0.1
log_interval = 1
val_interval = 1
start_epoch = -1
milestones = [150, 225]
# ============================ step 1/5 数据 ============================
norm_mean = [0.485, 0.456, 0.406]
norm_std = [0.229, 0.224, 0.225]
train_transform = transforms.Compose([
transforms.Resize(32),
transforms.RandomHorizontalFlip(p=0.5),
transforms.RandomCrop(32, padding=4),
transforms.ToTensor(),
transforms.Normalize(norm_mean, norm_std),
])
valid_transform = transforms.Compose([
transforms.Resize((32, 32)),
transforms.ToTensor(),
transforms.Normalize(norm_mean, norm_std),
])
# 构建MyDataset实例
train_data = CifarDataset(data_dir=train_dir, transform=train_transform)
valid_data = CifarDataset(data_dir=test_dir, transform=valid_transform)
# 构建DataLoder
train_loader = DataLoader(dataset=train_data, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
valid_loader = DataLoader(dataset=valid_data, batch_size=32, num_workers=2)
# ============================ step 2/5 模型 ============================
se_resnet_model = resnet20(CifarSEBasicBlock)
# se_resnet_model = resnet20()
print(se_resnet_model)
se_resnet_model.to(device)
# ============================ step 3/5 损失函数 ============================
criterion = nn.CrossEntropyLoss()
# ============================ step 4/5 优化器 ============================
# 冻结卷积层
optimizer = optim.SGD(se_resnet_model.parameters(), lr=LR, momentum=0.9, weight_decay=1e-4) # 选择优化器
scheduler = optim.lr_scheduler.MultiStepLR(optimizer, gamma=0.1, milestones=milestones)
# ============================ step 5/5 训练 ============================
loss_rec = {"train": [], "valid": []}
acc_rec = {"train": [], "valid": []}
best_acc, best_epoch = 0, 0
for epoch in range(start_epoch + 1, MAX_EPOCH):
# 训练(data_loader, model, loss_f, optimizer, epoch_id, device, max_epoch)
loss_train, acc_train, mat_train = ModelTrainer.train(train_loader, se_resnet_model, criterion, optimizer, epoch, device, MAX_EPOCH)
loss_valid, acc_valid, mat_valid = ModelTrainer.valid(valid_loader, se_resnet_model, criterion, device)
print("Epoch[{:0>3}/{:0>3}] Train Acc: {:.2%} Valid Acc:{:.2%} Train loss:{:.4f} Valid loss:{:.4f} LR:{}".format(
epoch + 1, MAX_EPOCH, acc_train, acc_valid, loss_train, loss_valid, optimizer.param_groups[0]["lr"]))
scheduler.step() # 更新学习率
# 绘图
loss_rec["train"].append(loss_train), loss_rec["valid"].append(loss_valid)
acc_rec["train"].append(acc_train), acc_rec["valid"].append(acc_valid)
show_confMat(mat_train, class_names, "train", log_dir, verbose=epoch == MAX_EPOCH-1)
show_confMat(mat_valid, class_names, "valid", log_dir, verbose=epoch == MAX_EPOCH-1)
plt_x = np.arange(1, epoch+2)
plot_line(plt_x, loss_rec["train"], plt_x, loss_rec["valid"], mode="loss", out_dir=log_dir)
plot_line(plt_x, acc_rec["train"], plt_x, acc_rec["valid"], mode="acc", out_dir=log_dir)
if epoch > (MAX_EPOCH/2) and best_acc < acc_valid:
best_acc = acc_valid
best_epoch = epoch
checkpoint = {"model_state_dict": se_resnet_model.state_dict(),
"optimizer_state_dict": optimizer.state_dict(),
"epoch": epoch,
"best_acc": best_acc}
path_checkpoint = os.path.join(log_dir, "checkpoint_best.pkl")
torch.save(checkpoint, path_checkpoint)
print(" done ~~~~ {}, best acc: {} in :{} epochs. ".format(datetime.strftime(datetime.now(), '%m-%d_%H-%M'),
best_acc, best_epoch))
now_time = datetime.now()
time_str = datetime.strftime(now_time, '%m-%d_%H-%M')
print(time_str)
senet网络结构构建程序
class SELayer(nn.Module):
def __init__(self, channel, reduction=16):
super(SELayer, self).__init__()
self.avg_pool = nn.AdaptiveAvgPool2d(1)
self.fc = nn.Sequential(
nn.Linear(channel, channel // reduction, bias=False),
nn.ReLU(inplace=True),
nn.Linear(channel // reduction, channel, bias=False),
nn.Sigmoid()
)
"""
We found empirically that on ResNet
architectures, removing the biases of the FC layers in the
excitation operation facilitates the modelling of channel
dependencies
"""
def forward(self, x):
b, c, _, _ = x.size()
y = self.avg_pool(x).view(b, c)
y = self.fc(y).view(b, c, 1, 1)
return x * y.expand_as(x)
def conv3x3(in_planes, out_planes, stride=1):
return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False)
class SEBasicBlock(nn.Module):
expansion = 1
def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1,
base_width=64, dilation=1, norm_layer=None,
*, reduction=16):
# 参数列表里的 * 星号,标志着位置参数的就此终结,之后的那些参数,都只能以关键字形式来指定。
super(SEBasicBlock, self).__init__()
self.conv1 = conv3x3(inplanes, planes, stride)
self.bn1 = nn.BatchNorm2d(planes)
self.relu = nn.ReLU(inplace=True)
self.conv2 = conv3x3(planes, planes, 1)
self.bn2 = nn.BatchNorm2d(planes)
self.se = SELayer(planes, reduction)
self.downsample = downsample
self.stride = stride
def forward(self, x):
residual = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
out = self.se(out)
if self.downsample is not None:
residual = self.downsample(x)
out += residual
out = self.relu(out)
return out
class SEBottleneck(nn.Module):
expansion = 4
def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1,
base_width=64, dilation=1, norm_layer=None,
*, reduction=16):
# 参数列表里的 * 星号,标志着位置参数的就此终结,之后的那些参数,都只能以关键字形式来指定。
super(SEBottleneck, self).__init__()
self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=1, bias=False)
self.bn1 = nn.BatchNorm2d(planes)
self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride,
padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(planes)
self.conv3 = nn.Conv2d(planes, planes * 4, kernel_size=1, bias=False)
self.bn3 = nn.BatchNorm2d(planes * 4)
self.relu = nn.ReLU(inplace=True)
self.se = SELayer(planes * 4, reduction)
self.downsample = downsample
self.stride = stride
def forward(self, x):
residual = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
out = self.relu(out)
out = self.conv3(out)
out = self.bn3(out)
out = self.se(out)
if self.downsample is not None:
residual = self.downsample(x)
out += residual
out = self.relu(out)
return out
更多推荐
所有评论(0)