5.基于改进算法优化BP神经网络的浮选精矿品位预测
一、
如何使用神经网络 NN 算法训练 data_cleaned.csv
该算法需要先将data_cleaned.csv
训练集中的NewDateTime
时间列删掉然后才能训练成功
- 删除
NewDateTime
列
if 'NewDateTime' in data.columns:
data = data.drop(columns=['NewDateTime'])
- 检查数据集是否为空
if data.empty:
raise ValueError("数据集为空,请检查数据预处理步骤。")
- 处理缺失值
if data.isnull().values.any():
data = data.fillna(data.mean())
- 完整代码
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['SimHei'] # 设置字体为黑体
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
def tanh(x):
return np.tanh(x)
def tanh_deriv(x):
return 1.0 - np.tanh(x)**2
def logistic(x):
return 1 / (1 + np.exp(-x))
def logistic_derivative(x):
return logistic(x) * (1 - logistic(x))
class NeuralNetwork:
def __init__(self, layers, activation='tanh'):
if activation == 'logistic':
self.activation = logistic
self.activation_deriv = logistic_derivative
elif activation == 'tanh':
self.activation = tanh
self.activation_deriv = tanh_deriv
self.weights = []
# 输入层到隐藏层的权重初始化
self.weights.append((2 * np.random.random((layers[0], layers[1])) - 1) * 0.25)
# 隐藏层到输出层的权重初始化
self.weights.append((2 * np.random.random((layers[1], layers[2])) - 1) * 0.25)
def fit(self, X, y, learning_rate=0.2, epochs=10000):
X = np.atleast_2d(X)
y = np.array(y)
for k in range(epochs):
i = np.random.randint(X.shape[0])
a = [X[i]]
for l in range(len(self.weights)):
a.append(self.activation(np.dot(a[l], self.weights[l])))
error = y[i] - a[-1]
deltas = [error * self.activation_deriv(a[-1])]
for l in range(len(a) - 2, 0, -1):
deltas.append(deltas[-1].dot(self.weights[l].T) * self.activation_deriv(a[l]))
deltas.reverse()
for i in range(len(self.weights)):
layer = np.atleast_2d(a[i])
delta = np.atleast_2d(deltas[i])
self.weights[i] += learning_rate * layer.T.dot(delta)
def predict(self, x):
x = np.array(x)
a = x
for l in range(len(self.weights)):
a = self.activation(np.dot(a, self.weights[l]))
return a
# Load dataset
data = pd.read_csv(r'C:\Users\mtudou\Desktop\float\data_cleaned.csv')
# 删除时间列(假设时间列是第一列,列名为'NewDateTime')
if 'NewDateTime' in data.columns:
data = data.drop(columns=['NewDateTime'])
# 确保所有列都是数值类型
data = data.apply(pd.to_numeric, errors='coerce')
# 检查数据集是否为空
if data.empty:
raise ValueError("数据集为空,请检查数据预处理步骤。")
# 检查是否存在缺失值
if data.isnull().values.any():
# 填充缺失值,例如用均值填充
data = data.fillna(data.mean())
# Assuming the last column is the target variable
X = data.iloc[:, :-1].values
y = data.iloc[:, -1].values
# Data preprocessing
scaler_X = StandardScaler()
X = scaler_X.fit_transform(X)
scaler_y = StandardScaler()
y = scaler_y.fit_transform(y.reshape(-1, 1))
# Split dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Initialize and train the neural network
nn = NeuralNetwork([X_train.shape[1], 100, 1], 'tanh')
print("开始训练神经网络...")
nn.fit(X_train, y_train, learning_rate=0.01, epochs=10000)
print("神经网络训练完成。")
# Test the neural network
print("\n使用测试集进行评估")
predictions = []
for i in range(X_test.shape[0]):
prediction = nn.predict(X_test[i])
predictions.append(prediction)
predictions = np.array(predictions)
# Calculate performance metrics
predictions = scaler_y.inverse_transform(predictions.reshape(-1, 1))
y_test = scaler_y.inverse_transform(y_test)
mse = mean_squared_error(y_test, predictions)
r_squared = r2_score(y_test, predictions)
print(f"\n均方误差 (MSE): {mse:.4f}")
print(f"决定系数 (R²): {r_squared:.4f}")
# 可视化结果
plt.figure(figsize=(10, 6))
plt.scatter(y_test, predictions, c='blue', label='预测值')
plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'k--', lw=2, label='理想情况')
plt.xlabel('实际值')
plt.ylabel('预测值')
plt.title('神经网络预测结果')
plt.legend()
plt.show()
二、训练 feed.csv 数据集
数据集内容太多了,就训练了前 10000 条
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import SelectKBest, f_regression
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['SimHei'] # 设置字体为黑体
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
# 定义激活函数及其导数
def tanh(x):
return np.tanh(x)
def tanh_deriv(x):
return 1.0 - np.tanh(x)**2
def logistic(x):
return 1 / (1 + np.exp(-x))
def logistic_derivative(x):
return logistic(x) * (1 - logistic(x))
# 定义神经网络类
class NeuralNetwork:
def __init__(self, layers, activation='tanh'):
if activation == 'logistic':
self.activation = logistic
self.activation_deriv = logistic_derivative
elif activation == 'tanh':
self.activation = tanh
self.activation_deriv = tanh_deriv
self.weights = []
# 输入层到隐藏层的权重初始化
self.weights.append((2 * np.random.random((layers[0], layers[1])) - 1) * 0.25)
# 隐藏层到输出层的权重初始化
self.weights.append((2 * np.random.random((layers[1], layers[2])) - 1) * 0.25)
def fit(self, X, y, learning_rate=0.2, epochs=10000, l2_lambda=0.01):
X = np.atleast_2d(X)
y = np.array(y)
for k in range(epochs):
i = np.random.randint(X.shape[0])
a = [X[i]]
for l in range(len(self.weights)):
a.append(self.activation(np.dot(a[l], self.weights[l])))
error = y[i] - a[-1]
deltas = [error * self.activation_deriv(a[-1])]
for l in range(len(a) - 2, 0, -1):
deltas.append(deltas[-1].dot(self.weights[l].T) * self.activation_deriv(a[l]))
deltas.reverse()
for i in range(len(self.weights)):
layer = np.atleast_2d(a[i])
delta = np.atleast_2d(deltas[i])
# 添加L2正则化
self.weights[i] += learning_rate * (layer.T.dot(delta) - l2_lambda * self.weights[i])
def predict(self, x):
x = np.array(x)
a = x
for l in range(len(self.weights)):
a = self.activation(np.dot(a, self.weights[l]))
return a
# Load dataset and take the first 10,000 rows
data = pd.read_csv(r'C:\Users\mtudou\Desktop\float\feed.csv').iloc[:10000]
# 删除不需要的列(序号列和日期列)
data = data.drop(columns=['X1', 'date'])
# 确保所有列都是数值类型
data = data.apply(pd.to_numeric, errors='coerce')
# 检查数据集是否为空
if data.empty:
raise ValueError("数据集为空,请检查数据预处理步骤。")
# 检查是否存在缺失值
if data.isnull().values.any():
# 填充缺失值,例如用均值填充
data = data.fillna(data.mean())
# 检查数据是否包含无效值
if not np.isfinite(data).all().all():
# 处理非有限值(如无穷大)
data = data.replace([np.inf, -np.inf], np.nan)
data = data.fillna(data.mean())
# 特征选择
X = data.iloc[:, :-1]
y = data.iloc[:, -1]
# 检查目标变量是否包含无效值
if not np.isfinite(y).all():
# 处理非有限值
y = y.replace([np.inf, -np.inf], np.nan)
y = y.fillna(y.mean())
# 选择相关性最高的特征
selector = SelectKBest(score_func=f_regression, k=5)
X_selected = selector.fit_transform(X, y)
# Data preprocessing
scaler_X = StandardScaler()
X_scaled = scaler_X.fit_transform(X_selected)
scaler_y = StandardScaler()
y_scaled = scaler_y.fit_transform(y.values.reshape(-1, 1))
# Split dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y_scaled, test_size=0.2, random_state=42)
# Initialize and train the neural network
nn = NeuralNetwork([X_train.shape[1], 200, 1], 'tanh') # 增加隐藏层节点数
print("开始训练神经网络...")
nn.fit(X_train, y_train, learning_rate=0.0001, epochs=30000, l2_lambda=0.01) # 进一步降低学习率,增加训练次数
print("神经网络训练完成。")
# Test the neural network
print("\n使用测试集进行评估")
predictions = []
for i in range(X_test.shape[0]):
prediction = nn.predict(X_test[i])
predictions.append(prediction)
predictions = np.array(predictions)
# Calculate performance metrics
predictions = scaler_y.inverse_transform(predictions.reshape(-1, 1))
y_test = scaler_y.inverse_transform(y_test)
mse = mean_squared_error(y_test, predictions)
r_squared = r2_score(y_test, predictions)
print(f"\n均方误差 (MSE): {mse:.4f}")
print(f"决定系数 (R²): {r_squared:.4f}")
# 可视化结果
plt.figure(figsize=(10, 6))
plt.scatter(y_test, predictions, c='blue', label='预测值')
plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'k--', lw=2, label='理想情况')
plt.xlabel('实际值')
plt.ylabel('预测值')
plt.title('神经网络预测结果')
plt.legend()
plt.show()
三、训练scoringdataset.csv
数据集
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['SimHei'] # 设置字体为黑体
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
# 定义激活函数及其导数
def tanh(x):
return np.tanh(x)
def tanh_deriv(x):
return 1.0 - np.tanh(x)**2
def logistic(x):
return 1 / (1 + np.exp(-x))
def logistic_derivative(x):
return logistic(x) * (1 - logistic(x))
# 定义神经网络类
class NeuralNetwork:
def __init__(self, layers, activation='tanh'):
if activation == 'logistic':
self.activation = logistic
self.activation_deriv = logistic_derivative
elif activation == 'tanh':
self.activation = tanh
self.activation_deriv = tanh_deriv
self.weights = []
# 输入层到隐藏层的权重初始化
self.weights.append((2 * np.random.random((layers[0], layers[1])) - 1) * 0.25)
# 隐藏层到输出层的权重初始化
self.weights.append((2 * np.random.random((layers[1], layers[2])) - 1) * 0.25)
def fit(self, X, y, learning_rate=0.2, epochs=10000):
X = np.atleast_2d(X)
y = np.array(y)
for k in range(epochs):
i = np.random.randint(X.shape[0])
a = [X[i]]
for l in range(len(self.weights)):
a.append(self.activation(np.dot(a[l], self.weights[l])))
error = y[i] - a[-1]
deltas = [error * self.activation_deriv(a[-1])]
for l in range(len(a) - 2, 0, -1):
deltas.append(deltas[-1].dot(self.weights[l].T) * self.activation_deriv(a[l]))
deltas.reverse()
for i in range(len(self.weights)):
layer = np.atleast_2d(a[i])
delta = np.atleast_2d(deltas[i])
self.weights[i] += learning_rate * layer.T.dot(delta)
def predict(self, x):
x = np.array(x)
a = x
for l in range(len(self.weights)):
a = self.activation(np.dot(a, self.weights[l]))
return a
# 加载数据集
data = pd.read_csv(r'C:\Users\mtudou\Desktop\float\scoringdataset.csv')
# 删除不需要的列
# 假设序号列是第一列(索引为0),breaks列是第二列(索引为1)
data = data.drop(columns=[data.columns[0], 'breaks']) # 删除序号列(索引0)和breaks列
# 确保所有列都是数值类型
data = data.apply(pd.to_numeric, errors='coerce')
# 检查数据集是否为空
if data.empty:
raise ValueError("数据集为空,请检查数据预处理步骤。")
# 检查是否存在缺失值
if data.isnull().values.any():
# 填充缺失值,例如用均值填充
data = data.fillna(data.mean())
# 假设最后一列是目标变量
X = data.iloc[:, :-1].values
y = data.iloc[:, -1].values
# 数据预处理
scaler_X = StandardScaler()
X = scaler_X.fit_transform(X)
scaler_y = StandardScaler()
y = scaler_y.fit_transform(y.reshape(-1, 1))
# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 初始化并训练神经网络
nn = NeuralNetwork([X_train.shape[1], 100, 1], 'tanh')
print("开始训练神经网络...")
nn.fit(X_train, y_train, learning_rate=0.01, epochs=10000)
print("神经网络训练完成。")
# 测试神经网络
print("\n使用测试集进行评估")
predictions = []
for i in range(X_test.shape[0]):
prediction = nn.predict(X_test[i])
predictions.append(prediction)
predictions = np.array(predictions)
# 反向转换预测结果和实际值
predictions = scaler_y.inverse_transform(predictions.reshape(-1, 1))
y_test = scaler_y.inverse_transform(y_test)
# 计算性能指标
mse = mean_squared_error(y_test, predictions)
r_squared = r2_score(y_test, predictions)
print(f"\n均方误差 (MSE): {mse:.4f}")
print(f"决定系数 (R²): {r_squared:.4f}")
# 可视化结果
plt.figure(figsize=(10, 6))
plt.scatter(y_test, predictions, c='blue', label='预测值')
plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'k--', lw=2, label='理想情况')
plt.xlabel('实际值')
plt.ylabel('预测值')
plt.title('神经网络预测结果')
plt.legend()
plt.show()
四、训练 flotation.csv 数据集
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['SimHei'] # 设置字体为黑体
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
# 定义激活函数及其导数
def tanh(x):
return np.tanh(x)
def tanh_deriv(x):
return 1.0 - np.tanh(x)**2
def logistic(x):
return 1 / (1 + np.exp(-x))
def logistic_derivative(x):
return logistic(x) * (1 - logistic(x))
# 定义神经网络类
class NeuralNetwork:
def __init__(self, layers, activation='tanh'):
if activation == 'logistic':
self.activation = logistic
self.activation_deriv = logistic_derivative
elif activation == 'tanh':
self.activation = tanh
self.activation_deriv = tanh_deriv
self.weights = []
# 输入层到隐藏层的权重初始化
self.weights.append((2 * np.random.random((layers[0], layers[1])) - 1) * 0.25)
# 隐藏层到输出层的权重初始化
self.weights.append((2 * np.random.random((layers[1], layers[2])) - 1) * 0.25)
def fit(self, X, y, learning_rate=0.2, epochs=10000):
X = np.atleast_2d(X)
y = np.array(y)
for k in range(epochs):
i = np.random.randint(X.shape[0])
a = [X[i]]
for l in range(len(self.weights)):
a.append(self.activation(np.dot(a[l], self.weights[l])))
error = y[i] - a[-1]
deltas = [error * self.activation_deriv(a[-1])]
for l in range(len(a) - 2, 0, -1):
deltas.append(deltas[-1].dot(self.weights[l].T) * self.activation_deriv(a[l]))
deltas.reverse()
for i in range(len(self.weights)):
layer = np.atleast_2d(a[i])
delta = np.atleast_2d(deltas[i])
self.weights[i] += learning_rate * layer.T.dot(delta)
def predict(self, x):
x = np.array(x)
a = x
for l in range(len(self.weights)):
a = self.activation(np.dot(a, self.weights[l]))
return a
# 加载数据集
data = pd.read_csv(r'C:\Users\mtudou\Desktop\float\flotation.csv')
# 只取前10000条数据
data = data.head(10000)
# 删除不需要的列
data = data.drop(columns=['X1', 'date']) # 删除序号列(X1)和date列
# 确保所有列都是数值类型
data = data.apply(pd.to_numeric, errors='coerce')
# 检查数据集是否为空
if data.empty:
raise ValueError("数据集为空,请检查数据预处理步骤。")
# 检查是否存在缺失值
if data.isnull().values.any():
# 填充缺失值,例如用均值填充
data = data.fillna(data.mean())
# 假设最后一列是目标变量
X = data.iloc[:, :-1].values
y = data.iloc[:, -1].values
# 数据预处理
scaler_X = StandardScaler()
X = scaler_X.fit_transform(X)
scaler_y = StandardScaler()
y = scaler_y.fit_transform(y.reshape(-1, 1))
# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 初始化并训练神经网络
nn = NeuralNetwork([X_train.shape[1], 100, 1], 'tanh')
print("开始训练神经网络...")
nn.fit(X_train, y_train, learning_rate=0.01, epochs=10000)
print("神经网络训练完成。")
# 测试神经网络
print("\n使用测试集进行评估")
predictions = []
for i in range(X_test.shape[0]):
prediction = nn.predict(X_test[i])
predictions.append(prediction)
predictions = np.array(predictions)
# 反向转换预测结果和实际值
predictions = scaler_y.inverse_transform(predictions.reshape(-1, 1))
y_test = scaler_y.inverse_transform(y_test)
# 计算性能指标
mse = mean_squared_error(y_test, predictions)
r_squared = r2_score(y_test, predictions)
print(f"\n均方误差 (MSE): {mse:.4f}")
print(f"决定系数 (R²): {r_squared:.4f}")
# 可视化结果
plt.figure(figsize=(10, 6))
plt.scatter(y_test, predictions, c='blue', label='预测值')
plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'k--', lw=2, label='理想情况')
plt.xlabel('实际值')
plt.ylabel('预测值')
plt.title('神经网络预测结果')
plt.legend()
plt.show()
五
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['SimHei'] # 设置字体为黑体
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
# 定义激活函数及其导数
def tanh(x):
return np.tanh(x)
def tanh_deriv(x):
return 1.0 - np.tanh(x)**2
def relu(x):
return np.maximum(0, x)
def relu_deriv(x):
return np.where(x > 0, 1, 0)
class NeuralNetwork:
def __init__(self, layers, activation='tanh'):
self.activation_name = activation
if activation == 'tanh':
self.activation = tanh
self.activation_deriv = tanh_deriv
elif activation == 'relu':
self.activation = relu
self.activation_deriv = relu_deriv
self.weights = []
# 输入层到隐藏层的权重初始化
self.weights.append((2 * np.random.random((layers[0], layers[1])) - 1) * 0.25)
# 隐藏层到输出层的权重初始化
self.weights.append((2 * np.random.random((layers[1], layers[2])) - 1) * 0.25)
def fit(self, X, y, learning_rate=0.2, epochs=10000):
X = np.atleast_2d(X)
y = np.array(y)
for k in range(epochs):
i = np.random.randint(X.shape[0])
a = [X[i]]
for l in range(len(self.weights)):
a.append(self.activation(np.dot(a[l], self.weights[l])))
error = y[i] - a[-1]
deltas = [error * self.activation_deriv(a[-1])]
for l in range(len(a) - 2, 0, -1):
deltas.append(deltas[-1].dot(self.weights[l].T) * self.activation_deriv(a[l]))
deltas.reverse()
for i in range(len(self.weights)):
layer = np.atleast_2d(a[i])
delta = np.atleast_2d(deltas[i])
self.weights[i] += learning_rate * layer.T.dot(delta)
def predict(self, x):
x = np.array(x)
a = x
for l in range(len(self.weights)):
a = self.activation(np.dot(a, self.weights[l]))
return a
# Load dataset
data = pd.read_csv(r'C:\Users\mtudou\Desktop\float\data_cleaned.csv')
# 删除时间列(假设时间列是第一列,列名为'NewDateTime')
if 'NewDateTime' in data.columns:
data = data.drop(columns=['NewDateTime'])
# 确保所有列都是数值类型
data = data.apply(pd.to_numeric, errors='coerce')
# 检查数据集是否为空
if data.empty:
raise ValueError("数据集为空,请检查数据预处理步骤。")
# 检查是否存在缺失值
if data.isnull().values.any():
# 填充缺失值,例如用均值填充
data = data.fillna(data.mean())
# Assuming the last column is the target variable
X = data.iloc[:, :-1].values
y = data.iloc[:, -1].values
# Data preprocessing
scaler_X = StandardScaler()
X = scaler_X.fit_transform(X)
scaler_y = StandardScaler()
y = scaler_y.fit_transform(y.reshape(-1, 1))
# Split dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Initialize and train the neural network
nn = NeuralNetwork([X_train.shape[1], 200, 1], 'relu') # 增加隐藏层神经元数量并尝试relu激活函数
print("开始训练神经网络...")
nn.fit(X_train, y_train, learning_rate=0.02, epochs=15000) # 调整学习率和训练周期
print("神经网络训练完成。")
# Test the neural network
print("\n使用测试集进行评估")
predictions = []
for i in range(X_test.shape[0]):
prediction = nn.predict(X_test[i])
predictions.append(prediction)
predictions = np.array(predictions)
# Calculate performance metrics
predictions = scaler_y.inverse_transform(predictions.reshape(-1, 1))
y_test = scaler_y.inverse_transform(y_test)
mse = mean_squared_error(y_test, predictions)
r_squared = r2_score(y_test, predictions)
print(f"\n均方误差 (MSE): {mse:.4f}")
print(f"决定系数 (R²): {r_squared:.4f}")
# 可视化结果
plt.figure(figsize=(10, 6))
plt.scatter(y_test, predictions, c='blue', label='预测值')
plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'k--', lw=2, label='理想情况')
plt.xlabel('实际值')
plt.ylabel('预测值')
plt.title('神经网络预测结果')
plt.legend()
plt.show()