Node.js 인터뷰 질문 81

질문: Node.js의 스트림(Streams)에 대해 설명하고, 대용량 데이터 처리에 어떻게 활용할 수 있는지 예제와 함께 설명해주세요.

답변:

Node.js 스트림은 데이터를 작은 청크(chunk)로 나누어 처리하는 추상화된 인터페이스로, 대용량 데이터를 효율적으로 처리할 수 있게 해줍니다. 스트림을 사용하면 전체 데이터를 메모리에 로드하지 않고도 데이터를 연속적으로 처리할 수 있어 메모리 사용량을 크게 줄일 수 있습니다.

1. 스트림의 기본 개념

Node.js에는 네 가지 기본 스트림 유형이 있습니다:

  1. Readable 스트림: 데이터를 읽기 위한 스트림 (예: fs.createReadStream)
  2. Writable 스트림: 데이터를 쓰기 위한 스트림 (예: fs.createWriteStream)
  3. Duplex 스트림: 읽기와 쓰기가 모두 가능한 스트림 (예: net.Socket)
  4. Transform 스트림: 데이터를 읽고 쓰면서 변환할 수 있는 스트림 (예: zlib.createGzip)

2. 파일 스트림 기본 사용법

2.1 파일 읽기/쓰기 스트림

const fs = require("fs");

// 읽기 스트림 생성
const readStream = fs.createReadStream("source.txt", {
  encoding: "utf8",
  highWaterMark: 64 * 1024, // 64KB 청크
});

// 쓰기 스트림 생성
const writeStream = fs.createWriteStream("destination.txt");

// 데이터 이벤트 처리
readStream.on("data", (chunk) => {
  console.log(`${chunk.length} 바이트를 받았습니다`);
  writeStream.write(chunk);
});

// 종료 이벤트 처리
readStream.on("end", () => {
  writeStream.end();
  console.log("읽기가 완료되었습니다");
});

// 오류 처리
readStream.on("error", (err) => {
  console.error("읽기 오류:", err);
});

writeStream.on("error", (err) => {
  console.error("쓰기 오류:", err);
});

writeStream.on("finish", () => {
  console.log("쓰기가 완료되었습니다");
});

2.2 파이프 메서드 사용

위 코드를 pipe() 메서드를 사용하여 간소화할 수 있습니다:

const fs = require("fs");

const readStream = fs.createReadStream("source.txt");
const writeStream = fs.createWriteStream("destination.txt");

// pipe 메서드로 읽기 스트림을 쓰기 스트림에 연결
readStream.pipe(writeStream);

// 완료 이벤트 처리
writeStream.on("finish", () => {
  console.log("파일 복사가 완료되었습니다");
});

// 오류 처리
readStream.on("error", (err) => {
  console.error("읽기 오류:", err);
});

writeStream.on("error", (err) => {
  console.error("쓰기 오류:", err);
});

3. 대용량 데이터 처리 사례

3.1 대용량 파일 압축

const fs = require("fs");
const zlib = require("zlib");

// 파일 압축 예제
function compressFile(inputPath, outputPath) {
  const readStream = fs.createReadStream(inputPath);
  const gzipStream = zlib.createGzip();
  const writeStream = fs.createWriteStream(outputPath);

  // 파이프 체인으로 연결
  readStream.pipe(gzipStream).pipe(writeStream);

  writeStream.on("finish", () => {
    console.log(`${inputPath}${outputPath}로 압축했습니다`);
  });

  // 오류 처리
  const handleError = (err) => console.error("오류 발생:", err);
  readStream.on("error", handleError);
  gzipStream.on("error", handleError);
  writeStream.on("error", handleError);
}

// 사용 예시
compressFile("large-file.txt", "large-file.txt.gz");

3.2 CSV 파일 변환

const fs = require("fs");
const csv = require("csv-parser");
const { Transform } = require("stream");

// CSV 파일을 JSON으로 변환하고 처리하는 예제
function processLargeCsvFile(inputPath, outputPath) {
  const results = [];

  // 데이터 변환을 위한 Transform 스트림 생성
  const transformData = new Transform({
    objectMode: true, // 객체 모드 활성화
    transform(row, encoding, callback) {
      // 데이터 변환 로직
      const transformedRow = {
        name: row.name.toUpperCase(),
        age: parseInt(row.age, 10),
        email: row.email,
        active: row.active === "true",
      };

      // 변환된 데이터를 스트림으로 전달
      this.push(JSON.stringify(transformedRow) + "\n");
      callback();
    },
  });

  // 스트림 연결
  fs.createReadStream(inputPath)
    .pipe(csv())
    .pipe(transformData)
    .pipe(fs.createWriteStream(outputPath))
    .on("finish", () => {
      console.log(`CSV 파일 처리 완료: ${outputPath}`);
    })
    .on("error", (err) => {
      console.error("처리 중 오류 발생:", err);
    });
}

// 사용 예시
processLargeCsvFile("users.csv", "transformed-users.json");

3.3 HTTP 요청 스트리밍

const http = require("http");
const fs = require("fs");

// 파일을 HTTP 응답으로 스트리밍하는 서버
const server = http.createServer((req, res) => {
  if (req.url === "/download") {
    // 파일 정보 얻기
    const stat = fs.statSync("large-video.mp4");

    // 헤더 설정
    res.writeHead(200, {
      "Content-Type": "video/mp4",
      "Content-Length": stat.size,
    });

    // 파일을 응답으로 스트리밍
    const fileStream = fs.createReadStream("large-video.mp4");
    fileStream.pipe(res);

    // 오류 처리
    fileStream.on("error", (err) => {
      console.error("스트리밍 오류:", err);
      res.statusCode = 500;
      res.end("서버 오류");
    });
  } else {
    res.statusCode = 404;
    res.end("Not Found");
  }
});

server.listen(3000, () => {
  console.log("서버가 포트 3000에서 실행 중입니다");
});

4. 커스텀 스트림 구현

4.1 Transform 스트림 구현

const { Transform } = require("stream");

// 텍스트 라인을 처리하는 Transform 스트림
class LineCounter extends Transform {
  constructor(options = {}) {
    super(options);
    this.lineCount = 0;
    this.lastLine = "";
  }

  _transform(chunk, encoding, callback) {
    // 이전의 마지막 라인과 현재 청크를 합침
    const data = this.lastLine + chunk.toString();
    const lines = data.split("\n");

    // 마지막 라인 저장 (불완전할 수 있음)
    this.lastLine = lines.pop();

    // 라인 수 계산
    this.lineCount += lines.length;

    // 라인 수가 포함된 데이터를 다음 스트림으로 전달
    this.push(`현재까지 ${this.lineCount}개의 라인이 처리되었습니다.\n`);

    callback();
  }

  _flush(callback) {
    // 마지막 라인 처리
    if (this.lastLine) {
      this.lineCount++;
      this.push(`최종 라인 수: ${this.lineCount}\n`);
    }
    callback();
  }
}

// 사용 예시
const fs = require("fs");
const lineCounter = new LineCounter();

fs.createReadStream("large-log-file.txt")
  .pipe(lineCounter)
  .pipe(process.stdout);

4.2 Readable 스트림 구현

const { Readable } = require("stream");

// 숫자를 생성하는 Readable 스트림
class NumberGenerator extends Readable {
  constructor(max, options = {}) {
    super(options);
    this.max = max;
    this.current = 0;
  }

  _read() {
    this.current++;

    if (this.current <= this.max) {
      // 숫자를 문자열로 변환하여 스트림에 푸시
      this.push(`${this.current}\n`);
    } else {
      // 더 이상 데이터가 없음을 알림
      this.push(null);
    }
  }
}

// 사용 예시
const numberGenerator = new NumberGenerator(1000000);
const writeStream = fs.createWriteStream("numbers.txt");

numberGenerator.pipe(writeStream);

writeStream.on("finish", () => {
  console.log("백만 개의 숫자가 생성되었습니다");
});

5. 스트림 백프레셔(Backpressure) 처리

백프레셔는 쓰기 스트림이 읽기 스트림보다 느릴 때 발생하는 현상으로, 이를 적절히 처리하지 않으면 메모리 사용량이 증가할 수 있습니다.

const fs = require("fs");

// 읽기 및 쓰기 스트림 생성
const readStream = fs.createReadStream("large-file.txt");
const writeStream = fs.createWriteStream("destination.txt");

// 백프레셔를 고려한 데이터 처리
readStream.on("data", (chunk) => {
  // write() 메서드는 쓰기 버퍼 상태를 나타내는 불리언 값을 반환
  const canContinue = writeStream.write(chunk);

  // 버퍼가 가득 차면 읽기를 일시 중지
  if (!canContinue) {
    console.log("백프레셔: 읽기 일시 중지");
    readStream.pause();
  }
});

// 쓰기 버퍼가 비워지면 읽기 재개
writeStream.on("drain", () => {
  console.log("백프레셔: 읽기 재개");
  readStream.resume();
});

// 종료 처리
readStream.on("end", () => {
  writeStream.end();
});

6. 스트림 오류 처리 패턴

스트림 작업에서는 적절한 오류 처리가 중요합니다:

const fs = require("fs");
const { pipeline } = require("stream");
const zlib = require("zlib");

// pipeline API를 사용한 오류 처리 (Node.js 10 이상)
function compressFileWithPipeline(input, output) {
  pipeline(
    fs.createReadStream(input),
    zlib.createGzip(),
    fs.createWriteStream(output),
    (err) => {
      if (err) {
        console.error("파이프라인 오류:", err);
      } else {
        console.log("압축이 성공적으로 완료되었습니다");
      }
    }
  );
}

// 사용 예시
compressFileWithPipeline("large-file.txt", "large-file.txt.gz");

// 여러 스트림 연결 시 오류 처리 (pipeline 사용 이전)
function compressFileManualErrorHandling(input, output) {
  const readStream = fs.createReadStream(input);
  const gzipStream = zlib.createGzip();
  const writeStream = fs.createWriteStream(output);

  // 각 스트림에 오류 리스너 추가
  let hadError = false;
  const handleError = (err) => {
    hadError = true;

    // 열린 스트림 정리
    readStream.destroy();
    gzipStream.destroy();
    writeStream.destroy();

    // 불완전한 출력 파일 삭제
    fs.unlink(output, () => {
      console.error("압축 중 오류 발생:", err);
    });
  };

  readStream.on("error", handleError);
  gzipStream.on("error", handleError);
  writeStream.on("error", handleError);

  // 스트림 연결
  readStream
    .pipe(gzipStream)
    .pipe(writeStream)
    .on("finish", () => {
      if (!hadError) {
        console.log("압축이 성공적으로 완료되었습니다");
      }
    });
}

7. 스트림 성능 최적화 팁

7.1 버퍼 크기 조정

const fs = require("fs");

// 버퍼 크기 최적화
const readStream = fs.createReadStream("large-file.txt", {
  highWaterMark: 256 * 1024, // 256KB (기본값 64KB)
});

const writeStream = fs.createWriteStream("destination.txt", {
  highWaterMark: 256 * 1024,
});

readStream.pipe(writeStream);

7.2 객체 모드 스트림 최적화

const { Transform } = require("stream");
const fs = require("fs");
const JSONStream = require("JSONStream");

// 대량의 JSON 데이터 처리
function processLargeJson(inputPath) {
  const jsonParser = JSONStream.parse("*"); // 배열 항목마다 이벤트 발생

  const processor = new Transform({
    objectMode: true,
    transform(obj, encoding, callback) {
      // 가능한 가벼운 연산 수행
      const result = {
        id: obj.id,
        processed: true,
        timestamp: Date.now(),
      };

      callback(null, result);
    },
  });

  const stringifier = JSONStream.stringify();

  fs.createReadStream(inputPath)
    .pipe(jsonParser)
    .pipe(processor)
    .pipe(stringifier)
    .pipe(process.stdout);
}

요약

Node.js 스트림의 주요 이점:

  1. 메모리 효율성: 전체 데이터를 한 번에 메모리에 로드하지 않고 청크 단위로 처리
  2. 시간 효율성: 전체 데이터가 준비되기 전에 처리 시작 가능
  3. 조합 가능성: 여러 스트림을 파이프로 연결하여 복잡한 데이터 처리 파이프라인 구성
  4. 백프레셔 처리: 데이터 생산과 소비 속도 차이를 자동으로 관리

스트림은 다음과 같은 상황에서 특히 유용합니다:

  • 대용량 파일 처리
  • 네트워크 통신
  • 데이터 변환 파이프라인
  • 실시간 데이터 처리

Node.js의 스트림 API를 효과적으로 활용하면 메모리 사용량을 최소화하면서 대용량 데이터를 효율적으로 처리할 수 있습니다.

results matching ""

    No results matching ""