TransportSubject
Message-based communication transport as a RxJS Subject
Installation
npm install @ceramicnetwork/transport-subject
Usage
import { TransportSubject } from '@ceramicnetwork/transport-subject'
import { Subscriber, interval } from 'rxjs'
import { map } from 'rxjs'
type Message = { type: string }
class MyTransport extends TransportSubject<Message> {
constructor(time = 1000) {
const source = interval(time).map(() => ({ type: 'ping' }))
const sink = new Subscriber((message) => {
console.log('send message', message)
})
super(source, sink)
}
}
const transport = new MyTransport()
transport.subscribe((message) => {
console.log('received message', message)
})
transport.next({ type: 'pong' })
Types
Wrapped
type Wrapped<Message, Namespace extends string> = { __tw: true; msg: Message; ns: Namespace }
Wrapper
type Wrapper<MsgIn, MsgOut, WrappedOut> = {
wrap: (msg: MsgOut) => WrappedOut
unwrap: (input: any) => MsgIn
}
UnwrapOperatorOptions
type UnwrapOperatorOptions = {
onInvalidInput?: (input: unknown, error: Error) => void
throwWhenInvalid?: boolean
}
API
TransportSubject class
Extends RxJS Subject class
Type parameters
-
MsgIn
: the type of the messages coming in from thesource
-
MsgOut = MsgIn
: the type of the messages going out to thesink
new TransportSubject()
Arguments
.next()
Arguments
message: MsgOut
Returns void
createWrap()
Type parameters
MsgOut
Namespace extends string = string
Arguments
namespace: Namespace
Returns (msg: MsgOut) => Wrapped<MsgOut, Namespace>
, see Wrapped type
createUnwrap()
Type parameters
MsgIn
Namespace extends string = string
Arguments
namespace: Namespace
Returns (input: any) => MsgIn
createWrapper()
Combines createWrap()
and createUnwrap()
Type parameters
-
MsgIn
: the type of the messages coming in from the returned transport -
MsgOut = MsgIn
: the type of the messages pushed to the returned transport Namespace extends string = string
Arguments
namespace: Namespace
Returns Wrapper<MsgIn, MsgOut, Wrapped<MsgOut, Namespace>>
, see Wrapper and Wrapped types
createUnwrapOperator()
Type parameters
-
WrappedIn
: the type of the messages coming in from the inputsource
-
MsgIn
: the type of the messages coming in from the returned observable
Arguments
unwrap: (input: any) => MsgIn
options?: UnwrapOperatorOptions = {}
Returns OperatorFunction<WrappedIn, MsgIn>
createWrapObserver()
Type parameters
-
MsgOut
: the type of the messages pushed to the returned observer -
WrappedOut
: the type of the messages going out to the inputsink
Arguments
sink: Observer<WrappedOut>
wrap: (msg: MsgOut) => WrappedOut
Returns Observer<MsgOut>
createWrappedTransport()
Combines createUnwrapObservable()
and createWrapObserver()
in a TransportSubject
Type parameters
-
MsgIn
: the type of the messages coming in from the returned transport -
MsgOut
: the type of the messages pushed to the returned transport -
WrappedIn
: the type of the messages coming in from the inputtransport
source -
WrappedOut = WrappedIn
: the type of the messages going out to the inputtransport
sink
Arguments
transport: TransportSubject<WrappedIn, WrappedOut>
wrapper: MessageWrapper<MsgIn, MsgOut, WrappedOut>
options?: UnwrapObservableOptions = {}
Returns TransportSubject<MsgIn, MsgOut>
createNamepacedTransport()
Combines createWrappedTransport()
and createWrapper()
Type parameters
-
MsgIn
: the type of the messages coming in from the returned transport -
MsgOut = MsgIn
: the type of the messages pushed to the returned transport Namespace extends string = string
Arguments
transport TransportSubject<Wrapped<MsgIn, Namespace>, Wrapped<MsgOut, Namespace>>>
namespace: Namespace
options?: UnwrapOperatorOptions = {}
Returns TransportSubject<MsgIn, MsgOut>
License
Apache-2.0 OR MIT