Node.js 인터뷰 질문 84

질문: Node.js의 워커 스레드(Worker Threads)에 대해 설명하고, CPU 집약적 작업에 어떻게 활용할 수 있는지 예제와 함께 설명해주세요.

답변:

Node.js는 기본적으로 단일 스레드 모델을 사용하지만, 10.5.0 버전부터 worker_threads 모듈을 통해 멀티스레딩을 지원합니다. 이는 CPU 집약적인 작업을 병렬로 처리할 수 있게 해주어 성능을 크게 향상시킬 수 있습니다.

1. 워커 스레드의 기본 개념

워커 스레드는 메인 스레드와 별도로 실행되는 백그라운드 스레드로, 다음과 같은 특징을 가집니다:

  • 메인 이벤트 루프 차단 없이 CPU 집약적 작업 수행 가능
  • 스레드 간 메모리 공유 지원
  • 메시지 패싱을 통한 통신
  • 각 워커는 자체 V8 인스턴스와 이벤트 루프 보유

2. 기본 워커 스레드 사용법

// main.js
const { Worker } = require("worker_threads");

// 워커 생성
function runWorker(workerData) {
  return new Promise((resolve, reject) => {
    const worker = new Worker("./worker.js", { workerData });

    worker.on("message", resolve);
    worker.on("error", reject);
    worker.on("exit", (code) => {
      if (code !== 0) reject(new Error(`워커가 종료 코드 ${code}로 종료됨`));
    });
  });
}

// 워커 실행
async function main() {
  try {
    const result = await runWorker({ num: 50 });
    console.log("피보나치 결과:", result);
  } catch (err) {
    console.error(err);
  }
}

main();
// worker.js
const { parentPort, workerData } = require("worker_threads");

// CPU 집약적인 작업 - 피보나치 계산
function fibonacci(n) {
  if (n <= 1) return n;
  return fibonacci(n - 1) + fibonacci(n - 2);
}

// 워커 데이터로부터 입력 받기
const result = fibonacci(workerData.num);

// 결과를 메인 스레드로 전송
parentPort.postMessage(result);

3. 워커 풀을 사용한 부하 분산

여러 CPU 코어를 효율적으로 활용하기 위해 워커 풀을 구성할 수 있습니다:

// worker-pool.js
const { Worker } = require("worker_threads");
const os = require("os");

class WorkerPool {
  constructor(workerPath, numWorkers = os.cpus().length) {
    this.workerPath = workerPath;
    this.numWorkers = numWorkers;
    this.workers = [];
    this.freeWorkers = [];

    for (let i = 0; i < this.numWorkers; i++) {
      this.addNewWorker();
    }
  }

  addNewWorker() {
    const worker = new Worker(this.workerPath);

    worker.on("message", (result) => {
      // 작업 완료 시 콜백 호출
      worker.taskCallback(null, result);
      worker.taskCallback = null;

      // 워커를 다시 풀에 추가
      this.freeWorkers.push(worker);
      this.processQueue();
    });

    worker.on("error", (err) => {
      // 에러 처리
      if (worker.taskCallback) {
        worker.taskCallback(err, null);
      }

      // 문제가 있는 워커 제거 및 새 워커 추가
      this.workers = this.workers.filter((w) => w !== worker);
      this.addNewWorker();
    });

    this.workers.push(worker);
    this.freeWorkers.push(worker);
  }

  runTask(task, callback) {
    if (this.freeWorkers.length === 0) {
      // 대기열에 작업 추가 로직 구현 가능
      // ...
      return;
    }

    const worker = this.freeWorkers.pop();
    worker.taskCallback = callback;
    worker.postMessage(task);
  }

  processQueue() {
    // 대기열 처리 로직 구현 가능
    // ...
  }

  close() {
    for (const worker of this.workers) {
      worker.terminate();
    }
  }
}

module.exports = WorkerPool;
// main-with-pool.js
const WorkerPool = require("./worker-pool");
const pool = new WorkerPool("./worker-task.js");

// 여러 작업 실행
for (let i = 0; i < 10; i++) {
  pool.runTask({ num: 40 + i }, (err, result) => {
    if (err) {
      return console.error(`작업 ${i} 오류:`, err);
    }
    console.log(`작업 ${i} 결과:`, result);
  });
}

// 모든 작업이 끝나면 풀 종료
// 실제 구현에서는 더 복잡한 종료 로직이 필요할 수 있음
setTimeout(() => {
  pool.close();
}, 60000);
// worker-task.js
const { parentPort } = require("worker_threads");

// CPU 집약적인 작업 함수
function fibonacci(n) {
  if (n <= 1) return n;
  return fibonacci(n - 1) + fibonacci(n - 2);
}

// 메시지 수신 시 작업 처리
parentPort.on("message", (task) => {
  const result = fibonacci(task.num);
  parentPort.postMessage(result);
});

4. 데이터 공유 및 전송

워커 스레드는 다양한 방법으로 데이터를 공유하고 전송할 수 있습니다:

4.1 메시지 전송 (복사 방식)

// 기본 메시지 전송 (직렬화 후 복사)
worker.postMessage({ data: largeArray });

4.2 SharedArrayBuffer를 통한 메모리 공유

// main.js
const { Worker } = require("worker_threads");

// 공유 메모리 버퍼 생성
const sharedBuffer = new SharedArrayBuffer(4 * 10); // 10개의 정수를 위한 공간
const sharedArray = new Int32Array(sharedBuffer);

// 초기 데이터 설정
for (let i = 0; i < sharedArray.length; i++) {
  Atomics.store(sharedArray, i, i);
}

const worker = new Worker("./shared-worker.js", {
  workerData: { sharedBuffer },
});

worker.on("message", (msg) => {
  console.log("워커에서 처리 완료");

  // 결과 확인
  for (let i = 0; i < sharedArray.length; i++) {
    console.log(`sharedArray[${i}] = ${Atomics.load(sharedArray, i)}`);
  }
});
// shared-worker.js
const { parentPort, workerData } = require("worker_threads");
const { sharedBuffer } = workerData;

// 공유 배열에 접근
const sharedArray = new Int32Array(sharedBuffer);

// 값을 두 배로 만들기
for (let i = 0; i < sharedArray.length; i++) {
  // Atomics API를 사용한 안전한 읽기/쓰기
  const value = Atomics.load(sharedArray, i);
  Atomics.store(sharedArray, i, value * 2);
}

parentPort.postMessage("완료");

4.3 MessageChannel을 통한 양방향 통신

// main.js
const { Worker, MessageChannel } = require("worker_threads");

// 채널 생성
const { port1, port2 } = new MessageChannel();

// 워커 생성 및 포트 전달
const worker = new Worker("./channel-worker.js");
worker.postMessage({ port: port2 }, [port2]);

// 포트1에서 메시지 수신
port1.on("message", (msg) => {
  console.log("워커로부터 메시지:", msg);

  // 응답 전송
  port1.postMessage("메인에서 응답");
});
// channel-worker.js
const { parentPort, workerData } = require("worker_threads");

// MessagePort 수신
parentPort.once("message", ({ port }) => {
  // 메시지 전송
  port.postMessage("워커에서 메시지 전송");

  // 메시지 수신
  port.on("message", (msg) => {
    console.log("메인으로부터 메시지:", msg);
  });
});

4.4 transferList를 통한 제로 복사 전송

// 버퍼 소유권 이전 (제로 복사)
const buffer = new ArrayBuffer(1024 * 1024 * 32); // 32MB 버퍼
worker.postMessage({ buffer }, [buffer]);

5. 워커 스레드를 활용한 CPU 집약적 작업 처리

5.1 이미지 처리 예제

// image-processor.js
const { Worker } = require("worker_threads");
const path = require("path");
const fs = require("fs");

// 이미지 처리 작업을 여러 워커로 분산
async function processImage(imagePath, outputPath, numWorkers = 4) {
  // 이미지 데이터 로드
  const imageBuffer = fs.readFileSync(imagePath);

  // 워커별 작업 분할
  const workerPromises = [];
  const chunkSize = Math.ceil(imageBuffer.length / numWorkers);

  for (let i = 0; i < numWorkers; i++) {
    const start = i * chunkSize;
    const end = Math.min(start + chunkSize, imageBuffer.length);

    // 각 워커에 대한 슬라이스 계산
    const chunk = imageBuffer.slice(start, end);

    workerPromises.push(
      runWorker("./image-worker.js", {
        chunk,
        index: i,
        total: numWorkers,
      })
    );
  }

  // 모든 워커 완료 대기
  const results = await Promise.all(workerPromises);

  // 결과 병합
  const resultBuffers = results.map((r) => r.processedChunk);
  const resultBuffer = Buffer.concat(resultBuffers);

  // 출력 파일 저장
  fs.writeFileSync(outputPath, resultBuffer);

  return {
    success: true,
    processingTime: Math.max(...results.map((r) => r.processingTime)),
  };
}

// 워커 실행 헬퍼 함수
function runWorker(workerPath, workerData) {
  return new Promise((resolve, reject) => {
    const worker = new Worker(path.resolve(workerPath), { workerData });

    worker.on("message", resolve);
    worker.on("error", reject);
    worker.on("exit", (code) => {
      if (code !== 0) reject(new Error(`워커가 코드 ${code}로 종료됨`));
    });
  });
}

// 사용 예시
async function main() {
  console.time("이미지 처리");

  try {
    const result = await processImage("./input.jpg", "./output.jpg");

    console.log("처리 완료:", result);
  } catch (err) {
    console.error("처리 오류:", err);
  }

  console.timeEnd("이미지 처리");
}

main();
// image-worker.js
const { parentPort, workerData } = require("worker_threads");
const { chunk, index, total } = workerData;

// 이미지 처리 시작 시간
const startTime = Date.now();

// CPU 집약적인 이미지 처리 시뮬레이션
function processImageChunk(buffer) {
  // 이 예제에서는 간단하게 밝기 조정을 구현
  // 실제로는 더 복잡한 알고리즘이 적용될 수 있음
  const result = Buffer.alloc(buffer.length);

  for (let i = 0; i < buffer.length; i++) {
    // 픽셀 밝기 증가 (실제 이미지 처리는 더 복잡함)
    result[i] = Math.min(buffer[i] + 50, 255);
  }

  // 처리 지연 시뮬레이션 (실제 무거운 작업)
  let sum = 0;
  for (let i = 0; i < 1000000; i++) {
    sum += Math.sqrt(i);
  }

  return result;
}

// 이미지 청크 처리
const processedChunk = processImageChunk(chunk);

// 처리 시간 계산
const processingTime = Date.now() - startTime;

// 결과 반환
parentPort.postMessage({
  index,
  processingTime,
  processedChunk, // 처리된 버퍼
});

5.2 대규모 데이터 병렬 처리

// data-processor.js
const { Worker } = require("worker_threads");
const os = require("os");

async function processLargeDataset(
  data,
  processingFn,
  numWorkers = os.cpus().length
) {
  // 데이터 청크로 분할
  const chunkSize = Math.ceil(data.length / numWorkers);
  const chunks = [];

  for (let i = 0; i < numWorkers; i++) {
    const start = i * chunkSize;
    const end = Math.min(start + chunkSize, data.length);
    chunks.push(data.slice(start, end));
  }

  // 각 청크에 대해 워커 생성
  const workerPromises = chunks.map((chunk, index) => {
    return new Promise((resolve, reject) => {
      const worker = new Worker("./data-worker.js", {
        workerData: {
          chunk,
          index,
          processingFn: processingFn.toString(), // 함수를 문자열로 전달
        },
      });

      worker.on("message", resolve);
      worker.on("error", reject);
      worker.on("exit", (code) => {
        if (code !== 0) reject(new Error(`워커가 코드 ${code}로 종료됨`));
      });
    });
  });

  // 모든 워커의 결과 수집
  const results = await Promise.all(workerPromises);

  // 결과 병합
  return [].concat(...results.map((r) => r.processedData));
}

// 사용 예시
async function main() {
  // 대규모 데이터셋 생성
  const largeDataset = Array.from({ length: 10000000 }, (_, i) => i);

  console.time("데이터 처리");

  // 각 항목을 제곱하는 처리 함수
  const processingFn = (x) => {
    // CPU 작업 시뮬레이션
    let result = 0;
    for (let i = 0; i < 1000; i++) {
      result += Math.sin(x) * Math.cos(x);
    }
    return x * x + result;
  };

  const result = await processLargeDataset(largeDataset, processingFn);

  console.log(`처리된 항목 수: ${result.length}`);
  console.log(`첫 5개 결과:`, result.slice(0, 5));

  console.timeEnd("데이터 처리");
}

main().catch(console.error);
// data-worker.js
const { parentPort, workerData } = require("worker_threads");
const { chunk, index, processingFn } = workerData;

// 문자열에서 함수 복원
const processFn = eval(`(${processingFn})`);

// 데이터 처리
const processedData = chunk.map(processFn);

// 결과 반환
parentPort.postMessage({
  index,
  processedData,
});

6. 실제 응용 사례 및 고려사항

6.1 워커 스레드 사용이 적합한 경우

  1. CPU 집약적 연산:

    • 복잡한 수학 연산, 암호화, 해시 계산
    • 대규모 데이터 정렬 및 검색
    • 인공지능/머신러닝 추론
  2. 이미지/비디오 처리:

    • 이미지 변환 및 필터 적용
    • 비디오 인코딩/디코딩
    • 이미지 객체 인식
  3. 데이터 압축/해제:

    • 대용량 파일 압축/해제
    • 데이터 마이닝 작업

6.2 주의사항 및 모범 사례

// 워커 스레드 모범 사례 예시
const { Worker } = require("worker_threads");
const os = require("os");

class WorkerManager {
  constructor(workerScript, options = {}) {
    this.workerScript = workerScript;
    this.maxWorkers = options.maxWorkers || os.cpus().length;
    this.terminateOnComplete = options.terminateOnComplete !== false;
    this.workers = new Map();
    this.taskQueue = [];
    this.activeWorkers = 0;
  }

  runTask(data) {
    return new Promise((resolve, reject) => {
      const task = { data, resolve, reject };

      if (this.activeWorkers < this.maxWorkers) {
        this.startWorkerWithTask(task);
      } else {
        this.taskQueue.push(task);
      }
    });
  }

  startWorkerWithTask(task) {
    const worker = new Worker(this.workerScript, {
      workerData: task.data,
    });

    const workerId = Date.now() + Math.random();
    this.workers.set(workerId, worker);
    this.activeWorkers++;

    worker.on("message", (result) => {
      task.resolve(result);

      if (this.terminateOnComplete) {
        worker.terminate();
        this.workers.delete(workerId);
        this.activeWorkers--;

        // 대기 중인 작업이 있으면 처리
        if (this.taskQueue.length > 0) {
          this.startWorkerWithTask(this.taskQueue.shift());
        }
      }
    });

    worker.on("error", (err) => {
      task.reject(err);
      worker.terminate();
      this.workers.delete(workerId);
      this.activeWorkers--;

      // 오류 발생 시에도 다음 작업 처리
      if (this.taskQueue.length > 0) {
        this.startWorkerWithTask(this.taskQueue.shift());
      }
    });

    worker.on("exit", (code) => {
      if (code !== 0 && !this.terminateOnComplete) {
        task.reject(new Error(`워커가 코드 ${code}로 종료됨`));
      }

      // 비정상 종료 시 관리 상태 정리
      if (this.workers.has(workerId)) {
        this.workers.delete(workerId);
        this.activeWorkers--;

        if (this.taskQueue.length > 0) {
          this.startWorkerWithTask(this.taskQueue.shift());
        }
      }
    });
  }

  terminateAll() {
    for (const worker of this.workers.values()) {
      worker.terminate();
    }

    this.workers.clear();
    this.activeWorkers = 0;
    this.taskQueue = [];
  }
}

// 사용 예시
const manager = new WorkerManager("./heavy-task.js", {
  maxWorkers: 4,
  terminateOnComplete: true,
});

async function runTasks() {
  const tasks = Array.from({ length: 20 }, (_, i) => ({
    taskId: i,
    value: Math.random() * 100,
  }));

  try {
    const results = await Promise.all(
      tasks.map((task) => manager.runTask(task))
    );

    console.log("모든 작업 완료:", results.length);
  } catch (err) {
    console.error("작업 오류:", err);
  } finally {
    manager.terminateAll();
  }
}

runTasks();

요약

Node.js의 워커 스레드는 다음과 같은 장점을 제공합니다:

  1. CPU 병렬 처리: 다중 코어 활용을 통한 성능 향상
  2. 메인 스레드 블로킹 방지: 이벤트 루프를 차단하지 않고 무거운 연산 수행
  3. 메모리 공유 옵션: SharedArrayBuffer를 통한 효율적인 데이터 공유
  4. 격리된 실행 환경: 각 워커는 독립적인 V8 인스턴스 보유

워커 스레드는 다음과 같은 상황에서 특히 유용합니다:

  • 암호화 및 해싱
  • 이미지/비디오 처리
  • 대규모 데이터셋 분석
  • 복잡한 수학 연산

하지만 다음 사항을 고려해야 합니다:

  • 워커 생성 및 통신 오버헤드
  • 추가적인 메모리 사용
  • 데이터 동기화 복잡성

적절한 사용 사례에서 워커 스레드는 Node.js 애플리케이션의 성능을 크게 향상시킬 수 있는 강력한 도구입니다.

results matching ""

    No results matching ""