| 1 | + | /** |
| 2 | + | * Adaptive concurrency limiter using Event Loop Utilization (ELU). |
| 3 | + | * |
| 4 | + | * The limiter dynamically adjusts the maximum number of concurrent requests |
| 5 | + | * based on Node.js Event Loop Utilization — a direct measure of how saturated |
| 6 | + | * the server's single-threaded event loop is. This is the most reliable signal |
| 7 | + | * for a Node.js server because: |
| 8 | + | * |
| 9 | + | * - Unlike latency-based algorithms (Vegas, Gradient), ELU is unaffected by |
| 10 | + | * workload heterogeneity. Switching from a fast route to a slow route |
| 11 | + | * increases latency naturally but does NOT mean the server is overloaded. |
| 12 | + | * ELU only rises when the event loop itself is saturated. |
| 13 | + | * |
| 14 | + | * - Unlike CPU%, ELU directly measures event loop busy/idle time, which is |
| 15 | + | * the actual bottleneck for a single-threaded server. |
| 16 | + | * |
| 17 | + | * The control loop uses AIMD (Additive Increase, Multiplicative Decrease): |
| 18 | + | * - **ELU < eluMax**: increase limit by sqrt(limit) per window (fast recovery) |
| 19 | + | * - **ELU ≥ eluMax**: decrease limit by 10% per window (gentle backoff) |
| 20 | + | * |
| 21 | + | * The limiter starts wide open (initialLimit = maxLimit) and should be |
| 22 | + | * invisible under normal load. It only tightens when the event loop is |
| 23 | + | * genuinely saturated. |
| 24 | + | * |
| 25 | + | * When a request cannot be immediately admitted, it is placed in a bounded FIFO |
| 26 | + | * queue with a per-request timeout. Slots are released to queued waiters before |
| 27 | + | * becoming available for new `acquire()` calls, ensuring fair ordering. |
| 28 | + | * |
| 29 | + | * @module |
| 30 | + | */ |
| 31 | + | |
| 32 | + | import { performance } from "node:perf_hooks"; |
| 33 | + | |
| 34 | + | /** |
| 35 | + | * @typedef {Object} AdaptiveLimiterConfig |
| 36 | + | * @property {number} [initialLimit=1000] Starting concurrency limit (defaults to maxLimit — start wide open) |
| 37 | + | * @property {number} [minLimit=1] Floor for the adaptive limit |
| 38 | + | * @property {number} [maxLimit=1000] Ceiling for the adaptive limit |
| 39 | + | * @property {number} [eluMax=0.95] ELU level that triggers limit decrease and queue skip (0–1) |
| 40 | + | * @property {number} [sampleWindow=1000] Interval (ms) for recalculation and ELU sampling |
| 41 | + | * @property {number} [smoothingFactor=0.2] EWMA factor for `smoothedLatency` in stats (observability only — not used in the control loop) |
| 42 | + | * @property {number} [queueSize=100] Max requests waiting in the backpressure queue |
| 43 | + | * @property {number} [queueTimeout=5000] Max time (ms) a request waits in the queue before 503 |
| 44 | + | * @property {{ info?: Function, warn?: Function }} [logger] Optional logger; transitions (limit shrink/recover, queue saturation, 503 firing) are reported here. |
| 45 | + | */ |
| 46 | + | |
| 47 | + | /** |
| 48 | + | * Create an adaptive concurrency limiter. |
| 49 | + | * |
| 50 | + | * @param {AdaptiveLimiterConfig} [config] |
| 51 | + | */ |
| 52 | + | export function createAdaptiveLimiter(config = {}) { |
| 53 | + | const { |
| 54 | + | minLimit = 1, |
| 55 | + | maxLimit = 1000, |
| 56 | + | initialLimit = maxLimit, |
| 57 | + | eluMax = 0.95, |
| 58 | + | sampleWindow = 1000, |
| 59 | + | smoothingFactor = 0.2, |
| 60 | + | queueSize = 100, |
| 61 | + | queueTimeout = 5000, |
| 62 | + | logger = null, |
| 63 | + | } = config; |
| 64 | + | |
| 65 | + | // Counters for the optional periodic log line — reset every recalc tick. |
| 66 | + | let rejected503 = 0; |
| 67 | + | let queuedTotal = 0; |
| 68 | + | |
| 69 | + | // ── Limiter state ── |
| 70 | + | let limit = Math.max(minLimit, Math.min(maxLimit, initialLimit)); |
| 71 | + | let inflight = 0; |
| 72 | + | let sampleCount = 0; |
| 73 | + | |
| 74 | + | // ── Latency tracking (for observability, not used in control loop) ── |
| 75 | + | let smoothedLatency = 0; |
| 76 | + | |
| 77 | + | // ── ELU state ── |
| 78 | + | let prevELU = performance.eventLoopUtilization(); |
| 79 | + | let currentELU = 0; |
| 80 | + | |
| 81 | + | // ── Wait queue (bounded FIFO) ── |
| 82 | + | // Each entry: { resolve, timer, abortHandler, signal } |
| 83 | + | // resolve(true) = slot acquired, proceed |
| 84 | + | // resolve(false) = timed out or destroyed, reject with 503 |
| 85 | + | /** @type {{ resolve: (v: boolean) => void, timer: ReturnType<typeof setTimeout>, abortHandler: (() => void) | null, signal: AbortSignal | null }[]} */ |
| 86 | + | const waitQueue = []; |
| 87 | + | |
| 88 | + | /** |
| 89 | + | * Try to hand a slot to the next queued waiter. |
| 90 | + | * |
| 91 | + | * Critically, this respects the current adaptive limit: if inflight >= limit |
| 92 | + | * after the release, we do NOT wake a waiter. This lets the server drain back |
| 93 | + | * to the computed limit under overload. Without this check, drainOne() would |
| 94 | + | * defeat the adaptive algorithm by keeping inflight permanently above the |
| 95 | + | * limit — every finished request would immediately be replaced. |
| 96 | + | * |
| 97 | + | * Skipped entries (aborted clients) are cleaned up without consuming a slot. |
| 98 | + | */ |
| 99 | + | function drainOne() { |
| 100 | + | while (waitQueue.length > 0) { |
| 101 | + | // Respect the adaptive limit — let inflight drain before admitting more |
| 102 | + | if (inflight >= limit) { |
| 103 | + | return false; |
| 104 | + | } |
| 105 | + | const waiter = waitQueue.shift(); |
| 106 | + | clearTimeout(waiter.timer); |
| 107 | + | if (waiter.signal) { |
| 108 | + | waiter.signal.removeEventListener("abort", waiter.abortHandler); |
| 109 | + | } |
| 110 | + | // Client already disconnected — skip without consuming a slot |
| 111 | + | if (waiter.signal?.aborted) { |
| 112 | + | continue; |
| 113 | + | } |
| 114 | + | inflight++; |
| 115 | + | waiter.resolve(true); |
| 116 | + | return true; |
| 117 | + | } |
| 118 | + | return false; |
| 119 | + | } |
| 120 | + | |
| 121 | + | // ── Periodic recalculation (AIMD based on ELU) ── |
| 122 | + | const recalcInterval = setInterval(() => { |
| 123 | + | // Sample ELU over the last window. The `prev` argument must be a |
| 124 | + | // cumulative snapshot, NOT a delta: Node computes `current - prev` |
| 125 | + | // and a diff object's idle/active fields aren't cumulative values. |
| 126 | + | // So we call `eventLoopUtilization()` again with no args to capture |
| 127 | + | // a fresh cumulative baseline for the next window. The few ns gap |
| 128 | + | // between the two calls is unobservable. |
| 129 | + | const nowELU = performance.eventLoopUtilization(prevELU); |
| 130 | + | currentELU = nowELU.utilization; |
| 131 | + | prevELU = performance.eventLoopUtilization(); |
| 132 | + | |
| 133 | + | const prevLimit = limit; |
| 134 | + | |
| 135 | + | if (currentELU >= eluMax) { |
| 136 | + | // ── Decrease: multiplicative (gentle 10% backoff) ── |
| 137 | + | // Only shrink when we're actually at capacity. If inflight is well |
| 138 | + | // below the limit, the high ELU is transient (GC, etc.), not sustained. |
| 139 | + | if (inflight >= limit * 0.5) { |
| 140 | + | limit = Math.max(minLimit, Math.floor(limit * 0.9)); |
| 141 | + | } |
| 142 | + | } else { |
| 143 | + | // ── Increase: additive (sqrt scaling for proportional exploration) ── |
| 144 | + | // No dead zone — always recover toward maxLimit unless overloaded. |
| 145 | + | // The limiter starts wide open and should stay wide open under normal load. |
| 146 | + | limit = Math.min( |
| 147 | + | maxLimit, |
| 148 | + | limit + Math.max(1, Math.ceil(Math.sqrt(limit))) |
| 149 | + | ); |
| 150 | + | } |
| 151 | + | |
| 152 | + | // Wake queued waiters if limit grew |
| 153 | + | if (limit > prevLimit) { |
| 154 | + | while (inflight < limit && waitQueue.length > 0) { |
| 155 | + | if (!drainOne()) break; |
| 156 | + | } |
| 157 | + | } |
| 158 | + | |
| 159 | + | // ── Operator-visible transitions ── |
| 160 | + | // We log only when something changes — silent under steady-state. |
| 161 | + | if (logger) { |
| 162 | + | if (limit < prevLimit) { |
| 163 | + | logger.warn?.( |
| 164 | + | `[adaptive-limiter] limit ${prevLimit} → ${limit} (ELU=${currentELU.toFixed(2)}, inflight=${inflight}, queued=${waitQueue.length})` |
| 165 | + | ); |
| 166 | + | } else if (limit > prevLimit && prevLimit < maxLimit) { |
| 167 | + | logger.info?.( |
| 168 | + | `[adaptive-limiter] limit ${prevLimit} → ${limit} (recovering)` |
| 169 | + | ); |
| 170 | + | } |
| 171 | + | if (rejected503 > 0 || queuedTotal > 0) { |
| 172 | + | logger.warn?.( |
| 173 | + | `[adaptive-limiter] window: ${rejected503} rejected, ${queuedTotal} queued, queue depth ${waitQueue.length}/${queueSize}` |
| 174 | + | ); |
| 175 | + | } |
| 176 | + | } |
| 177 | + | rejected503 = 0; |
| 178 | + | queuedTotal = 0; |
| 179 | + | |
| 180 | + | // Reset sample count for next window |
| 181 | + | sampleCount = 0; |
| 182 | + | }, sampleWindow); |
| 183 | + | |
| 184 | + | // Don't keep the process alive just for this timer |
| 185 | + | recalcInterval.unref(); |
| 186 | + | |
| 187 | + | return { |
| 188 | + | /** |
| 189 | + | * Try to acquire a slot, optionally waiting in a bounded queue. |
| 190 | + | * |
| 191 | + | * @param {AbortSignal} [signal] - Client connection abort signal. When the |
| 192 | + | * client disconnects while queued, the waiter is removed automatically. |
| 193 | + | * @returns {boolean | Promise<boolean>} `true` if the request may proceed, |
| 194 | + | * `false` if rejected. Returns a plain boolean for the fast path (no |
| 195 | + | * Promise overhead), a Promise only when the request is queued. |
| 196 | + | * |
| 197 | + | * Resolution paths: |
| 198 | + | * - Slot available (inflight < limit) → returns `true` (sync, no Promise) |
| 199 | + | * - At limit + ELU > eluMax → returns `false` (sync, no Promise) |
| 200 | + | * - At limit + queue full → returns `false` (sync, no Promise) |