Introduction

When Bun was released last September, it shook the JavaScript world significantly because of its lightning-fast speed. Bun is written from the ground up in Zig, a new low-level programming language. Benchmarks show Bun is much faster than Node and Deno, as shown from its benchmarks (~5x faster http web servers and ~6x faster websockets server).

Bun is not a drop-in replacement for Node yet since many Node-specific APIs are still not implemented. It is compatible with NPM, which is 17x faster than pnpm, 29x faster than npm and 33x faster than yarn. Performance-wise, Bun seems to dominate the existing ecosystems.

In terms of websockets, Bun uses μWebSokets implementation, which is blazingly fast since it's written in C and C++. It's a bummer that Bun only provides a Bun-specific Pub/sub API for its websockets. So, let's build our own Pub/Sub protocol on top of Bun's WebSocket.

Designed Pub-Sub Protocols

The Fun Part

First of all, we need to have Bun installed on your system. Once that's taken care of, create a new project with the command bun init. It will ask you a few questions; just answer them as you like. After that, open the newly created project in your favourite code editor.

Bun provides a global variable Bun to interact with native Bun APIs. In our case, we will use the serve method to create a fast HTTP web server. It takes in an options parameter, which you can pass various parameters. We are primarily concerned with the fetch and websocket parameters. Bun allows you to upgrade your HTTP connection to a websocket, but you'll need to write your own upgrade logic, which is done inside the fetch function. The serve method also accepts an optional generic argument for the data property, which acts as the context for the websocket instance. The data property will be available for each websocket instance.

The heart and core of our pub/sub web server will be our PubSubManager class. It will manage subscribing to channels, publishing messages to channels, etc.

// pub-sub-manager.ts

import { ServerWebSocket } from "bun";

export type WSData = {
  id: string;
  createdAt: Date;
};

type ChannelContext = {
  subscribers: ServerWebSocket<WSData>[];
  messages: string[];
};

export class PubSubManager {
  private brokerId: Timer;
  private channels: Record<string, ChannelContext>;

  constructor() {
    this.channels = {};
    this.brokerId = setInterval(this.broker, 50);
  }

  subscribe = (channel: string, ws: ServerWebSocket<WSData>) => {
    console.debug(`[Subscribe]: Channel name ${channel}`);
    if (!this.channels[channel]) {
      this.channels[channel] = {
        subscribers: [],
        messages: [],
      };
    }

    if (
      !this.channels[channel].subscribers.find((s) => s.data.id === ws.data.id)
    )
      this.channels[channel].subscribers.push(ws);
  };

  unSubscribe = (channel: string, ws: ServerWebSocket<WSData>) => {
    if (this.channels[channel])
      this.channels[channel].subscribers = this.channels[
        channel
      ].subscribers.filter((s) => s.data.id !== ws.data.id);
  };

  publish = (channel: string, message: string) => {
    this.channels[channel]?.messages.push(message);
  };

  onConnectionClose = (ws: ServerWebSocket<WSData>) => {
    Object.entries(this.channels).forEach(([_, context]) => {
      context.subscribers = context.subscribers.filter(
        (s) => s.data.id !== ws.data.id
      );
    });
  };

  broker = () => {
    Object.entries(this.channels).forEach(([channel, context]) => {
      if (context.messages.length > 0) {
        context.subscribers.forEach((subscriber) => {
          context.messages.forEach((message) => {
            subscriber.send(JSON.stringify({ message, channel }));
          });
        });
        context.messages = [];
      }
    });
  };

  close = () => {
    clearInterval(this.brokerId);
  };
}

Contents of pub-sub-manager.ts

Let us break down what's happening in the code. The PubSubManager class keeps track of all the channels, their subscribers and message queues in memory (for the sake of simplicity, this can be moved to external services such as Redis). This class can also be used in other runtimes (NodeJs, Deno). The subscribe function here will add a socket to the list of subscribers of a channel. The unsubscribe function does the opposite. The publish function adds the message to the message queue of the provided channel. The onConnectionClose method is there to handle the websocket disconnect.

The main thing to notice here is the broker function. This function runs periodically; in the example, it's called every 50ms. On every run, it serially checks for the messages in all the channels and, if present, broadcasts them and clears the message queue. It is somewhat similar to the event loop. This is done here for simplicity; we can use RxJS and observables to make this more performant.

Now, let's actually use the PubSubManager in our Bun web server. In the index.ts file, populate the following:

// index.ts

import { PubSubManager, WSData } from "./pub-sub-manager";

type WsMessage =
  | {
      type: "subscribe";
      channel: string;
    }
  | {
      type: "unsubscribe";
      channel: string;
    }
  | {
      type: "publish";
      channel: string;
      message: string;
    };

const host = "0.0.0.0";
const port = 1337;

const manager = new PubSubManager();

Bun.serve<WSData>({
  hostname: host,
  port: port,
  fetch: (req, server) => {
    const clientId = crypto.randomUUID();
    server.upgrade(req, {
      data: {
        id: clientId,
        createdAt: Date(),
      },
    });
    return new Response("Upgrade Failed", { status: 500 });
  },
  websocket: {
    open(ws) {
      console.debug(`Client connected: ${ws.data.id}`);
    },
    message: (ws, message) => {
      const payload = JSON.parse(message as string) as WsMessage;

      switch (payload.type) {
        case "subscribe":
          manager.subscribe(payload.channel, ws);
          break;
        case "unsubscribe":
          manager.unSubscribe(payload.channel, ws);
          break;
        case "publish":
          manager.publish(payload.channel, payload.message);
          break;
      }
    },
    close(ws) {
      console.debug(`Closing connection with client: ${ws.data.id}`);
      manager.onConnectionClose(ws);
    },
  },
});

process.on("beforeExit", () => {
  manager.close();
});

console.debug(`Listening on ws://${host}:${port}`);

Contents of index.ts

Here, our server is running at 0.0.0.0 and port 1337. In the fetch property, we are upgrading the connection from HTTP to WebSockets. Also, we are adding additional data to the socket context: the socket id (whose value is a random UUID) and the date and time of socket connection establishment. This data is available anywhere in the socket instance. We can also handle request validation and auth handling in this function.

It's Showtime

Let's start our API server with the following command in the project root:

bun src/index.ts

Now, let us check out our server in action. Open Postman/Insomnia or any of your favourite API testing clients and open a websocket connection to

ws://0.0.0.0:1337.

Voilà! We have established a websocket connection to our server.

Connected to Websocket Server

Now let's connect to the channel "channel1". Let's send the following JSON data

{
  "type": "subscribe",
  "channel": "channel1"
}

Message to subscribe to "channel1"

We can see that we are subscribed to "channel1".

Subscribing to "channel1"

Now let us try sending messages to "channel1". Send the following JSON

{
  "type": "publish",
  "channel": "channel1",
  "message": "Hello channel1"
}

JSON to send message to "channel1"

Just as we hit send, we will receive the message from channel1 that we have subscribed to.

Sent message received from "channel1"

Conclusion

We learnt how to implement a pub/sub websocket server in Bun. Since Bun is still in its early stages, there is much more to learn. We shall cover them in future blogs.

Thank you for reading this article. Please consider leaving a comment below if you liked it.