Server-Sent Events client using Generators

In the fast-paced world of real-time web applications, keeping users informed with the latest data without requiring them to refresh their browsers is crucial for an engaging user experience.

Server-Sent Events (SSE) offer a straightforward way to push updates from the server to the client, making it an excellent choice for delivering notifications, live feeds, and other real-time updates. However, managing these streams of data can sometimes introduce complexity into the application logic, particularly when dealing with asynchronous event handling. This is where Generators come into play, offering a powerful yet underutilised tool for simplifying asynchronous code.

In this blog post, we will dive into how to implement an SSE client using Generators, providing a detailed guide to harnessing their potential to streamline real-time data handling. By marrying SSE's simplicity with the controlled execution flow of Generators, we can (hopefully!) create more readable, maintainable, and efficient real-time applications.


Generators

Generators in JavaScript are a powerful feature introduced in ES6 that provide a way to pause and resume functions at specific points. They can produce a sequence of values over time and yield control back to the calling code, enabling efficient handling of iterative operations and asynchronous tasks. That makes generators particularly suited for scenarios such as lazy iteration of (potentially) infinite data, complex asynchronous flow control, stateful iterations, cooperative multitasking (co-routines), and data transformation pipelines.

As an example, we can use Generators to compute the Fibonacci sequence. One more traditional implement can be as follows:

function fibonacci(n: number): number[] {
  let a, b = [0, 1];
  let sequence = [a];

  while (sequence.length < n) {
    [a, b] = [b, a + b];
    sequence.push(a);
  }
  
  return sequence;
}

console.log(fibonacci(10)); // Generates the first 10 Fibonacci numbers

One of the main issues with this implementation is that we might start running out of memory for very large sequences (where n is very large) as we need to compute and store them in an array.

With generators, we can lazily compute the sequence, technically to infinity:

function* fibonacci(): number {
    let [a, b] = [0, 1];
    while (true) {
        yield a;
        [a, b] = [b, a + b];
    }
}

const fib = fibonacci();
console.log(fib.next().value); // 0
console.log(fib.next().value); // 1
console.log(fib.next().value); // 1
console.log(fib.next().value); // 2
console.log(fib.next().value); // 3
// and so on...

Generators in JavaScript, as demonstrated through the Fibonacci sequence example, allow for lazy evaluation and on-demand value generation, making them ideal for processing infinite sequences, streaming data, and managing complex asynchronous tasks. Their ability to pause and resume execution streamlines the creation of maintainable and intuitive code for a variety of applications.


Server-Sent Events

Server-Sent Events (SSE) provide a standard way for servers to push real-time updates to web clients using a single, persistent HTTP connection, making them highly efficient for scenarios where the server frequently sends data to the client, such as live feeds or updates. Unlike WebSockets, which support full-duplex communication, SSE is designed for unidirectional flow from server to client, simplifying use in applications that don't require client-initiated messages.

A basic set up for SSE in Node using express looks like:

import express from "express";

const app = express();
const PORT = 3000;

app.get("/events", (req, res) => {
  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");

  // Send a message every 1 seconds
  const intervalId = setInterval(() => {
    const date = new Date();
    res.write(`data: ${JSON.stringify({ time: date.toISOString() })}\n\n`);
  }, 1000);

  // Clean up on close
  req.on("close", () => {
    clearInterval(intervalId);
    res.end();
  });
});

app.listen(PORT, () => {
  console.log(`Server running on http://localhost:${PORT}`);
});

The key here is to set appropriate headers to keep the connection open and periodically send messages in a specific format (data: <message>\n\n).

On the client side, we can consume those events as follows:

<!DOCTYPE html>
<html>
<head>
    <title>SSE Example</title>
</head>
<body>
    <h1>Server Time Updates</h1>
    <div id="serverTime"></div>
    <script>
        const eventSource = new EventSource('http://localhost:3000/events');

        eventSource.onmessage = function(event) {
            const data = JSON.parse(event.data);
            document.getElementById('serverTime').innerText = data.time;
        };

        eventSource.onerror = function(error) {
            console.error('EventSource failed:', error);
            eventSource.close();
        };
    </script>
</body>
</html>

If we wanted to implement this same functionality without SSE, the client would be needing to send an HTTP request to the server every second to get the current server time. Here, we can see how real-time data can be sent to the client from the server, and how we can update the UI based on that data.

Server-Sent Events are a powerful tool for real-time server-client communication, while being more lightweight than WebSockets (and being stateless!). Consuming SSE from a client can be done with basic event handlers, but we've just seen a JavaScript feature that was quite good at handling streams of data and asynchronous flows. We can indeed leverage the features of Generators to build a more robust solution ... and we're going to do just that!


Implementing a Server-Sent Event client using Generators

The example above serves well for simple use cases, but if we'd like to add custom event handling and asynchronous handlers to those events, we can leverage more powerful tools. In our case, let's build a Server-Sent Event client using Generators.

Listening to messages using EventSource

At its core, we are leveraging EventSource to listen to Server-Sent Events. This is encapsulated inside a SSEClient class, which will allows us to build more features around those events.

class SSEClient {
  #url: string;
  #eventSource: EventSource | null = null;

  constructor(url: string) {
    this.#url = url;
    this.#eventSource = null;
  }

  connect(): void {
    this.#eventSource = new EventSource(this.#url);

    this.#eventSource.onmessage = (event) => {
      console.log(event.data);
    }

    this.#eventSource.onerror = (event) => {
      console.error(event);
    }
  }

  close(): void {
    if (this.#eventSource) {
      this.#eventSource.close();
    }
  }
}

To use this basic client, we create a new instance of the SSEClient class by passing in the URL of from which events are being sent, and we call the connect() method on that instance to instantiate the EventSource and start receiving message. We can close the event source by calling the close() method.

const client = new SSEClient("https://example.com/events");
client.connect();
// Starts receiving messages
// ...
client.close();

This client isn't that useful right now, as all it does is log data and errors to the console. To allow for custom handler logic in those cases, we are adding a catch() and an on() methods to the SSEClient class.

export class SSEClient {
  #url: string;
  #eventSource: EventSource | null;

  constructor(url: string) {
    this.#url = url;
    this.#eventSource = null;
  }

  connect(): void {
    // ...
  }

  close(): void {
    // ...
  }

  catch(handleError: (event: Event) => void): void {
    if (!this.#eventSource) {
      throw new Error("EventSource is not initialized");
    }

    this.#eventSource.onerror = (event) => {
      handleError(event);
    };
  }

  on(handleEvent: (event: MessageEvent) => void): void {
    if (!this.#eventSource) {
      throw new Error("EventSource is not initialized");
    }

    this.#eventSource.onmessage = (event) => {
      handleEvent(event);
    };
  }
}

We can know register a handler for messages and a handler for errors as follows:

const client = new SSEClient("https://example.com/events");

client.on((event) => {
  console.log(event.data);
});
client.catch((event) => {
  console.error(event)
});

client.connect();
// Starts receiving messages
// ...
client.close();

Handling events with a Generator

For most purposes, the client we just build would work just fine! But in cases where we expect to run asynchronous event handler and we would like some guarantee that only a single asynchronous handler is running at every single time, we'd need add some more complexity to our code.

But first, let's refactor the code we already have. The main changes we are making is to attach the event handlers in the connect() method, and have the catch() and on() methods just set some functions that then get called in the event handlers.

type EventHandlerMessage = (event: MessageEvent) => void;
type EventHandlerError = (event: Event) => void;

class SSEClient {
  protected _url: string;
  protected _eventSource: EventSource | null;

  private _handleMessage?: EventHandlerMessage;
  private _handleError?: EventHandlerError;

  get url(): string {
    return this._url;
  }

  constructor(url: string) {
    this._url = url;
    this._eventSource = null;
  }

  connect(): void {
    if (!this._eventSource) {
      this._eventSource = new EventSource(this._url);
    }

    this._eventSource.onmessage = (event) => {
	  this._handleMessage?.(event);
    };

    this._eventSource.onerror = (event) => {
      this._handleError?.(event);
    };
  }

  close(): void {
    if (this._eventSource) {
      this._eventSource.close();
      this._eventSource = null;
    }
  }

  catch(handleError: EventHandlerError): void {
    this._handleError = handleError;
  }

  on(handleMessage: EventHandlerMessage): void {
    this._handleMessage = handleMessage;
  }
}

Now, we want to sequentially handle each message as it comes. On top of that, we want to support for asynchronous event handlers. To achieve that, we will leverage a Generator function to give us more control on the whole flow.

//...
type EventHandlerGenerator = AsyncGenerator<
  MessageEvent | undefined,
  unknown,
  MessageEvent
>;

class SSEClient {
  //...
  private _eventHandler: EventHandlerGenerator;

  constructor(url: string) {
    this._url = url;
    this._eventSource = null;
    this._eventHandler = this._createEventHandler();
    this._eventHandler.next();
  }

  connect(): void {
    //...
    this._eventSource.onmessage = async (event) => {
      await this._eventHandler.next(event);
    };
    //...
  }

  //...

  private async * _createEventHandler(): EventHandlerGenerator {
    while (true) {
      const event = yield;
      if (event) {
        await this._handleMessage?.(event);
      }
    }
  }
}

A few things to note here:

  • The _eventHandler generator is an infinite loop that yields on new events and awaits for the registered message handler (_handleMessage). Note that we need to first call _eventHandler.next() to initialise the loop.
  • The onmessage function on our EventSource is now async and calls the _eventHandler generator by passing the next event it receives and then awaits. This guarantees that only one message is processed at a time, even if the processing is asynchronous.

Further improvements

We have covered a decent amount of ground with our SSE client, but we can look at some further "quality of life" improvements!

Status updates

To allow for some observability into our client, we can implement some basic status update mechanism. The status will be tracked internally, and an optional onStatusUpdate callback can be passed to the SSEClient constructor. Note that there is already the possibility to hook a callback on errors using the catch() method.

We define the status as an enum with the same values as EventSource: connecting, open, and closed. Note that the status is closed by default.

//...
enum SSEClientStatus {
  CONNECTING = "CONNECTING",
  OPEN = "OPEN",
  CLOSED = "CLOSED",
}

class SSEClient<T> {
  //...
  protected _status: SSEClientStatus = SSEClientStatus.CLOSED;
  private _onStatusUpdate?: (status: SSEClientStatus) => void;

  //...
  get status(): SSEClientStatus {
    return this._status;
  }

  constructor(url: string, onStatusUpdate?: (status: SSEClientStatus) => void) {
    //...
    this._onStatusUpdate = onStatusUpdate;
  }

  connect(): void {
    if (!this._eventSource) {
      //...
      this._updateStatus(SSEClientStatus.CONNECTING);
    }

    this._eventSource.onopen = () => {
      this._updateStatus(SSEClientStatus.OPEN);
    };
    //...
  }

  close(): void {
    if (this._eventSource) {
      //...
      this._updateStatus(SSEClientStatus.CLOSED);
    }
  }

  //...
  
  private _updateStatus(status: SSEClientStatus): void {
    this._status = status;
    this._onStatusUpdate?.(status);
  }
}
Automatic reconnections

As a further improvement, we can also add automatic reconnection logic to our SSEClient. This will be a basic logic, that will try to reconnect a certain number of times with a similar delay.

We are also slightly changing our SSEClient constructor arguments, taking an optional options object as the second argument and placing the optional onStatusUpdate callback inside that options object. That allows us to add another optional reconnect parameter that controls the automatic reconnection logic.

class SSEClient<T> {
  //...
  protected _reconnect?: {
    attempts: number;
    maxAttempts: number;
    delay: number;
  };

  //...
  constructor(
    url: string,
    options?: {
      onStatusUpdate?: (status: SSEClientStatus) => void;
      reconnect?: { maxAttempts: number; delay: number };
    }
  ) {
    //...
    this._onStatusUpdate = options?.onStatusUpdate;

    if (options?.reconnect) {
      this._reconnect = {
        ...options.reconnect,
        attempts: 0,
      };
    }
  }

  connect(): void {
    //...
    this._eventSource.onerror = (event) => {
      this._handleError?.(event);
      this.close();
      this._handleReconnect();
    };
  }

  //...
  protected _handleReconnect(): void {
    if (this._reconnect) {
      if (this._reconnect.attempts < this._reconnect.maxAttempts) {
        this._reconnect.attempts++;
        setTimeout(() => {
          this.connect();
        }, this._reconnect.delay);
      }
    }
  }
}

There are still a few improvements we can make, but this concludes this article. Please feel free to open a PR in the GitHub repository for this project if you want to contribute further!


GitHub code + npm package

All this code is available on GitHub and as an npm package!

GitHub - AntonioVdlC/sse-gen
Contribute to AntonioVdlC/sse-gen development by creating an account on GitHub.
sse-gen
Server-Sent Event client using Generators. Latest version: 1.0.0, last published: a minute ago. Start using sse-gen in your project by running `npm i sse-gen`. There are no other projects in the npm registry using sse-gen.
Antonio Villagra De La Cruz

Antonio Villagra De La Cruz

Multicultural software engineer passionate about building products that empower people.