import * as egret from './ByteArray'; import * as WebSocket from 'ws'; export enum PinusWSClientEvent { EVENT_IO_ERROR = 'io-error', EVENT_CLOSE = 'close', EVENT_KICK = 'onKick', EVENT_HEART_BEAT_TIMEOUT = 'heartbeat timeout' } export class PinusWSClient { static DEBUG: boolean = false; private JS_WS_CLIENT_TYPE = 'js-websocket'; private JS_WS_CLIENT_VERSION = '0.0.5'; private RES_OK: number = 200; private RES_FAIL: number = 500; private RES_OLD_CLIENT: number = 501; private socket: WebSocket = null; private callbacks: any = {}; private handlers: any = {}; // Map from request id to route private routeMap: any = {}; private heartbeatInterval: number = 0; private heartbeatTimeout: number = 0; private nextHeartbeatTimeout: number = 0; private gapThreshold: number = 100; private heartbeatId: any = null; private heartbeatTimeoutId: any = null; private handshakeCallback: any = null; private handshakeBuffer: any; private initCallback: Function = null; private _callbacks: any = {}; private reqId: number = 0; private _package: IPackage; private _message: IMessage; constructor() { this.socket = null; this.callbacks = {}; this.handlers = {}; // Map from request id to route this.routeMap = {}; this._message = new Message(this.routeMap); this._package = new Package(); this.heartbeatInterval = 0; this.heartbeatTimeout = 0; this.nextHeartbeatTimeout = 0; this.gapThreshold = 100; this.heartbeatId = null; this.heartbeatTimeoutId = null; this.handshakeCallback = null; this.handshakeBuffer = { 'sys': { type: this.JS_WS_CLIENT_TYPE, version: this.JS_WS_CLIENT_VERSION }, 'user': { } }; this.initCallback = null; this.reqId = 0; this.handlers[Package.TYPE_HANDSHAKE] = this.handshake; this.handlers[Package.TYPE_HEARTBEAT] = this.heartbeat; this.handlers[Package.TYPE_DATA] = this.onData; this.handlers[Package.TYPE_KICK] = this.onKick; } public init(params: any, cb: Function): void { console.log('init', params); this.initCallback = cb; let host = params.host; let port = params.port; // // var url = 'ws://' + host; // if(port) { // url += ':' + port; // } this.handshakeBuffer.user = params.user; this.handshakeCallback = params.handshakeCallback; this.initWebSocket(host, port, cb); } private initWebSocket(host: string, port: number, cb: Function): void { console.log('[Pinus] connect to:', host, port); let url = 'ws://' + host; if (port) { url += ':' + port; } let socket = new WebSocket(url); socket.binaryType = 'arraybuffer'; socket.onopen = (event) => { this.onConnect(); }; socket.onmessage = (event) => { this.onMessage(event); }; socket.onerror = (event) => { this.onIOError(event); }; socket.onclose = (event) => { this.onClose(event); }; this.socket = socket; } public on(event: PinusWSClientEvent.EVENT_IO_ERROR, fn: (err: Error) => void): void; public on(event: PinusWSClientEvent.EVENT_CLOSE, fn: (err: Error) => void): void; public on(event: PinusWSClientEvent.EVENT_KICK, fn: (err: Error) => void): void; public on(event: PinusWSClientEvent.EVENT_HEART_BEAT_TIMEOUT, fn: (err: Error) => void): void; public on(route: string, fn: (msg: any) => void) : void; public on(event: string, fn: (msg: any) => void) { (this._callbacks[event] = this._callbacks[event] || []).push(fn); } public request(route: string, msg: any, cb: Function) { if (arguments.length === 2 && typeof msg === 'function') { cb = msg; msg = {}; } else { msg = msg || {}; } route = route || msg.route; if (!route) { return; } this.reqId++; if (this.reqId > 127) { this.reqId = 1; } let reqId = this.reqId; if (PinusWSClient.DEBUG) { console.log(`REQUEST:route:${route} , reqId:${reqId}, msg:${msg}`); } this.sendMessage(reqId, route, msg); this.callbacks[reqId] = cb; this.routeMap[reqId] = route; } public notify(route: string, msg: any): void { this.sendMessage(0, route, msg); } private onMessage(event: { data: string | Buffer | ArrayBuffer | Buffer[]; type: string; target: WebSocket }): void { this.processPackage(this._package.decode(new egret.ByteArray(event.data))); } private sendMessage(reqId: number, route: string, msg: any) { let byte: egret.ByteArray; byte = this._message.encode(reqId, route, msg); byte = this._package.encode(Package.TYPE_DATA, byte); this.send(byte); } private onConnect(): void { console.log('[Pinus] connect success'); this.send(this._package.encode(Package.TYPE_HANDSHAKE, Protocol.strencode(JSON.stringify(this.handshakeBuffer)))); } private onClose(e: any): void { console.error('[Pinus] connect close:', e); // this.emit(Pinus.EVENT_CLOSE,e); } private onIOError(e: any): void { // this.emit(Pinus.EVENT_IO_ERROR, e); console.error('socket error: ', e); } private onKick(event: string) { // this.emit(PinusWSClient.EVENT_KICK,event); } private onData(data: any) { // probuff decode let msg = this._message.decode(data); if (msg.id > 0) { msg.route = this.routeMap[msg.id]; delete this.routeMap[msg.id]; if (!msg.route) { return; } } // msg.body = this.deCompose(msg); this.processMessage(msg); } private processMessage(msg: any) { if (!msg.id) { // server push message if (PinusWSClient.DEBUG) { console.log(`EVENT: Route:${msg.route} Msg:${msg.body}`); } // this.emit(msg.route, msg.body); return; } if (PinusWSClient.DEBUG) { console.log(`RESPONSE: Id:${msg.id} Msg:${msg.body}`); } // if have a id then find the callback function with the request let cb = this.callbacks[msg.id]; delete this.callbacks[msg.id]; if (typeof cb !== 'function') { return; } if (msg.body && msg.body.code === 500) { let obj: any = { 'code': 500, 'desc': '服务器内部错误', 'key': 'INTERNAL_ERROR' }; msg.body.error = obj; } cb(msg.body); return; } private heartbeat(data: any) { if (!this.heartbeatInterval) { // no heartbeat return; } let obj = this._package.encode(Package.TYPE_HEARTBEAT); if (this.heartbeatTimeoutId) { clearTimeout(this.heartbeatTimeoutId); this.heartbeatTimeoutId = null; } if (this.heartbeatId) { // already in a heartbeat interval return; } let self = this; self.heartbeatId = setTimeout(function () { self.heartbeatId = null; self.send(obj); self.nextHeartbeatTimeout = Date.now() + self.heartbeatTimeout; self.heartbeatTimeoutId = setTimeout(self.heartbeatTimeoutCb.bind(self, data), self.heartbeatTimeout); }, self.heartbeatInterval); } private heartbeatTimeoutCb(data: any) { let gap = this.nextHeartbeatTimeout - Date.now(); if (gap > this.gapThreshold) { this.heartbeatTimeoutId = setTimeout(this.heartbeatTimeoutCb.bind(this, data), gap); } else { console.error('server heartbeat timeout', data); // this.emit(PinusWSClient.EVENT_HEART_BEAT_TIMEOUT,data); this._disconnect(); } } public off(event?: string, fn?: Function) { this.removeAllListeners(event, fn); } public removeAllListeners(event?: string, fn?: Function) { // all if (0 === arguments.length) { this._callbacks = {}; return; } // specific event let callbacks = this._callbacks[event]; if (!callbacks) { return; } // remove all handlers if (event && !fn) { delete this._callbacks[event]; return; } // remove specific handler let i = this.index(callbacks, (fn as any)._off || fn); if (~i) { callbacks.splice(i, 1); } return; } private index(arr: any, obj: any) { if ([].indexOf) { return arr.indexOf(obj); } for (let i = 0; i < arr.length; ++i) { if (arr[i] === obj) return i; } return -1; } public disconnect(): void { this._disconnect(); } private _disconnect(): void { console.warn('[Pinus] client disconnect ...'); if (this.socket) this.socket.close(); this.socket = null; if (this.heartbeatId) { clearTimeout(this.heartbeatId); this.heartbeatId = null; } if (this.heartbeatTimeoutId) { clearTimeout(this.heartbeatTimeoutId); this.heartbeatTimeoutId = null; } } private processPackage(msg: any): void { this.handlers[msg.type].apply(this, [msg.body]); } private handshake(resData: any) { let data = JSON.parse(Protocol.strdecode(resData)); if (data.code === this.RES_OLD_CLIENT) { // this.emit(PinusWSClient.EVENT_IO_ERROR, 'client version not fullfill'); return; } if (data.code !== this.RES_OK) { // this.emit(PinusWSClient.EVENT_IO_ERROR, 'handshake fail'); return; } this.handshakeInit(data); let obj = this._package.encode(Package.TYPE_HANDSHAKE_ACK); this.send(obj); if (this.initCallback) { this.initCallback(data); this.initCallback = null; } } private handshakeInit(data: any): void { if (data.sys) { Routedic.init(data.sys.dict); Protobuf.init(data.sys.protos); } if (data.sys && data.sys.heartbeat) { this.heartbeatInterval = data.sys.heartbeat * 1000; // heartbeat interval this.heartbeatTimeout = this.heartbeatInterval * 2; // max heartbeat timeout } else { this.heartbeatInterval = 0; this.heartbeatTimeout = 0; } if (typeof this.handshakeCallback === 'function') { this.handshakeCallback(data.user); } } private send(byte: egret.ByteArray): void { if (this.socket) { this.socket.send(byte.buffer); } } // private deCompose(msg){ // return JSON.parse(Protocol.strdecode(msg.body)); // } private emit(event: string, ...args: any[]) { let params = [].slice.call(arguments, 1); let callbacks = this._callbacks[event]; if (callbacks) { callbacks = callbacks.slice(0); for (let i = 0, len = callbacks.length; i < len; ++i) { callbacks[i].apply(this, params); } } return this; } } class Package implements IPackage { static TYPE_HANDSHAKE: number = 1; static TYPE_HANDSHAKE_ACK: number = 2; static TYPE_HEARTBEAT: number = 3; static TYPE_DATA: number = 4; static TYPE_KICK: number = 5; public encode(type: number, body?: egret.ByteArray) { let length: number = body ? body.length : 0; let buffer: egret.ByteArray = new egret.ByteArray(); buffer.writeByte(type & 0xff); buffer.writeByte((length >> 16) & 0xff); buffer.writeByte((length >> 8) & 0xff); buffer.writeByte(length & 0xff); if (body) buffer.writeBytes(body, 0, body.length); return buffer; } public decode(buffer: egret.ByteArray) { let type: number = buffer.readUnsignedByte(); let len: number = (buffer.readUnsignedByte() << 16 | buffer.readUnsignedByte() << 8 | buffer.readUnsignedByte()) >>> 0; let body: egret.ByteArray; if (buffer.bytesAvailable >= len) { body = new egret.ByteArray(); if (len) buffer.readBytes(body, 0, len); } else { console.log('[Package] no enough length for current type:', type); } return { type: type, body: body, length: len }; } } class Message implements IMessage { public static MSG_FLAG_BYTES: number = 1; public static MSG_ROUTE_CODE_BYTES: number = 2; public static MSG_ID_MAX_BYTES: number = 5; public static MSG_ROUTE_LEN_BYTES: number = 1; public static MSG_ROUTE_CODE_MAX: number = 0xffff; public static MSG_COMPRESS_ROUTE_MASK: number = 0x1; public static MSG_TYPE_MASK: number = 0x7; static TYPE_REQUEST: number = 0; static TYPE_NOTIFY: number = 1; static TYPE_RESPONSE: number = 2; static TYPE_PUSH: number = 3; constructor(private routeMap: any) { } public encode(id: number, route: string, msg: any) { let buffer: egret.ByteArray = new egret.ByteArray(); let type: number = id ? Message.TYPE_REQUEST : Message.TYPE_NOTIFY; let byte: egret.ByteArray = Protobuf.encode(route, msg) || Protocol.strencode(JSON.stringify(msg)); let rot: any = Routedic.getID(route) || route; buffer.writeByte((type << 1) | ((typeof (rot) === 'string') ? 0 : 1)); if (id) { // 7.x do { let tmp: number = id % 128; let next: number = Math.floor(id / 128); if (next !== 0) { tmp = tmp + 128; } buffer.writeByte(tmp); id = next; } while (id !== 0); // 5.x // var len:Array = []; // len.push(id & 0x7f); // id >>= 7; // while(id > 0) // { // len.push(id & 0x7f | 0x80); // id >>= 7; // } // // for (var i:int = len.length - 1; i >= 0; i--) // { // buffer.writeByte(len[i]); // } } if (rot) { if (typeof rot === 'string') { buffer.writeByte(rot.length & 0xff); buffer.writeUTFBytes(rot); } else { buffer.writeByte((rot >> 8) & 0xff); buffer.writeByte(rot & 0xff); } } if (byte) { buffer.writeBytes(byte); } return buffer; } public decode(buffer: egret.ByteArray): any { // parse flag let flag: number = buffer.readUnsignedByte(); let compressRoute: number = flag & Message.MSG_COMPRESS_ROUTE_MASK; let type: number = (flag >> 1) & Message.MSG_TYPE_MASK; let route: any; // parse id let id: number = 0; if (type === Message.TYPE_REQUEST || type === Message.TYPE_RESPONSE) { // 7.x let i: number = 0; let m: number; do { m = buffer.readUnsignedByte(); id = id + ((m & 0x7f) * Math.pow(2, (7 * i))); i++; } while (m >= 128); // 5.x // var byte:int = buffer.readUnsignedByte(); // id = byte & 0x7f; // while(byte & 0x80) // { // id <<= 7; // byte = buffer.readUnsignedByte(); // id |= byte & 0x7f; // } } // parse route if (type === Message.TYPE_REQUEST || type === Message.TYPE_NOTIFY || type === Message.TYPE_PUSH) { if (compressRoute) { route = buffer.readUnsignedShort(); } else { let routeLen: number = buffer.readUnsignedByte(); route = routeLen ? buffer.readUTFBytes(routeLen) : ''; } } else if (type === Message.TYPE_RESPONSE) { route = this.routeMap[id]; } if (!id && !(typeof (route) === 'string')) { route = Routedic.getName(route); } let body: any = Protobuf.decode(route, buffer) || JSON.parse(Protocol.strdecode(buffer)); return { id: id, type: type, route: route, body: body }; } } class Protocol { public static strencode(str: string): egret.ByteArray { let buffer: egret.ByteArray = new egret.ByteArray(); buffer.length = str.length; buffer.writeUTFBytes(str); return buffer; } public static strdecode(byte: egret.ByteArray): string { return byte.readUTFBytes(byte.bytesAvailable); } } class Protobuf { static TYPES: any = { uInt32: 0, sInt32: 0, int32: 0, double: 1, string: 2, message: 2, float: 5 }; private static _clients: any = {}; private static _servers: any = {}; static init(protos: any): void { this._clients = protos && protos.client || {}; this._servers = protos && protos.server || {}; } static encode(route: string, msg: any): egret.ByteArray { let protos: any = this._clients[route]; if (!protos) return null; return this.encodeProtos(protos, msg); } static decode(route: string, buffer: egret.ByteArray): any { let protos: any = this._servers[route]; if (!protos) return null; return this.decodeProtos(protos, buffer); } private static encodeProtos(protos: any, msg: any): egret.ByteArray { let buffer: egret.ByteArray = new egret.ByteArray(); for (let name in msg) { if (protos[name]) { let proto: any = protos[name]; switch (proto.option) { case 'optional': case 'required': buffer.writeBytes(this.encodeTag(proto.type, proto.tag)); this.encodeProp(msg[name], proto.type, protos, buffer); break; case 'repeated': if (!!msg[name] && msg[name].length > 0) { this.encodeArray(msg[name], proto, protos, buffer); } break; } } } return buffer; } static decodeProtos(protos: any, buffer: egret.ByteArray): any { let msg: any = {}; while (buffer.bytesAvailable) { let head: any = this.getHead(buffer); let name: string = protos.__tags[head.tag]; switch (protos[name].option) { case 'optional': case 'required': msg[name] = this.decodeProp(protos[name].type, protos, buffer); break; case 'repeated': if (!msg[name]) { msg[name] = []; } this.decodeArray(msg[name], protos[name].type, protos, buffer); break; } } return msg; } static encodeTag(type: number, tag: number): egret.ByteArray { let value: number = this.TYPES[type] !== undefined ? this.TYPES[type] : 2; return this.encodeUInt32((tag << 3) | value); } static getHead(buffer: egret.ByteArray): any { let tag: number = this.decodeUInt32(buffer); return { type: tag & 0x7, tag: tag >> 3 }; } static encodeProp(value: any, type: string, protos: any, buffer: egret.ByteArray): void { switch (type) { case 'uInt32': buffer.writeBytes(this.encodeUInt32(value)); break; case 'int32': case 'sInt32': buffer.writeBytes(this.encodeSInt32(value)); break; case 'float': // Float32Array let floats: egret.ByteArray = new egret.ByteArray(); floats.endian = egret.Endian.LITTLE_ENDIAN; floats.writeFloat(value); buffer.writeBytes(floats); break; case 'double': let doubles: egret.ByteArray = new egret.ByteArray(); doubles.endian = egret.Endian.LITTLE_ENDIAN; doubles.writeDouble(value); buffer.writeBytes(doubles); break; case 'string': buffer.writeBytes(this.encodeUInt32(value.length)); buffer.writeUTFBytes(value); break; default: let proto: any = protos.__messages[type] || this._clients['message ' + type]; if (!!proto) { let buf: egret.ByteArray = this.encodeProtos(proto, value); buffer.writeBytes(this.encodeUInt32(buf.length)); buffer.writeBytes(buf); } break; } } static decodeProp(type: string, protos: any, buffer: egret.ByteArray): any { switch (type) { case 'uInt32': return this.decodeUInt32(buffer); case 'int32': case 'sInt32': return this.decodeSInt32(buffer); case 'float': let floats: egret.ByteArray = new egret.ByteArray(); buffer.readBytes(floats, 0, 4); floats.endian = egret.Endian.LITTLE_ENDIAN; let float: number = buffer.readFloat(); return floats.readFloat(); case 'double': let doubles: egret.ByteArray = new egret.ByteArray(); buffer.readBytes(doubles, 0, 8); doubles.endian = egret.Endian.LITTLE_ENDIAN; return doubles.readDouble(); case 'string': let length: number = this.decodeUInt32(buffer); return buffer.readUTFBytes(length); default: let proto: any = protos && (protos.__messages[type] || this._servers['message ' + type]); if (proto) { let len: number = this.decodeUInt32(buffer); let buf: egret.ByteArray; if (len) { buf = new egret.ByteArray(); buffer.readBytes(buf, 0, len); } return len ? Protobuf.decodeProtos(proto, buf) : false; } break; } } static isSimpleType(type: string): boolean { return ( type === 'uInt32' || type === 'sInt32' || type === 'int32' || type === 'uInt64' || type === 'sInt64' || type === 'float' || type === 'double' ); } static encodeArray(array: Array, proto: any, protos: any, buffer: egret.ByteArray): void { let isSimpleType = this.isSimpleType; if (isSimpleType(proto.type)) { buffer.writeBytes(this.encodeTag(proto.type, proto.tag)); buffer.writeBytes(this.encodeUInt32(array.length)); let encodeProp = this.encodeProp; for (let i: number = 0; i < array.length; i++) { encodeProp(array[i], proto.type, protos, buffer); } } else { let encodeTag = this.encodeTag; for (let j: number = 0; j < array.length; j++) { buffer.writeBytes(encodeTag(proto.type, proto.tag)); this.encodeProp(array[j], proto.type, protos, buffer); } } } static decodeArray(array: Array, type: string, protos: any, buffer: egret.ByteArray): void { let isSimpleType = this.isSimpleType; let decodeProp = this.decodeProp; if (isSimpleType(type)) { let length: number = this.decodeUInt32(buffer); for (let i: number = 0; i < length; i++) { array.push(decodeProp(type, protos, buffer)); } } else { array.push(decodeProp(type, protos, buffer)); } } static encodeUInt32(n: number): egret.ByteArray { let result: egret.ByteArray = new egret.ByteArray(); do { let tmp: number = n % 128; let next: number = Math.floor(n / 128); if (next !== 0) { tmp = tmp + 128; } result.writeByte(tmp); n = next; } while (n !== 0); return result; } static decodeUInt32(buffer: egret.ByteArray): number { let n: number = 0; for (let i: number = 0; i < buffer.length; i++) { let m: number = buffer.readUnsignedByte(); n = n + ((m & 0x7f) * Math.pow(2, (7 * i))); if (m < 128) { return n; } } return n; } static encodeSInt32(n: number): egret.ByteArray { n = n < 0 ? (Math.abs(n) * 2 - 1) : n * 2; return this.encodeUInt32(n); } static decodeSInt32(buffer: egret.ByteArray): number { let n: number = this.decodeUInt32(buffer); let flag: number = ((n % 2) === 1) ? -1 : 1; n = ((n % 2 + n) / 2) * flag; return n; } } class Routedic { private static _ids: any = {}; private static _names: any = {}; static init(dict: any): void { this._names = dict || {}; let _names = this._names; let _ids = this._ids; for (let name in _names) { _ids[_names[name]] = name; } } static getID(name: string) { return this._names[name]; } static getName(id: number) { return this._ids[id]; } } interface IMessage { /** * encode * @param id * @param route * @param msg * @return ByteArray */ encode(id: number, route: string, msg: any): egret.ByteArray; /** * decode * @param buffer * @return Object */ decode(buffer: egret.ByteArray): any; } interface IPackage { encode(type: number, body?: egret.ByteArray): egret.ByteArray; decode(buffer: egret.ByteArray): any; }