mysql2 库进行批量更新(Bulk Update)有几种常见的方案,以下是详细的实现方法:
这是最高效的批量更新方式,单次 SQL 查询更新多条记录。
const mysql = require('mysql2/promise');
async function bulkUpdateWithCaseWhen(updates) {
const connection = await mysql.createConnection({
host: 'localhost',
user: 'root',
password: 'password',
database: 'test_db'
});
try {
// 构建 CASE WHEN 语句
const ids = updates.map(update => update.id);
const cases = updates.map(update =>
`WHEN id = ${update.id} THEN ${update.value}`
).join(' ');
const sql = `
UPDATE your_table
SET
column1 = CASE id ${cases} END,
column2 = CASE id
${updates.map(update =>
`WHEN id = ${update.id} THEN '${update.otherValue}'`
).join(' ')}
END
WHERE id IN (${ids.join(',')})
`;
const [result] = await connection.execute(sql);
return result;
} finally {
await connection.end();
}
}
// 使用示例
const updates = [
{ id: 1, value: 100, otherValue: 'A' },
{ id: 2, value: 200, otherValue: 'B' },
{ id: 3, value: 300, otherValue: 'C' }
];
bulkUpdateWithCaseWhen(updates)
.then(result => console.log('更新成功:', result.affectedRows))
.catch(err => console.error('更新失败:', err));
适用于有唯一索引或主键的表:
async function bulkUpdateOnDuplicate(updates) {
const connection = await mysql.createConnection({
host: 'localhost',
user: 'root',
password: 'password',
database: 'test_db'
});
try {
// 批量插入或更新
const sql = `
INSERT INTO your_table (id, column1, column2, column3)
VALUES ?
ON DUPLICATE KEY UPDATE
column1 = VALUES(column1),
column2 = VALUES(column2),
column3 = VALUES(column3)
`;
const values = updates.map(update => [
update.id,
update.column1,
update.column2,
update.column3
]);
const [result] = await connection.query(sql, [values]);
return result;
} finally {
await connection.end();
}
}
// 使用示例
const data = [
{ id: 1, column1: 'value1', column2: 100, column3: 'A' },
{ id: 2, column1: 'value2', column2: 200, column3: 'B' },
{ id: 3, column1: 'value3', column2: 300, column3: 'C' }
];
bulkUpdateOnDuplicate(data);
async function bulkUpdateWithTransaction(updates) {
const connection = await mysql.createConnection({
host: 'localhost',
user: 'root',
password: 'password',
database: 'test_db'
});
try {
await connection.beginTransaction();
for (const update of updates) {
await connection.execute(
'UPDATE your_table SET column1 = ?, column2 = ? WHERE id = ?',
[update.column1, update.column2, update.id]
);
}
await connection.commit();
console.log('批量更新完成');
} catch (error) {
await connection.rollback();
throw error;
} finally {
await connection.end();
}
}
async function bulkUpdateWithPreparedStatements(updates) {
const connection = await mysql.createConnection({
host: 'localhost',
user: 'root',
password: 'password',
database: 'test_db'
});
try {
const updatePromises = updates.map(update => {
return connection.execute(
'UPDATE your_table SET name = ?, age = ? WHERE id = ?',
[update.name, update.age, update.id]
);
});
const results = await Promise.all(updatePromises);
return results;
} finally {
await connection.end();
}
}
async function bulkUpdateWithValues(updates) {
const connection = await mysql.createConnection({
host: 'localhost',
user: 'root',
password: 'password',
database: 'test_db'
});
try {
// 创建临时表或使用 VALUES 语法
const values = updates.map(update =>
`(${update.id}, '${update.name}', ${update.age})`
).join(', ');
const sql = `
UPDATE your_table t
JOIN (VALUES ${values}) AS tmp(id, name, age)
ON t.id = tmp.id
SET t.name = tmp.name, t.age = tmp.age
`;
const [result] = await connection.execute(sql);
return result;
} finally {
await connection.end();
}
}
class BulkUpdater {
constructor(config) {
this.pool = mysql.createPool(config);
}
async bulkUpdateCaseWhen(table, updates, whereField = 'id') {
if (!updates || updates.length === 0) return { affectedRows: 0 };
const fields = Object.keys(updates[0]).filter(key => key !== whereField);
const ids = updates.map(update => update[whereField]);
// 构建 CASE WHEN 语句
const caseStatements = fields.map(field => {
const cases = updates.map(update =>
`WHEN ${whereField} = ${update[whereField]} THEN ${this.escapeValue(update[field])}`
).join(' ');
return `${field} = CASE ${cases} END`;
}).join(', ');
const sql = `
UPDATE ${table}
SET ${caseStatements}
WHERE ${whereField} IN (${ids.join(',')})
`;
const [result] = await this.pool.execute(sql);
return result;
}
escapeValue(value) {
if (typeof value === 'string') {
return `'${value.replace(/'/g, "''")}'`;
}
if (value === null || value === undefined) {
return 'NULL';
}
return value;
}
async bulkUpdateOnDuplicate(table, updates, uniqueFields = ['id']) {
if (!updates || updates.length === 0) return { affectedRows: 0 };
const fields = Object.keys(updates[0]);
const values = updates.map(update =>
fields.map(field => update[field])
);
const fieldList = fields.join(', ');
const updateList = fields
.filter(field => !uniqueFields.includes(field))
.map(field => `${field} = VALUES(${field})`)
.join(', ');
const sql = `
INSERT INTO ${table} (${fieldList})
VALUES ?
ON DUPLICATE KEY UPDATE ${updateList}
`;
const [result] = await this.pool.query(sql, [values]);
return result;
}
async close() {
await this.pool.end();
}
}
// 使用示例
const bulkUpdater = new BulkUpdater({
host: 'localhost',
user: 'root',
password: 'password',
database: 'test_db',
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0
});
// 使用 CASE WHEN
await bulkUpdater.bulkUpdateCaseWhen('users', [
{ id: 1, name: 'Alice', age: 25 },
{ id: 2, name: 'Bob', age: 30 }
]);
// 使用 ON DUPLICATE KEY UPDATE
await bulkUpdater.bulkUpdateOnDuplicate('users', [
{ id: 1, name: 'Alice', age: 26 },
{ id: 3, name: 'Charlie', age: 28 }
]);
await bulkUpdater.close();
选择哪种方案取决于具体需求: