- stream-watch
- Importing
- Basic usage
- API
This package was created with the specific goal of eliminating the noise of stream online/offline handling for Twitch bots. However, it carries no production dependencies and does not rely on any Twitch-specific behavior, and could conceivably be applied to any two-state system with potentially noisy inputs where the developer wants to consolidate event-driven behavior according to some configurable policy. Some examples may help, so here are some use-cases:
A streamer wants to automate the task of sending a notification to Discord when the stream goes live. The notification includes an @mention
which pings people. People are unusually feisty about getting pinged unnecessarily.
While it's relatively straightforward to set up an event listener for one of Twitch's go-live notifications, the downside is that if the stream is interrupted for any reason at all and goes offline then online again, an unwanted message is sent.
Conversely, if PubSub or EventSub fails to send a notification, none will ever sent even when the stream is live.
A streamer has a chat bot that runs games for chatters to amuse themselves with while the stream is offline. However, with a large audience while live, the bot is too spammy and detracts from chat interaction with the streamer.
Again, it's relatively straightforward to set up an event listener to react to "offline" and enable the bot, or react to "online" and disable the bot. However, the same problems arise when something goes amiss: the bot can wind up being active while the stream is online or inactive when the stream is offline if notifications don't get delivered, and if there's a temporary stream outage, unruly chatters can create unnecessary noise in the brief period before the stream comes back online.
Twitch has two "push" style notifications systems: PubSub and EventSub. They usually work as expected, but sometimes they don't, or are significantly delayed. There is also an API you can call to directly request the status of a stream.
One way you might choose to address the unreliable delivery is to add a second source of information, say by polling the API. Once you decide to do that, you now have to deal with combining the results you observe into something that behaves how you want. That's what this library does.
The Watcher
class exported by this library accepts any number of online or offline notifications, and maintains an awareness of the "canonical" state of the stream. You may then bind event listeners to be notified when the state changes. These listeners are only called once per transition, regardless of how many times the watcher was notified of a state.
Each listener may be further configured with policies defining how to behave. For example, adding a DelayPolicy
will cause the listener to be called only after a certain amount of time has passed without a state change: "Run this code only when the stream has been offline for at least an hour". Similarly, adding a ThrottlePolicy
will cause the listener to be called no more frequently than a specified period: "Run this code a maximum of once per 8 hours"
When a Watcher is first initialized, it doesn't know if the stream is online or offline. It begins in the "Unknown" state until the first update it receives tells it what the current state is.
Both ThrottlePolicy
and DelayPolicy
have options that alter their behavior on the first transition (from unknown to online / offline) to allow you to configure whether you want to be notified about this initial transition. ThrottlePolicy
has notifyOnInitial
, which (when set to true
) causes an event to be emitted for transitions from "Unknown" to whatever the desired state is. DelayPolicy
has noDelayOnInitial
, which (when set to true
) causes the first event to be emitted immediately instead of after the configured delay.
This package is dual-published with both ESM and CJS; both include types. The ESM import path is stream-watch
, and the CJS import path is stream-watch/cjs
.
If you are using {"type": "module"}
, you should use:
import { Watcher } from 'stream-watch';
If you are using {"type": "commonjs"}
(or "type"
is not set), you should use:
const { Watcher } = require('stream-watch/cjs');
If you see an error to the effect of Error [ERR_PACKAGE_PATH_NOT_EXPORTED]
, you're likely using the wrong import path for your package setup.
The primary exported class is Watcher
. It takes an argument which is a Promise-returning function that resolves to either null
or a generically-typed Stream
. Online listeners will receive this type when notified of an online
event.
// twitch user id to track
const STREAMER_ID = '241636';
// ----- Set up Twurple -----
import { ApiClient, type HelixStream } from '@twurple/api';
import { StaticAuthProvider } from '@twurple/auth';
import { EventSubWsListener } from '@twurple/eventsub-ws';
const clientId = process.env.CLIENT_ID!;
const authToken = process.env.ACCESS_TOKEN!;
// set up the auth provider (requires client_id and client_secret for a Twitch app)
// NOTE: you should use a RefreshingAuthProvider, but the code required
// to demonstrate loading and saving on refresh would complicate the example
// a bit too much...
const authProvider = new StaticAuthProvider(clientId, authToken);
// set up the API client
const apiClient = new ApiClient({ authProvider });
// set up an EventSub subscription
const listener = new EventSubWsListener({
// NOTE: Twurple's API does not currently provide a way to use
// websocket eventsub listeners to subscribe to a user id that
// is not the same as the user id of the authProvider. see
// the workaround at the bottom of this example
apiClient: new ApiClient({ authProvider }),
});
// ----- Set up stream-watch -----
import { Duration, Watcher } from 'stream-watch';
// define the function to retrieve the stream status
const getStream = (): Promise<HelixStream | null> => apiClient.streams.getStreamByUserId(STREAMER_ID);
// create the Watcher instance
const watcher = new Watcher({
logger: true,
getStream,
});
// without any policy configured, these will just pass through every
// online or offline state change, but it will deduplicate the transitions.
// you can expect that once you've received an "online" event that you will
// not receive another until there has first been an "offline" event
watcher.on('online', (stream: HelixStream) => {
console.log('stream is now live', stream.gameName);
// stream is live, do something
});
watcher.on('offline', () => {
console.log('stream is now offline');
// stream ended, do something
});
// start the watcher polling, as a backup. `immediately: true`
// performs a poll immediately, to get the initial state
watcher.poll({ every: Duration.minute(10), immediately: true });
// use Twurple to feed in EventSub notifications
// add notifications from EventSub
listener.onStreamOnline(STREAMER_ID, async () => {
// twitch does not provide the stream data, so we use
// the eventsub notification to trigger an api call to
// fetch it. Watcher is generic over the return type of
// the `getStream` function we passed it initially, so
// that subscribers can always have that value available.
watcher.update(await getStream());
});
listener.onStreamOffline(STREAMER_ID, () => {
// for offline notifications, we just send null.
watcher.update(null);
});
// connect the EventSub websocket and start listening
listener.start();
This example shows both polling and integrating an external event source.
Online event with a throttle policy:
import { ThrottlePolicy } from 'stream-watch';
watcher.on(
'online',
(stream: HelixStream) => {
// do some work, e.g. notify Discord
},
{
throttle: new ThrottlePolicy({
// do something when stream comes online, but only if it's been at least
// 6 hours since the last time the event triggered
atMostOncePer: Duration.hour(6),
// if the stream is online when the code first runs, do not emit anything
notifyOnInitial: false,
}),
}
);
Offline event with a delay policy:
import { DelayPolicy } from 'stream-watch';
watcher.on(
'offline',
() => {
// do some work, e.g. enable offline bot commands
},
{
delay: new DelayPolicy({
// do something once stream has been online for the last 30 minutes
waitAtLeast: Duration.minute(30),
// run immediately on initial startup if stream is offline
noDelayOnInitial: true,
}),
}
);
Watcher has one generic argument, Stream
, which by default is inferred from the return type of the getStream
function
const watcher = new Watcher({
// required
getStream: apiClient.streams.getStreamByUserId(STREAMER_ID),
// optional
logger: defaultLogger,
// optional
humanizer: defaultHumanizer,
});
A function that returns a promise for either a stream object (when online) or null (when offline). Used to get initial status and for polling.
Set to true
to use the default logger, undefined
to disable logging, or something matching the Logger
interface to use your own code. The Logger
interface just matches console.log
and console.error
:
export interface Logger {
info(...args: any[]): void;
error(...args: any[]): void;
}
Used when logging is enabled to render durations into friendly strings such as "15 minutes". Defaults to defaultHumanizer
, but you can supply your own by meeting the interface:
export interface Humanizer {
(ms: number, since?: number): string;
}
The first argument (ms
) is the value, in milliseconds, to represent. If the second argument (since
) is given, then ms
and since
are both timestamps (in milliseconds), and the function should return the relative time between them.
Begin polling. Returns a function to disable polling. If polling is already enabled, calling again will replace the previous polling configuration with the new configuration.
import { Duration } from 'stream-watch';
const stopPolling = watcher.poll({
every: Duration.minute(5),
immediately: true,
});
// stop polling
stopPolling();
Specify how frequently to poll for status (in milliseconds). You can use the Duration
helper for convenience, e.g. Duration.minute(15)
or Duration.second(30)
When true
, performs a poll immediately. When false
, the first poll will occur after every
milliseconds.
Used to integrate external online/offline notifications. Just call update
with either null or a value of the same type as the getStream
function you passed to the constructor.
// provide a "stream offline" input
watcher.update(null);
// provide a "stream online" input
watcher.update(stream);
Register a handler for stream online/offline events.
The policies
argument is the same for all events. If no policies are specified, the handler will be called once per transition. In other words, an offline handler will be called when the stream changes from Online
to Offline
. This includes the initial transition from Unknown
. To exclude the initial transition, you may specify a ThrottlePolicy
with {notifyOnInitial: false}
.
If the watcher's last-known state is Offline
it will not emit duplicate Offline
events when receiving an update / poll result that indicates the stream is (still) offline.
Register an online
handler with the given policy configuration. Returns an unsubscribe function.
const subscription = watcher.on(
'online',
(stream: Stream) => {
// called with the latest-known value of stream
},
// policy argument is optional, as are
// all properties
{
throttle: throttlePolicy,
delay: delayPolicy,
}
);
// stop calling the event handler
subscription.unsubscribe();
Register an offline
handler with the given policy configuration. Returns an unsubscribe function.
const subscription = watcher.on(
'offline',
() => {
// called without any argument
},
// policy argument is optional, as are
// all properties
{
throttle: throttlePolicy,
delay: delayPolicy,
}
);
// stop calling the event handler
subscription.unsubscribe();
Enable a throttle policy. Supply an instance of ThrottlePolicy
.
Enable a delay policy. Supply an instance of DelayPolicy
.
Returns the current / last-known status of the stream (in other words, the most recent call to update
or poll result).
import { StreamState } from 'stream-watch';
const { status, stream } = watcher.getStatus();
switch (status) {
case StreamState.Unknown:
// initial status has not yet been determined
// `stream` is undefined
break;
case StreamState.Online:
// stream is currently online
console.log(stream.gameName); // `stream` is a Stream
break;
case StreamState.Offline:
// stream is currently offline
// `stream` is undefined
break;
}
Stops polling, ceases notifying any event listeners, and causes any further method calls to throw an error.
A helper class allowing you to unregister a subscription or inspect the status of that subscription's notifications.
Stop sending events to the associated handler.
subscription.unsubscribe()
Return the current status of this Subscription:
export type SubscriptionStatus =
| {
state: StreamState.Unknown | StreamState.Online | StreamState.Offline;
remaining: undefined;
}
| {
state: StreamState.PendingOnline | StreamState.PendingOffline;
remaining: number;
};
PendingOnline
and PendingOffline
represent a subscription that's in the delay phase of a DelayPolicy
-- that is, the stream is known to be online, but it hasn't yet been online for long enough. For the Pending
states, an additional property (remaining
) contains the number of milliseconds until the event handler will be called, if the notification is not aborted by another stream status change.
Define a throttle policy for a subscription
const throttlePolicy = new ThrottlePolicy({
atMostOncePer: Duration.hour(6),
notifyOnInitial: false,
});
watcher.on('event', handler, {
throttle: throttlePolicy,
});
The handler will be called at most once per this many milliseconds.
When true
, the handler will be called when the stream state changes from Unknown
to Online
or Offline
. When false
, it will not be called on the initial transition.
Define a delay policy for a subscription
const delayPolicy = new DelayPolicy({
waitAtLeast: Duration.minute(30),
noDelayOnInitial: true,
});
watcher.on('event', handler, {
delay: delayPolicy,
});
The handler will not be called until at least this many milliseconds have passed without an online/offline state change.
When true
, the handler will be called without delay when the stream state changes from Unknown
to Online
or Offline
. When false
, the initial transition will also be subject to delay.