Skip to content

feat: websocket reconnect #405

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,21 @@ It also supports an additional `rewriteRequestHeaders(headers, request)` functio
opening the WebSocket connection. This function should return an object with the given headers.
The default implementation forwards the `cookie` header.

## `wsReconnect`

The `wsReconnect` option contains the configuration for the WebSocket reconnection feature; is an object with the following properties:

- `pingInterval`: The interval between ping messages in ms (default: `30_000`).
- `maxReconnectionRetries`: The maximum number of reconnection retries (`1` to `Infinity`, default: `Infinity`).
- `reconnectInterval`: The interval between reconnection attempts in ms (default: `1_000`).
- `reconnectDecay`: The decay factor for the reconnection interval (default: `1.5`).
- `connectionTimeout`: The timeout for establishing the connection in ms (default: `5_000`).
- `reconnectOnClose`: Whether to reconnect on close, as long as the connection from the related client to the proxy is active (default: `false`).
- `logs`: Whether to log the reconnection process (default: `false`).

Reconnection feature detects and closes broken connections and reconnects automatically, see [how to detect and close broken connections](https://github.com/websockets/ws#how-to-detect-and-close-broken-connections).
The connection is considered broken if the target does not respond to the ping messages or no data is received from the target.

## Benchmarks

The following benchmarks were generated on a dedicated server with an Intel(R) Core(TM) i7-7700 CPU @ 3.60GHz and 64GB of RAM:
Expand Down
191 changes: 185 additions & 6 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
'use strict'
const { setTimeout: wait } = require('node:timers/promises')
const From = require('@fastify/reply-from')
const { ServerResponse } = require('node:http')
const WebSocket = require('ws')
const { convertUrlToWebSocket } = require('./utils')
const fp = require('fastify-plugin')
const qs = require('fast-querystring')
const { validateOptions } = require('./src/options')

const httpMethods = ['DELETE', 'GET', 'HEAD', 'PATCH', 'POST', 'PUT', 'OPTIONS']
const urlPattern = /^https?:\/\//
Expand All @@ -27,6 +29,7 @@ function liftErrorCode (code) {
}

function closeWebSocket (socket, code, reason) {
socket.isAlive = false
if (socket.readyState === WebSocket.OPEN) {
socket.close(liftErrorCode(code), reason)
}
Expand All @@ -40,11 +43,44 @@ function waitConnection (socket, write) {
}
}

function waitForConnection (target, timeout) {
return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
/* c8 ignore start */
reject(new Error('WebSocket connection timeout'))
/* c8 ignore stop */
}, timeout)

/* c8 ignore start */
if (target.readyState === WebSocket.OPEN) {
clearTimeout(timeoutId)
return resolve()
}
/* c8 ignore stop */

if (target.readyState === WebSocket.CONNECTING) {
target.once('open', () => {
clearTimeout(timeoutId)
resolve()
})
target.once('error', (err) => {
clearTimeout(timeoutId)
reject(err)
})
/* c8 ignore start */
} else {
clearTimeout(timeoutId)
reject(new Error('WebSocket is closed'))
}
/* c8 ignore stop */
})
}

function isExternalUrl (url) {
return urlPattern.test(url)
}

function noop () {}
function noop () { }

function proxyWebSockets (source, target) {
function close (code, reason) {
Expand Down Expand Up @@ -76,6 +112,144 @@ function proxyWebSockets (source, target) {
/* c8 ignore stop */
}

async function reconnect (logger, source, wsReconnectOptions, targetParams) {
const { url, subprotocols, optionsWs } = targetParams

let attempts = 0
let target
do {
const reconnectWait = wsReconnectOptions.reconnectInterval * (wsReconnectOptions.reconnectDecay * attempts || 1)
wsReconnectOptions.logs && logger.warn({ target: targetParams.url }, `proxy ws reconnect in ${reconnectWait} ms`)
await wait(reconnectWait)

try {
target = new WebSocket(url, subprotocols, optionsWs)
await waitForConnection(target, wsReconnectOptions.connectionTimeout)
} catch (err) {
wsReconnectOptions.logs && logger.error({ target: targetParams.url, err, attempts }, 'proxy ws reconnect error')
attempts++
target = undefined
}
} while (!target && attempts < wsReconnectOptions.maxReconnectionRetries)

if (!target) {
logger.error({ target: targetParams.url, attempts }, 'proxy ws failed to reconnect! No more retries')
return
}

wsReconnectOptions.logs && logger.info({ target: targetParams.url, attempts }, 'proxy ws reconnected')
proxyWebSocketsWithReconnection(logger, source, target, wsReconnectOptions, targetParams)
}

function proxyWebSocketsWithReconnection (logger, source, target, options, targetParams, fromReconnection = false) {
function close (code, reason) {
target.pingTimer && clearTimeout(source.pingTimer)
target.pingTimer = undefined

// reconnect target as long as the source connection is active
if (source.isAlive && (target.broken || options.reconnectOnClose)) {
target.isAlive = false
target.removeAllListeners()
// need to specify the listeners to remove
removeSourceListeners(source)
reconnect(logger, source, options, targetParams)
return
}

options.logs && logger.info({ msg: 'proxy ws close link' })
closeWebSocket(source, code, reason)
closeWebSocket(target, code, reason)
}

function removeSourceListeners (source) {
source.off('message', sourceOnMessage)
source.off('ping', sourceOnPing)
source.off('pong', sourceOnPong)
source.off('close', sourceOnClose)
source.off('error', sourceOnError)
source.off('unexpected-response', sourceOnUnexpectedResponse)
}

/* c8 ignore start */
function sourceOnMessage (data, binary) {
source.isAlive = true
waitConnection(target, () => target.send(data, { binary }))
}
function sourceOnPing (data) {
waitConnection(target, () => target.ping(data))
}
function sourceOnPong (data) {
source.isAlive = true
waitConnection(target, () => target.pong(data))
}
function sourceOnClose (code, reason) {
options.logs && logger.warn({ target: targetParams.url, code, reason }, 'proxy ws source close event')
close(code, reason)
}
function sourceOnError (error) {
options.logs && logger.warn({ target: targetParams.url, error: error.message }, 'proxy ws source error event')
close(1011, error.message)
}
function sourceOnUnexpectedResponse () {
options.logs && logger.warn({ target: targetParams.url }, 'proxy ws source unexpected-response event')
close(1011, 'unexpected response')
}
/* c8 ignore stop */

// source is alive since it is created by the proxy service
// the pinger is not set since we can't reconnect from here
source.isAlive = true
source.on('message', sourceOnMessage)
source.on('ping', sourceOnPing)
source.on('pong', sourceOnPong)
source.on('close', sourceOnClose)
source.on('error', sourceOnError)
source.on('unexpected-response', sourceOnUnexpectedResponse)

// source WebSocket is already connected because it is created by ws server
/* c8 ignore start */
target.on('message', (data, binary) => {
target.isAlive = true
source.send(data, { binary })
})
target.on('ping', data => {
target.isAlive = true
source.ping(data)
})
target.on('pong', data => {
target.isAlive = true
source.pong(data)
})
/* c8 ignore stop */
target.on('close', (code, reason) => {
options.logs && logger.warn({ target: targetParams.url, code, reason }, 'proxy ws target close event')
close(code, reason)
})
/* c8 ignore start */
target.on('error', error => {
options.logs && logger.warn({ target: targetParams.url, error: error.message }, 'proxy ws target error event')
close(1011, error.message)
})
target.on('unexpected-response', () => {
options.logs && logger.warn({ target: targetParams.url }, 'proxy ws target unexpected-response event')
close(1011, 'unexpected response')
})
/* c8 ignore stop */

target.isAlive = true
target.pingTimer = setInterval(() => {
if (target.isAlive === false) {
target.broken = true
options.logs && logger.warn({ target: targetParams.url }, 'proxy ws connection is broken')
target.pingTimer && clearInterval(target.pingTimer)
target.pingTimer = undefined
return target.terminate()
}
target.isAlive = false
target.ping()
}, options.pingInterval).unref()
}

function handleUpgrade (fastify, rawRequest, socket, head) {
// Save a reference to the socket and then dispatch the request through the normal fastify router so that it will invoke hooks and then eventually a route handler that might upgrade the socket.
rawRequest[kWs] = socket
Expand All @@ -91,7 +265,7 @@ function handleUpgrade (fastify, rawRequest, socket, head) {
}

class WebSocketProxy {
constructor (fastify, { wsServerOptions, wsClientOptions, upstream, wsUpstream, replyOptions: { getUpstream } = {} }) {
constructor (fastify, { wsReconnect, wsServerOptions, wsClientOptions, upstream, wsUpstream, replyOptions: { getUpstream } = {} }) {
this.logger = fastify.log
this.wsClientOptions = {
rewriteRequestHeaders: defaultWsHeadersRewrite,
Expand All @@ -101,6 +275,7 @@ class WebSocketProxy {
this.upstream = upstream ? convertUrlToWebSocket(upstream) : ''
this.wsUpstream = wsUpstream ? convertUrlToWebSocket(wsUpstream) : ''
this.getUpstream = getUpstream
this.wsReconnect = wsReconnect

const wss = new WebSocket.Server({
noServer: true,
Expand Down Expand Up @@ -190,7 +365,13 @@ class WebSocketProxy {

const target = new WebSocket(url, subprotocols, optionsWs)
this.logger.debug({ url: url.href }, 'proxy websocket')
proxyWebSockets(source, target)

if (this.wsReconnect) {
const targetParams = { url, subprotocols, optionsWs }
proxyWebSocketsWithReconnection(this.logger, source, target, this.wsReconnect, targetParams)
} else {
proxyWebSockets(source, target)
}
}
}

Expand Down Expand Up @@ -228,9 +409,7 @@ function generateRewritePrefix (prefix, opts) {
}

async function fastifyHttpProxy (fastify, opts) {
if (!opts.upstream && !opts.websocket && !((opts.upstream === '' || opts.wsUpstream === '') && opts.replyOptions && typeof opts.replyOptions.getUpstream === 'function')) {
throw new Error('upstream must be specified')
}
opts = validateOptions(opts)

const preHandler = opts.preHandler || opts.beforeHandler
const rewritePrefix = generateRewritePrefix(fastify.prefix, opts)
Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
"http-errors": "^2.0.0",
"http-proxy": "^1.18.1",
"neostandard": "^0.12.0",
"pino": "^9.6.0",
"pino-test": "^1.1.0",
"simple-get": "^4.0.1",
"socket.io": "^4.7.5",
"socket.io-client": "^4.7.5",
Expand Down
67 changes: 67 additions & 0 deletions src/options.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
'use strict'

const DEFAULT_PING_INTERVAL = 30_000
const DEFAULT_MAX_RECONNECTION_RETRIES = Infinity
Copy link
Member

@climba03003 climba03003 Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Max retries should be limited by default.
There is no reason to reconnected unlimited since in reconnecting period the client may continue sending data. The data will buffered in stream unlimited.

Copy link
Contributor Author

@simone-sanfratello simone-sanfratello Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention is to keep the connection alive, assuming the error messages are being monitored, that's why I set Infinity

We can remove the default value as Infinity, but in that case we can't set an arbitrary default value, it really depends by the context, ping interval, reconnection delay and so on

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Those should be limited, possibly to 30 seconds.

const DEFAULT_RECONNECT_INTERVAL = 1_000
const DEFAULT_RECONNECT_DECAY = 1.5
const DEFAULT_CONNECTION_TIMEOUT = 5_000
const DEFAULT_RECONNECT_ON_CLOSE = false
const DEFAULT_LOGS = false

function validateOptions (options) {
if (!options.upstream && !options.websocket && !((options.upstream === '' || options.wsUpstream === '') && options.replyOptions && typeof options.replyOptions.getUpstream === 'function')) {
throw new Error('upstream must be specified')
}

if (options.wsReconnect) {
const wsReconnect = options.wsReconnect

if (wsReconnect.pingInterval !== undefined && (typeof wsReconnect.pingInterval !== 'number' || wsReconnect.pingInterval < 0)) {
throw new Error('wsReconnect.pingInterval must be a non-negative number')
}
wsReconnect.pingInterval = wsReconnect.pingInterval ?? DEFAULT_PING_INTERVAL

if (wsReconnect.maxReconnectionRetries !== undefined && (typeof wsReconnect.maxReconnectionRetries !== 'number' || wsReconnect.maxReconnectionRetries < 1)) {
throw new Error('wsReconnect.maxReconnectionRetries must be a number greater than or equal to 1')
}
wsReconnect.maxReconnectionRetries = wsReconnect.maxReconnectionRetries ?? DEFAULT_MAX_RECONNECTION_RETRIES

if (wsReconnect.reconnectInterval !== undefined && (typeof wsReconnect.reconnectInterval !== 'number' || wsReconnect.reconnectInterval < 100)) {
throw new Error('wsReconnect.reconnectInterval (ms) must be a number greater than or equal to 100')
}
wsReconnect.reconnectInterval = wsReconnect.reconnectInterval ?? DEFAULT_RECONNECT_INTERVAL

if (wsReconnect.reconnectDecay !== undefined && (typeof wsReconnect.reconnectDecay !== 'number' || wsReconnect.reconnectDecay < 1)) {
throw new Error('wsReconnect.reconnectDecay must be a number greater than or equal to 1')
}
wsReconnect.reconnectDecay = wsReconnect.reconnectDecay ?? DEFAULT_RECONNECT_DECAY

if (wsReconnect.connectionTimeout !== undefined && (typeof wsReconnect.connectionTimeout !== 'number' || wsReconnect.connectionTimeout < 0)) {
throw new Error('wsReconnect.connectionTimeout must be a non-negative number')
}
wsReconnect.connectionTimeout = wsReconnect.connectionTimeout ?? DEFAULT_CONNECTION_TIMEOUT

if (wsReconnect.reconnectOnClose !== undefined && typeof wsReconnect.reconnectOnClose !== 'boolean') {
throw new Error('wsReconnect.reconnectOnClose must be a boolean')
}
wsReconnect.reconnectOnClose = wsReconnect.reconnectOnClose ?? DEFAULT_RECONNECT_ON_CLOSE

if (wsReconnect.logs !== undefined && typeof wsReconnect.logs !== 'boolean') {
throw new Error('wsReconnect.logs must be a boolean')
}
wsReconnect.logs = wsReconnect.logs ?? DEFAULT_LOGS
}

return options
}

module.exports = {
validateOptions,
DEFAULT_PING_INTERVAL,
DEFAULT_MAX_RECONNECTION_RETRIES,
DEFAULT_RECONNECT_INTERVAL,
DEFAULT_RECONNECT_DECAY,
DEFAULT_CONNECTION_TIMEOUT,
DEFAULT_RECONNECT_ON_CLOSE,
DEFAULT_LOGS
}
Loading
Loading