欢迎光临白事网
详情描述

mysql2 库进行批量更新(Bulk Update)有几种常见的方案,以下是详细的实现方法:

方案一:使用 CASE WHEN 语句(推荐)

这是最高效的批量更新方式,单次 SQL 查询更新多条记录。

const mysql = require('mysql2/promise');

async function bulkUpdateWithCaseWhen(updates) {
  const connection = await mysql.createConnection({
    host: 'localhost',
    user: 'root',
    password: 'password',
    database: 'test_db'
  });

  try {
    // 构建 CASE WHEN 语句
    const ids = updates.map(update => update.id);
    const cases = updates.map(update => 
      `WHEN id = ${update.id} THEN ${update.value}`
    ).join(' ');

    const sql = `
      UPDATE your_table
      SET 
        column1 = CASE id ${cases} END,
        column2 = CASE id 
          ${updates.map(update => 
            `WHEN id = ${update.id} THEN '${update.otherValue}'`
          ).join(' ')} 
        END
      WHERE id IN (${ids.join(',')})
    `;

    const [result] = await connection.execute(sql);
    return result;
  } finally {
    await connection.end();
  }
}

// 使用示例
const updates = [
  { id: 1, value: 100, otherValue: 'A' },
  { id: 2, value: 200, otherValue: 'B' },
  { id: 3, value: 300, otherValue: 'C' }
];

bulkUpdateWithCaseWhen(updates)
  .then(result => console.log('更新成功:', result.affectedRows))
  .catch(err => console.error('更新失败:', err));

方案二:使用 INSERT ... ON DUPLICATE KEY UPDATE

适用于有唯一索引或主键的表:

async function bulkUpdateOnDuplicate(updates) {
  const connection = await mysql.createConnection({
    host: 'localhost',
    user: 'root',
    password: 'password',
    database: 'test_db'
  });

  try {
    // 批量插入或更新
    const sql = `
      INSERT INTO your_table (id, column1, column2, column3)
      VALUES ?
      ON DUPLICATE KEY UPDATE
        column1 = VALUES(column1),
        column2 = VALUES(column2),
        column3 = VALUES(column3)
    `;

    const values = updates.map(update => [
      update.id,
      update.column1,
      update.column2,
      update.column3
    ]);

    const [result] = await connection.query(sql, [values]);
    return result;
  } finally {
    await connection.end();
  }
}

// 使用示例
const data = [
  { id: 1, column1: 'value1', column2: 100, column3: 'A' },
  { id: 2, column1: 'value2', column2: 200, column3: 'B' },
  { id: 3, column1: 'value3', column2: 300, column3: 'C' }
];

bulkUpdateOnDuplicate(data);

方案三:使用事务 + 循环更新(小批量数据)

async function bulkUpdateWithTransaction(updates) {
  const connection = await mysql.createConnection({
    host: 'localhost',
    user: 'root',
    password: 'password',
    database: 'test_db'
  });

  try {
    await connection.beginTransaction();

    for (const update of updates) {
      await connection.execute(
        'UPDATE your_table SET column1 = ?, column2 = ? WHERE id = ?',
        [update.column1, update.column2, update.id]
      );
    }

    await connection.commit();
    console.log('批量更新完成');
  } catch (error) {
    await connection.rollback();
    throw error;
  } finally {
    await connection.end();
  }
}

方案四:使用批量预处理语句

async function bulkUpdateWithPreparedStatements(updates) {
  const connection = await mysql.createConnection({
    host: 'localhost',
    user: 'root',
    password: 'password',
    database: 'test_db'
  });

  try {
    const updatePromises = updates.map(update => {
      return connection.execute(
        'UPDATE your_table SET name = ?, age = ? WHERE id = ?',
        [update.name, update.age, update.id]
      );
    });

    const results = await Promise.all(updatePromises);
    return results;
  } finally {
    await connection.end();
  }
}

方案五:使用 VALUES() 函数批量更新

async function bulkUpdateWithValues(updates) {
  const connection = await mysql.createConnection({
    host: 'localhost',
    user: 'root',
    password: 'password',
    database: 'test_db'
  });

  try {
    // 创建临时表或使用 VALUES 语法
    const values = updates.map(update => 
      `(${update.id}, '${update.name}', ${update.age})`
    ).join(', ');

    const sql = `
      UPDATE your_table t
      JOIN (VALUES ${values}) AS tmp(id, name, age)
        ON t.id = tmp.id
      SET t.name = tmp.name, t.age = tmp.age
    `;

    const [result] = await connection.execute(sql);
    return result;
  } finally {
    await connection.end();
  }
}

封装为通用工具类

class BulkUpdater {
  constructor(config) {
    this.pool = mysql.createPool(config);
  }

  async bulkUpdateCaseWhen(table, updates, whereField = 'id') {
    if (!updates || updates.length === 0) return { affectedRows: 0 };

    const fields = Object.keys(updates[0]).filter(key => key !== whereField);
    const ids = updates.map(update => update[whereField]);

    // 构建 CASE WHEN 语句
    const caseStatements = fields.map(field => {
      const cases = updates.map(update => 
        `WHEN ${whereField} = ${update[whereField]} THEN ${this.escapeValue(update[field])}`
      ).join(' ');

      return `${field} = CASE ${cases} END`;
    }).join(', ');

    const sql = `
      UPDATE ${table}
      SET ${caseStatements}
      WHERE ${whereField} IN (${ids.join(',')})
    `;

    const [result] = await this.pool.execute(sql);
    return result;
  }

  escapeValue(value) {
    if (typeof value === 'string') {
      return `'${value.replace(/'/g, "''")}'`;
    }
    if (value === null || value === undefined) {
      return 'NULL';
    }
    return value;
  }

  async bulkUpdateOnDuplicate(table, updates, uniqueFields = ['id']) {
    if (!updates || updates.length === 0) return { affectedRows: 0 };

    const fields = Object.keys(updates[0]);
    const values = updates.map(update => 
      fields.map(field => update[field])
    );

    const fieldList = fields.join(', ');
    const updateList = fields
      .filter(field => !uniqueFields.includes(field))
      .map(field => `${field} = VALUES(${field})`)
      .join(', ');

    const sql = `
      INSERT INTO ${table} (${fieldList})
      VALUES ?
      ON DUPLICATE KEY UPDATE ${updateList}
    `;

    const [result] = await this.pool.query(sql, [values]);
    return result;
  }

  async close() {
    await this.pool.end();
  }
}

// 使用示例
const bulkUpdater = new BulkUpdater({
  host: 'localhost',
  user: 'root',
  password: 'password',
  database: 'test_db',
  waitForConnections: true,
  connectionLimit: 10,
  queueLimit: 0
});

// 使用 CASE WHEN
await bulkUpdater.bulkUpdateCaseWhen('users', [
  { id: 1, name: 'Alice', age: 25 },
  { id: 2, name: 'Bob', age: 30 }
]);

// 使用 ON DUPLICATE KEY UPDATE
await bulkUpdater.bulkUpdateOnDuplicate('users', [
  { id: 1, name: 'Alice', age: 26 },
  { id: 3, name: 'Charlie', age: 28 }
]);

await bulkUpdater.close();

性能优化建议

数据量较大时:分批处理,每批 1000-5000 条 使用连接池:避免频繁创建连接 添加索引:确保 WHERE 条件的字段有索引 监控性能:使用 EXPLAIN 分析查询 错误处理:添加重试机制和日志记录

注意事项

SQL 注入防护:始终使用参数化查询或转义输入 数据类型:确保传递的值类型与数据库列类型匹配 事务一致性:需要原子性操作时使用事务 连接管理:及时释放连接资源 数据验证:更新前验证数据的有效性

选择哪种方案取决于具体需求:

  • 性能优先:方案一(CASE WHEN)
  • 存在则更新,不存在则插入:方案二
  • 需要事务保证:方案三
  • 更新字段不同:方案一或自定义方案
相关帖子
Fedora系统外观怎么设置? Fedora Linux外观主题设置技巧
Fedora系统外观怎么设置? Fedora Linux外观主题设置技巧
在2026年,生孩子时医保的起付线和报销比例是怎么计算的?
在2026年,生孩子时医保的起付线和报销比例是怎么计算的?
对于刚入行的青年技能人才,有哪些切实可行的路径可以快速提升自身薪酬?
对于刚入行的青年技能人才,有哪些切实可行的路径可以快速提升自身薪酬?
为什么用微波炉热某些食物时会发出爆裂声该如何避免?
为什么用微波炉热某些食物时会发出爆裂声该如何避免?
面对不断变种的骚扰电话,普通用户如何建立有效的防御意识?
面对不断变种的骚扰电话,普通用户如何建立有效的防御意识?
hpsysdrv.exe是什么进程?hpsysdrv.exe是病毒吗?
hpsysdrv.exe是什么进程?hpsysdrv.exe是病毒吗?
汕尾市AI数字人制作小视频#企业网站建设公司,专业开发团队
汕尾市AI数字人制作小视频#企业网站建设公司,专业开发团队
湘潭市小视频制作与剪辑&商城网站定制,专业设计团队
湘潭市小视频制作与剪辑&商城网站定制,专业设计团队
coreldraw怎么勾画轮廓图?coreldraw勾画轮廓图方法
coreldraw怎么勾画轮廓图?coreldraw勾画轮廓图方法
Pandas索引器 loc 和 iloc 比较及代码示例
Pandas索引器 loc 和 iloc 比较及代码示例
MYSQL的日志文件详解
MYSQL的日志文件详解
益阳市殡葬一条龙公司电话|家庭白事服务,24小时服务热线
益阳市殡葬一条龙公司电话|家庭白事服务,24小时服务热线
2026年婚纱租赁店的卫生管理,是否有相关的行业规范或标准可以参考?
2026年婚纱租赁店的卫生管理,是否有相关的行业规范或标准可以参考?
面对一些生活必需品的价格波动,普通家庭可以采取哪些策略来平稳开支?
面对一些生活必需品的价格波动,普通家庭可以采取哪些策略来平稳开支?
手机号不再使用,但忘记绑定了哪些应用,有什么方法可以全面查询和解绑?
手机号不再使用,但忘记绑定了哪些应用,有什么方法可以全面查询和解绑?
对于网络上流传的历史类或社会类文章,普通读者可以从哪些角度评估其可信度?
对于网络上流传的历史类或社会类文章,普通读者可以从哪些角度评估其可信度?
Tomcat服务器日志超详细讲解
Tomcat服务器日志超详细讲解
AJAX POST数据中有特殊符号(转义字符)导致数据丢失的解决方法
AJAX POST数据中有特殊符号(转义字符)导致数据丢失的解决方法
明明知道熬夜不好,为什么就是控制不住自己?如何克服报复性熬夜?
明明知道熬夜不好,为什么就是控制不住自己?如何克服报复性熬夜?
不同城市间关于父母随迁落户的具体执行细则,主要差异体现在哪里?
不同城市间关于父母随迁落户的具体执行细则,主要差异体现在哪里?