Nobody Peels Mangoes

    TypeScript icon, indicating that this package has built-in type declarations

    10.6.2 • Public • Published


    GitHub License NPM version Build status dependency status devDependency Status peerDependency Status

    What is it?

    A bunch of observables and operators for RxJS.

    Why might you need it?

    I created this package as a place to put additional RxJS observables, operators and methods. If you are looking for something that's not in the RxJS distribution, there might be something suitable in here - if you're lucky.


    Install the package using NPM:

    npm install rxjs-etc --save

    What's in it?

    Observable factories

    • combineLatestArray, concatArray, forkJoinArray, mergeArray, zipArray

      A bunch of static methods that behave in a predictable manner when passed empty arrays. Some of these are now redundant, but some aren't.

      To see how these methods behave, consult their tests.

    • combineLatestHigherOrderArray, combineLatestHigherOrderObject

      Higher-order variants of combineLatestArray - that takes Observable<Observable<T>[]> and returns Observable<T[]> - and combineLatestObject.

    • combineLatestObject, forkJoinObject, zipObject

      Like the array versions, but these take objects. Observable properties are combined using either combineLatest, forkJoin or zip.

    • forkJoinConcurrent

      Like forkJoin but only runs the specified number of observables concurrently.

    • mergeHigherOrderArray

      Higher-order variant of mergeArray - that takes Observable<Observable<T>[]> and returns Observable<T>.

    • toggle

      Splits a notifier into two or more states and between which notifications are toggled.

    • traverse

      Based on expand. Traverses a graph - with backpressure control - using either a notifier or a consumer.

    • zipPadded

      Works like zipArray, but if some sources complete whilst others continue to emit values, those the complete are 'padded' with the specified padValue (which defaults to undefined).

    • percolate

      Runs a sequence of observables in order until an observable completes successfully.

    Functions for use with pipe or let

    A bunch of functions that can be passed to the let operator. Use them like this:

    source.let(endWith("this is the end"))

    They can also be used with pipe, like this:

    source.pipe(endWith("this is the end"))
    • bucketBy

      Uses a hash function to put values from an observable stream into buckets - which are themselves observable streams. See splitBy.

    • bufferRecent

      Buffers the specified number of most-recent values.

    • concatIfEmpty

      Like defaultIfEmpty, but it takes a default observable instead of a default value.

    • concatMapEager

      Like the RxJava concatMapEager operator. It accepts a concurrency and eagerly subscribes to its inner observables, buffering their values and then emitting them in the concatMap order.

    • continueWith

      Mirrors the source, but sends the last received value to a project function and merges the ObservableInput that it returns.

    • debounceAfter

      Debounce the source observable, but only after the notifier emits a value.

    • debounceSync

      Debounces synchronously emitted values from a source.

    • debounceTimeSubsequent

      Debounce the source observable, but don't debounce the first count notifications - only the subsequent notifications.

    • debounceTimeWithinReason

      Like debounceTime, but with an additional duration to ensure some notifications are emitted for super-busy streams.

    • delayUntil

      Delays a source's value notifications until a signal is received from a notifier.

    • dispose

      Like finalize, but calls a child subscription's callback for its parent's.

    • endWith

      Like startWith, but for the other end.

    • equals

      Like filter, but takes a value - rather than a function - and performs a reference equality check.

    • guard

      Applies the specified TypeScript guard to change the source observable's type and perform a runtime check. Emits an error notification if the guard rejects a value.

    • hasCompleted

      Emits true when the source observable completes.

    • indexElements

      Like map((value, index) => index) when it's called without a selector. When called with a selector, it's just an alias for map.

    • inexorably

      Like finalize (which is also exported as an alias), but passes the callback the Notification that effected the teardown, or undefined if explicitly unsubscribed.

    • initial

      Apply the operator to the source observable, but select only the initial count notifications - don't select the subsequent notifications.

    • pairwiseStartWith

      Like a combination of startWith and pairwise, but with more specific typings.

    • pluck

      Like pluck, but it's type-safe and only lets you valid keys. And it returns the appropriate type.

    • prioritize

      When creating signals from a source observable - for use with operators that take a notifier, like buffer and window - the order in which subscriptions are made is important. prioritize can be used to ensure that the notifier subscribes to the source first.

    • rateLimit

      A rate limiter with pass through when waiting is not necessary.

    • refCountDelay

      Can be used with a ConnectableObservable instead of refCount. When the reference count drops to zero, it waits the specified duration and then if the reference count is zero, it unsubscribes. If the reference count is incremented within the duration, no unsubscription occurs.

    • refCountForever

      Somewhat like the change that was made to shareReplay in 5.5.0.beta.4. When first subscribed to, a subscription is made to the source, but the source is never explicitly unsubscribed from. Unsubscription from the source only occurs if the source completes or errors.

    • refCountOn

      Like refCount, but performs connections and unsubscriptions on the specified scheduler.

    • reschedule

      Emits values using the specified scheduler.

    • skipSync

      Skips the initial, synchronously emitted values from a source.

    • splitBy

      Splits an observable stream into two streams. Values that satisfy a predicate are fed into the first stream and values that don't are fed into the second. It's a (better) replacement for partition - which did not multicast the source. See bucketBy for the general case of splitting a stream into a specific number of 'buckets'.

    • startWithTimeout

      Like startWith but only emits the starting value if the source does not emit within the specified duration.

    • subsequent

      Apply the operator to the source observable, but don't select the first count notifications - only the subsequent notifications.

    • takeSync

      Takes the initial, synchronously emitted values from a source and then completes.

    • takeWhileInclusive

      Like takeWhile, but the value that fails the predicate is taken.

    • tapSubscribe

      Like tap, but for subscriptions and unsubscriptions instead of notifications.

    • tapWithIndex

      Like tap, but it receives a tuple that includes the emitted value and the index.

    • throttleAfter

      Throttle the source observable, but only after the notifier emits a value.

    • unsubscribeOn

      Like subscribeOn, but for unsubscription.

    Utility functions

    A bunch of utility functions that do what their names suggest:


    npm i rxjs-etc

    DownloadsWeekly Downloads






    Unpacked Size

    859 kB

    Total Files


    Last publish


    • cartant