Node.js Internals
Streams: Потоковая обработка данных
В 2019 году Netflix столкнулись с проблемой: пользователи жаловались на вылеты приложения при стриминге 4K видео. Проблема оказалась не в кодеках, а в том, что сервер загружал весь файл в память перед отправкой. Решение? Streams. Память упала с 8GB на 64KB, лаги исчезли.
- **Video streaming (YouTube, Netflix):** Отправка 2GB фильма клиенту без загрузки в память - стримим по 64KB chunks. Backpressure останавливает чтение если клиент на медленном WiFi
- **Log processing (Datadog, Splunk):** Парсинг 50GB логов в реальном времени - readline + Transform streams. Память фиксирована на 16KB, несмотря на размер файла
- **File uploads (AWS S3, Google Cloud):** Загрузка пользовательских файлов напрямую в S3 через multipart upload - стримим без сохранения на диск. Экономия: 0 байт диска, мгновенная загрузка
Зачем нужны стримы
Ситуация: нужно обработать видеофайл на 2GB. Можно загрузить его целиком в память - и получить крах приложения. Или читать по кусочкам, обрабатывая данные **по мере поступления**. Это и есть streams.
**Streams** - это абстракция для работы с данными, которые поступают **постепенно**. Вместо загрузки всего файла в RAM, чтение идёт по 64KB, обработка - по мере поступления, и так до конца.
**Память vs Streams:** Загрузить 2GB файл целиком - 2GB RAM. Прочитать стримом - 64KB буфер. Разница в **30,000 раз**.
**Когда нужны streams:** - Файлы больше 100MB (логи, видео, базы данных) - HTTP запросы/ответы (тело запроса может быть огромным) - Сетевые протоколы (TCP, WebSockets) - Трансформация данных на лету (сжатие, шифрование) - Бесконечные потоки (чат, метрики, IoT датчики)
Дан файл логов на 5GB. Нужно посчитать количество строк с ошибками. Какой подход лучше?
Readable Streams
**Readable stream** - источник данных, откуда происходит **чтение**. Это может быть файл, HTTP запрос, stdin, TCP соединение. Readable stream **пушит** данные в код приложения кусками (chunks).
**Два режима Readable stream:** - **Flowing mode** - данные автоматически поступают через `data` event - **Paused mode** - `.read()` вызывается явно, чтобы получить chunk
**Важные события Readable stream:** - `data` - пришёл новый chunk (переключает в flowing mode) - `end` - данные закончились - `error` - произошла ошибка - `readable` - данные доступны для `.read()` (paused mode) - `pause` / `resume` - управление потоком
HTTP request body читается через readable stream. Клиент отправляет 10MB JSON. Что произойдёт при простом stream.on('data', chunk => buffer += chunk)?
Writable Streams
**Writable stream** - приёмник данных, куда идёт **запись**. Это может быть файл, HTTP response, stdout, TCP socket. Writable stream **принимает** chunks и обрабатывает их (записывает на диск, отправляет по сети).
**Критический момент:** `.write()` возвращает `false`, когда внутренний буфер переполнен. Если игнорировать - память раздуется. Правильно: остановиться и ждать события `drain`.
**Важные события Writable stream:** - `drain` - буфер опустел, можно продолжать писать - `finish` - все данные записаны (вызвали `.end()`) - `error` - произошла ошибка записи - `pipe` / `unpipe` - readable stream подключился/отключился
Запись миллиона строк в файл через writable stream. Код: for (let i = 0; i < 1e6; i++) stream.write(data). Что произойдёт?
Duplex и Transform Streams
**Duplex stream** - это readable и writable одновременно. Например, TCP socket: можно писать данные (`socket.write()`) и читать (`socket.on('data')`).
**Transform stream** - особый вид duplex, который **трансформирует** данные: читает chunk, обрабатывает, пушит результат. Примеры: `zlib.createGzip()` (сжатие), `crypto.createCipher()` (шифрование), `csv-parser` (парсинг CSV).
**Transform vs Duplex:** Transform всегда связывает чтение и запись (что читаем - то и трансформируем). Duplex может работать независимо (TCP socket: отправляем запрос, получаем ответ).
Создаётся Transform stream для парсинга CSV. В _transform() пришёл chunk '"Name","Age"\n"Alice'. Что нужно сделать?
Pipe и Pipeline
**`.pipe()`** - метод соединения стримов: `readable.pipe(writable)`. Автоматически управляет backpressure: если writable медленный, readable приостанавливается.
**Проблема `.pipe()`:** не обрабатывает ошибки правильно. Если один стрим упал, остальные могут остаться висеть. **Решение:** `stream.pipeline()` из модуля `stream/promises` - автоматически закрывает все стримы при ошибке.
**Правило:** Используйте `pipeline()` вместо `.pipe()` для production кода. Pipeline гарантирует: ошибка в любом стриме → все стримы закрываются, утечек памяти нет.
**Чем `.pipe()` отличается от `pipeline()`:**
| `.pipe()` | `pipeline()` |
|---|---|
| Ручная обработка ошибок для каждого стрима | Автоматическая обработка ошибок |
| Стримы могут остаться висеть при ошибке | Все стримы закрываются при ошибке |
| Утечки дескрипторов файлов | Гарантия очистки ресурсов |
| Legacy API | Современный Promise-based API |
Дан код: fs.createReadStream('in.txt').pipe(gzip).pipe(fs.createWriteStream('out.gz')). В середине обработки диск заполнился, createWriteStream выбросил ENOSPC. Что произойдёт?
Backpressure и управление потоком
**Backpressure** - механизм контроля скорости данных. Когда приёмник (writable stream) не успевает обрабатывать данные, он сигнализирует источнику (readable stream): "Стоп, я перегружен, дай мне передохнуть".
**Почему backpressure критичен:** Без него producer может генерировать данные быстрее, чем consumer обрабатывает → буфер раздувается → out of memory → крах приложения.
**Как работает backpressure:** 1. Writable stream имеет `highWaterMark` (например, 16KB) 2. Когда буфер заполнен выше highWaterMark, `.write()` возвращает `false` 3. Readable stream видит `false` → вызывает `.pause()` на себе 4. Когда writable stream обработал данные → эмитит `drain` event 5. Readable stream слышит `drain` → вызывает `.resume()`
**Признаки проблем с backpressure:** - Память приложения растёт со временем (утечка в буферах) - Out of memory крэши при обработке больших файлов - Высокая latency в HTTP стриминге - Лог ошибок: 'write after end', 'stream destroyed' **Как диагностировать:** ```typescript const stream = createWriteStream('file.txt'); console.log(stream.writableHighWaterMark); // Лимит буфера console.log(stream.writableLength); // Текущий размер буфера if (stream.writableLength > stream.writableHighWaterMark) { console.warn('⚠️ Буфер переполнен!'); } ```
Streams автоматически управляют памятью - можно не думать о backpressure
Streams дают ИНСТРУМЕНТЫ для управления памятью (pause/resume, drain event), но их нужно ИСПОЛЬЗОВАТЬ. Без этого streams не лучше readFile()
Backpressure - это контракт между readable и writable стримами. Использование .on('data') без проверки .write() или игнорирование pipeline() ломает этот контракт. Streams не могут автоматически остановить пользовательский код
Сервер стримит 4K видео клиентам через HTTP. Клиент на медленном 3G соединении. Код: fs.createReadStream('video.mp4').on('data', chunk => res.write(chunk)). Что произойдёт?
Ключевые идеи
- **Streams = обработка данных по кусочкам:** Вместо загрузки 2GB файла в память, читаем по 64KB - память фиксирована, производительность не зависит от размера данных
- **4 типа стримов:** Readable (источник), Writable (приёмник), Duplex (оба сразу, например TCP socket), Transform (читает, трансформирует, пишет - например gzip)
- **Backpressure - защита от out of memory:** Когда writable stream перегружен, он возвращает false из write() → readable stream должен вызвать pause() и ждать drain event. Без этого буфер раздуется
- **pipeline() > .pipe():** pipeline() автоматически управляет ошибками и закрывает все стримы при сбое. .pipe() оставляет стримы висеть - утечки дескрипторов и памяти
- **Transform streams для трансформаций:** Сжатие (gzip), шифрование (crypto), парсинг (CSV, JSON) - всё через Transform. Накапливайте неполные chunks в буфере, обрабатывайте завершённые данные
- **Производственные баги:** Игнорирование backpressure (res.write() без проверки), накопление данных в .on('data') (потеря эффективности), отсутствие обработки ошибок (.pipe() без .on('error'))
Связанные темы
Streams - фундамент асинхронной обработки данных в Node.js. Они используются повсеместно:
- Event Loop — Streams работают через события (data, drain, error) → обрабатываются в event loop. Блокирующий код в _transform() заблокирует весь event loop
- Buffer — Chunks в streams - это Buffer объекты. highWaterMark определяет размер Buffer. Понимание Buffer критично для оптимизации streams
- Worker Threads — CPU-интенсивная трансформация (видео кодирование, ML) должна быть в Worker Thread, чтобы не блокировать event loop. Streams можно передавать между потоками через MessagePort
Вопросы для размышления
- Почему pipeline() безопаснее .pipe() для production кода? Что произойдёт если один из стримов упадёт?
- Как реализовать Transform stream для парсинга CSV, если chunks могут разрываться посередине строки?
- В чём разница между Duplex и Transform? Когда использовать каждый из них?