下面是详细讲解“Node.js中child_process实现多进程”的完整攻略。
一、什么是child_process模块
在Node.js中,使用child_process模块可以创建并控制子进程。这个模块提供了四个函数:spawn、exec、execFile、fork,分别对应不同类型的子进程。
二、何时使用多进程
在一些需要高并发处理的场景中,单进程的性能可能会遇到瓶颈,此时可以考虑使用多进程来提高性能。
三、使用child_process实现多进程的步骤
下面以创建多个子进程来同时处理请求为例,讲解如何使用child_process模块实现多进程。
1. 启动子进程
具体的代码如下所示:
const cp = require('child_process');
const numCPUs = require('os').cpus().length;
for (let i = 0; i < numCPUs; i++) {
const child = cp.fork('./worker.js');
console.log(`Started worker ${child.pid}`);
}
以上代码会启动和CPU数量相等的子进程,每个子进程执行的脚本为worker.js
。
2. 子进程执行任务
子进程在启动后,会执行worker.js
脚本中的代码。在worker.js
中,可以监听主进程发送的消息,并根据不同的消息类型执行不同的任务。具体的代码如下所示:
process.on('message', ({ type, payload }) => {
switch (type) {
case 'TASK_A':
doTaskA(payload);
break;
case 'TASK_B':
doTaskB(payload);
break;
default:
break;
}
});
function doTaskA(payload) {
// 处理任务A
process.send({ type: 'TASK_A_RESULT', payload: result });
}
function doTaskB(payload) {
// 处理任务B
process.send({ type: 'TASK_B_RESULT', payload: result });
}
上述代码中,process.on('message')
用于监听主进程发送的消息,根据不同的消息类型执行不同的任务。任务执行完毕后,会通过process.send
方法向主进程返回执行结果。
3. 主进程发送任务
主进程中可以根据需要将不同类型的任务分发给子进程执行,具体的代码如下所示:
const workers = [];
for (let i = 0; i < numCPUs; i++) {
const child = cp.fork('./worker.js');
console.log(`Started worker ${child.pid}`);
workers.push(child);
}
workers.forEach((worker, index) => {
worker.send({ type: 'TASK_A', payload: `Task A for worker ${index}` });
worker.send({ type: 'TASK_B', payload: `Task B for worker ${index}` });
});
以上代码会将TASK_A
和TASK_B
两种任务分发给每个子进程执行。每个任务的payload
可以根据需要进行设置。
四、使用示例
下面分别提供两个使用child_process实现多进程的示例:
示例一:使用child_process处理大文件
以下代码会使用两个子进程,分别对两个大文件进行排序,并将结果写入新的文件中。
const fs = require('fs');
const path = require('path');
const cp = require('child_process');
const util = require('util');
const numCPUs = require('os').cpus().length;
const inFile = path.join(__dirname, 'bigfile.txt');
const outFile = path.join(__dirname, 'output.txt');
let lineCount = 0;
const readStream = fs.createReadStream(inFile);
readStream.on('data', chunk => {
const lines = chunk.toString().split('\n');
lineCount += lines.length;
// 分配给每个子进程的行数
const chunkSize = Math.ceil(lines.length / numCPUs);
for (let i = 0; i < numCPUs; i++) {
const start = chunkSize * i;
const end = start + chunkSize;
const child = cp.fork(path.join(__dirname, 'child.js'));
const subLines = lines.slice(start, end);
const payload = {
inFile,
outFile,
subLines
};
child.send(payload);
console.log(`Forked process with PID ${child.pid}.`);
}
});
readStream.on('end', () => {
console.log(`There are ${lineCount} lines in total.`);
});
let processedCount = 0;
function messageHandler(childProcess) {
return function (message) {
processedCount++;
if (message.type === 'done') {
console.log(`Process with PID ${childProcess.pid} is done.`);
}
if (processedCount === numCPUs) {
console.log(`All processes are done. Start merging files.`);
mergeFiles(numCPUs);
}
};
}
function mergeFiles(count) {
const readerStreams = [];
for (let i = 0; i < count; i++) {
const reader = fs.createReadStream(
path.join(__dirname, `part-${i}.txt`),
{ encoding: 'utf-8' }
);
readerStreams.push(reader);
}
let writeStream = fs.createWriteStream(outFile, { encoding: 'utf-8' });
util.promisify(writeStream.on).call(writeStream, 'finish', () => {
console.log(`Merging files is done.`);
});
let index = 0;
function writeNext() {
const reader = readerStreams.shift();
if (reader) {
reader.pipe(writeStream, { end: false });
reader.once('end', () => {
console.log(`Finished writing file ${index}.`);
index++;
writeNext();
});
}
}
writeNext();
}
子进程的代码如下:
const fs = require('fs');
const path = require('path');
process.on('message', message => {
const { inFile, outFile, subLines } = message;
subLines.sort();
const content = subLines.join('\n');
fs.writeFile(path.join(__dirname, `part-${process.pid}.txt`), content, err => {
if (err) {
process.send({ type: 'error', message: err.message });
} else {
process.send({ type: 'done' });
}
process.exit();
});
});
示例二:使用child_process处理CPU密集型计算
以下代码会使用多个子进程,计算质数。主进程会将计算任务分发给子进程并进行计算结果的汇总。
const cp = require('child_process');
const numCPUs = require('os').cpus().length;
let idx = 1;
for (let i = 0; i < numCPUs; i++) {
const child = cp.fork('./worker.js');
child.on('message', msg => {
console.log(`Worker ${msg.workerId} calculated ${msg.count} prime numbers.`);
idx++;
if (idx > 100) {
console.log(`All workers are done.`);
process.exit();
}
});
const payload = {
workerId: i,
start: i * 100000,
end: (i + 1) * 100000 - 1
};
child.send(payload);
console.log(`Forked worker ${child.pid}.`);
}
上述代码中,主进程会启动和CPU数量相等的子进程,每个子进程执行的代码为worker.js
。主进程将需要计算的数值范围分配给每个子进程,并等待每个子进程返回计算结果。
子进程的代码如下所示:
process.on('message', msg => {
const { start, end } = msg;
let count = 0;
for (let i = start; i <= end; i++) {
if (isPrime(i)) {
count++;
}
}
process.send({ count, workerId: msg.workerId });
process.exit();
});
function isPrime(num) {
for (let i = 2; i <= Math.sqrt(num); i++) {
if (num % i === 0) {
return false;
}
}
return num !== 1;
}
上述代码中,子进程会接收到主进程分配的数值范围,并计算出这个范围内的质数个数。计算完毕后,子进程通过process.send
方法向主进程返回计算结果。
以上就是使用child_process模块实现多进程的完整攻略和两个示例。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Node.js中child_process实现多进程 - Python技术站