WebSocket 连接共享

1000 多个用户同时在线,用户打开多个 Tab 页,每个 Tab 页都会创建一个 WS 连接,最终创建了 140000 多个 WS 连接,导致服务端奔溃。通过使用 SharedWorker 转发 WS 消息,让所有页面共用一个 WS 连接来解决问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// socket.ts
import { v4 as uuidv4 } from 'uuid'

export interface Message {
messageId?: string;
path: string;
data: any;
code?: number;
message?: string;
}

type Handler = (message: Message) => void;

class SharedWebSocket {
private worker!: SharedWorker
private connected: boolean = false
private handlers: Map<string, Handler> = new Map()
private listeners: Map<string, Handler[]> = new Map()
private token!: string
private timeout: number = 10000

constructor(readonly url: string) {
this.url = url
this.initSharedWorker()
setInterval(() => {
if (this.worker.port) {
this.worker.port.postMessage({ type: 'message', data: 'ping' })
}
}, 30000)
}

private initSharedWorker(): void {
this.worker = new SharedWorker(new URL('./sharedWorker.js', import.meta.url))
this.worker.port.start()
this.worker.port.onmessage = (event) => {
const { type } = event.data
if (type === 'message') {
this.onMessage(event)
}
if (type === 'open') {
this.onOpen()
}
if (type === 'close') {
this.onClose()
}
}
}

public connect(token: string): void {
this.token = token
if (this.connected) {
return
}
this.worker.port.postMessage({ type: 'connect', data: { token, url: this.url } })
}

private onMessage(event: MessageEvent): void {
const { data } = event.data
if (data !== 'pong') {
const message: Message = JSON.parse(data)
const handler = this.handlers.get(message.messageId || '')
const listeners = this.listeners.get(message.path)
// request-response
if (handler) {
handler(message)
}
// pub-sub
if (listeners) {
listeners.forEach((listener) => {
try {
listener(message)
} catch (e) {
console.debug(e)
}
})
}
if (handler) {
this.handlers.delete(message.messageId || '')
}
}
}

private onOpen(): void {
this.connected = true
}

private onClose(): void {
this.connected = false
this.reconnect()
}

private reconnect(): void {
this.connect(this.token)
}

public sendMessage(message: Message): Promise<Message> {
const messageId = uuidv4()
message.messageId = messageId
this.worker.port.postMessage({ type: 'message', data: message })
return new Promise<Message>((resolve, reject) => {
const timer = setTimeout(() => {
const error: Message = {
messageId,
path: message.path,
data: null,
code: 409,
message: 'timeout',
}
reject(error)
clearTimeout(timer)
this.handlers.delete(message.messageId || '')
}, this.timeout)
const handler = (value: Message) => {
if (value.code === 200) {
if (value.path === '/touch') {
resolve(value)
}
} else {
reject(value)
}
clearTimeout(timer)
}
this.handlers.set(messageId, handler)
})
}

public on(event: string, callback: Handler): void {
let handlers = this.listeners.get(event)
if (!handlers) {
handlers = []
this.listeners.set(event, handlers)
}
handlers.push(callback)
}

public off(event: string): void {
this.listeners.delete(event)
}
}

export const useWsClient = new SharedWebSocket('wss://xxx')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// sharedWorker.ts
let socket: WebSocket | null
const ports: MessagePort[] = []

globalThis.addEventListener('connect', function (event: MessageEvent) {
const port: MessagePort = event.ports[0]
ports.push(port)

port.onmessage = function (event: MessageEvent) {
const data = event.data
if (data.type === 'connect') {
connect(data.data)
} else if (data.type === 'close') {
close()
} else {
if (socket?.readyState === WebSocket.OPEN) {
socket.send(typeof data.data !== 'string' ? JSON.stringify(data.data) : data.data)
}
}
}
})

function connect(data: { url: string; token: string}) {
if (!socket || socket.readyState !== WebSocket.OPEN) {
const { url, token } = data
socket = new WebSocket(url, [token])
socket.onopen = function () {
ports.forEach((port) => {
port.postMessage({
type: 'open',
})
})
}
socket.onclose = function (event: CloseEvent) {
ports.forEach((port) => {
port.postMessage({
type: 'close',
})
})
}
socket.onmessage = function (event: MessageEvent) {
ports.forEach((port) => {
port.postMessage({
type: 'message',
data: event.data,
})
})
}
}
}

function close() {
if (socket?.readyState === WebSocket.OPEN) {
socket.close()
}
}