Foundations for pushing notifications to WebSocket clients and receiving commands from them

Foundations for pushing notifications to WebSocket clients and receiving commands from them

Amir Eldor's photo
Amir Eldor

Published on Jun 17, 2021

3 min read

Hello there. Today I will write about how I lay down the foundations for async communications between the clients and the server using WebSockets for my online persistent-universe single-player/co-op space game. I will also write a bit about my next steps which include wiring up all the things needed for handling user actions and processes in the game.

Most WebSocket tutorials end up in a sequential send/recv loop, but in my game, where I need to push messages asynchronously such as "construction complete" or "new stars were discovered" or a dreadful "space dragons are inbound in 25 hours" without waiting for an input from the WebSocket client, a slightly different tactic should be used.

The basic concept, taken straight from my WebSocket route handling class looks like this:

# start_loop is called after successful authentication
async def start_loop(self):
    sub = self.auth_data["sub"]
    try:
        listener = self.listener(self.websocket)
        poster = self.poster(self.websocket)
        await gather(listener, poster) # from the asyncio module
    except WebSocketDisconnect:
        logger.info(f"{sub} disconnected, bye")
        self.disconnect()

The listener coroutine looks like the following. Receive from the socket, then handle the command (not implemented yet).

async def listener(self, websocket: WebSocket):
    sub = self.auth_data["sub"]
    while True:
        try:
            data = await websocket.receive_json()
            command = data["command"]
        except JSONDecodeError:
            await self.websocket.send_text("JSON. JSON please")
            logger.warning(f"{sub} sent a bad JSON in request, very suspicious!")
            continue
        except KeyError:
            command = None
        await self.websocket.send_json(
            ok_response(None, "thank you", command=command)
        )

And the poster, with a POC for posting to the WebSocket asynchronously every 5 seconds:

async def poster(self, websocket: WebSocket):
    while True:
        await sleep(5.0) # TODO: await an Event from some Events Queue
        await websocket.send_json(
            ok_response(MessageType.Unknown, "Jamaica. Let's go to Jamaica.")
        )

And in the browser, we can see pushes and commands sent independently:

image.png

Nice!

As for my next steps, I plan to implement the following logic as I described in a previous post (details below):

04 - workers and queue.png

As I receive a WebSocket command, say... "Build Something", the Web Process will create a Celery task for that. A Worker Process will catch the task and create a DB entry in some Build Table for later reference and also create a delayed task ETA-ed for the time it would be completed. When the delayed task is due, a Worker will catch that and will check if the task is still valid in the Build Table (e.g. has not been cancelled). If valid, it will publish to a PubSub saying that the Build Something is complete. The Web Worker which is subscribed to that PubSub will receive the message and send a notification to all connected clients.

Phew.

I hope that will work.

Some thoughts about game restarts/crashes pop up now, especially for delayed tasks. I tend to cancel all tasks when a worker dies rather than relying on Redis to be persistent enough for me. When a game starts, I can read the Build Table and set up delayed tasks again. I think that would work better.

That would be all for now. I'll keep on updating.

 
Share this