
CNN
发布日期:2021-05-14 14:58:11
浏览次数:24
分类:精选文章
本文共 17588 字,大约阅读时间需要 58 分钟。
CNN
目录结构
代码
train.py
import osimport randomimport numpy as npimport torchimport torch.nn as nnfrom torch.utils.data import DataLoaderimport torchvision.transforms as transformsimport torch.optim as optimfrom matplotlib import pyplot as pltfrom model.lenet import LeNetfrom model.se_resnet import se_resnet18from model.se_resnet import se_resnet50 # 引入模型from my_dataset import MyDatasetdef set_seed(seed=1): random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed(seed)set_seed() # 设置随机种子rmb_label = { "eyesclosed": 0, "lookingarroud": 1, "safedriving": 2, "smoking": 3, "yawning": 4} # 如果改了分类目标,这里需要修改# 参数设置MAX_EPOCH = 200BATCH_SIZE = 16LR = 0.01log_interval = 10val_interval = 1# ============================ step 1/5 数据 ============================# 数据路径split_dir = os.path.join("..", "..", "data", "split_data")train_dir = os.path.join(split_dir, "train_test")valid_dir = os.path.join(split_dir, "valid_test")# 图像的均值和标准差norm_mean = [0.33424968,0.33424437, 0.33428448]norm_std = [0.24796878, 0.24796101, 0.24801227]# 训练数据预处理train_transform = transforms.Compose([ transforms.Resize((256, 256)), transforms.RandomCrop(256, padding=4), # 添加随机遮挡 旋转 等 transforms.ToTensor(), transforms.Normalize(norm_mean, norm_std),])# 验证数据预处理valid_transform = transforms.Compose([ transforms.Resize((256, 256)), transforms.ToTensor(), transforms.Normalize(norm_mean, norm_std),])# 构建MyDataset实例train_data = MyDataset(data_dir=train_dir, transform=train_transform)valid_data = MyDataset(data_dir=valid_dir, transform=valid_transform)# 构建DataLodertrain_loader = DataLoader(dataset=train_data, batch_size=BATCH_SIZE, shuffle=True) # shuffle训练时打乱样本valid_loader = DataLoader(dataset=valid_data, batch_size=BATCH_SIZE)# ============================ step 2/5 模型 ============================net = se_resnet50(num_classes=5,pretrained=True) # 对应修改模型 net = se_resnet50(num_classes=5,pretrained=True)# ============================ step 3/5 损失函数 ============================criterion = nn.CrossEntropyLoss() # 选择损失函数# ============================ step 4/5 优化器 ============================optimizer = optim.SGD(net.parameters(), lr=LR, momentum=0.9) # 选择优化器scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1) # 设置学习率下降策略# ============================ step 5/5 训练 ============================train_curve = list()valid_curve = list()for epoch in range(MAX_EPOCH): loss_mean = 0. correct = 0. total = 0. # incorrect=0. net.train() for i, data in enumerate(train_loader):# 获取数据 # forward inputs, labels = data outputs = net(inputs) # backward optimizer.zero_grad() loss = criterion(outputs, labels) # 一个batch的loss loss.backward() # update weights optimizer.step() # 统计分类情况 _, predicted = torch.max(outputs.data, 1) # 1 应该是返回索引的意思 total += labels.size(0) # torch.squeeze() 这个函数主要对数据的维度进行压缩,去掉维数为1的的维度,比如是一行或者一列这种,一个一行三列(1,3)的数去掉第一个维数为一的维度之后就变成(3)行。 # pytorch可以通过.numpy()和torch.from_numpy()实现tensor和numpy的ndarray类型之间的转换 correct += (predicted == labels).squeeze().sum().numpy() # 计算一共正确的个数 # 多分类时 采用one vs rest策略时 假如 label = [0,0,1,2,2] 只能百对于0,1,2这三个类别分度别计算知召道回率和专准确属率。可以用来分析数据 #incorrect += (predicted != labels).squeeze().sum().numpy() # Top1就是普通的Accuracy,Top5比Top1衡量标准更“严格”, # 具体来讲,比如一共需要分10类,每次分类器的输出结果都是10个相加为1的概率值,Top1就是这十个值中最大的那个概率值对应的分类恰好正确的频率,而Top5则是在十个概率值中从大到小排序出前五个,然后看看这前五个分类中是否存在那个正确分类,再计算频率 # 打印训练信息 loss_mean += loss.item() # 计算一共的loss train_curve.append(loss.item()) # 训练曲线,用于显示 if (i+1) % log_interval == 0: # log_interval=10 表示每迭代10次,打印一次训练信息,在这里bachsize=16 迭代10次就是160张图片,即total=160 loss_mean = loss_mean / log_interval # 取平均loss print("Training:Epoch[{:0>3}/{:0>3}] Iteration[{:0>3}/{:0>3}] Loss: {:.4f} Acc:{:.2%}".format( epoch, MAX_EPOCH, i+1, len(train_loader), loss_mean, correct / total)) correct=correct total=total # total=160 # 保存训练信息,即写日志 f = open("log_training.txt", 'a') # 若文件不存在,系统自动创建。'a'表示可连续写入到文件,保留原内容,在原 # 内容之后写入。可修改该模式('w+','w','wb'等) f.write("Training:Epoch[{:0>3}/{:0>3}] Iteration[{:0>3}/{:0>3}] Loss: {:.4f} Acc:{:.2%}".format( epoch, MAX_EPOCH, i+1, len(train_loader), loss_mean, correct / total)) # 将字符串写入文件中 f.write("\n") # 换行 loss_mean = 0. # 每次需要清0 scheduler.step() # 更新学习率 # validate the model if (epoch+1) % val_interval == 0: # val_interval=1 表示每一个epoch打印一次验证信息 correct_val = 0. # 正确值 total_val = 0. # 一共的 loss_val = 0. # 损失 net.eval() # 模型保持静止,不进行更新,从而来验证 with torch.no_grad(): # 不保存梯度,减少内存消耗,提高运行速度 for j, data in enumerate(valid_loader): inputs, labels = data outputs = net(inputs) loss = criterion(outputs, labels) _, predicted = torch.max(outputs.data, 1) total_val += labels.size(0) correct_val += (predicted == labels).squeeze().sum().numpy() loss_val += loss.item() valid_curve.append(loss_val/valid_loader.__len__()) print("Valid:\t Epoch[{:0>3}/{:0>3}] Iteration[{:0>3}/{:0>3}] Loss: {:.4f} Acc:{:.2%}".format( epoch, MAX_EPOCH, j+1, len(valid_loader), loss_val, correct_val / total_val)) f = open("log_training.txt", 'a') # 若文件不存在,系统自动创建。'a'表示可连续写入到文件,保留原内容,在原 # 内容之后写入。可修改该模式('w+','w','wb'等) f.write("Valid:\t Epoch[{:0>3}/{:0>3}] Iteration[{:0>3}/{:0>3}] Loss: {:.4f} Acc:{:.2%}".format( epoch, MAX_EPOCH, j+1, len(valid_loader), loss_val, correct_val / total_val)) # 将字符串写入文件中 f.write("\n") # 换行train_x = range(len(train_curve))train_y = train_curvetrain_iters = len(train_loader)valid_x = np.arange(1, len(valid_curve)+1) * train_iters*val_interval # 由于valid中记录的是epochloss,需要对记录点进行转换到iterationsvalid_y = valid_curveplt.plot(train_x, train_y, label='Train')plt.plot(valid_x, valid_y, label='Valid')plt.legend(loc='upper right')plt.ylabel('loss value')plt.xlabel('Iteration')plt.show()# ============================ inference ============================BASE_DIR = os.path.dirname(os.path.abspath(__file__))test_dir = os.path.join(BASE_DIR, "test_data")test_data = MyDataset(data_dir=test_dir, transform=valid_transform)valid_loader = DataLoader(dataset=test_data, batch_size=1)for i, data in enumerate(valid_loader): # forward inputs, labels = data outputs = net(inputs) _, predicted = torch.max(outputs.data, 1) # 取最大的索引 #rmb = 1 if predicted.numpy()[0] == 0 else 100 final_choose=[1,2,3,4,5] result=final_choose[predicted.numpy()[0]] print("模型获得类型{}".format(result))
my_dataset.py
"""各数据集的Dataset定义"""import osimport randomfrom PIL import Imagefrom torch.utils.data import Datasetrandom.seed(1)rmb_label = { "eyesclosed": 0, "lookingarroud": 1, "safedriving": 2, "smoking": 3, "yawning": 4} # 如果改了分类目标,这里需要修改# 主要是用来接受索引返回样本用的class MyDataset(Dataset): def __init__(self, data_dir, transform=None): """ :param data_dir: str, 数据集所在路径 :param transform: torch.transform,数据预处理 """ self.label_name = { "eyesclosed": 0, "lookingarroud": 1, "safedriving": 2, "smoking": 3, "yawning": 4} # 如果改了分类目标,这里需要修改 self.data_info = self.get_img_info(data_dir) # data_info存储所有图片路径和标签,在DataLoader中通过index读取样本 self.transform = transform #接受一个索引,返回一个样本 --- img, label def __getitem__(self, index): path_img, label = self.data_info[index] img = Image.open(path_img).convert('RGB') # 0~255 if self.transform is not None: img = self.transform(img) # 在这里做transform,转为tensor等等 return img, label def __len__(self): return len(self.data_info) @staticmethod def get_img_info(data_dir): data_info = list() for root, dirs, _ in os.walk(data_dir): # 遍历类别 for sub_dir in dirs: img_names = os.listdir(os.path.join(root, sub_dir)) img_names = list(filter(lambda x: x.endswith('.jpg'), img_names)) # 如果改了图片格式,这里需要修改 # 遍历图片 for i in range(len(img_names)): img_name = img_names[i] path_img = os.path.join(root, sub_dir, img_name) label = rmb_label[sub_dir] data_info.append((path_img, int(label))) return data_info
splite_dataset.py
# -*- coding: utf-8 -*-"""将数据集划分为训练集,验证集,测试集"""import osimport randomimport shutildef makedir(new_dir): if not os.path.exists(new_dir): os.makedirs(new_dir)if __name__ == '__main__': random.seed(1) dataset_dir = os.path.join("..", "..", "data", "raw_data","training_images") split_dir = os.path.join("..", "..", "data", "split_data") train_dir = os.path.join(split_dir, "train_test") valid_dir = os.path.join(split_dir, "valid_test") test_dir = os.path.join(split_dir, "test") train_pct = 0.8 valid_pct = 0.2 test_pct = 0 for root, dirs, files in os.walk(dataset_dir): for sub_dir in dirs: imgs = os.listdir(os.path.join(root, sub_dir)) imgs = list(filter(lambda x: x.endswith('.jpg'), imgs)) # # 如果改了图片格式,这里需要修改 random.shuffle(imgs) img_count = len(imgs) train_point = int(img_count * train_pct) valid_point = int(img_count * (train_pct + valid_pct)) for i in range(img_count): if i < train_point: out_dir = os.path.join(train_dir, sub_dir) elif i < valid_point: out_dir = os.path.join(valid_dir, sub_dir) else: out_dir = os.path.join(test_dir, sub_dir) makedir(out_dir) target_path = os.path.join(out_dir, imgs[i]) src_path = os.path.join(dataset_dir, sub_dir, imgs[i]) shutil.copy(src_path, target_path) print('Class:{}, train:{}, valid:{}, test:{}'.format(sub_dir, train_point, valid_point-train_point, img_count-valid_point))
caculate_mean_std.py
import torchimport numpy as npimport torchvision.transforms as transformsimport torchvisionfrom torch.utils.data import DataLoaderfrom my_dataset import MyDatasetimport ostrain_dir = os.path.join('.', "train_test")train_transform = transforms.Compose([ transforms.Resize((256, 256)), transforms.ToTensor(),])train_data = MyDataset(data_dir=train_dir, transform=train_transform)train_loader = DataLoader(dataset=train_data, batch_size=1000, shuffle=True)train = iter(train_loader).next()[0] # 500张图片的mean stdtrain_mean = np.mean(train.numpy(), axis=(0, 2, 3))train_std = np.std(train.numpy(), axis=(0, 2, 3))print(train_mean, train_std)
se_resnet.py
import torch.nn as nnfrom torch.hub import load_state_dict_from_urlfrom torchvision.models import ResNetfrom torch import nnimport torch.nn as nnimport torch.nn.functional as Fclass SELayer(nn.Module): def __init__(self, channel, reduction=16): super(SELayer, self).__init__() self.avg_pool = nn.AdaptiveAvgPool2d(1) self.fc = nn.Sequential( nn.Linear(channel, channel // reduction, bias=False), nn.ReLU(inplace=True), nn.Linear(channel // reduction, channel, bias=False), nn.Sigmoid() ) def forward(self, x): b, c, _, _ = x.size() y = self.avg_pool(x).view(b, c) y = self.fc(y).view(b, c, 1, 1) return x * y.expand_as(x)def conv3x3(in_planes, out_planes, stride=1): return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False)class SEBasicBlock(nn.Module): expansion = 1 def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1, base_width=64, dilation=1, norm_layer=None, *, reduction=16): super(SEBasicBlock, self).__init__() self.conv1 = conv3x3(inplanes, planes, stride) self.bn1 = nn.BatchNorm2d(planes) self.relu = nn.ReLU(inplace=True) self.conv2 = conv3x3(planes, planes, 1) self.bn2 = nn.BatchNorm2d(planes) self.se = SELayer(planes, reduction) self.downsample = downsample self.stride = stride def forward(self, x): residual = x out = self.conv1(x) out = self.bn1(out) out = self.relu(out) out = self.conv2(out) out = self.bn2(out) out = self.se(out) if self.downsample is not None: residual = self.downsample(x) out += residual out = self.relu(out) return outclass SEBottleneck(nn.Module): expansion = 4 def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1, base_width=64, dilation=1, norm_layer=None, *, reduction=16): super(SEBottleneck, self).__init__() self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=1, bias=False) self.bn1 = nn.BatchNorm2d(planes) self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False) self.bn2 = nn.BatchNorm2d(planes) self.conv3 = nn.Conv2d(planes, planes * 4, kernel_size=1, bias=False) self.bn3 = nn.BatchNorm2d(planes * 4) self.relu = nn.ReLU(inplace=True) self.se = SELayer(planes * 4, reduction) self.downsample = downsample self.stride = stride def forward(self, x): residual = x out = self.conv1(x) out = self.bn1(out) out = self.relu(out) out = self.conv2(out) out = self.bn2(out) out = self.relu(out) out = self.conv3(out) out = self.bn3(out) out = self.se(out) if self.downsample is not None: residual = self.downsample(x) out += residual out = self.relu(out) return outdef se_resnet18(num_classes=1_000): """Constructs a ResNet-18 model. Args: pretrained (bool): If True, returns a model pre-trained on ImageNet """ model = ResNet(SEBasicBlock, [2, 2, 2, 2], num_classes=num_classes) model.avgpool = nn.AdaptiveAvgPool2d(1) return modeldef se_resnet34(num_classes=1_000): """Constructs a ResNet-34 model. Args: pretrained (bool): If True, returns a model pre-trained on ImageNet """ model = ResNet(SEBasicBlock, [3, 4, 6, 3], num_classes=num_classes) model.avgpool = nn.AdaptiveAvgPool2d(1) return modeldef se_resnet50(num_classes=1_000, pretrained=False): """Constructs a ResNet-50 model. Args: pretrained (bool): If True, returns a model pre-trained on ImageNet """ model = ResNet(SEBottleneck, [3, 4, 6, 3], num_classes=num_classes) model.avgpool = nn.AdaptiveAvgPool2d(1) if pretrained: model.load_state_dict(load_state_dict_from_url( "https://github.com/moskomule/senet.pytorch/releases/download/archive/seresnet50-60a8950a85b2b.pkl")) return modeldef se_resnet101(num_classes=1_000): """Constructs a ResNet-101 model. Args: pretrained (bool): If True, returns a model pre-trained on ImageNet """ model = ResNet(SEBottleneck, [3, 4, 23, 3], num_classes=num_classes) model.avgpool = nn.AdaptiveAvgPool2d(1) return modeldef se_resnet152(num_classes=1_000): """Constructs a ResNet-152 model. Args: pretrained (bool): If True, returns a model pre-trained on ImageNet """ model = ResNet(SEBottleneck, [3, 8, 36, 3], num_classes=num_classes) model.avgpool = nn.AdaptiveAvgPool2d(1) return model
lenet.py
import torch.nn as nnimport torch.nn.functional as Fclass LeNet(nn.Module): # 模型定义设计 def __init__(self, num_classes): super(LeNet, self).__init__() self.conv1 = nn.Conv2d(3, 6, 5) self.conv2 = nn.Conv2d(6, 16, 5) self.fc1 = nn.Linear(16*5*5, 120) self.fc2 = nn.Linear(120, 84) self.fc3 = nn.Linear(84, num_classes) # 模型前向传播 def forward(self, x): out = F.relu(self.conv1(x)) out = F.max_pool2d(out, 2) out = F.relu(self.conv2(out)) out = F.max_pool2d(out, 2) out = out.view(out.size(0), -1) out = F.relu(self.fc1(out)) out = F.relu(self.fc2(out)) out = self.fc3(out) return out def initialize_weights(self): for m in self.modules(): if isinstance(m, nn.Conv2d): nn.init.xavier_normal_(m.weight.data) if m.bias is not None: m.bias.data.zero_() elif isinstance(m, nn.BatchNorm2d): m.weight.data.fill_(1) m.bias.data.zero_() elif isinstance(m, nn.Linear): nn.init.normal_(m.weight.data, 0, 0.1) m.bias.data.zero_()class LeNet2(nn.Module): def __init__(self, num_classes): super(LeNet2, self).__init__() self.features = nn.Sequential( nn.Conv2d(3, 6, 5), nn.ReLU(), nn.MaxPool2d(2, 2), nn.Conv2d(6, 16, 5), nn.ReLU(), nn.MaxPool2d(2, 2) ) self.classifier = nn.Sequential( nn.Linear(16*5*5, 120), nn.ReLU(), nn.Linear(120, 84), nn.ReLU(), nn.Linear(84, num_classes) ) def forward(self, x): x = self.features(x) x = x.view(x.size()[0], -1) x = self.classifier(x) return x
发表评论
最新留言
网站不错 人气很旺了 加油
[***.192.178.218]2025年04月13日 15时45分55秒
关于作者

喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
考研复试——KY276 Problem C
2021-05-14
LeetCode62/63/64 不同路径I/II/最小路径和
2021-05-14
LeetCode 45/55. 跳跃游戏I/II
2021-05-14
老鸟带你画tiled lines
2021-05-14
MybatisPlus自定义Sql实现多表查询
2021-05-15
Java位运算,负数的二进制表示形式,int类型最大值为什么是2的31次方-1
2021-05-15
WIFI模块开发教程之W600网络篇3:STA模式下TCP Client通信
2021-05-15
PyQt5快速上手基础篇10-QSettings用法
2021-05-15
JQuery--手风琴,留言板
2021-05-15
vue--搜索,添加,删除小案例
2021-05-15
VUE框架应用包---------微信二维码应用
2021-05-15
(KOA2 step1)利用koa-generator创建KOA2项目
2021-05-15
MFC 自定义消息发送字符串
2021-05-15
goahead 下goaction测试与搭建
2021-05-15
Adding Powers
2021-05-15
不能将 "const char *" 类型的值分配到 "char *" 类型的实体
2021-05-15
PyTorch学习笔记——(6)数据加载Dataset和DataLoader的使用
2021-05-15
ideal 下创建springboot项目
2021-05-15
Linux操作系统的安装与使用
2021-05-15
ajax请求出现/[object%20Object]错误的解决办法
2021-05-15