Node.js 인터뷰 질문 84
질문: Node.js의 워커 스레드(Worker Threads)에 대해 설명하고, CPU 집약적 작업에 어떻게 활용할 수 있는지 예제와 함께 설명해주세요.
답변:
Node.js는 기본적으로 단일 스레드 모델을 사용하지만, 10.5.0 버전부터 worker_threads
모듈을 통해 멀티스레딩을 지원합니다. 이는 CPU 집약적인 작업을 병렬로 처리할 수 있게 해주어 성능을 크게 향상시킬 수 있습니다.
1. 워커 스레드의 기본 개념
워커 스레드는 메인 스레드와 별도로 실행되는 백그라운드 스레드로, 다음과 같은 특징을 가집니다:
- 메인 이벤트 루프 차단 없이 CPU 집약적 작업 수행 가능
- 스레드 간 메모리 공유 지원
- 메시지 패싱을 통한 통신
- 각 워커는 자체 V8 인스턴스와 이벤트 루프 보유
2. 기본 워커 스레드 사용법
// main.js
const { Worker } = require("worker_threads");
// 워커 생성
function runWorker(workerData) {
return new Promise((resolve, reject) => {
const worker = new Worker("./worker.js", { workerData });
worker.on("message", resolve);
worker.on("error", reject);
worker.on("exit", (code) => {
if (code !== 0) reject(new Error(`워커가 종료 코드 ${code}로 종료됨`));
});
});
}
// 워커 실행
async function main() {
try {
const result = await runWorker({ num: 50 });
console.log("피보나치 결과:", result);
} catch (err) {
console.error(err);
}
}
main();
// worker.js
const { parentPort, workerData } = require("worker_threads");
// CPU 집약적인 작업 - 피보나치 계산
function fibonacci(n) {
if (n <= 1) return n;
return fibonacci(n - 1) + fibonacci(n - 2);
}
// 워커 데이터로부터 입력 받기
const result = fibonacci(workerData.num);
// 결과를 메인 스레드로 전송
parentPort.postMessage(result);
3. 워커 풀을 사용한 부하 분산
여러 CPU 코어를 효율적으로 활용하기 위해 워커 풀을 구성할 수 있습니다:
// worker-pool.js
const { Worker } = require("worker_threads");
const os = require("os");
class WorkerPool {
constructor(workerPath, numWorkers = os.cpus().length) {
this.workerPath = workerPath;
this.numWorkers = numWorkers;
this.workers = [];
this.freeWorkers = [];
for (let i = 0; i < this.numWorkers; i++) {
this.addNewWorker();
}
}
addNewWorker() {
const worker = new Worker(this.workerPath);
worker.on("message", (result) => {
// 작업 완료 시 콜백 호출
worker.taskCallback(null, result);
worker.taskCallback = null;
// 워커를 다시 풀에 추가
this.freeWorkers.push(worker);
this.processQueue();
});
worker.on("error", (err) => {
// 에러 처리
if (worker.taskCallback) {
worker.taskCallback(err, null);
}
// 문제가 있는 워커 제거 및 새 워커 추가
this.workers = this.workers.filter((w) => w !== worker);
this.addNewWorker();
});
this.workers.push(worker);
this.freeWorkers.push(worker);
}
runTask(task, callback) {
if (this.freeWorkers.length === 0) {
// 대기열에 작업 추가 로직 구현 가능
// ...
return;
}
const worker = this.freeWorkers.pop();
worker.taskCallback = callback;
worker.postMessage(task);
}
processQueue() {
// 대기열 처리 로직 구현 가능
// ...
}
close() {
for (const worker of this.workers) {
worker.terminate();
}
}
}
module.exports = WorkerPool;
// main-with-pool.js
const WorkerPool = require("./worker-pool");
const pool = new WorkerPool("./worker-task.js");
// 여러 작업 실행
for (let i = 0; i < 10; i++) {
pool.runTask({ num: 40 + i }, (err, result) => {
if (err) {
return console.error(`작업 ${i} 오류:`, err);
}
console.log(`작업 ${i} 결과:`, result);
});
}
// 모든 작업이 끝나면 풀 종료
// 실제 구현에서는 더 복잡한 종료 로직이 필요할 수 있음
setTimeout(() => {
pool.close();
}, 60000);
// worker-task.js
const { parentPort } = require("worker_threads");
// CPU 집약적인 작업 함수
function fibonacci(n) {
if (n <= 1) return n;
return fibonacci(n - 1) + fibonacci(n - 2);
}
// 메시지 수신 시 작업 처리
parentPort.on("message", (task) => {
const result = fibonacci(task.num);
parentPort.postMessage(result);
});
4. 데이터 공유 및 전송
워커 스레드는 다양한 방법으로 데이터를 공유하고 전송할 수 있습니다:
4.1 메시지 전송 (복사 방식)
// 기본 메시지 전송 (직렬화 후 복사)
worker.postMessage({ data: largeArray });
4.2 SharedArrayBuffer를 통한 메모리 공유
// main.js
const { Worker } = require("worker_threads");
// 공유 메모리 버퍼 생성
const sharedBuffer = new SharedArrayBuffer(4 * 10); // 10개의 정수를 위한 공간
const sharedArray = new Int32Array(sharedBuffer);
// 초기 데이터 설정
for (let i = 0; i < sharedArray.length; i++) {
Atomics.store(sharedArray, i, i);
}
const worker = new Worker("./shared-worker.js", {
workerData: { sharedBuffer },
});
worker.on("message", (msg) => {
console.log("워커에서 처리 완료");
// 결과 확인
for (let i = 0; i < sharedArray.length; i++) {
console.log(`sharedArray[${i}] = ${Atomics.load(sharedArray, i)}`);
}
});
// shared-worker.js
const { parentPort, workerData } = require("worker_threads");
const { sharedBuffer } = workerData;
// 공유 배열에 접근
const sharedArray = new Int32Array(sharedBuffer);
// 값을 두 배로 만들기
for (let i = 0; i < sharedArray.length; i++) {
// Atomics API를 사용한 안전한 읽기/쓰기
const value = Atomics.load(sharedArray, i);
Atomics.store(sharedArray, i, value * 2);
}
parentPort.postMessage("완료");
4.3 MessageChannel을 통한 양방향 통신
// main.js
const { Worker, MessageChannel } = require("worker_threads");
// 채널 생성
const { port1, port2 } = new MessageChannel();
// 워커 생성 및 포트 전달
const worker = new Worker("./channel-worker.js");
worker.postMessage({ port: port2 }, [port2]);
// 포트1에서 메시지 수신
port1.on("message", (msg) => {
console.log("워커로부터 메시지:", msg);
// 응답 전송
port1.postMessage("메인에서 응답");
});
// channel-worker.js
const { parentPort, workerData } = require("worker_threads");
// MessagePort 수신
parentPort.once("message", ({ port }) => {
// 메시지 전송
port.postMessage("워커에서 메시지 전송");
// 메시지 수신
port.on("message", (msg) => {
console.log("메인으로부터 메시지:", msg);
});
});
4.4 transferList를 통한 제로 복사 전송
// 버퍼 소유권 이전 (제로 복사)
const buffer = new ArrayBuffer(1024 * 1024 * 32); // 32MB 버퍼
worker.postMessage({ buffer }, [buffer]);
5. 워커 스레드를 활용한 CPU 집약적 작업 처리
5.1 이미지 처리 예제
// image-processor.js
const { Worker } = require("worker_threads");
const path = require("path");
const fs = require("fs");
// 이미지 처리 작업을 여러 워커로 분산
async function processImage(imagePath, outputPath, numWorkers = 4) {
// 이미지 데이터 로드
const imageBuffer = fs.readFileSync(imagePath);
// 워커별 작업 분할
const workerPromises = [];
const chunkSize = Math.ceil(imageBuffer.length / numWorkers);
for (let i = 0; i < numWorkers; i++) {
const start = i * chunkSize;
const end = Math.min(start + chunkSize, imageBuffer.length);
// 각 워커에 대한 슬라이스 계산
const chunk = imageBuffer.slice(start, end);
workerPromises.push(
runWorker("./image-worker.js", {
chunk,
index: i,
total: numWorkers,
})
);
}
// 모든 워커 완료 대기
const results = await Promise.all(workerPromises);
// 결과 병합
const resultBuffers = results.map((r) => r.processedChunk);
const resultBuffer = Buffer.concat(resultBuffers);
// 출력 파일 저장
fs.writeFileSync(outputPath, resultBuffer);
return {
success: true,
processingTime: Math.max(...results.map((r) => r.processingTime)),
};
}
// 워커 실행 헬퍼 함수
function runWorker(workerPath, workerData) {
return new Promise((resolve, reject) => {
const worker = new Worker(path.resolve(workerPath), { workerData });
worker.on("message", resolve);
worker.on("error", reject);
worker.on("exit", (code) => {
if (code !== 0) reject(new Error(`워커가 코드 ${code}로 종료됨`));
});
});
}
// 사용 예시
async function main() {
console.time("이미지 처리");
try {
const result = await processImage("./input.jpg", "./output.jpg");
console.log("처리 완료:", result);
} catch (err) {
console.error("처리 오류:", err);
}
console.timeEnd("이미지 처리");
}
main();
// image-worker.js
const { parentPort, workerData } = require("worker_threads");
const { chunk, index, total } = workerData;
// 이미지 처리 시작 시간
const startTime = Date.now();
// CPU 집약적인 이미지 처리 시뮬레이션
function processImageChunk(buffer) {
// 이 예제에서는 간단하게 밝기 조정을 구현
// 실제로는 더 복잡한 알고리즘이 적용될 수 있음
const result = Buffer.alloc(buffer.length);
for (let i = 0; i < buffer.length; i++) {
// 픽셀 밝기 증가 (실제 이미지 처리는 더 복잡함)
result[i] = Math.min(buffer[i] + 50, 255);
}
// 처리 지연 시뮬레이션 (실제 무거운 작업)
let sum = 0;
for (let i = 0; i < 1000000; i++) {
sum += Math.sqrt(i);
}
return result;
}
// 이미지 청크 처리
const processedChunk = processImageChunk(chunk);
// 처리 시간 계산
const processingTime = Date.now() - startTime;
// 결과 반환
parentPort.postMessage({
index,
processingTime,
processedChunk, // 처리된 버퍼
});
5.2 대규모 데이터 병렬 처리
// data-processor.js
const { Worker } = require("worker_threads");
const os = require("os");
async function processLargeDataset(
data,
processingFn,
numWorkers = os.cpus().length
) {
// 데이터 청크로 분할
const chunkSize = Math.ceil(data.length / numWorkers);
const chunks = [];
for (let i = 0; i < numWorkers; i++) {
const start = i * chunkSize;
const end = Math.min(start + chunkSize, data.length);
chunks.push(data.slice(start, end));
}
// 각 청크에 대해 워커 생성
const workerPromises = chunks.map((chunk, index) => {
return new Promise((resolve, reject) => {
const worker = new Worker("./data-worker.js", {
workerData: {
chunk,
index,
processingFn: processingFn.toString(), // 함수를 문자열로 전달
},
});
worker.on("message", resolve);
worker.on("error", reject);
worker.on("exit", (code) => {
if (code !== 0) reject(new Error(`워커가 코드 ${code}로 종료됨`));
});
});
});
// 모든 워커의 결과 수집
const results = await Promise.all(workerPromises);
// 결과 병합
return [].concat(...results.map((r) => r.processedData));
}
// 사용 예시
async function main() {
// 대규모 데이터셋 생성
const largeDataset = Array.from({ length: 10000000 }, (_, i) => i);
console.time("데이터 처리");
// 각 항목을 제곱하는 처리 함수
const processingFn = (x) => {
// CPU 작업 시뮬레이션
let result = 0;
for (let i = 0; i < 1000; i++) {
result += Math.sin(x) * Math.cos(x);
}
return x * x + result;
};
const result = await processLargeDataset(largeDataset, processingFn);
console.log(`처리된 항목 수: ${result.length}`);
console.log(`첫 5개 결과:`, result.slice(0, 5));
console.timeEnd("데이터 처리");
}
main().catch(console.error);
// data-worker.js
const { parentPort, workerData } = require("worker_threads");
const { chunk, index, processingFn } = workerData;
// 문자열에서 함수 복원
const processFn = eval(`(${processingFn})`);
// 데이터 처리
const processedData = chunk.map(processFn);
// 결과 반환
parentPort.postMessage({
index,
processedData,
});
6. 실제 응용 사례 및 고려사항
6.1 워커 스레드 사용이 적합한 경우
CPU 집약적 연산:
- 복잡한 수학 연산, 암호화, 해시 계산
- 대규모 데이터 정렬 및 검색
- 인공지능/머신러닝 추론
이미지/비디오 처리:
- 이미지 변환 및 필터 적용
- 비디오 인코딩/디코딩
- 이미지 객체 인식
데이터 압축/해제:
- 대용량 파일 압축/해제
- 데이터 마이닝 작업
6.2 주의사항 및 모범 사례
// 워커 스레드 모범 사례 예시
const { Worker } = require("worker_threads");
const os = require("os");
class WorkerManager {
constructor(workerScript, options = {}) {
this.workerScript = workerScript;
this.maxWorkers = options.maxWorkers || os.cpus().length;
this.terminateOnComplete = options.terminateOnComplete !== false;
this.workers = new Map();
this.taskQueue = [];
this.activeWorkers = 0;
}
runTask(data) {
return new Promise((resolve, reject) => {
const task = { data, resolve, reject };
if (this.activeWorkers < this.maxWorkers) {
this.startWorkerWithTask(task);
} else {
this.taskQueue.push(task);
}
});
}
startWorkerWithTask(task) {
const worker = new Worker(this.workerScript, {
workerData: task.data,
});
const workerId = Date.now() + Math.random();
this.workers.set(workerId, worker);
this.activeWorkers++;
worker.on("message", (result) => {
task.resolve(result);
if (this.terminateOnComplete) {
worker.terminate();
this.workers.delete(workerId);
this.activeWorkers--;
// 대기 중인 작업이 있으면 처리
if (this.taskQueue.length > 0) {
this.startWorkerWithTask(this.taskQueue.shift());
}
}
});
worker.on("error", (err) => {
task.reject(err);
worker.terminate();
this.workers.delete(workerId);
this.activeWorkers--;
// 오류 발생 시에도 다음 작업 처리
if (this.taskQueue.length > 0) {
this.startWorkerWithTask(this.taskQueue.shift());
}
});
worker.on("exit", (code) => {
if (code !== 0 && !this.terminateOnComplete) {
task.reject(new Error(`워커가 코드 ${code}로 종료됨`));
}
// 비정상 종료 시 관리 상태 정리
if (this.workers.has(workerId)) {
this.workers.delete(workerId);
this.activeWorkers--;
if (this.taskQueue.length > 0) {
this.startWorkerWithTask(this.taskQueue.shift());
}
}
});
}
terminateAll() {
for (const worker of this.workers.values()) {
worker.terminate();
}
this.workers.clear();
this.activeWorkers = 0;
this.taskQueue = [];
}
}
// 사용 예시
const manager = new WorkerManager("./heavy-task.js", {
maxWorkers: 4,
terminateOnComplete: true,
});
async function runTasks() {
const tasks = Array.from({ length: 20 }, (_, i) => ({
taskId: i,
value: Math.random() * 100,
}));
try {
const results = await Promise.all(
tasks.map((task) => manager.runTask(task))
);
console.log("모든 작업 완료:", results.length);
} catch (err) {
console.error("작업 오류:", err);
} finally {
manager.terminateAll();
}
}
runTasks();
요약
Node.js의 워커 스레드는 다음과 같은 장점을 제공합니다:
- CPU 병렬 처리: 다중 코어 활용을 통한 성능 향상
- 메인 스레드 블로킹 방지: 이벤트 루프를 차단하지 않고 무거운 연산 수행
- 메모리 공유 옵션: SharedArrayBuffer를 통한 효율적인 데이터 공유
- 격리된 실행 환경: 각 워커는 독립적인 V8 인스턴스 보유
워커 스레드는 다음과 같은 상황에서 특히 유용합니다:
- 암호화 및 해싱
- 이미지/비디오 처리
- 대규모 데이터셋 분석
- 복잡한 수학 연산
하지만 다음 사항을 고려해야 합니다:
- 워커 생성 및 통신 오버헤드
- 추가적인 메모리 사용
- 데이터 동기화 복잡성
적절한 사용 사례에서 워커 스레드는 Node.js 애플리케이션의 성능을 크게 향상시킬 수 있는 강력한 도구입니다.