Node.js Internals

EventEmitter: Паттерн событий Node.js

Почему Socket.io может обрабатывать миллионы WebSocket соединений на одном сервере? Как Express middleware знает, когда вызвать next(), не блокируя другие запросы? Секрет в EventEmitter - паттерне, который превращает Node.js из просто JavaScript runtime в мощную платформу для real-time и event-driven приложений.

  • **Socket.io** использует EventEmitter для каждого WebSocket соединения. События 'message', 'disconnect', 'error' позволяют обрабатывать тысячи соединений через единый API. Под капотом - тысячи EventEmitter объектов, каждый эмитит события независимо.
  • **Bull (job queue)** построен на EventEmitter + Redis. События 'completed', 'failed', 'progress' позволяют отслеживать миллионы фоновых задач. Каждый job - EventEmitter, каждая очередь - EventEmitter, вся архитектура event-driven.
  • **Node.js HTTP server** - это EventEmitter. Каждый request, connection, upgrade (WebSocket) - события. Это позволяет фреймворкам (Express, Fastify, NestJS) строить middleware архитектуры без изменения core кода сервера.

Observer Pattern

Аналогия с редакцией газеты: когда выходит новый выпуск, редактору не нужно обзванивать каждого подписчика - подписчики сами получают газету. Редактор просто публикует (emit), подписчики получают (listen). Это **Observer pattern**, и именно на нём построен EventEmitter - основа асинхронности в Node.js.

**EventEmitter** - это встроенный класс Node.js, реализующий паттерн событий (pub/sub). Он лежит в основе всех асинхронных API: Streams (data, end, error), HTTP (request, close), net (connection), child_process (exit) и многих других. Понимание EventEmitter критично, потому что разработчик работает с ним даже не осознавая - каждый раз при использовании `.on('data', ...)` или `.once('end', ...)`.

**Почему EventEmitter, а не callbacks?** Callback подходит для одноразовых операций (fs.readFile). Но когда событие может произойти многократно (каждый HTTP-запрос, каждый чанк данных в stream), нужен механизм подписки. EventEmitter позволяет множеству слушателей реагировать на одно событие, не зная друг о друге (loose coupling).

EventEmitter под капотом Node.js

**HTTP Server:** ```typescript const server = http.createServer(); server.on('request', handler); // EventEmitter! ``` Server наследуется от EventEmitter. Когда приходит TCP-соединение, libuv уведомляет Node.js → HTTP парсер обрабатывает headers → server.emit('request', req, res). **Readable Stream:** ```typescript stream.on('data', chunk => {}); // EventEmitter! stream.on('end', () => {}); // EventEmitter! ``` Stream читает данные из файла/сети чанками → emit('data', chunk) для каждого чанка → emit('end') при завершении. **Child Process:** ```typescript const child = spawn('ls'); child.on('exit', code => {}); // EventEmitter! ``` Процесс завершился → libuv получил SIGCHLD → emit('exit', code). Всё это работает через EventEmitter!

**КРИТИЧЕСКОЕ отличие от браузерных событий:** В браузере `addEventListener` выполняет handlers асинхронно через event loop. В Node.js `emitter.emit()` вызывает handlers **СИНХРОННО** в порядке регистрации. Это означает, что если handler бросит исключение, оно всплывёт к месту вызова emit().

**Когда использовать EventEmitter vs Promise:** **EventEmitter:** - Множественные события (stream data чанки, websocket сообщения) - Неизвестное количество событий (HTTP server может получить 0 или миллион запросов) - Разделение логики (один emit, много независимых handlers) **Promise:** - Одноразовая операция (fetch данных, запись файла) - Известный результат (success/error) - Цепочки трансформаций (then/catch)

Дан код: ```typescript const emitter = new EventEmitter(); emitter.on('test', () => console.log('A')); emitter.emit('test'); console.log('B'); ``` Какой порядок вывода?

EventEmitter API

EventEmitter предоставляет компактный, но богатый API для работы с событиями. Основные методы: **on** (подписка), **emit** (публикация), **once** (одноразовая подписка), **off/removeListener** (отписка). Но есть критически важные детали, которые многие пропускают: **error событие** (единственное обязательное), **maxListeners** (защита от утечек), **eventNames/listenerCount** (интроспекция).

**КРИТИЧЕСКИ ВАЖНО: error событие** - единственное специальное событие в EventEmitter. При вызове `emit('error', err)` без подписчиков на 'error' Node.js бросит исключение и крашнет процесс! Это сделано намеренно - необработанная ошибка в асинхронном коде должна быть заметна.

**maxListeners - защита от memory leaks:** По умолчанию EventEmitter предупреждает, если на одно событие подписано больше 10 listeners. Это не ограничение, а warning - часто это признак утечки памяти (забыли removeListener в цикле). Можно изменить через `setMaxListeners(n)` или `EventEmitter.defaultMaxListeners = n`.

Реальный баг: prependListener для порядка выполнения

```typescript // Библиотека, которая валидирует данные class Validator extends EventEmitter { validate(data: unknown) { // Сначала базовая валидация if (!data) return this.emit('error', new Error('Empty data')); // Затем emit для кастомных проверок this.emit('validate', data); } } const validator = new Validator(); // Пользователь добавляет свою проверку validator.on('validate', (data) => { console.log('User validation:', data); }); // Нужно добавить ПРИОРИТЕТНУЮ проверку, // которая должна выполниться ПЕРВОЙ validator.prependListener('validate', (data) => { console.log('Priority check:', data); }); validator.validate({ foo: 'bar' }); // Вывод: // Priority check: { foo: 'bar' } ← prependListener выполнился первым // User validation: { foo: 'bar' } ``` **prependListener/prependOnceListener** добавляют handler в НАЧАЛО очереди, а не в конец. Это полезно для framework-уровневой логики, которая должна выполниться до пользовательских handlers.

**Лучшие практики работы с EventEmitter API:** 1. **ВСЕГДА** подписка на 'error', даже просто для логирования 2. **once** - для одноразовых событий (ready, close) 3. **Удаление listeners** через off/removeListener, когда они больше не нужны 4. Хранить **ссылки на handlers** при планируемом удалении (анонимные функции не подходят) 5. **maxListeners** как canary - если warning появился, искать утечку 6. emit() **синхронный** - тяжёлые операции в handlers блокируют Event Loop

Что произойдёт при выполнении этого кода? ```typescript const emitter = new EventEmitter(); emitter.emit('error', new Error('Oops')); console.log('After emit'); ```

Паттерны использования

EventEmitter - не просто API, это строительный блок для event-driven архитектур. Три ключевых паттерна: **(1) Наследование от EventEmitter** для создания событийных классов, **(2) TypeScript типизация событий** для type safety, **(3) Async обработка событий** для неблокирующих handlers. Эти паттерны используются во всех major Node.js фреймворках.

**Альтернатива наследованию - composition:** Вместо `extends EventEmitter` можно создать приватное поле `private events = new EventEmitter()`. Это предпочтительнее, если класс уже наследуется от другого или нужно скрыть некоторые методы EventEmitter от публичного API.

Реальный паттерн: typed-emitter библиотека

Для более удобной типизации используйте библиотеку `typed-emitter`: ```typescript import { EventEmitter } from 'events'; import TypedEmitter from 'typed-emitter'; type DatabaseEvents = { connected: () => void; query: (sql: string, duration: number) => void; error: (error: Error) => void; }; // Просто приводим к типу TypedEmitter class Database extends (EventEmitter as new () => TypedEmitter<DatabaseEvents>) { // Теперь все методы типизированы автоматически! connect() { this.emit('connected'); // ✅ } } const db = new Database(); db.on('query', (sql, duration) => { // sql и duration автоматически типизированы }); ``` Это паттерн используется в TypeORM, NestJS, Socket.io и других библиотеках.

**ОПАСНОСТЬ: emit() НЕ ЖДЁТ async handlers!** Если listener асинхронный, emit() вернёт управление немедленно, не дожидаясь завершения. Это может привести к race conditions: ```typescript emitter.on('data', async (data) => { await db.save(data); // 100ms }); emitter.emit('data', { foo: 'bar' }); process.exit(); // ❌ БД не сохранила данные! ``` Решение - emitAsync() паттерн или events.once() Promise wrapper.

**Выбор паттерна по use case:** **Наследование (extends EventEmitter):** - Класс является источником событий (Stream, Server, Database) - Нужен полный API EventEmitter в публичном интерфейсе **Composition (приватный EventEmitter):** - Класс использует события внутренне - Нужен контроль над тем, какие методы доступны (только on/off, без emit) **TypeScript типизация:** - Средние и большие проекты (ловит опечатки в event names) - Публичные библиотеки (лучший DX для пользователей) **emitAsync() паттерн:** - Handlers выполняют I/O операции (БД, API) - Нужно дождаться завершения всех handlers перед продолжением

Есть класс Logger extends EventEmitter с async handlers: ```typescript logger.on('log', async (msg) => { await db.insert({ message: msg }); // 50ms }); logger.emit('log', 'Test'); logger.emit('log', 'Test2'); ``` Что произойдёт?

Memory Leaks

Memory leaks в EventEmitter - одна из самых частых причин роста памяти в Node.js приложениях. Проблема простая: подписка на события через `.on()` без отписки через `.off()`. Listeners остаются в памяти, даже когда объект, который их создал, больше не нужен. Через несколько часов работы - гигабайты утекшей памяти.

**Почему это утечка?** JavaScript GC собирает объекты, на которые нет ссылок. Но EventEmitter ХРАНИТ ссылку на каждый listener в массиве `_events['broadcast']`. Listener (стрелочная функция) замыкается на `this`, т.е. на UserSession. Пока listener жив - UserSession не может быть собран GC.

**КРИТИЧЕСКАЯ ОШИБКА: удаление анонимной функции не работает!** ```typescript // ❌ НЕ РАБОТАЕТ: server.on('data', (chunk) => this.handle(chunk)); server.off('data', (chunk) => this.handle(chunk)); // Новая функция! // Каждая стрелочная функция - НОВЫЙ объект. // .off() ищет listener по ссылке, не находит → не удаляет. ``` Всегда сохраняйте ссылку на handler, если планируете его удалять.

Реальный баг: WeakMap для auto-cleanup

```typescript // Продвинутый паттерн: автоматическая отписка через WeakMap class SmartEmitter extends EventEmitter { // Храним cleanup функции для каждого объекта private cleanups = new WeakMap<object, Set<() => void>>(); // Подписка с автоматической отпиской при GC объекта onFor(owner: object, event: string, listener: Function) { this.on(event, listener); // Сохраняем cleanup if (!this.cleanups.has(owner)) { this.cleanups.set(owner, new Set()); } this.cleanups.get(owner)!.add(() => { this.off(event, listener); }); } // Явная отписка для объекта offFor(owner: object) { const cleanups = this.cleanups.get(owner); if (cleanups) { cleanups.forEach(cleanup => cleanup()); this.cleanups.delete(owner); } } } const emitter = new SmartEmitter(); class Session { constructor() { emitter.onFor(this, 'data', this.handle.bind(this)); } handle(data: unknown) {} destroy() { emitter.offFor(this); // Автоматически удалит все listeners } } ``` WeakMap не держит ссылку на ключ - когда Session собирается GC, запись в WeakMap тоже удаляется.

**Золотые правила предотвращения утечек:** 1. **Симметрия:** Каждый `.on()` должен иметь парный `.off()` при cleanup 2. **Сохраняйте ссылки:** Никогда не используйте анонимные функции, если планируете удалять listener 3. **WeakMap/WeakSet:** Для связывания listeners с lifecycle объектов 4. **once() вместо on():** Если событие нужно только один раз - автоматическая отписка 5. **maxListeners:** Держите низкий лимит в dev/test окружении (5-10), чтобы ловить утечки рано 6. **Мониторинг:** Логируйте listenerCount в production, алертите при аномалиях 7. **AbortController:** Для отмены всех операций через сигнал (современный паттерн)

Есть WebSocket сервер. Каждое новое соединение создаёт объект Connection, который подписывается на глобальный EventEmitter для broadcast сообщений. После disconnect соединение закрывается. Через 24 часа работы память выросла с 50MB до 2GB. В чём проблема?

Custom EventEmitter

Создание собственных EventEmitter-based классов - это искусство проектирования event-driven архитектур. Базовые примеры (Database, Server) уже разобраны. Теперь - продвинутые паттерны: **event namespacing** (разделение событий по типам), **error propagation** (как правильно обрабатывать ошибки), **lifecycle events** (connected, ready, closing, closed), **real-world примеры** из production систем.

Реальный паттерн: Redis PubSub Adapter

```typescript import { EventEmitter } from 'events'; import Redis from 'ioredis'; // Adapter для Redis Pub/Sub с EventEmitter API class RedisPubSub extends EventEmitter { private publisher: Redis; private subscriber: Redis; private channels = new Set<string>(); constructor(redisUrl: string) { super(); this.publisher = new Redis(redisUrl); this.subscriber = new Redis(redisUrl); // Подписываемся на Redis messages this.subscriber.on('message', (channel, message) => { try { const data = JSON.parse(message); // Эмитим как EventEmitter событие this.emit(channel, data); } catch (err) { this.emit('error', err); } }); // Error propagation this.publisher.on('error', (err) => this.emit('error', err)); this.subscriber.on('error', (err) => this.emit('error', err)); } // Override on() для автоматической подписки на Redis channel on(event: string, listener: (...args: any[]) => void): this { // Если это первый listener для этого channel if (event !== 'error' && this.listenerCount(event) === 0) { this.channels.add(event); this.subscriber.subscribe(event); } return super.on(event, listener); } // Override off() для автоматической отписки off(event: string, listener: (...args: any[]) => void): this { super.off(event, listener); // Если больше нет listeners - отписываемся от Redis if (event !== 'error' && this.listenerCount(event) === 0) { this.channels.delete(event); this.subscriber.unsubscribe(event); } return this; } // Публикация async publish(channel: string, data: unknown): Promise<void> { await this.publisher.publish(channel, JSON.stringify(data)); } // Cleanup async close() { await this.publisher.quit(); await this.subscriber.quit(); this.removeAllListeners(); } } // Использование: EventEmitter API скрывает Redis внутри! const pubsub = new RedisPubSub('redis://localhost:6379'); pubsub.on('chat:messages', (message) => { console.log('New message:', message); }); await pubsub.publish('chat:messages', { user: 'Alice', text: 'Hello!' }); ``` Этот паттерн используют Socket.io, Bull (job queue), NestJS для микросервисной коммуникации.

**Продвинутая альтернатива: eventemitter2** - библиотека с расширенным функционалом: - **Wildcard подписки**: `emitter.on('server.*.error', handler)` - **Namespaces**: события разделены точками (`user.created`, `user.updated`) - **TTL для событий**: автоматическая отписка через timeout - **Max listeners per event**: более гибкий контроль Используется в NestJS, Seneca (микросервисы), PM2.

Итоги

  • **EventEmitter реализует Observer pattern:** издатель (emit) + подписчики (on/once). Это основа всех асинхронных API в Node.js - Streams, HTTP, net, child_process. Понимание EventEmitter критично для работы с любым асинхронным кодом.
  • **emit() синхронный, error события обязательны:** emit() вызывает handlers в текущем call stack, не через Event Loop. 'error' - единственное событие, которое крашит процесс, если нет обработчика. Подписка на 'error' обязательна.
  • **Memory leaks - главная опасность:** каждый .on() должен иметь парный .off() при cleanup. Сохраняйте ссылки на handlers (не анонимные функции), используйте maxListeners как canary, мониторьте listenerCount в production.
  • **TypeScript типизация + продвинутые паттерны:** типизация событий через declaration merging даёт type safety. Наследование vs composition, emitAsync для async handlers, middleware для валидации, namespacing для сложных систем - инструменты для production-ready архитектур.

Связанные темы

EventEmitter - это фундамент event-driven архитектуры Node.js. Для полного понимания изучите связанные концепции:

  • Streams — Streams наследуются от EventEmitter. События 'data', 'end', 'error' - это EventEmitter API. Понимание backpressure, piping, transform streams требует глубокого знания EventEmitter lifecycle.
  • HTTP Server и Middleware — http.Server - это EventEmitter. События 'request', 'connection', 'upgrade' лежат в основе всех HTTP фреймворков. Express middleware - это цепочка event handlers.
  • Child Processes — child_process.spawn() возвращает ChildProcess, который наследуется от EventEmitter. События 'exit', 'message', 'error' - для IPC и управления процессами.
  • Cluster и Worker Threads — Worker объекты в cluster и worker_threads - это EventEmitter. События 'online', 'message', 'exit' для межпроцессной коммуникации.

Вопросы для размышления

  • Микросервис с EventEmitter для внутренних событий нужно масштабировать на несколько процессов (pm2 cluster mode). Какие изменения в архитектуре потребуются, чтобы события работали между процессами?
  • В production найден memory leak: listenerCount растёт до миллионов за неделю. Heap snapshot показывает массивы в _events['data']. Как диагностировать, какой код забывает removeListener, если в проекте тысячи строк кода?
  • Сравните EventEmitter (in-process) с RabbitMQ/Kafka (distributed). В каких сценариях EventEmitter достаточно, а когда нужен message broker? Как комбинировать оба подхода (EventEmitter локально, Redis PubSub для inter-service)?

Связанные уроки

  • os-05-sync
EventEmitter: Паттерн событий Node.js

0

1

Войти

**Опасность переусложнения:** EventEmitter - простой и быстрый инструмент. Не добавляйте middleware, namespaces, wildcards без реальной необходимости. Для сложных event-driven систем рассмотрите: - **Message Broker** (RabbitMQ, Kafka) для distributed events - **CQRS/Event Sourcing** для domain events - **State Machine** (XState) для сложных lifecycle EventEmitter отлично работает для in-process событий. Для inter-process или microservices - другие инструменты.

**Checklist для production-ready EventEmitter класса:** ✅ **TypeScript типы** для всех событий (через interface + declaration merging) ✅ **Обязательный 'error' handler** - документируется, что пользователь ДОЛЖЕН подписаться ✅ **Lifecycle события** в строгом порядке (connecting → connected → ready → closing → closed) ✅ **Cleanup метод** (close/destroy) с removeAllListeners() ✅ **maxListeners настроен** адекватно use case ✅ **Async handlers** - документируется, что emit() не ждёт, либо предоставляется emitAsync() ✅ **Tests** на утечки памяти (создать 10000 объектов, проверить listenerCount) ✅ **Примеры использования** в JSDoc с правильным паттерном cleanup

EventEmitter.emit() добавляет handlers в Event Loop очередь, как Promise.then()

emit() вызывает handlers СИНХРОННО в текущем call stack, в порядке регистрации. Это не асинхронная операция

Это фундаментальное отличие Node.js EventEmitter от браузерных событий. В браузере dispatchEvent() использует task queue (асинхронно). В Node.js emit() - это обычный синхронный вызов функций из массива. Почему так? 1. Производительность - нет overhead на scheduling 2. Предсказуемость - порядок выполнения детерминирован 3. Error handling - исключения всплывают к месту emit(). Если вам нужен async emit, используйте emitAsync() паттерн с Promise.all() или библиотеку eventemitter3 с async support.

Создаётся библиотека Database client с EventEmitter. Пользователь пишет: ```typescript const db = new Database(); db.connect(); db.query('SELECT * FROM users'); ``` И получает ошибку "Not connected". Какой паттерн решит проблему наиболее чисто?