pg-events

1.0.2 • Public • Published

TypeSafe PubSub with PostgreSQL

version coverage minsize license node

Install

npm i --save pg-events

Usage

import { Client } from 'pg';
import { PubSub, ClientProvider, Contract, encoder, decoder } from 'pg-events';

// 1. Define channels
enum Channel {
  myChannel = 'myChannel',
}

// 2. Define protocol
interface Protocol {
  [Channel.myChannel]: { message: string };
}

// 3. Define conection options
const options = {
  user: 'postgres',
  password: 'postgres',
  database: 'postgres',
};

// 4. Initialize connection provider
const provider = new ClientProvider();

// 5. Initialize pubsub instance
const pubSub = new PubSub<Contract<Protocol>>({
  decoder, // default object->JSON decoder
  encoder, // default JSON->object encoder
  provider, // connection emitter
});

// 6. Usage

(async () => {
  // 7. Initialize PG connection
  const client = new Client(options);
  // 8. Connect to PostgreSQL server
  await client.connect();

  // 9. Emit connection to PubSub
  provider.next(client);

  // 10. Add event listener
  pubSub.on(Channel.myChannel, async ({ message }) => {
    console.warn('Received ' + message);
  });

  // 11. Subscribe to channel
  await pubSub.subscribe(Channel.myChannel);

  // 12. Emit message
  await pubSub.publish(Channel.myChannel, { message: 'Hello!' });

  // 13. Close PubSub
  await pubSub.end();
  await client.end();
})();

Strict mode

Implement AsyncEncoder to validate payload before broadcasting.

Implement AsyncDecoder to ensure valid payloads are received

import { Client } from 'pg';
import { PubSub, ClientProvider, Contract, encoder, decoder } from 'pg-events';

// 1. Define channels
enum Channel {
  myChannel = 'myChannel',
}

// 2. Define protocol
interface Protocol {
  [Channel.myChannel]: { value: number };
}

// 3. Define conection options
const options = {
  user: 'postgres',
  password: 'postgres',
  database: 'postgres',
};

// 4. Initialize connection provider
const provider = new ClientProvider();

// 4.1 Define validators
function isMyChannelMessage(obj: any): obj is Protocol['myChannel'] {
  return obj && typeof obj === 'object' && typeof obj.value === 'number' && obj.value >= 0;
}

function checkMyChannelPayload(payload: any) {
  if (!isMyChannelMessage(payload)) {
    throw new Error('myChannel.value must be a positive number');
  }
}

// 5. Initialize strict pubsub instance
const strictPubSub = new PubSub<Contract<Protocol>>({
  // Custom async encoder which validates payload before publishing
  encoder: {
    async encode(payload: object): Promise<string> {
      checkMyChannelPayload(payload);
      return encoder.encode(payload);
    },
  },
  // Custom async decoder which validates payload before emitting
  decoder: {
    async decode(data: string): Promise<object> {
      const payload = await decoder.decode(data);
      checkMyChannelPayload(payload);
      return payload;
    },
  },
  provider, // connection emitter
});

// 6. Usage

(async () => {
  // 7. Initialize PG connection
  const client = new Client(options);
  // 8. Connect to PostgreSQL server
  await client.connect();

  // 9. Emit connection to PubSub
  provider.next(client);

  // 10. Add event listener
  strictPubSub.on(Channel.myChannel, async ({ value }) => {
    console.warn('Received ' + value);
  });

  strictPubSub.on('unprocessed', async (error) => {
    console.warn(error);
  });

  // 11. Subscribe to channel
  await strictPubSub.subscribe(Channel.myChannel);

  // 12. Emit message
  console.warn('First call sends 5');
  await strictPubSub.publish(Channel.myChannel, { value: 5 });

  // 13. Emit message with invalid payload
  console.warn('Second call sends -1');
  try {
    await strictPubSub.publish(Channel.myChannel, { value: -1 });
  } catch (error) {
    console.warn('It fails because -1 is not a negative number');
  }
  // 14. Close PubSub
  await strictPubSub.end();
  await client.end();
})();

Dependents (0)

Package Sidebar

Install

npm i pg-events

Weekly Downloads

0

Version

1.0.2

License

MIT

Unpacked Size

55 kB

Total Files

87

Last publish

Collaborators

  • crabicode