Naughty Program Manipulator

    @prats-tech/rx-channels
    TypeScript icon, indicating that this package has built-in type declarations

    1.0.3 • Public • Published

    rx-channels

    A library for manipulate external communications between processes using RxJS.


    Overview

    The purpose of the library is to create a standard interface between channels to facilitate the orchestration of messages between processes, by default the channels are observables and it is implemented in a way where that you can subscribe to an observable to read messages and dispatch messages using the same interface.

    Since messages between processes often use different providers that implement a certain infrastructure complexity (such as SQS, SNS, Redis, Kafka), the library does not provide any implementation beyond the base contract.

    So you can see in the examples below how easy it would be to adapt creating an implementation respecting the interface that uses a provider, for example the AWS SQS (Simple queue service).

    Project architecture

    The project's architecture basically consists of an orchestrator that controls a Channel hashmap that has a defined interface based on observables, each channel has the ability to publish (dispatch) messages and be subscribed to listen them.

    Installation

    # Using NPM
    $ npm install @prats-tech/rx-channels
    
    ---
    
    # Using Yarn
    $ yarn add @prats-tech/rx-channels

    Examples

    Creating a sync channel

    import { ChannelOrchestrator } from '@prats-tech/rx-channels';
    
    
    type SyncMessage {
      name: string;
      date: Date;
    }
    
    const orchestrator = ChannelOrchestrator.getInstance();
    
    // Create your channel
    orchestrator.addChannel({
      config: {
        name: 'channel-test',
      },
    });
    
    // Subscribe incoming messages
    const channel = orchestrator.getChannelObservable('channel-test');
    channel.subscribe<SyncMessage>({
      next: message => {
        console.log(message);
      },
    });
    
    // Dispatch your messages
    const data: SyncMessage = {
      name: 'test',
      date: new Date()
    };
    orchestrator
      .dispatch<SyncMessage>('channel-test', data)

    Creating a async channel

    import { Subject } from 'rxjs';
    
    import { ChannelOrchestrator, ChannelInterface, ChannelType } from '@prats-tech/rx-channels';
    
    
    type AyncMessage {
      name: string;
      date: Date;
    }
    
    class AsyncChannel implements ChannelInterface {
      private subject: Subject<any>;
    
      constructor() {
        this.subject = new Subject();
      }
    
      // implementation of send message for async provider
      private async sqsDispatch() {}
    
      // implementation of listen message from async provider
      private async sqsListen(message) {
        this.subject.next(message);
      }
    
      async dispatch<T = any>(message: T) {
        await this.sqsDispatch(message);
      }
    
      getObservable<T = any>() {
        return this.subject.asObservable();
      }
    
      getName() {
        return 'async-channel';
      };
    
      getType() {
        return ChannelType.Async;
      };
    }
    
    const orchestrator = ChannelOrchestrator.getInstance();
    
    // Create your channel
    orchestrator.addChannel({
      channel: new AsyncChannel()
    });
    
    // Subscribe incoming messages
    const channel = orchestrator.getChannelObservable('channel-test');
    channel.subscribe<AsyncMessage>({
      next: message => {
        console.log(message);
      },
    });
    
    // Dispatch your messages
    const data: AsyncMessage = {
      name: 'test',
      date: new Date()
    };
    orchestrator
      .dispatch<AsyncMessage>('channel-test', data)

    License

    MIT © Prats

    Install

    npm i @prats-tech/rx-channels

    DownloadsWeekly Downloads

    4

    Version

    1.0.3

    License

    MIT

    Unpacked Size

    89.2 kB

    Total Files

    32

    Last publish

    Collaborators

    • gustavobertoirosa
    • vertexportus