Wondering what’s next for npm?Check out our public roadmap! »

    loopback-pubsub-component
    TypeScript icon, indicating that this package has built-in type declarations

    0.2.1 • Public • Published

    loopback-pubsub-component

    A PubSub component for LoopBack 4, trying to follow GraphQL specs and inspired by graphql-subscriptions

    This is a generic component that can wraps "any" PubSub client/broker in your own repository and strategy provider.

    Installation

    Run the following command to install loopback-pubsub-component:

    npm i -s loopback-pubsub-component

    Usage

    Import component

    When the loopback-pubsub-component package is installed, bind it to your application with app.component()

    import {RestApplication} from '@loopback/rest';
    import {PubSubBindings, PubSubComponent, PubSubStrategyProvider} from 'loopback-pubsub-component';
     
    const app = new RestApplication();
     
    app.bind(PubSubBindings.CONFIG).to({
      eventEmitter: new EventEmitter(),
      // client: mqttClient
    });
    app.component(PubSubComponent);
    app.bind(PubSubBindings.PUBSUB_STRATEGY).toProvider(PubSubStrategyProvider);
     

    Create a repository

    Create a repository that implements your client/broker logic, here an example for a simple EventEmitter. You could create several repositories, with MQTT client or other kinds of PubSub clients.

    import {inject} from '@loopback/core';
    import {
      PubSubEngine,
      PubSubBindings,
      PubSubConfig,
      PubSubAsyncIterator,
    } from 'loopback-pubsub-component';
    import {EventEmitter} from 'events';
     
    export class PubSubEERepository extends PubSubEngine {
      protected ee: EventEmitter;
      public subscriptions: {[subId: number]: [string, (...args: any[]) => void]};
      public subIdCounter: number;
     
      constructor(@inject(PubSubBindings.CONFIG) options: PubSubConfig) {
        super();
        if (options.eventEmitter) {
          this.ee = options.eventEmitter;
        } else {
          this.ee = new EventEmitter();
        }
        this.subscriptions = {};
        this.subIdCounter = 0;
      }
     
      public publish(triggerName: string, payload: any): Promise<void> {
        this.ee.emit(triggerName, payload);
        return Promise.resolve();
      }
     
      public subscribe(
        triggerName: string,
        onMessage: (...args: any[]) => void,
        options?: Object,
      ): Promise<number> {
        this.ee.addListener(triggerName, onMessage);
        this.subIdCounter = this.subIdCounter + 1;
        this.subscriptions[this.subIdCounter] = [triggerName, onMessage];
        return Promise.resolve(this.subIdCounter);
      }
     
      public unsubscribe(subIdOrTriggerName: number | string) {
        if (typeof subIdOrTriggerName === 'string') {
          return this.unsubscribeByName(subIdOrTriggerName);
        }
        return this.unsubscribeById(subIdOrTriggerName);
      }
     
      public asyncIterator<T>(triggers: string | string[]): AsyncIterator<T> {
        return new PubSubAsyncIterator<T>(this, triggers);
      }
     
      private unsubscribeById(subId: number) {
        const [triggerName, onMessage] = this.subscriptions[subId];
        delete this.subscriptions[subId];
        this.ee.removeListener(triggerName, onMessage);
        return Promise.resolve();
      }
     
      private unsubscribeByName(triggerName: string) {
        const subIds: number[] = Object.keys(this.subscriptions).map(Number);
        for (const subId of subIds) {
          if (this.subscriptions[subId][0] === triggerName) {
            const onMessage = this.subscriptions[subId][1];
            delete this.subscriptions[subId];
            this.ee.removeListener(triggerName, onMessage);
            break;
          }
        }
        return Promise.resolve();
      }
    }

    Strategy provider

    Create a strategy provider that implements your custom logic. If you have several repositories, inject them and create a function to switch between repositories with trigerName filtering.

    import {inject, Provider, ValueOrPromise} from '@loopback/core';
    import {repository} from '@loopback/repository';
    import {
      PubSubBindings,
      PubSubConfig,
      PubSubStrategy,
      PubSubMetadata,
    } from 'loopback-pubsub-component';
    import {PubSubEERepository, PubSubMQTTRepository} from '../repositories';
     
    export class PubSubStrategyProvider implements Provider<PubSubStrategy | undefined> {
      private engines: (PubSubEERepository | PubSubMQTTRepository)[];
     
      constructor(
        @inject(PubSubBindings.METADATA) private metadata: PubSubMetadata,
        @repository(PubSubEERepository) protected pubsubEERepo: PubSubEERepository,
        @repository(PubSubMQTTRepository) protected pubsubMQTTRepo: PubSubMQTTRepository,
      ) {
        this.engines = [this.pubsubEERepo, this.pubsubMQTTRepo];
      }
     
      selectRepository(triggerNames: string | string[]): PubSubEERepository | PubSubMQTTRepository {
        // filter triggerName or triggerNames[0]
        return this.engines[0];
      }
     
      value(): ValueOrPromise<PubSubStrategy | undefined> {
        const self = this;
     
        return {
          setConfig: async (config?: Object) => {
            const pubsubConf: PubSubConfig = {};
            return pubsubConf;
          },
     
          publish: async (triggerName: string, payload: any) => {
            const engine = this.selectRepository(triggerName);
            await engine.publish(triggerName, JSON.stringify(payload));
          },
     
          subscribe: (triggerName: string, onMessage: (...args: any[]) => void, options?: Object) => {
            const engine = this.selectRepository(triggerName);
            // return Promise.all([engines.map( engine => engine.subscribe(triggerName, onMessage, options))])
            return engine.subscribe(triggerName, onMessage, options);
          },
     
          unsubscribe: async (triggerName: string) => {
            const engine = this.selectRepository(triggerName);
            await engine.unsubscribe(triggerName);
          },
     
          asyncIterator(triggers: string | string[]) {
            const engine = self.selectRepository(triggers);
            return engine.asyncIterator(triggers);
          },
     
        };
      }
    }
     

    Use in a controller

    Inject the bindings, to make available PubSubStrategy provider functions.

    import {inject} from '@loopback/context';
    import {repository} from '@loopback/repository';
    import {
      post,
      param,
      get,
      patch,
      put,
      Request,
      requestBody,
      RestBindings,
    } from '@loopback/rest';
    import {PubSubBindings} from 'loopback-pubsub-component';
    import {Device} from '../models';
    import {DeviceApi, devicesApiEndPoint} from '../services';
    import {getToken} from '../utils';
     
    const security = [
      {
        Authorization: [],
      },
    ];
     
    export class DeviceController {
      constructor(
        @inject('services.DeviceApi') protected deviceApi: DeviceApi,
        @inject(RestBindings.Http.REQUEST) public request: Request,
        @inject(PubSubBindings.publish) public publish: PubSubPublishFn,
      ) {}
     
      @post(`/${devicesApiEndPoint}`, {
        operationId: 'createDevice',
        security,
        responses: {
          '200': {
            description: 'Device instance',
            content: {'application/json': {schema: {'x-ts-type': Device}}},
          },
        },
      })
      async create(@requestBody() device: Device): Promise<Device> {
        const token = getToken(this.request);
        const result = await this.deviceApi.create(token, device);
        await this.publish(this.request.path, result)
        return result;
      }
     
    }
     

    TODO

    • Adding decorator to use it like a router in a Controller ( @publish, @subscribe ... ) and control access ( when using broker )

    License

    LoopBack

    Install

    npm i loopback-pubsub-component

    DownloadsWeekly Downloads

    1

    Version

    0.2.1

    License

    ISC

    Unpacked Size

    91.6 kB

    Total Files

    65

    Last publish

    Collaborators

    • avatar