@eduardorothdev/rxjs-mqtt
TypeScript icon, indicating that this package has built-in type declarations

1.1.2 • Public • Published

@eduardorothdev/rxjs-mqtt

RxJS Wrapper for mqtt with TS support. Based on async-mqtt

Install

npm i @eduardorothdev/rxjs-mqtt

Using it

You first need to call and await the connect function, which will return a MqttClient object with updated methods.

The methods for this MqttClient object are the same as one generated with mqtt but has the following methods modified:

  • Promises
    • .publish()
    • .subscribe()
    • .unsubscribe()
    • .end()
  • Observables
    • .on()
      • This method returns an observable that emits any time it receives from the event being listened.
    • .onJsonMessage<T>() New method
      • This is a helper method that extends the on() one, getting only the events of message sent from the MQTT server, and parsing them to JSON
      • <T> is the interface or type that the method will try to parse the received message

In addition to those, I added two more Operator Function helpers that you can chain to the pipe method of the Observable returned, that will help you easily parse the received events.

  • Helpers
    • parsePayload<T>(parser: (buffer: Buffer) => T | string = (buffer: Buffer) => buffer.toString()
      • This Operator Function helper allows you to parse the bytes payload received from the MQTT event with a custom function that will convert the bytes to anything.
    • parsePayloadToJSON()
      • This Operator Function helper extends the previous function to easily convert the received bytes to JSON.

Example code

import { connect } from "@eduardorothdev/rxjs-mqtt";

try {
  const client = await connect("mqtt://test.mosquitto.org", {
    //username: 'user',
    //password: 'pass',
  });
  // subscribe to a topic
  await client.subscribe("some/topic/#");
  const sub = client
    .onJsonMessage<{
      some: string;
      property: string;
      mapping: boolean;
    }>()
    .pipe(
      catchError((err) => {
        // we have to catch the error so the
        // observable pipe doesn't stop sending messages
        return of(null);
      }),
    )
    .subscribe((jsonMessage) => {
      // { some: 'hello', property: 'from mqtt', mapping: true }
      console.log(jsonMessage);
    });

  // later you can unsubscribe when needed.
  // this will unsubscribe from the onJsonMessage observable pipe
  // not from the topic subscription
  // sub.unsubscribe();

  // Unsubscribe from the topic subscription
  // await client.unsubscribe('some/topic/#');
} catch (err) {
  // connection/subscription-to-topic errors
  console.log(err);
}

License

Licensed under MIT.

Package Sidebar

Install

npm i @eduardorothdev/rxjs-mqtt

Weekly Downloads

0

Version

1.1.2

License

MIT

Unpacked Size

23.2 kB

Total Files

13

Last publish

Collaborators

  • eduardoroth