CNN
发布日期:2021-05-14 14:58:11 浏览次数:24 分类:精选文章

本文共 17588 字,大约阅读时间需要 58 分钟。

CNN

目录结构

在这里插入图片描述

代码

train.py

import osimport randomimport numpy as npimport torchimport torch.nn as nnfrom torch.utils.data import DataLoaderimport torchvision.transforms as transformsimport torch.optim as optimfrom matplotlib import pyplot as pltfrom model.lenet import LeNetfrom model.se_resnet import se_resnet18from model.se_resnet import se_resnet50  # 引入模型from my_dataset import MyDatasetdef set_seed(seed=1):    random.seed(seed)    np.random.seed(seed)    torch.manual_seed(seed)    torch.cuda.manual_seed(seed)set_seed()  # 设置随机种子rmb_label = {   "eyesclosed": 0, "lookingarroud": 1, "safedriving": 2, "smoking": 3, "yawning": 4}  # 如果改了分类目标,这里需要修改# 参数设置MAX_EPOCH = 200BATCH_SIZE = 16LR = 0.01log_interval = 10val_interval = 1# ============================ step 1/5 数据 ============================# 数据路径split_dir = os.path.join("..", "..", "data", "split_data")train_dir = os.path.join(split_dir, "train_test")valid_dir = os.path.join(split_dir, "valid_test")# 图像的均值和标准差norm_mean = [0.33424968,0.33424437, 0.33428448]norm_std = [0.24796878, 0.24796101, 0.24801227]# 训练数据预处理train_transform = transforms.Compose([    transforms.Resize((256, 256)),    transforms.RandomCrop(256, padding=4),    # 添加随机遮挡 旋转 等    transforms.ToTensor(),    transforms.Normalize(norm_mean, norm_std),])# 验证数据预处理valid_transform = transforms.Compose([    transforms.Resize((256, 256)),    transforms.ToTensor(),    transforms.Normalize(norm_mean, norm_std),])# 构建MyDataset实例train_data = MyDataset(data_dir=train_dir, transform=train_transform)valid_data = MyDataset(data_dir=valid_dir, transform=valid_transform)# 构建DataLodertrain_loader = DataLoader(dataset=train_data, batch_size=BATCH_SIZE, shuffle=True)  # shuffle训练时打乱样本valid_loader = DataLoader(dataset=valid_data, batch_size=BATCH_SIZE)# ============================ step 2/5 模型 ============================net = se_resnet50(num_classes=5,pretrained=True)  # 对应修改模型 net = se_resnet50(num_classes=5,pretrained=True)# ============================ step 3/5 损失函数 ============================criterion = nn.CrossEntropyLoss()                                                   # 选择损失函数# ============================ step 4/5 优化器 ============================optimizer = optim.SGD(net.parameters(), lr=LR, momentum=0.9)                        # 选择优化器scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)     # 设置学习率下降策略# ============================ step 5/5 训练 ============================train_curve = list()valid_curve = list()for epoch in range(MAX_EPOCH):    loss_mean = 0.    correct = 0.    total = 0.    # incorrect=0.    net.train()    for i, data in enumerate(train_loader):# 获取数据        # forward        inputs, labels = data        outputs = net(inputs)        # backward        optimizer.zero_grad()        loss = criterion(outputs, labels)  # 一个batch的loss        loss.backward()        # update weights        optimizer.step()        # 统计分类情况        _, predicted = torch.max(outputs.data, 1)  # 1 应该是返回索引的意思        total += labels.size(0)        # torch.squeeze() 这个函数主要对数据的维度进行压缩,去掉维数为1的的维度,比如是一行或者一列这种,一个一行三列(1,3)的数去掉第一个维数为一的维度之后就变成(3)行。        # pytorch可以通过.numpy()和torch.from_numpy()实现tensor和numpy的ndarray类型之间的转换        correct += (predicted == labels).squeeze().sum().numpy()  # 计算一共正确的个数        # 多分类时 采用one vs rest策略时 假如 label = [0,0,1,2,2] 只能百对于0,1,2这三个类别分度别计算知召道回率和专准确属率。可以用来分析数据        #incorrect += (predicted != labels).squeeze().sum().numpy()        # Top1就是普通的Accuracy,Top5比Top1衡量标准更“严格”,        # 具体来讲,比如一共需要分10类,每次分类器的输出结果都是10个相加为1的概率值,Top1就是这十个值中最大的那个概率值对应的分类恰好正确的频率,而Top5则是在十个概率值中从大到小排序出前五个,然后看看这前五个分类中是否存在那个正确分类,再计算频率        # 打印训练信息        loss_mean += loss.item()  # 计算一共的loss        train_curve.append(loss.item())  # 训练曲线,用于显示        if (i+1) % log_interval == 0:   # log_interval=10 表示每迭代10次,打印一次训练信息,在这里bachsize=16 迭代10次就是160张图片,即total=160            loss_mean = loss_mean / log_interval  # 取平均loss            print("Training:Epoch[{:0>3}/{:0>3}] Iteration[{:0>3}/{:0>3}] Loss: {:.4f} Acc:{:.2%}".format(                epoch, MAX_EPOCH, i+1, len(train_loader), loss_mean, correct / total))            correct=correct            total=total   # total=160            # 保存训练信息,即写日志            f = open("log_training.txt", 'a')  # 若文件不存在,系统自动创建。'a'表示可连续写入到文件,保留原内容,在原            # 内容之后写入。可修改该模式('w+','w','wb'等)            f.write("Training:Epoch[{:0>3}/{:0>3}] Iteration[{:0>3}/{:0>3}] Loss: {:.4f} Acc:{:.2%}".format(                epoch, MAX_EPOCH, i+1, len(train_loader), loss_mean, correct / total))  # 将字符串写入文件中            f.write("\n")  # 换行            loss_mean = 0.  # 每次需要清0    scheduler.step()  # 更新学习率    # validate the model    if (epoch+1) % val_interval == 0:  # val_interval=1 表示每一个epoch打印一次验证信息        correct_val = 0. #  正确值        total_val = 0.  # 一共的        loss_val = 0.  # 损失        net.eval()  # 模型保持静止,不进行更新,从而来验证        with torch.no_grad():  # 不保存梯度,减少内存消耗,提高运行速度            for j, data in enumerate(valid_loader):                inputs, labels = data                outputs = net(inputs)                loss = criterion(outputs, labels)                _, predicted = torch.max(outputs.data, 1)                total_val += labels.size(0)                correct_val += (predicted == labels).squeeze().sum().numpy()                loss_val += loss.item()            valid_curve.append(loss_val/valid_loader.__len__())            print("Valid:\t Epoch[{:0>3}/{:0>3}] Iteration[{:0>3}/{:0>3}] Loss: {:.4f} Acc:{:.2%}".format(                epoch, MAX_EPOCH, j+1, len(valid_loader), loss_val, correct_val / total_val))            f = open("log_training.txt", 'a')  # 若文件不存在,系统自动创建。'a'表示可连续写入到文件,保留原内容,在原            # 内容之后写入。可修改该模式('w+','w','wb'等)            f.write("Valid:\t Epoch[{:0>3}/{:0>3}] Iteration[{:0>3}/{:0>3}] Loss: {:.4f} Acc:{:.2%}".format(                epoch, MAX_EPOCH, j+1, len(valid_loader), loss_val, correct_val / total_val))  # 将字符串写入文件中            f.write("\n")  # 换行train_x = range(len(train_curve))train_y = train_curvetrain_iters = len(train_loader)valid_x = np.arange(1, len(valid_curve)+1) * train_iters*val_interval # 由于valid中记录的是epochloss,需要对记录点进行转换到iterationsvalid_y = valid_curveplt.plot(train_x, train_y, label='Train')plt.plot(valid_x, valid_y, label='Valid')plt.legend(loc='upper right')plt.ylabel('loss value')plt.xlabel('Iteration')plt.show()# ============================ inference ============================BASE_DIR = os.path.dirname(os.path.abspath(__file__))test_dir = os.path.join(BASE_DIR, "test_data")test_data = MyDataset(data_dir=test_dir, transform=valid_transform)valid_loader = DataLoader(dataset=test_data, batch_size=1)for i, data in enumerate(valid_loader):    # forward    inputs, labels = data    outputs = net(inputs)    _, predicted = torch.max(outputs.data, 1) # 取最大的索引    #rmb = 1 if predicted.numpy()[0] == 0 else 100    final_choose=[1,2,3,4,5]    result=final_choose[predicted.numpy()[0]]    print("模型获得类型{}".format(result))

my_dataset.py

"""各数据集的Dataset定义"""import osimport randomfrom PIL import Imagefrom torch.utils.data import Datasetrandom.seed(1)rmb_label = {   "eyesclosed": 0, "lookingarroud": 1, "safedriving": 2, "smoking": 3, "yawning": 4}  # 如果改了分类目标,这里需要修改# 主要是用来接受索引返回样本用的class MyDataset(Dataset):    def __init__(self, data_dir, transform=None):        """        :param data_dir: str, 数据集所在路径        :param transform: torch.transform,数据预处理        """        self.label_name = {   "eyesclosed": 0, "lookingarroud": 1, "safedriving": 2, "smoking": 3, "yawning": 4}  # 如果改了分类目标,这里需要修改        self.data_info = self.get_img_info(data_dir)  # data_info存储所有图片路径和标签,在DataLoader中通过index读取样本        self.transform = transform    #接受一个索引,返回一个样本 ---  img, label    def __getitem__(self, index):        path_img, label = self.data_info[index]        img = Image.open(path_img).convert('RGB')     # 0~255        if self.transform is not None:            img = self.transform(img)   # 在这里做transform,转为tensor等等        return img, label    def __len__(self):        return len(self.data_info)    @staticmethod    def get_img_info(data_dir):        data_info = list()        for root, dirs, _ in os.walk(data_dir):            # 遍历类别            for sub_dir in dirs:                img_names = os.listdir(os.path.join(root, sub_dir))                img_names = list(filter(lambda x: x.endswith('.jpg'), img_names))   # 如果改了图片格式,这里需要修改                # 遍历图片                for i in range(len(img_names)):                    img_name = img_names[i]                    path_img = os.path.join(root, sub_dir, img_name)                    label = rmb_label[sub_dir]                    data_info.append((path_img, int(label)))        return data_info

splite_dataset.py

# -*- coding: utf-8 -*-"""将数据集划分为训练集,验证集,测试集"""import osimport randomimport shutildef makedir(new_dir):    if not os.path.exists(new_dir):        os.makedirs(new_dir)if __name__ == '__main__':    random.seed(1)    dataset_dir = os.path.join("..", "..", "data", "raw_data","training_images")    split_dir = os.path.join("..", "..", "data", "split_data")    train_dir = os.path.join(split_dir, "train_test")    valid_dir = os.path.join(split_dir, "valid_test")    test_dir = os.path.join(split_dir, "test")    train_pct = 0.8    valid_pct = 0.2    test_pct = 0    for root, dirs, files in os.walk(dataset_dir):        for sub_dir in dirs:            imgs = os.listdir(os.path.join(root, sub_dir))            imgs = list(filter(lambda x: x.endswith('.jpg'), imgs)) # # 如果改了图片格式,这里需要修改            random.shuffle(imgs)            img_count = len(imgs)            train_point = int(img_count * train_pct)            valid_point = int(img_count * (train_pct + valid_pct))            for i in range(img_count):                if i < train_point:                    out_dir = os.path.join(train_dir, sub_dir)                elif i < valid_point:                    out_dir = os.path.join(valid_dir, sub_dir)                else:                    out_dir = os.path.join(test_dir, sub_dir)                makedir(out_dir)                target_path = os.path.join(out_dir, imgs[i])                src_path = os.path.join(dataset_dir, sub_dir, imgs[i])                shutil.copy(src_path, target_path)            print('Class:{}, train:{}, valid:{}, test:{}'.format(sub_dir, train_point, valid_point-train_point,                                                                 img_count-valid_point))

caculate_mean_std.py

import torchimport numpy as npimport torchvision.transforms as transformsimport torchvisionfrom torch.utils.data import DataLoaderfrom my_dataset import MyDatasetimport ostrain_dir = os.path.join('.', "train_test")train_transform = transforms.Compose([    transforms.Resize((256, 256)),    transforms.ToTensor(),])train_data = MyDataset(data_dir=train_dir, transform=train_transform)train_loader = DataLoader(dataset=train_data, batch_size=1000, shuffle=True)train = iter(train_loader).next()[0]  # 500张图片的mean stdtrain_mean = np.mean(train.numpy(), axis=(0, 2, 3))train_std = np.std(train.numpy(), axis=(0, 2, 3))print(train_mean, train_std)

se_resnet.py

import torch.nn as nnfrom torch.hub import load_state_dict_from_urlfrom torchvision.models import ResNetfrom torch import nnimport torch.nn as nnimport torch.nn.functional as Fclass SELayer(nn.Module):    def __init__(self, channel, reduction=16):        super(SELayer, self).__init__()        self.avg_pool = nn.AdaptiveAvgPool2d(1)        self.fc = nn.Sequential(            nn.Linear(channel, channel // reduction, bias=False),            nn.ReLU(inplace=True),            nn.Linear(channel // reduction, channel, bias=False),            nn.Sigmoid()        )    def forward(self, x):        b, c, _, _ = x.size()        y = self.avg_pool(x).view(b, c)        y = self.fc(y).view(b, c, 1, 1)        return x * y.expand_as(x)def conv3x3(in_planes, out_planes, stride=1):    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False)class SEBasicBlock(nn.Module):    expansion = 1    def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1,                 base_width=64, dilation=1, norm_layer=None,                 *, reduction=16):        super(SEBasicBlock, self).__init__()        self.conv1 = conv3x3(inplanes, planes, stride)        self.bn1 = nn.BatchNorm2d(planes)        self.relu = nn.ReLU(inplace=True)        self.conv2 = conv3x3(planes, planes, 1)        self.bn2 = nn.BatchNorm2d(planes)        self.se = SELayer(planes, reduction)        self.downsample = downsample        self.stride = stride    def forward(self, x):        residual = x        out = self.conv1(x)        out = self.bn1(out)        out = self.relu(out)        out = self.conv2(out)        out = self.bn2(out)        out = self.se(out)        if self.downsample is not None:            residual = self.downsample(x)        out += residual        out = self.relu(out)        return outclass SEBottleneck(nn.Module):    expansion = 4    def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1,                 base_width=64, dilation=1, norm_layer=None,                 *, reduction=16):        super(SEBottleneck, self).__init__()        self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=1, bias=False)        self.bn1 = nn.BatchNorm2d(planes)        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride,                               padding=1, bias=False)        self.bn2 = nn.BatchNorm2d(planes)        self.conv3 = nn.Conv2d(planes, planes * 4, kernel_size=1, bias=False)        self.bn3 = nn.BatchNorm2d(planes * 4)        self.relu = nn.ReLU(inplace=True)        self.se = SELayer(planes * 4, reduction)        self.downsample = downsample        self.stride = stride    def forward(self, x):        residual = x        out = self.conv1(x)        out = self.bn1(out)        out = self.relu(out)        out = self.conv2(out)        out = self.bn2(out)        out = self.relu(out)        out = self.conv3(out)        out = self.bn3(out)        out = self.se(out)        if self.downsample is not None:            residual = self.downsample(x)        out += residual        out = self.relu(out)        return outdef se_resnet18(num_classes=1_000):    """Constructs a ResNet-18 model.    Args:        pretrained (bool): If True, returns a model pre-trained on ImageNet    """    model = ResNet(SEBasicBlock, [2, 2, 2, 2], num_classes=num_classes)    model.avgpool = nn.AdaptiveAvgPool2d(1)    return modeldef se_resnet34(num_classes=1_000):    """Constructs a ResNet-34 model.    Args:        pretrained (bool): If True, returns a model pre-trained on ImageNet    """    model = ResNet(SEBasicBlock, [3, 4, 6, 3], num_classes=num_classes)    model.avgpool = nn.AdaptiveAvgPool2d(1)    return modeldef se_resnet50(num_classes=1_000, pretrained=False):    """Constructs a ResNet-50 model.    Args:        pretrained (bool): If True, returns a model pre-trained on ImageNet    """    model = ResNet(SEBottleneck, [3, 4, 6, 3], num_classes=num_classes)    model.avgpool = nn.AdaptiveAvgPool2d(1)    if pretrained:        model.load_state_dict(load_state_dict_from_url(            "https://github.com/moskomule/senet.pytorch/releases/download/archive/seresnet50-60a8950a85b2b.pkl"))    return modeldef se_resnet101(num_classes=1_000):    """Constructs a ResNet-101 model.    Args:        pretrained (bool): If True, returns a model pre-trained on ImageNet    """    model = ResNet(SEBottleneck, [3, 4, 23, 3], num_classes=num_classes)    model.avgpool = nn.AdaptiveAvgPool2d(1)    return modeldef se_resnet152(num_classes=1_000):    """Constructs a ResNet-152 model.    Args:        pretrained (bool): If True, returns a model pre-trained on ImageNet    """    model = ResNet(SEBottleneck, [3, 8, 36, 3], num_classes=num_classes)    model.avgpool = nn.AdaptiveAvgPool2d(1)    return model

lenet.py

import torch.nn as nnimport torch.nn.functional as Fclass LeNet(nn.Module):    # 模型定义设计    def __init__(self, num_classes):        super(LeNet, self).__init__()        self.conv1 = nn.Conv2d(3, 6, 5)        self.conv2 = nn.Conv2d(6, 16, 5)        self.fc1 = nn.Linear(16*5*5, 120)        self.fc2 = nn.Linear(120, 84)        self.fc3 = nn.Linear(84, num_classes)    # 模型前向传播    def forward(self, x):        out = F.relu(self.conv1(x))        out = F.max_pool2d(out, 2)        out = F.relu(self.conv2(out))        out = F.max_pool2d(out, 2)        out = out.view(out.size(0), -1)        out = F.relu(self.fc1(out))        out = F.relu(self.fc2(out))        out = self.fc3(out)        return out    def initialize_weights(self):        for m in self.modules():            if isinstance(m, nn.Conv2d):                nn.init.xavier_normal_(m.weight.data)                if m.bias is not None:                    m.bias.data.zero_()            elif isinstance(m, nn.BatchNorm2d):                m.weight.data.fill_(1)                m.bias.data.zero_()            elif isinstance(m, nn.Linear):                nn.init.normal_(m.weight.data, 0, 0.1)                m.bias.data.zero_()class LeNet2(nn.Module):    def __init__(self, num_classes):        super(LeNet2, self).__init__()        self.features = nn.Sequential(            nn.Conv2d(3, 6, 5),            nn.ReLU(),            nn.MaxPool2d(2, 2),            nn.Conv2d(6, 16, 5),            nn.ReLU(),            nn.MaxPool2d(2, 2)        )        self.classifier = nn.Sequential(            nn.Linear(16*5*5, 120),            nn.ReLU(),            nn.Linear(120, 84),            nn.ReLU(),            nn.Linear(84, num_classes)        )    def forward(self, x):        x = self.features(x)        x = x.view(x.size()[0], -1)        x = self.classifier(x)        return x
上一篇:RNN
下一篇:使用pytorch手动实现全连接神经网络(从数据生成到预测---inference)

发表评论

最新留言

网站不错 人气很旺了 加油
[***.192.178.218]2025年04月13日 15时45分55秒