Foundations for pushing notifications to WebSocket clients and receiving commands from them

Foundations for pushing notifications to WebSocket clients and receiving commands from them

Hello there. Today I will write about how I lay down the foundations for async communications between the clients and the server using WebSockets for my online persistent-universe single-player/co-op space game. I will also write a bit about my next steps which include wiring up all the things needed for handling user actions and processes in the game.

Most WebSocket tutorials end up in a sequential send/recv loop, but in my game, where I need to push messages asynchronously such as "construction complete" or "new stars were discovered" or a dreadful "space dragons are inbound in 25 hours" without waiting for an input from the WebSocket client, a slightly different tactic should be used.

The basic concept, taken straight from my WebSocket route handling class looks like this:

# start_loop is called after successful authentication
async def start_loop(self):
    sub = self.auth_data["sub"]
    try:
        listener = self.listener(self.websocket)
        poster = self.poster(self.websocket)
        await gather(listener, poster) # from the asyncio module
    except WebSocketDisconnect:
        logger.info(f"{sub} disconnected, bye")
        self.disconnect()

The listener coroutine looks like the following. Receive from the socket, then handle the command (not implemented yet).

async def listener(self, websocket: WebSocket):
    sub = self.auth_data["sub"]
    while True:
        try:
            data = await websocket.receive_json()
            command = data["command"]
        except JSONDecodeError:
            await self.websocket.send_text("JSON. JSON please")
            logger.warning(f"{sub} sent a bad JSON in request, very suspicious!")
            continue
        except KeyError:
            command = None
        await self.websocket.send_json(
            ok_response(None, "thank you", command=command)
        )

And the poster, with a POC for posting to the WebSocket asynchronously every 5 seconds:

async def poster(self, websocket: WebSocket):
    while True:
        await sleep(5.0) # TODO: await an Event from some Events Queue
        await websocket.send_json(
            ok_response(MessageType.Unknown, "Jamaica. Let's go to Jamaica.")
        )

And in the browser, we can see pushes and commands sent independently:

image.png

Nice!

As for my next steps, I plan to implement the following logic as I described in a previous post (details below):

04 - workers and queue.png

As I receive a WebSocket command, say... "Build Something", the Web Process will create a Celery task for that. A Worker Process will catch the task and create a DB entry in some Build Table for later reference and also create a delayed task ETA-ed for the time it would be completed. When the delayed task is due, a Worker will catch that and will check if the task is still valid in the Build Table (e.g. has not been cancelled). If valid, it will publish to a PubSub saying that the Build Something is complete. The Web Worker which is subscribed to that PubSub will receive the message and send a notification to all connected clients.

Phew.

I hope that will work.

Some thoughts about game restarts/crashes pop up now, especially for delayed tasks. I tend to cancel all tasks when a worker dies rather than relying on Redis to be persistent enough for me. When a game starts, I can read the Build Table and set up delayed tasks again. I think that would work better.

That would be all for now. I'll keep on updating.