Node.js 인터뷰 질문 81
질문: Node.js의 스트림(Streams)에 대해 설명하고, 대용량 데이터 처리에 어떻게 활용할 수 있는지 예제와 함께 설명해주세요.
답변:
Node.js 스트림은 데이터를 작은 청크(chunk)로 나누어 처리하는 추상화된 인터페이스로, 대용량 데이터를 효율적으로 처리할 수 있게 해줍니다. 스트림을 사용하면 전체 데이터를 메모리에 로드하지 않고도 데이터를 연속적으로 처리할 수 있어 메모리 사용량을 크게 줄일 수 있습니다.
1. 스트림의 기본 개념
Node.js에는 네 가지 기본 스트림 유형이 있습니다:
- Readable 스트림: 데이터를 읽기 위한 스트림 (예: fs.createReadStream)
- Writable 스트림: 데이터를 쓰기 위한 스트림 (예: fs.createWriteStream)
- Duplex 스트림: 읽기와 쓰기가 모두 가능한 스트림 (예: net.Socket)
- Transform 스트림: 데이터를 읽고 쓰면서 변환할 수 있는 스트림 (예: zlib.createGzip)
2. 파일 스트림 기본 사용법
2.1 파일 읽기/쓰기 스트림
const fs = require("fs");
// 읽기 스트림 생성
const readStream = fs.createReadStream("source.txt", {
encoding: "utf8",
highWaterMark: 64 * 1024, // 64KB 청크
});
// 쓰기 스트림 생성
const writeStream = fs.createWriteStream("destination.txt");
// 데이터 이벤트 처리
readStream.on("data", (chunk) => {
console.log(`${chunk.length} 바이트를 받았습니다`);
writeStream.write(chunk);
});
// 종료 이벤트 처리
readStream.on("end", () => {
writeStream.end();
console.log("읽기가 완료되었습니다");
});
// 오류 처리
readStream.on("error", (err) => {
console.error("읽기 오류:", err);
});
writeStream.on("error", (err) => {
console.error("쓰기 오류:", err);
});
writeStream.on("finish", () => {
console.log("쓰기가 완료되었습니다");
});
2.2 파이프 메서드 사용
위 코드를 pipe()
메서드를 사용하여 간소화할 수 있습니다:
const fs = require("fs");
const readStream = fs.createReadStream("source.txt");
const writeStream = fs.createWriteStream("destination.txt");
// pipe 메서드로 읽기 스트림을 쓰기 스트림에 연결
readStream.pipe(writeStream);
// 완료 이벤트 처리
writeStream.on("finish", () => {
console.log("파일 복사가 완료되었습니다");
});
// 오류 처리
readStream.on("error", (err) => {
console.error("읽기 오류:", err);
});
writeStream.on("error", (err) => {
console.error("쓰기 오류:", err);
});
3. 대용량 데이터 처리 사례
3.1 대용량 파일 압축
const fs = require("fs");
const zlib = require("zlib");
// 파일 압축 예제
function compressFile(inputPath, outputPath) {
const readStream = fs.createReadStream(inputPath);
const gzipStream = zlib.createGzip();
const writeStream = fs.createWriteStream(outputPath);
// 파이프 체인으로 연결
readStream.pipe(gzipStream).pipe(writeStream);
writeStream.on("finish", () => {
console.log(`${inputPath}를 ${outputPath}로 압축했습니다`);
});
// 오류 처리
const handleError = (err) => console.error("오류 발생:", err);
readStream.on("error", handleError);
gzipStream.on("error", handleError);
writeStream.on("error", handleError);
}
// 사용 예시
compressFile("large-file.txt", "large-file.txt.gz");
3.2 CSV 파일 변환
const fs = require("fs");
const csv = require("csv-parser");
const { Transform } = require("stream");
// CSV 파일을 JSON으로 변환하고 처리하는 예제
function processLargeCsvFile(inputPath, outputPath) {
const results = [];
// 데이터 변환을 위한 Transform 스트림 생성
const transformData = new Transform({
objectMode: true, // 객체 모드 활성화
transform(row, encoding, callback) {
// 데이터 변환 로직
const transformedRow = {
name: row.name.toUpperCase(),
age: parseInt(row.age, 10),
email: row.email,
active: row.active === "true",
};
// 변환된 데이터를 스트림으로 전달
this.push(JSON.stringify(transformedRow) + "\n");
callback();
},
});
// 스트림 연결
fs.createReadStream(inputPath)
.pipe(csv())
.pipe(transformData)
.pipe(fs.createWriteStream(outputPath))
.on("finish", () => {
console.log(`CSV 파일 처리 완료: ${outputPath}`);
})
.on("error", (err) => {
console.error("처리 중 오류 발생:", err);
});
}
// 사용 예시
processLargeCsvFile("users.csv", "transformed-users.json");
3.3 HTTP 요청 스트리밍
const http = require("http");
const fs = require("fs");
// 파일을 HTTP 응답으로 스트리밍하는 서버
const server = http.createServer((req, res) => {
if (req.url === "/download") {
// 파일 정보 얻기
const stat = fs.statSync("large-video.mp4");
// 헤더 설정
res.writeHead(200, {
"Content-Type": "video/mp4",
"Content-Length": stat.size,
});
// 파일을 응답으로 스트리밍
const fileStream = fs.createReadStream("large-video.mp4");
fileStream.pipe(res);
// 오류 처리
fileStream.on("error", (err) => {
console.error("스트리밍 오류:", err);
res.statusCode = 500;
res.end("서버 오류");
});
} else {
res.statusCode = 404;
res.end("Not Found");
}
});
server.listen(3000, () => {
console.log("서버가 포트 3000에서 실행 중입니다");
});
4. 커스텀 스트림 구현
4.1 Transform 스트림 구현
const { Transform } = require("stream");
// 텍스트 라인을 처리하는 Transform 스트림
class LineCounter extends Transform {
constructor(options = {}) {
super(options);
this.lineCount = 0;
this.lastLine = "";
}
_transform(chunk, encoding, callback) {
// 이전의 마지막 라인과 현재 청크를 합침
const data = this.lastLine + chunk.toString();
const lines = data.split("\n");
// 마지막 라인 저장 (불완전할 수 있음)
this.lastLine = lines.pop();
// 라인 수 계산
this.lineCount += lines.length;
// 라인 수가 포함된 데이터를 다음 스트림으로 전달
this.push(`현재까지 ${this.lineCount}개의 라인이 처리되었습니다.\n`);
callback();
}
_flush(callback) {
// 마지막 라인 처리
if (this.lastLine) {
this.lineCount++;
this.push(`최종 라인 수: ${this.lineCount}\n`);
}
callback();
}
}
// 사용 예시
const fs = require("fs");
const lineCounter = new LineCounter();
fs.createReadStream("large-log-file.txt")
.pipe(lineCounter)
.pipe(process.stdout);
4.2 Readable 스트림 구현
const { Readable } = require("stream");
// 숫자를 생성하는 Readable 스트림
class NumberGenerator extends Readable {
constructor(max, options = {}) {
super(options);
this.max = max;
this.current = 0;
}
_read() {
this.current++;
if (this.current <= this.max) {
// 숫자를 문자열로 변환하여 스트림에 푸시
this.push(`${this.current}\n`);
} else {
// 더 이상 데이터가 없음을 알림
this.push(null);
}
}
}
// 사용 예시
const numberGenerator = new NumberGenerator(1000000);
const writeStream = fs.createWriteStream("numbers.txt");
numberGenerator.pipe(writeStream);
writeStream.on("finish", () => {
console.log("백만 개의 숫자가 생성되었습니다");
});
5. 스트림 백프레셔(Backpressure) 처리
백프레셔는 쓰기 스트림이 읽기 스트림보다 느릴 때 발생하는 현상으로, 이를 적절히 처리하지 않으면 메모리 사용량이 증가할 수 있습니다.
const fs = require("fs");
// 읽기 및 쓰기 스트림 생성
const readStream = fs.createReadStream("large-file.txt");
const writeStream = fs.createWriteStream("destination.txt");
// 백프레셔를 고려한 데이터 처리
readStream.on("data", (chunk) => {
// write() 메서드는 쓰기 버퍼 상태를 나타내는 불리언 값을 반환
const canContinue = writeStream.write(chunk);
// 버퍼가 가득 차면 읽기를 일시 중지
if (!canContinue) {
console.log("백프레셔: 읽기 일시 중지");
readStream.pause();
}
});
// 쓰기 버퍼가 비워지면 읽기 재개
writeStream.on("drain", () => {
console.log("백프레셔: 읽기 재개");
readStream.resume();
});
// 종료 처리
readStream.on("end", () => {
writeStream.end();
});
6. 스트림 오류 처리 패턴
스트림 작업에서는 적절한 오류 처리가 중요합니다:
const fs = require("fs");
const { pipeline } = require("stream");
const zlib = require("zlib");
// pipeline API를 사용한 오류 처리 (Node.js 10 이상)
function compressFileWithPipeline(input, output) {
pipeline(
fs.createReadStream(input),
zlib.createGzip(),
fs.createWriteStream(output),
(err) => {
if (err) {
console.error("파이프라인 오류:", err);
} else {
console.log("압축이 성공적으로 완료되었습니다");
}
}
);
}
// 사용 예시
compressFileWithPipeline("large-file.txt", "large-file.txt.gz");
// 여러 스트림 연결 시 오류 처리 (pipeline 사용 이전)
function compressFileManualErrorHandling(input, output) {
const readStream = fs.createReadStream(input);
const gzipStream = zlib.createGzip();
const writeStream = fs.createWriteStream(output);
// 각 스트림에 오류 리스너 추가
let hadError = false;
const handleError = (err) => {
hadError = true;
// 열린 스트림 정리
readStream.destroy();
gzipStream.destroy();
writeStream.destroy();
// 불완전한 출력 파일 삭제
fs.unlink(output, () => {
console.error("압축 중 오류 발생:", err);
});
};
readStream.on("error", handleError);
gzipStream.on("error", handleError);
writeStream.on("error", handleError);
// 스트림 연결
readStream
.pipe(gzipStream)
.pipe(writeStream)
.on("finish", () => {
if (!hadError) {
console.log("압축이 성공적으로 완료되었습니다");
}
});
}
7. 스트림 성능 최적화 팁
7.1 버퍼 크기 조정
const fs = require("fs");
// 버퍼 크기 최적화
const readStream = fs.createReadStream("large-file.txt", {
highWaterMark: 256 * 1024, // 256KB (기본값 64KB)
});
const writeStream = fs.createWriteStream("destination.txt", {
highWaterMark: 256 * 1024,
});
readStream.pipe(writeStream);
7.2 객체 모드 스트림 최적화
const { Transform } = require("stream");
const fs = require("fs");
const JSONStream = require("JSONStream");
// 대량의 JSON 데이터 처리
function processLargeJson(inputPath) {
const jsonParser = JSONStream.parse("*"); // 배열 항목마다 이벤트 발생
const processor = new Transform({
objectMode: true,
transform(obj, encoding, callback) {
// 가능한 가벼운 연산 수행
const result = {
id: obj.id,
processed: true,
timestamp: Date.now(),
};
callback(null, result);
},
});
const stringifier = JSONStream.stringify();
fs.createReadStream(inputPath)
.pipe(jsonParser)
.pipe(processor)
.pipe(stringifier)
.pipe(process.stdout);
}
요약
Node.js 스트림의 주요 이점:
- 메모리 효율성: 전체 데이터를 한 번에 메모리에 로드하지 않고 청크 단위로 처리
- 시간 효율성: 전체 데이터가 준비되기 전에 처리 시작 가능
- 조합 가능성: 여러 스트림을 파이프로 연결하여 복잡한 데이터 처리 파이프라인 구성
- 백프레셔 처리: 데이터 생산과 소비 속도 차이를 자동으로 관리
스트림은 다음과 같은 상황에서 특히 유용합니다:
- 대용량 파일 처리
- 네트워크 통신
- 데이터 변환 파이프라인
- 실시간 데이터 처리
Node.js의 스트림 API를 효과적으로 활용하면 메모리 사용량을 최소화하면서 대용량 데이터를 효율적으로 처리할 수 있습니다.