import { Callbag } from "callbag"; import pipe from "callbag-pipe"; import share from "callbag-share"; import { INPUT_FREQUENCY } from "../ecs/Lockstep"; import { catchTalkback, defer, interval, makeSubject, map, merge } from "../utilities/Callbag"; import { ClientMessage, MessageTypes, ServerMessage } from "./LockstepClient"; /** Stub loopback server that handles multiple clients, for schemes where GlobalInput = LocalInput[] */ export class LoopbackServer { private nextClientId = 0; private inputBuffer: LocalInput[] = []; private heartbeat = pipe( interval(INPUT_FREQUENCY), map(() => ({ t: MessageTypes.INPUT, i: this.inputBuffer.slice() } as ServerMessage)), ); private broadcast = makeSubject>(); private serverFeed = share(merge(this.heartbeat, this.broadcast)); public readonly socket = defer(() => { const playerNumber = this.nextClientId++; return pipe( this.serverFeed, catchTalkback((message: ClientMessage) => this.processMessage(playerNumber, message)), map(message => this.postprocessResponse(playerNumber, message)) ); }); private processMessage(playerNumber: number, message: ClientMessage) { switch (message.t) { case MessageTypes.INPUT: if (playerNumber >= 0) { this.inputBuffer[playerNumber] = message.i; } break; case MessageTypes.SET_STATE: this.broadcast(1, { t: MessageTypes.SET_STATE, u: -1, s: message.s }); break; } } private postprocessResponse(playerNumber: number, message: ServerMessage): ServerMessage { if (message.t === MessageTypes.SET_STATE && playerNumber >= 0) { return { ...message, u: playerNumber, }; } else { return message; } } }