From 51c53f64d031b2da75ccd33d3d2c245e45395ce5 Mon Sep 17 00:00:00 2001 From: syuilo Date: Wed, 10 Oct 2018 20:59:10 +0900 Subject: [PATCH] Fix #2881, Fix #2879 --- src/client/app/common/scripts/stream.ts | 212 ++++++++++++++++-------- 1 file changed, 141 insertions(+), 71 deletions(-) diff --git a/src/client/app/common/scripts/stream.ts b/src/client/app/common/scripts/stream.ts index c588d1bb39..215f93703a 100644 --- a/src/client/app/common/scripts/stream.ts +++ b/src/client/app/common/scripts/stream.ts @@ -11,6 +11,7 @@ export default class Stream extends EventEmitter { private stream: ReconnectingWebsocket; private state: string; private buffer: any[]; + private sharedConnectionPools: Pool[] = []; private sharedConnections: SharedConnection[] = []; private nonSharedConnections: NonSharedConnection[] = []; @@ -29,22 +30,21 @@ export default class Stream extends EventEmitter { } public useSharedConnection = (channel: string): SharedConnection => { - const existConnection = this.sharedConnections.find(c => c.channel === channel); + let pool = this.sharedConnectionPools.find(p => p.channel === channel); - if (existConnection) { - existConnection.use(); - return existConnection; - } else { - const connection = new SharedConnection(this, channel); - connection.use(); - this.sharedConnections.push(connection); - return connection; + if (pool == null) { + pool = new Pool(this, channel); + this.sharedConnectionPools.push(pool); } + + const connection = new SharedConnection(this, channel, pool); + this.sharedConnections.push(connection); + return connection; } @autobind public removeSharedConnection(connection: SharedConnection) { - this.sharedConnections = this.sharedConnections.filter(c => c.id !== connection.id); + this.sharedConnections = this.sharedConnections.filter(c => c !== connection); } public connectToChannel = (channel: string, params?: any): NonSharedConnection => { @@ -55,7 +55,7 @@ export default class Stream extends EventEmitter { @autobind public disconnectToChannel(connection: NonSharedConnection) { - this.nonSharedConnections = this.nonSharedConnections.filter(c => c.id !== connection.id); + this.nonSharedConnections = this.nonSharedConnections.filter(c => c !== connection); } /** @@ -77,8 +77,8 @@ export default class Stream extends EventEmitter { // チャンネル再接続 if (isReconnect) { - this.sharedConnections.forEach(c => { - c.connect(); + this.sharedConnectionPools.forEach(p => { + p.connect(); }); this.nonSharedConnections.forEach(c => { c.connect(); @@ -104,8 +104,18 @@ export default class Stream extends EventEmitter { if (type == 'channel') { const id = body.id; - const connection = this.sharedConnections.find(c => c.id === id) || this.nonSharedConnections.find(c => c.id === id); - connection.emit(body.type, body.body); + + let connections: Connection[]; + + connections = this.sharedConnections.filter(c => c.id === id); + + if (connections.length === 0) { + connections = [this.nonSharedConnections.find(c => c.id === id)]; + } + + connections.filter(c => c != null).forEach(c => { + c.emit(body.type, body.body); + }); } else { this.emit(type, body); } @@ -140,19 +150,131 @@ export default class Stream extends EventEmitter { } } -abstract class Connection extends EventEmitter { +class Pool { public channel: string; public id: string; - protected params: any; protected stream: Stream; + private users = 0; + private disposeTimerId: any; + private isConnected = false; - constructor(stream: Stream, channel: string, params?: any) { + constructor(stream: Stream, channel: string) { + this.channel = channel; + this.stream = stream; + + this.id = Math.random().toString(); + } + + @autobind + public inc() { + if (this.users === 0 && !this.isConnected) { + this.connect(); + } + + this.users++; + + // タイマー解除 + if (this.disposeTimerId) { + clearTimeout(this.disposeTimerId); + this.disposeTimerId = null; + } + } + + @autobind + public dec() { + this.users--; + + // そのコネクションの利用者が誰もいなくなったら + if (this.users === 0) { + // また直ぐに再利用される可能性があるので、一定時間待ち、 + // 新たな利用者が現れなければコネクションを切断する + this.disposeTimerId = setTimeout(() => { + this.disconnect(); + }, 3000); + } + } + + @autobind + public connect() { + this.isConnected = true; + this.stream.send('connect', { + channel: this.channel, + id: this.id + }); + } + + @autobind + private disconnect() { + this.isConnected = false; + this.disposeTimerId = null; + this.stream.send('disconnect', { id: this.id }); + } +} + +abstract class Connection extends EventEmitter { + public channel: string; + protected stream: Stream; + public abstract id: string; + + constructor(stream: Stream, channel: string) { super(); this.stream = stream; this.channel = channel; + } + + @autobind + public send(id: string, typeOrPayload, payload?) { + const type = payload === undefined ? typeOrPayload.type : typeOrPayload; + const body = payload === undefined ? typeOrPayload.body : payload; + + this.stream.send('ch', { + id: id, + type: type, + body: body + }); + } + + public abstract dispose(): void; +} + +class SharedConnection extends Connection { + private pool: Pool; + + public get id(): string { + return this.pool.id; + } + + constructor(stream: Stream, channel: string, pool: Pool) { + super(stream, channel); + + this.pool = pool; + this.pool.inc(); + } + + @autobind + public send(typeOrPayload, payload?) { + super.send(this.pool.id, typeOrPayload, payload); + } + + @autobind + public dispose() { + this.pool.dec(); + this.removeAllListeners(); + this.stream.removeSharedConnection(this); + } +} + +class NonSharedConnection extends Connection { + public id: string; + protected params: any; + + constructor(stream: Stream, channel: string, params?: any) { + super(stream, channel); + this.params = params; this.id = Math.random().toString(); + this.connect(); } @@ -167,59 +289,7 @@ abstract class Connection extends EventEmitter { @autobind public send(typeOrPayload, payload?) { - const type = payload === undefined ? typeOrPayload.type : typeOrPayload; - const body = payload === undefined ? typeOrPayload.body : payload; - - this.stream.send('ch', { - id: this.id, - type: type, - body: body - }); - } - - public abstract dispose(): void; -} - -class SharedConnection extends Connection { - private users = 0; - private disposeTimerId: any; - - constructor(stream: Stream, channel: string) { - super(stream, channel); - } - - @autobind - public use() { - this.users++; - - // タイマー解除 - if (this.disposeTimerId) { - clearTimeout(this.disposeTimerId); - this.disposeTimerId = null; - } - } - - @autobind - public dispose() { - this.users--; - - // そのコネクションの利用者が誰もいなくなったら - if (this.users === 0) { - // また直ぐに再利用される可能性があるので、一定時間待ち、 - // 新たな利用者が現れなければコネクションを切断する - this.disposeTimerId = setTimeout(() => { - this.disposeTimerId = null; - this.removeAllListeners(); - this.stream.send('disconnect', { id: this.id }); - this.stream.removeSharedConnection(this); - }, 3000); - } - } -} - -class NonSharedConnection extends Connection { - constructor(stream: Stream, channel: string, params?: any) { - super(stream, channel, params); + super.send(this.id, typeOrPayload, payload); } @autobind