import { Client } from 'pg';
import { PubSub, ClientProvider, Contract, encoder, decoder } from 'pg-events';
enum Channel {
myChannel = 'myChannel',
}
interface Protocol {
[Channel.myChannel]: { value: number };
}
const options = {
user: 'postgres',
password: 'postgres',
database: 'postgres',
};
const provider = new ClientProvider();
function isMyChannelMessage(obj: any): obj is Protocol['myChannel'] {
return obj && typeof obj === 'object' && typeof obj.value === 'number' && obj.value >= 0;
}
function checkMyChannelPayload(payload: any) {
if (!isMyChannelMessage(payload)) {
throw new Error('myChannel.value must be a positive number');
}
}
const strictPubSub = new PubSub<Contract<Protocol>>({
encoder: {
async encode(payload: object): Promise<string> {
checkMyChannelPayload(payload);
return encoder.encode(payload);
},
},
decoder: {
async decode(data: string): Promise<object> {
const payload = await decoder.decode(data);
checkMyChannelPayload(payload);
return payload;
},
},
provider,
});
(async () => {
const client = new Client(options);
await client.connect();
provider.next(client);
strictPubSub.on(Channel.myChannel, async ({ value }) => {
console.warn('Received ' + value);
});
strictPubSub.on('unprocessed', async (error) => {
console.warn(error);
});
await strictPubSub.subscribe(Channel.myChannel);
console.warn('First call sends 5');
await strictPubSub.publish(Channel.myChannel, { value: 5 });
console.warn('Second call sends -1');
try {
await strictPubSub.publish(Channel.myChannel, { value: -1 });
} catch (error) {
console.warn('It fails because -1 is not a negative number');
}
await strictPubSub.end();
await client.end();
})();