咱们今天来聊一个话题:基于CNN的时间序列分类方法。
其中给大家展示了其中详细的原理,以及完整的一个案例,完整的代码~
假设我们有一组一维时间序列数据,每个样本的输入为长度为 的信号 ,目标是将其分为
个类别中的一个。为此,我们构建一个基于 CNN 的分类器,利用局部特征提取和层级特征融合实现分类。 数据预处理 通常,输入数据需要经过归一化或标准化处理,例如将每个时间序列归一化到均值为0、方差为1:
其中 和 分别为该序列的均值和标准差。
CNN 模型架构 对于一维时间序列,常采用 1D 卷积操作。典型的 CNN 模型结构可以包含以下几个模块:
激活层(Activation Layer,如 ReLU) 全连接层(Fully Connected Layer) 下面咱们就来详细介绍各部分的计算过程~
卷积层
卷积操作:
给定输入信号 和一个长度为 的卷积核(滤波器)
,以及偏置 ,卷积操作在位置 上的输出 定义为:
其中
。
对于多通道(例如前几层特征图)的情况,假设输入为 通道,每个通道的信号为 ,则卷积核也扩展为
,卷积结果为:
步幅与填充:
若使用填充,输出长度 为:
激活函数 常用的激活函数为 ReLU,其公式为:
即,对卷积层输出的每个元素 应用:
池化层 池化层用于降维和提取最显著的局部特征。以 1D 最大池化(Max Pooling)为例,假设池化窗口大小为 ,步幅为 ,在窗口
内,输出为:
这样可以减少特征图的长度,同时增强特征的不变性。
多层 CNN 结构 一个典型的 CNN 模型可以堆叠多个卷积层和池化层。
例如,假设模型包含两层卷积层,每层后接 ReLU 激活和池化层:
经过以上层级提取后,将得到形状为
的特征图,再将其展平为一个向量,输入到全连接层。
全连接层与 Softmax 分类器 设展平后的特征向量为 ,其中
。全连接层对 的线性变换为:
其中
, 。
接下来通过 Softmax 层计算各类别的预测概率:
损失函数与模型训练 通常采用交叉熵损失函数(Cross-Entropy Loss)来衡量模型输出与真实标签之间的差距。对于单个样本,若真实标签的 one-hot 编码为 (其中只有正确类别为1,其余为0),则交叉熵损失定义为:
其中
为真实类别。
在训练过程中,模型参数(卷积核权重、全连接层权重等)通过反向传播算法(Backpropagation)和梯度下降(如 Adam、SGD 等优化器)进行更新。
模型训练流程概述 对输入时间序列 依次经过各个卷积、激活、池化层提取局部特征。 将多层提取的特征展平后输入全连接层,得到类别得分 。 完整案例 传统方法通常依赖特征工程和经典机器学习算法,而深度学习中的卷积神经网络(CNN)在自动提取局部特征、捕捉时序数据局部模式等方面具有明显优势。
这里,主要想给大家展现的是:利用一维卷积网络对时间序列数据进行特征提取,并结合全连接层进行分类。相较于传统的基于 RNN 或 LSTM 的模型,CNN具有并行计算和局部感受野的优势,能够更快收敛且具有较好的鲁棒性。
数据集 为了说明问题,我们构造一个虚拟时间序列数据集。数据集包括三类信号,分别代表不同的模式(例如:正弦波、方波、锯齿波),并加入噪声模拟真实数据中的随机扰动。数据集共包含3000个样本,每个样本的时间步长为128,标签取值为0、1、2。这样可以模拟多类别分类场景。
数据可视化说明 在数据构造阶段,我们会绘制以下图形:
样本波形图 :原始时间序列数据的波形形态,便于观察各类别信号的不同模式。
训练过程损失曲线 :模型在训练集和验证集上的损失变化。
分类准确率曲线 :每个训练周期(epoch)在训练集和验证集上的分类准确率。
预测曲线(混淆矩阵、ROC 曲线等) :模型在测试集上的预测结果,如混淆矩阵可以直观反映各类别的识别效果;另外我们也可以绘制各类别的 ROC 曲线。
import numpy as npimport
matplotlib.pyplot as pltimport seaborn as snsimport torchimport torch.nn as nnimport torch.optim as optimfrom torch.utils.data import Dataset, DataLoader, random_splitfrom sklearn.metrics import confusion_matrix, roc_curve, aucimport itertoolsimport random# 固定随机种子,保证结果可重复 np.random.seed(42 ) torch.manual_seed(42 ) random.seed(42 )# 虚拟数据集生成 def generate_time_series (n_samples=3000 , seq_length=128 ) : """ 生成包含三类的虚拟时间序列数据:正弦波、方波、锯齿波,并加入高斯噪声。 参数: n_samples: 总样本数 seq_length: 每个样本的时间步数 返回: X: 数据矩阵,形状 (n_samples, seq_length) y: 标签向量,取值 0,1,2 """ X = [] y = [] t = np.linspace(0 , 2 *np.pi, seq_length) for i in range(n_samples): label = np.random.choice([0 ,1 ,2 ]) if label == 0 : # 正弦波 signal = np.sin(t) + np.random.normal(0 , 0.1 , seq_length) elif label == 1 : # 方波:利用正弦波阈值化 signal = np.where(np.sin(t) > 0 , 1.0 , -1.0 ) + np.random.normal(0 , 0.1 , seq_length) else : # 锯齿波:使用线性函数再取周期性 signal = ((t / np.pi) - 1 ) + np.random.normal(0 , 0.1 , seq_length) X.append(signal) y.append(label) X = np.array(X) y = np.array(y) return X, y# 生成数据 X, y = generate_time_series()# 数据可视化:样本波形图 def plot_sample_waveforms (X, y, n_samples=3 ) : """ 随机选择n_samples个样本,并绘制波形图,每种类别选择一个样本。 """ plt.figure(figsize=(12 , 6 )) colors = ['dodgerblue' , 'crimson' , 'limegreen' ] # 鲜艳的蓝色、红色、绿色 labels = ['Sine Wave' , 'Square Wave' , 'Sawtooth Wave' ] for class_label in range(3 ): idx = np.where(y==class_label)[0 ] sample_idx = np.random.choice(idx, 1 )[0 ] plt.plot(X[sample_idx], color=colors[class_label], linewidth=2 , label=labels[class_label]) plt.title("Sample Waveforms of Time Series" , fontsize=16 ) plt.xlabel("Time Step" , fontsize=14 ) plt.ylabel("Signal Value" , fontsize=14 ) plt.legend(fontsize=12 ) plt.grid(alpha=0.3 ) plt.tight_layout() plt.show() plot_sample_waveforms(X, y)# 数据可视化:数据分布直方图 def plot_data_distribution (y) : """ 绘制不同类别的样本数直方图 """ plt.figure(figsize=(8 ,6 )) colors = ['darkorange' , 'mediumorchid' , 'teal' ] # 橙色、紫色、青色 sns.countplot(x=y, palette=colors, edgecolor='black' ) plt.title("Class Distribution" , fontsize=16 ) plt.xlabel("Class Label" , fontsize=14 ) plt.ylabel("Count" , fontsize=14 ) plt.tight_layout() plt.show() plot_data_distribution(y)
样本波形图 :展示了三个典型时间序列样本(正弦波、方波、锯齿波),便于观察各信号形态及噪声干扰情况。数据分布直方图 :统计了各类别的样本数量,验证数据集均衡性,有助于判断是否需要后续数据重采样或加权处理。构建 CNN 模型进行时间序列分类 我们需要将生成的数据封装为 PyTorch 的 Dataset 对象。
构造了一个 TimeSeriesDataset 类,其内部将数据转换为 FloatTensor,并保证数据维度符合 CNN 输入要求(例如:[batch, channels, seq_length])。
class TimeSeriesDataset (Dataset) : def __init__ (self, X, y) : self.X = torch.tensor(X, dtype=torch.float32) # [N, seq_length] self.y = torch.tensor(y, dtype=torch.long) def __len__ (self) : return len(self.X) def __getitem__ (self, idx) : # 这里增加channel维度: [1, seq_length] return self.X[idx].unsqueeze(0 ), self.y[idx]# 实例化数据集对象 dataset = TimeSeriesDataset(X, y)# 划分训练集、验证集、测试集 (70%,15%,15%) n_total = len(dataset) n_train = int(n_total * 0.7 ) n_val = int(n_total * 0.15 ) n_test = n_total - n_train - n_val train_dataset, val_dataset, test_dataset = random_split(dataset, [n_train, n_val, n_test]) print("Train: {}, Val: {}, Test: {}" .format(len(train_dataset), len(val_dataset), len(test_dataset)))# DataLoader batch_size = 64 train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True ) val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False ) test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False )
CNN 模型构建 这里我们构造一个简单的一维卷积神经网络,包括多个卷积层、激活函数和池化层,以及最后的全连接层。模型结构如下:
Conv1d :输入通道为1,输出通道较多,滤波器尺寸较小,可捕获局部时序特征。
class CNN1D (nn.Module) : def __init__ (self, num_classes=3 ) : super(CNN1D, self).__init__() self.features = nn.Sequential( nn.Conv1d(in_channels=1 , out_channels=16 , kernel_size=3 , padding=1 ), nn.BatchNorm1d(16 ), nn.ReLU(inplace=True ), nn.MaxPool1d(kernel_size=2 ), # 输出长度: 128/2=64 nn.Conv1d(in_channels=16 , out_channels=32 , kernel_size=3 , padding=1 ), nn.BatchNorm1d(32 ), nn.ReLU(inplace=True ), nn.MaxPool1d(kernel_size=2 ), # 输出长度: 64/2=32 nn.Conv1d(in_channels=32 , out_channels=64 , kernel_size=3 , padding=1 ), nn.BatchNorm1d(64 ), nn.ReLU(inplace=True ), nn.MaxPool1d(kernel_size=2 ) # 输出长度: 32/2=16 ) self.classifier = nn.Sequential( nn.Dropout(0.5 ), nn.Linear(64 * 16 , 128 ), nn.ReLU(inplace=True ), nn.Linear(128 , num_classes) ) def forward (self, x) : x = self.features(x) x = x.view(x.size(0 ), -1 ) # flatten x = self.classifier(x) return x# 实例化模型 model = CNN1D(num_classes=3 ) device = torch.device("cuda" if torch.cuda.is_available() else "cpu" ) model.to(device) print(model)
模型训练、验证及可视化 训练过程中,咱们记录每个 epoch 的训练损失、验证损失以及准确率。这样可以通过曲线图直观地评估模型是否收敛,是否存在过拟合或欠拟合。
def train_model (model, train_loader, val_loader, num_epochs=30 , learning_rate=0.001 ) : criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=learning_rate) train_losses = [] val_losses = [] train_accuracies = [] val_accuracies = [] for epoch in range(num_epochs): # 训练阶段 model.train() running_loss = 0.0 correct = 0 total = 0 for inputs, labels in train_loader: inputs, labels = inputs.to(device), labels.to(device) optimizer.zero_grad() outputs = model(inputs) loss = criterion(outputs, labels) loss.backward() optimizer.step() running_loss += loss.item() * inputs.size(0 ) _, predicted = torch.max(outputs.data, 1 ) total += labels.size(0 ) correct += (predicted == labels).sum().item() epoch_loss = running_loss / total epoch_acc = correct / total train_losses.append(epoch_loss) train_accuracies.append(epoch_acc) # 验证阶段 model.eval() val_running_loss = 0.0 val_correct = 0 val_total = 0 with torch.no_grad(): for inputs, labels in val_loader:
inputs, labels = inputs.to(device), labels.to(device) outputs = model(inputs) loss = criterion(outputs, labels) val_running_loss += loss.item() * inputs.size(0 ) _, predicted = torch.max(outputs.data, 1 ) val_total += labels.size(0 ) val_correct += (predicted == labels).sum().item() val_epoch_loss = val_running_loss / val_total val_epoch_acc = val_correct / val_total val_losses.append(val_epoch_loss) val_accuracies.append(val_epoch_acc) print("Epoch [{}/{}] Train Loss: {:.4f} | Train Acc: {:.4f} || Val Loss: {:.4f} | Val Acc: {:.4f}" .format( epoch+1 , num_epochs, epoch_loss, epoch_acc, val_epoch_loss, val_epoch_acc)) return train_losses, val_losses, train_accuracies, val_accuracies# 开始训练模型 num_epochs = 30 learning_rate = 0.001 train_losses, val_losses, train_accuracies, val_accuracies = train_model(model, train_loader, val_loader, num_epochs=num_epochs, learning_rate=learning_rate)
可视化训练过程 训练完成后,我们将绘制损失曲线和准确率曲线:
# 可视化训练损失曲线 def plot_loss_curve (train_losses, val_losses) : plt.figure(figsize=(10 ,6 )) epochs = np.arange(1 , len(train_losses)+1 ) plt.plot(epochs, train_losses, label="Train Loss" , color="firebrick" , linewidth=2 ) plt.plot(epochs, val_losses, label="Validation Loss" , color="royalblue" , linewidth=2 ) plt.title("Training and Validation Loss Curve" , fontsize=16 ) plt.xlabel("Epoch" , fontsize=14 ) plt.ylabel("Loss" , fontsize=14 ) plt.legend(fontsize=12 ) plt.grid(alpha=0.3 ) plt.tight_layout() plt.show() plot_loss_curve(train_losses, val_losses)# 可视化准确率曲线 def plot_accuracy_curve (train_acc, val_acc) : plt.figure(figsize=(10 ,6 )) epochs = np.arange(1 , len(train_acc)+1 ) plt.plot(epochs, train_acc, label="Train Accuracy" , color="seagreen" , linewidth=2 ) plt.plot(epochs, val_acc, label="Validation Accuracy" , color="darkorchid" , linewidth=2 ) plt.title("Training and Validation Accuracy Curve" , fontsize=16 ) plt.xlabel("Epoch" , fontsize=14 ) plt.ylabel("Accuracy" , fontsize=14 ) plt.legend(fontsize=12 ) plt.grid(alpha=0.3 ) plt.tight_layout() plt.show() plot_accuracy_curve(train_accuracies, val_accuracies)
训练与验证损失曲线 :图中火红色线表示训练损失,蓝色线表示验证损失。该图直观展示了模型的收敛情况以及是否存在过拟合(若验证损失开始上升)。
训练与验证准确率曲线 :绿色线表示训练准确率,紫色线表示验证准确率。通过准确率变化判断模型性能提升及泛化能力。
模型评估与预测结果可视化 测试集评估与混淆矩阵 使用测试集对模型进行评估,并绘制混淆矩阵。混淆矩阵能够展示各类别被正确分类和误判的情况,帮助我们发现哪些类别易混淆。
def evaluate_model (model, test_loader) : model.eval() all_labels = [] all_preds = [] with torch.no_grad(): for inputs, labels in test_loader: inputs = inputs.to(device) outputs = model(inputs) _, predicted = torch.max(outputs.data, 1 ) all_labels.extend(labels.cpu().numpy()) all_preds.extend(predicted.cpu().numpy()) return np.array(all_labels), np.array(all_preds) true_labels, predictions = evaluate_model(model, test_loader)# 绘制混淆矩阵 def plot_confusion_matrix (cm, classes, title='Confusion Matrix' , cmap=plt.cm.Blues) : plt.figure(figsize=(8 ,6 )) sns.heatmap(cm, annot=True , fmt="d" , cmap=cmap, cbar=True , xticklabels=classes, yticklabels=classes, linewidths=0.5 , linecolor='gray' ) plt.title(title, fontsize=16 ) plt.ylabel('True Label' , fontsize=14 ) plt.xlabel('Predicted Label' , fontsize=14 ) plt.tight_layout() plt.show() cm = confusion_matrix(true_labels, predictions) plot_confusion_matrix(cm, classes=["Sine" , "Square" , "Sawtooth" ], title="Confusion Matrix on Test Data" )
ROC 曲线绘制 由于我们的任务是多分类,我们可以针对每个类别绘制 ROC 曲线。此处我们采用 One-vs-Rest 策略计算各类别的 ROC 曲线及 AUC 值。
from sklearn.preprocessing import label_binarizefrom sklearn.metrics import roc_auc_score# Binarize the labels for ROC analysis n_classes = 3 true_labels_binarized = label_binarize(true_labels, classes=[0 ,1 ,2 ])# 获取各类别的预测概率 def get_pred_probabilities (model, loader) : model.eval() all_probs = [] with torch.no_grad(): for inputs, _ in loader: inputs = inputs.to(device) outputs = model(inputs) probs = nn.functional.softmax(outputs, dim=1 ) all_probs.extend(probs.cpu().numpy()) return np.array(all_probs) y_score = get_pred_probabilities(model, test_loader)# 计算并绘制 ROC 曲线 plt.figure(figsize=(10 ,8 )) colors = ['darkorange' , 'darkgreen' , 'navy' ]for i in range(n_classes): fpr, tpr, _ = roc_curve(true_labels_binarized[:, i], y_score[:, i]) roc_auc = auc(fpr, tpr) plt.plot(fpr, tpr, color=colors[i], lw=2 , label='Class {} (AUC = {:.2f})' .format(i, roc_auc)) plt.plot([0 ,1 ], [0 ,1 ], color='grey' , lw=2 , linestyle='--' ) plt.xlim([0.0 , 1.0 ]) plt.ylim([0.0 , 1.05 ])
plt.xlabel("False Positive Rate" , fontsize=14 ) plt.ylabel("True Positive Rate" , fontsize=14 ) plt.title("Multi-class ROC Curve" , fontsize=16 ) plt.legend(loc="lower right" , fontsize=12 ) plt.grid(alpha=0.3 ) plt.tight_layout() plt.show()
混淆矩阵热力图 :使用蓝色渐变热力图显示各类别的正确预测和错误分类情况,帮助识别分类器在各类别间的性能差异。
多分类 ROC 曲线 :分别绘制了三条 ROC 曲线,每条曲线对应一个类别,曲线下方面积(AUC)直观反映分类器对该类别的识别能力。采用暗橙、深绿、藏蓝色区分不同类别。
算法优化点与超参数调节流程 优势 :CNN 能够通过卷积核自动捕捉局部时序模式,对不同信号形态具有鲁棒性;同时参数量相对较少,计算效率高。优化 :可以尝试多尺度卷积核(不同 kernel_size 的并行卷积)来进一步捕捉多尺度特征;或结合残差网络结构加深模型深度以增强表达能力。优势 :本案例使用原始波形数据作为输入,保留了时序信息。优化 :可尝试数据标准化、滤波降噪、数据扩充(如随机裁剪、平移、缩放)等预处理手段,进一步提高模型鲁棒性。优势 :模型采用多层卷积、批归一化、激活及池化层,结构清晰,收敛较快。优化 :可以尝试加入注意力机制(例如时序注意力模块)来突出关键信息;或者结合深度残差网络、DenseNet 等结构以提升特征传递效率。优势 :在全连接层中加入 Dropout 防止过拟合。优化 :进一步可考虑 L2 正则化、数据增强、Early Stopping 等策略,以平衡模型复杂度与泛化能力。优化 :尝试不同的学习率调度策略,如 ReduceLROnPlateau、Cosine Annealing 等,根据验证集误差动态调整学习率,提高训练稳定性。超参数调节流程 一个典型的调参流程:
1. 初步调参
初步观察训练和验证曲线,确定大致收敛区间及是否出现过拟合现象。 2. 细化调参
固定较优的网络结构后,对学习率、正则化系数、Dropout 比例等进行细调。 利用网格搜索或随机搜索方式,结合交叉验证评估每个参数组合的表现。 3. 动态调整策略
根据训练过程中验证集的表现,采用动态学习率调度:
根据损失曲线平滑趋势,决定是否提前停止训练(Early Stopping)。 4. 观察指标与模型复杂度平衡
重点观察混淆矩阵、ROC 曲线等多角度指标,判断模型在不同类别上的表现。 如果模型在某些类别上识别效果较差,考虑调整数据采样比例或设计更针对性的特征提取模块。 5. 最终验证与部署
在调参过程中保存不同模型的参数和性能日志,最终选取在验证集上表现最优的模型进行测试集评估。 分析误判样本,进一步分析模型不足,以便在后续版本中引入更多数据或更复杂的模型结构。 进一步的优化思路 集成学习
:结合多个模型(例如 CNN 与 Transformer 混合模型)进行集成,取多个模型预测的平均或投票结果,进一步提高鲁棒性。模型压缩与加速 :在模型部署时,采用剪枝、量化、知识蒸馏等技术对模型进行压缩,保证实时性。迁移学习 :对于实际应用中数据较少的情况,可尝试利用预训练模型(例如在大规模时序数据上预训练的 CNN 模型)进行微调,提升分类效果。预测结果展示 为了展示模型在预测阶段的表现,我们挑选部分测试样本,并绘制预测结果曲线,对比真实标签与模型预测值。
def plot_prediction_examples (model, dataset, num_examples=5 ) : model.eval() indices = np.random.choice(len(dataset), num_examples, replace=False ) plt.figure(figsize=(14 , num_examples*3 )) for i, idx in enumerate(indices): x, label = dataset[idx] x_input = x.unsqueeze(0 ).to(device) output = model(x_input) _, pred = torch.max(output, 1 ) x = x.squeeze().cpu().numpy() plt.subplot(num_examples, 1 , i+1 ) plt.plot(x, color='magenta' , linewidth=2 , label="Time Series Signal" ) plt.axhline(y=0 , color='grey' , linestyle='--' , linewidth=1 ) plt.title("Example {}: True Label = {} | Predicted Label = {}" .format(i+1 , label.item(), pred.item()), fontsize=14 ) plt.xlabel("Time Step" , fontsize=12 ) plt.ylabel("Signal Value" , fontsize=12 ) plt.legend(fontsize=10 ) plt.grid(alpha=0.3 ) plt.tight_layout() plt.show()# 绘制部分预测示例 plot_prediction_examples(model, test_dataset, num_examples=5 )
预测示例图 :每幅图展示一个测试样本的时间序列曲线(采用鲜艳的洋红色 magenta),并在图标题中标注真实标签与模型预测结果。该图形直观展示了模型在时序数据分类任务中的预测能力。
整个内容通过虚拟数据集构建了一个基于 CNN 的时间序列分类案例。全流程包括数据生成、数据可视化、模型构建、训练过程监控、测试评估与预测结果展示。
数据可视化环节采用了多种鲜艳且富有吸引力的图形,如波形图、直方图、损失曲线、准确率曲线、混淆矩阵及 ROC 曲线,全面展示数据特征与模型表现。 模型采用了 3 层卷积网络结构,结合批归一化、激活、池化和 Dropout,有效防止过拟合,并在多类别上均取得较好效果。 针对算法优化,本文详细说明了如何调整卷积核、网络深度、正则化、数据增强以及动态学习率等关键超参数,形成一套完善的调参流程。