5.基于改进算法优化BP神经网络的浮选精矿品位预测

书诚小驿2025/06/01算法知识库Algorithm

一、

如何使用神经网络 NN 算法训练 data_cleaned.csv

该算法需要先将data_cleaned.csv训练集中的NewDateTime时间列删掉然后才能训练成功

  1. 删除 NewDateTime
if 'NewDateTime' in data.columns:
    data = data.drop(columns=['NewDateTime'])
  1. 检查数据集是否为空
if data.empty:
    raise ValueError("数据集为空,请检查数据预处理步骤。")
  1. 处理缺失值
if data.isnull().values.any():
    data = data.fillna(data.mean())
  1. 完整代码
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt

plt.rcParams['font.sans-serif'] = ['SimHei']  # 设置字体为黑体
plt.rcParams['axes.unicode_minus'] = False  # 解决负号显示问题

def tanh(x):
    return np.tanh(x)

def tanh_deriv(x):
    return 1.0 - np.tanh(x)**2

def logistic(x):
    return 1 / (1 + np.exp(-x))

def logistic_derivative(x):
    return logistic(x) * (1 - logistic(x))

class NeuralNetwork:
    def __init__(self, layers, activation='tanh'):
        if activation == 'logistic':
            self.activation = logistic
            self.activation_deriv = logistic_derivative
        elif activation == 'tanh':
            self.activation = tanh
            self.activation_deriv = tanh_deriv

        self.weights = []
        # 输入层到隐藏层的权重初始化
        self.weights.append((2 * np.random.random((layers[0], layers[1])) - 1) * 0.25)
        # 隐藏层到输出层的权重初始化
        self.weights.append((2 * np.random.random((layers[1], layers[2])) - 1) * 0.25)

    def fit(self, X, y, learning_rate=0.2, epochs=10000):
        X = np.atleast_2d(X)
        y = np.array(y)

        for k in range(epochs):
            i = np.random.randint(X.shape[0])
            a = [X[i]]

            for l in range(len(self.weights)):
                a.append(self.activation(np.dot(a[l], self.weights[l])))
            error = y[i] - a[-1]
            deltas = [error * self.activation_deriv(a[-1])]

            for l in range(len(a) - 2, 0, -1):
                deltas.append(deltas[-1].dot(self.weights[l].T) * self.activation_deriv(a[l]))
            deltas.reverse()
            for i in range(len(self.weights)):
                layer = np.atleast_2d(a[i])
                delta = np.atleast_2d(deltas[i])
                self.weights[i] += learning_rate * layer.T.dot(delta)

    def predict(self, x):
        x = np.array(x)
        a = x
        for l in range(len(self.weights)):
            a = self.activation(np.dot(a, self.weights[l]))
        return a

# Load dataset
data = pd.read_csv(r'C:\Users\mtudou\Desktop\float\data_cleaned.csv')

# 删除时间列(假设时间列是第一列,列名为'NewDateTime')
if 'NewDateTime' in data.columns:
    data = data.drop(columns=['NewDateTime'])

# 确保所有列都是数值类型
data = data.apply(pd.to_numeric, errors='coerce')

# 检查数据集是否为空
if data.empty:
    raise ValueError("数据集为空,请检查数据预处理步骤。")

# 检查是否存在缺失值
if data.isnull().values.any():
    # 填充缺失值,例如用均值填充
    data = data.fillna(data.mean())

# Assuming the last column is the target variable
X = data.iloc[:, :-1].values
y = data.iloc[:, -1].values

# Data preprocessing
scaler_X = StandardScaler()
X = scaler_X.fit_transform(X)
scaler_y = StandardScaler()
y = scaler_y.fit_transform(y.reshape(-1, 1))

# Split dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Initialize and train the neural network
nn = NeuralNetwork([X_train.shape[1], 100, 1], 'tanh')
print("开始训练神经网络...")
nn.fit(X_train, y_train, learning_rate=0.01, epochs=10000)
print("神经网络训练完成。")

# Test the neural network
print("\n使用测试集进行评估")
predictions = []
for i in range(X_test.shape[0]):
    prediction = nn.predict(X_test[i])
    predictions.append(prediction)
predictions = np.array(predictions)

# Calculate performance metrics
predictions = scaler_y.inverse_transform(predictions.reshape(-1, 1))
y_test = scaler_y.inverse_transform(y_test)

mse = mean_squared_error(y_test, predictions)
r_squared = r2_score(y_test, predictions)

print(f"\n均方误差 (MSE): {mse:.4f}")
print(f"决定系数 (R²): {r_squared:.4f}")

# 可视化结果
plt.figure(figsize=(10, 6))
plt.scatter(y_test, predictions, c='blue', label='预测值')
plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'k--', lw=2, label='理想情况')
plt.xlabel('实际值')
plt.ylabel('预测值')
plt.title('神经网络预测结果')
plt.legend()
plt.show()

二、训练 feed.csv 数据集

数据集内容太多了,就训练了前 10000 条

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import SelectKBest, f_regression
import matplotlib.pyplot as plt

plt.rcParams['font.sans-serif'] = ['SimHei']  # 设置字体为黑体
plt.rcParams['axes.unicode_minus'] = False  # 解决负号显示问题

# 定义激活函数及其导数
def tanh(x):
    return np.tanh(x)

def tanh_deriv(x):
    return 1.0 - np.tanh(x)**2

def logistic(x):
    return 1 / (1 + np.exp(-x))

def logistic_derivative(x):
    return logistic(x) * (1 - logistic(x))

# 定义神经网络类
class NeuralNetwork:
    def __init__(self, layers, activation='tanh'):
        if activation == 'logistic':
            self.activation = logistic
            self.activation_deriv = logistic_derivative
        elif activation == 'tanh':
            self.activation = tanh
            self.activation_deriv = tanh_deriv

        self.weights = []
        # 输入层到隐藏层的权重初始化
        self.weights.append((2 * np.random.random((layers[0], layers[1])) - 1) * 0.25)
        # 隐藏层到输出层的权重初始化
        self.weights.append((2 * np.random.random((layers[1], layers[2])) - 1) * 0.25)

    def fit(self, X, y, learning_rate=0.2, epochs=10000, l2_lambda=0.01):
        X = np.atleast_2d(X)
        y = np.array(y)

        for k in range(epochs):
            i = np.random.randint(X.shape[0])
            a = [X[i]]

            for l in range(len(self.weights)):
                a.append(self.activation(np.dot(a[l], self.weights[l])))
            error = y[i] - a[-1]
            deltas = [error * self.activation_deriv(a[-1])]

            for l in range(len(a) - 2, 0, -1):
                deltas.append(deltas[-1].dot(self.weights[l].T) * self.activation_deriv(a[l]))
            deltas.reverse()
            for i in range(len(self.weights)):
                layer = np.atleast_2d(a[i])
                delta = np.atleast_2d(deltas[i])
                # 添加L2正则化
                self.weights[i] += learning_rate * (layer.T.dot(delta) - l2_lambda * self.weights[i])

    def predict(self, x):
        x = np.array(x)
        a = x
        for l in range(len(self.weights)):
            a = self.activation(np.dot(a, self.weights[l]))
        return a

# Load dataset and take the first 10,000 rows
data = pd.read_csv(r'C:\Users\mtudou\Desktop\float\feed.csv').iloc[:10000]

# 删除不需要的列(序号列和日期列)
data = data.drop(columns=['X1', 'date'])

# 确保所有列都是数值类型
data = data.apply(pd.to_numeric, errors='coerce')

# 检查数据集是否为空
if data.empty:
    raise ValueError("数据集为空,请检查数据预处理步骤。")

# 检查是否存在缺失值
if data.isnull().values.any():
    # 填充缺失值,例如用均值填充
    data = data.fillna(data.mean())

# 检查数据是否包含无效值
if not np.isfinite(data).all().all():
    # 处理非有限值(如无穷大)
    data = data.replace([np.inf, -np.inf], np.nan)
    data = data.fillna(data.mean())

# 特征选择
X = data.iloc[:, :-1]
y = data.iloc[:, -1]

# 检查目标变量是否包含无效值
if not np.isfinite(y).all():
    # 处理非有限值
    y = y.replace([np.inf, -np.inf], np.nan)
    y = y.fillna(y.mean())

# 选择相关性最高的特征
selector = SelectKBest(score_func=f_regression, k=5)
X_selected = selector.fit_transform(X, y)

# Data preprocessing
scaler_X = StandardScaler()
X_scaled = scaler_X.fit_transform(X_selected)
scaler_y = StandardScaler()
y_scaled = scaler_y.fit_transform(y.values.reshape(-1, 1))

# Split dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y_scaled, test_size=0.2, random_state=42)

# Initialize and train the neural network
nn = NeuralNetwork([X_train.shape[1], 200, 1], 'tanh')  # 增加隐藏层节点数
print("开始训练神经网络...")
nn.fit(X_train, y_train, learning_rate=0.0001, epochs=30000, l2_lambda=0.01)  # 进一步降低学习率,增加训练次数
print("神经网络训练完成。")

# Test the neural network
print("\n使用测试集进行评估")
predictions = []
for i in range(X_test.shape[0]):
    prediction = nn.predict(X_test[i])
    predictions.append(prediction)
predictions = np.array(predictions)

# Calculate performance metrics
predictions = scaler_y.inverse_transform(predictions.reshape(-1, 1))
y_test = scaler_y.inverse_transform(y_test)

mse = mean_squared_error(y_test, predictions)
r_squared = r2_score(y_test, predictions)

print(f"\n均方误差 (MSE): {mse:.4f}")
print(f"决定系数 (R²): {r_squared:.4f}")

# 可视化结果
plt.figure(figsize=(10, 6))
plt.scatter(y_test, predictions, c='blue', label='预测值')
plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'k--', lw=2, label='理想情况')
plt.xlabel('实际值')
plt.ylabel('预测值')
plt.title('神经网络预测结果')
plt.legend()
plt.show()

三、训练scoringdataset.csv数据集

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt

plt.rcParams['font.sans-serif'] = ['SimHei']  # 设置字体为黑体
plt.rcParams['axes.unicode_minus'] = False  # 解决负号显示问题

# 定义激活函数及其导数
def tanh(x):
    return np.tanh(x)

def tanh_deriv(x):
    return 1.0 - np.tanh(x)**2

def logistic(x):
    return 1 / (1 + np.exp(-x))

def logistic_derivative(x):
    return logistic(x) * (1 - logistic(x))

# 定义神经网络类
class NeuralNetwork:
    def __init__(self, layers, activation='tanh'):
        if activation == 'logistic':
            self.activation = logistic
            self.activation_deriv = logistic_derivative
        elif activation == 'tanh':
            self.activation = tanh
            self.activation_deriv = tanh_deriv

        self.weights = []
        # 输入层到隐藏层的权重初始化
        self.weights.append((2 * np.random.random((layers[0], layers[1])) - 1) * 0.25)
        # 隐藏层到输出层的权重初始化
        self.weights.append((2 * np.random.random((layers[1], layers[2])) - 1) * 0.25)

    def fit(self, X, y, learning_rate=0.2, epochs=10000):
        X = np.atleast_2d(X)
        y = np.array(y)

        for k in range(epochs):
            i = np.random.randint(X.shape[0])
            a = [X[i]]

            for l in range(len(self.weights)):
                a.append(self.activation(np.dot(a[l], self.weights[l])))
            error = y[i] - a[-1]
            deltas = [error * self.activation_deriv(a[-1])]

            for l in range(len(a) - 2, 0, -1):
                deltas.append(deltas[-1].dot(self.weights[l].T) * self.activation_deriv(a[l]))
            deltas.reverse()
            for i in range(len(self.weights)):
                layer = np.atleast_2d(a[i])
                delta = np.atleast_2d(deltas[i])
                self.weights[i] += learning_rate * layer.T.dot(delta)

    def predict(self, x):
        x = np.array(x)
        a = x
        for l in range(len(self.weights)):
            a = self.activation(np.dot(a, self.weights[l]))
        return a

# 加载数据集
data = pd.read_csv(r'C:\Users\mtudou\Desktop\float\scoringdataset.csv')

# 删除不需要的列
# 假设序号列是第一列(索引为0),breaks列是第二列(索引为1)
data = data.drop(columns=[data.columns[0], 'breaks'])  # 删除序号列(索引0)和breaks列

# 确保所有列都是数值类型
data = data.apply(pd.to_numeric, errors='coerce')

# 检查数据集是否为空
if data.empty:
    raise ValueError("数据集为空,请检查数据预处理步骤。")

# 检查是否存在缺失值
if data.isnull().values.any():
    # 填充缺失值,例如用均值填充
    data = data.fillna(data.mean())

# 假设最后一列是目标变量
X = data.iloc[:, :-1].values
y = data.iloc[:, -1].values

# 数据预处理
scaler_X = StandardScaler()
X = scaler_X.fit_transform(X)
scaler_y = StandardScaler()
y = scaler_y.fit_transform(y.reshape(-1, 1))

# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 初始化并训练神经网络
nn = NeuralNetwork([X_train.shape[1], 100, 1], 'tanh')
print("开始训练神经网络...")
nn.fit(X_train, y_train, learning_rate=0.01, epochs=10000)
print("神经网络训练完成。")

# 测试神经网络
print("\n使用测试集进行评估")
predictions = []
for i in range(X_test.shape[0]):
    prediction = nn.predict(X_test[i])
    predictions.append(prediction)
predictions = np.array(predictions)

# 反向转换预测结果和实际值
predictions = scaler_y.inverse_transform(predictions.reshape(-1, 1))
y_test = scaler_y.inverse_transform(y_test)

# 计算性能指标
mse = mean_squared_error(y_test, predictions)
r_squared = r2_score(y_test, predictions)

print(f"\n均方误差 (MSE): {mse:.4f}")
print(f"决定系数 (R²): {r_squared:.4f}")

# 可视化结果
plt.figure(figsize=(10, 6))
plt.scatter(y_test, predictions, c='blue', label='预测值')
plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'k--', lw=2, label='理想情况')
plt.xlabel('实际值')
plt.ylabel('预测值')
plt.title('神经网络预测结果')
plt.legend()
plt.show()

四、训练 flotation.csv 数据集

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt

plt.rcParams['font.sans-serif'] = ['SimHei']  # 设置字体为黑体
plt.rcParams['axes.unicode_minus'] = False  # 解决负号显示问题

# 定义激活函数及其导数
def tanh(x):
    return np.tanh(x)

def tanh_deriv(x):
    return 1.0 - np.tanh(x)**2

def logistic(x):
    return 1 / (1 + np.exp(-x))

def logistic_derivative(x):
    return logistic(x) * (1 - logistic(x))

# 定义神经网络类
class NeuralNetwork:
    def __init__(self, layers, activation='tanh'):
        if activation == 'logistic':
            self.activation = logistic
            self.activation_deriv = logistic_derivative
        elif activation == 'tanh':
            self.activation = tanh
            self.activation_deriv = tanh_deriv

        self.weights = []
        # 输入层到隐藏层的权重初始化
        self.weights.append((2 * np.random.random((layers[0], layers[1])) - 1) * 0.25)
        # 隐藏层到输出层的权重初始化
        self.weights.append((2 * np.random.random((layers[1], layers[2])) - 1) * 0.25)

    def fit(self, X, y, learning_rate=0.2, epochs=10000):
        X = np.atleast_2d(X)
        y = np.array(y)

        for k in range(epochs):
            i = np.random.randint(X.shape[0])
            a = [X[i]]

            for l in range(len(self.weights)):
                a.append(self.activation(np.dot(a[l], self.weights[l])))
            error = y[i] - a[-1]
            deltas = [error * self.activation_deriv(a[-1])]

            for l in range(len(a) - 2, 0, -1):
                deltas.append(deltas[-1].dot(self.weights[l].T) * self.activation_deriv(a[l]))
            deltas.reverse()
            for i in range(len(self.weights)):
                layer = np.atleast_2d(a[i])
                delta = np.atleast_2d(deltas[i])
                self.weights[i] += learning_rate * layer.T.dot(delta)

    def predict(self, x):
        x = np.array(x)
        a = x
        for l in range(len(self.weights)):
            a = self.activation(np.dot(a, self.weights[l]))
        return a

# 加载数据集
data = pd.read_csv(r'C:\Users\mtudou\Desktop\float\flotation.csv')

# 只取前10000条数据
data = data.head(10000)

# 删除不需要的列
data = data.drop(columns=['X1', 'date'])  # 删除序号列(X1)和date列

# 确保所有列都是数值类型
data = data.apply(pd.to_numeric, errors='coerce')

# 检查数据集是否为空
if data.empty:
    raise ValueError("数据集为空,请检查数据预处理步骤。")

# 检查是否存在缺失值
if data.isnull().values.any():
    # 填充缺失值,例如用均值填充
    data = data.fillna(data.mean())

# 假设最后一列是目标变量
X = data.iloc[:, :-1].values
y = data.iloc[:, -1].values

# 数据预处理
scaler_X = StandardScaler()
X = scaler_X.fit_transform(X)
scaler_y = StandardScaler()
y = scaler_y.fit_transform(y.reshape(-1, 1))

# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 初始化并训练神经网络
nn = NeuralNetwork([X_train.shape[1], 100, 1], 'tanh')
print("开始训练神经网络...")
nn.fit(X_train, y_train, learning_rate=0.01, epochs=10000)
print("神经网络训练完成。")

# 测试神经网络
print("\n使用测试集进行评估")
predictions = []
for i in range(X_test.shape[0]):
    prediction = nn.predict(X_test[i])
    predictions.append(prediction)
predictions = np.array(predictions)

# 反向转换预测结果和实际值
predictions = scaler_y.inverse_transform(predictions.reshape(-1, 1))
y_test = scaler_y.inverse_transform(y_test)

# 计算性能指标
mse = mean_squared_error(y_test, predictions)
r_squared = r2_score(y_test, predictions)

print(f"\n均方误差 (MSE): {mse:.4f}")
print(f"决定系数 (R²): {r_squared:.4f}")

# 可视化结果
plt.figure(figsize=(10, 6))
plt.scatter(y_test, predictions, c='blue', label='预测值')
plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'k--', lw=2, label='理想情况')
plt.xlabel('实际值')
plt.ylabel('预测值')
plt.title('神经网络预测结果')
plt.legend()
plt.show()

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt

plt.rcParams['font.sans-serif'] = ['SimHei']  # 设置字体为黑体
plt.rcParams['axes.unicode_minus'] = False  # 解决负号显示问题

# 定义激活函数及其导数
def tanh(x):
    return np.tanh(x)

def tanh_deriv(x):
    return 1.0 - np.tanh(x)**2

def relu(x):
    return np.maximum(0, x)

def relu_deriv(x):
    return np.where(x > 0, 1, 0)

class NeuralNetwork:
    def __init__(self, layers, activation='tanh'):
        self.activation_name = activation
        if activation == 'tanh':
            self.activation = tanh
            self.activation_deriv = tanh_deriv
        elif activation == 'relu':
            self.activation = relu
            self.activation_deriv = relu_deriv

        self.weights = []
        # 输入层到隐藏层的权重初始化
        self.weights.append((2 * np.random.random((layers[0], layers[1])) - 1) * 0.25)
        # 隐藏层到输出层的权重初始化
        self.weights.append((2 * np.random.random((layers[1], layers[2])) - 1) * 0.25)

    def fit(self, X, y, learning_rate=0.2, epochs=10000):
        X = np.atleast_2d(X)
        y = np.array(y)

        for k in range(epochs):
            i = np.random.randint(X.shape[0])
            a = [X[i]]

            for l in range(len(self.weights)):
                a.append(self.activation(np.dot(a[l], self.weights[l])))
            error = y[i] - a[-1]
            deltas = [error * self.activation_deriv(a[-1])]

            for l in range(len(a) - 2, 0, -1):
                deltas.append(deltas[-1].dot(self.weights[l].T) * self.activation_deriv(a[l]))
            deltas.reverse()
            for i in range(len(self.weights)):
                layer = np.atleast_2d(a[i])
                delta = np.atleast_2d(deltas[i])
                self.weights[i] += learning_rate * layer.T.dot(delta)

    def predict(self, x):
        x = np.array(x)
        a = x
        for l in range(len(self.weights)):
            a = self.activation(np.dot(a, self.weights[l]))
        return a

# Load dataset
data = pd.read_csv(r'C:\Users\mtudou\Desktop\float\data_cleaned.csv')

# 删除时间列(假设时间列是第一列,列名为'NewDateTime')
if 'NewDateTime' in data.columns:
    data = data.drop(columns=['NewDateTime'])

# 确保所有列都是数值类型
data = data.apply(pd.to_numeric, errors='coerce')

# 检查数据集是否为空
if data.empty:
    raise ValueError("数据集为空,请检查数据预处理步骤。")

# 检查是否存在缺失值
if data.isnull().values.any():
    # 填充缺失值,例如用均值填充
    data = data.fillna(data.mean())

# Assuming the last column is the target variable
X = data.iloc[:, :-1].values
y = data.iloc[:, -1].values

# Data preprocessing
scaler_X = StandardScaler()
X = scaler_X.fit_transform(X)
scaler_y = StandardScaler()
y = scaler_y.fit_transform(y.reshape(-1, 1))

# Split dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Initialize and train the neural network
nn = NeuralNetwork([X_train.shape[1], 200, 1], 'relu')  # 增加隐藏层神经元数量并尝试relu激活函数
print("开始训练神经网络...")
nn.fit(X_train, y_train, learning_rate=0.02, epochs=15000)  # 调整学习率和训练周期
print("神经网络训练完成。")

# Test the neural network
print("\n使用测试集进行评估")
predictions = []
for i in range(X_test.shape[0]):
    prediction = nn.predict(X_test[i])
    predictions.append(prediction)
predictions = np.array(predictions)

# Calculate performance metrics
predictions = scaler_y.inverse_transform(predictions.reshape(-1, 1))
y_test = scaler_y.inverse_transform(y_test)

mse = mean_squared_error(y_test, predictions)
r_squared = r2_score(y_test, predictions)

print(f"\n均方误差 (MSE): {mse:.4f}")
print(f"决定系数 (R²): {r_squared:.4f}")

# 可视化结果
plt.figure(figsize=(10, 6))
plt.scatter(y_test, predictions, c='blue', label='预测值')
plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'k--', lw=2, label='理想情况')
plt.xlabel('实际值')
plt.ylabel('预测值')
plt.title('神经网络预测结果')
plt.legend()
plt.show()
最后更新时间' 2025/6/22 17:15:21