racjs

1.0.1 • Public • Published

RacJS

implementation of RAC(OC) on JavaScript and replacement of RxJS

Signal(类比RxJS的Observable)

包含一定数量的可订阅数据(值,完成,错误)的信号对象,订阅者再接受到error或者complete之后就意味着订阅已经结束,即使信号还有没发送的值.

//创建一个signal,传入一个基于subscriber(订阅者对象)参数的block,block的返回值是一个Disposbale(FP编程的side effects)
let signal = new Signal(subscriber => {
   subscriber.sendNext(2)
   subscriber.sendNext(3)
   subscriber.sendComplete() //subscriber.sendError(new Error())
   subscriber.sendNext(4) // 4 不会被订阅者接收
   return null 
})

Subcriber (类比RxJs的Subscriber)

维护当前订阅者的接收操作,以及该订阅者订阅信号产生的Disposable(额外消耗)的对象(从设计的角度,Subscriber更像是一个定义一些操作的接口),一般不自己创建,对使用方透明。

//具体的订阅
let next = v => console.log(v)
let complete = () => console.log('completed')
let error = err => console.log('error')
let disposable = signal.subscribe(next, complete, error)
//如果想取消这次订阅可以调用 disposable.dispose

//订阅的方法有4个,subscribe是最全的
-subscribe(next, complete, error)
-subscribeNext(next)
-subscribeComplete(complete)
-subscribeError(error)

Disposable

这个是RAC里面的概念,RxJs没有,作用同于类似于RxJs的subscription对象的unsubscribe。Disposable是管理当前订阅操作以及Signal各种链式,组合过程中出现的side effects。 Disposable中有Disposable和CompoundDisposable两种,采用的是组合嵌套的设计模式。每一个对应的disposable只负责当前订阅者。几乎对使用者透明

Subject(RxJs里面的Subject)

概括的描述: 1)继承于Signal,同时又实现了Subscriber接口的对象 2)OOP进入到FRP的桥梁 3)特别灵活

let subject = new Subject()
subject.subscribe(next, complete, error)
subject.sendNext(2)
subject.sendComplete()
subject.sendError()

Subject有Subject,CurrentSubject,ReplaySubject。其中Subject就是普通的;CurrentSubject在被订阅的时候如果当前有值了就会发送最新值;ReplaySubject在被订阅会完美的复现所有的值,包括complete和error信息

Connection

基于Signal和Subject的一种1对N的实现。之前说过Subject既是一个Signal,又是一个Subscriber。这样就让Subject订阅Signal对signal的值做透传,然后将subject暴露给使用方订阅。这么做的好处就是为了让signal创建时传入的block只执行一次,减少额外的side effects。

Channel

基于Subject和Subject实现N对N的实现。RxJs好像没有

Command

借鉴RAC,RxJs好像没有。但是和RAC有区别

//第一个参数block,是根据输入生成一个signal
/*
 * 第二个参数,传入一个<=0的值,代表当前的Command不支持concurrent,也不支持等待,也就是说如果当前command在被执行,在执行完成前其他输入会被忽略
 * 如果 == 1,串型,支持等待。后续的输入会在前一个执行完成后执行
 * 如果 > 1, 并行,支持等待,当当前执行的signal大于该数量后,会进入缓冲区等待,每当有空闲就会从缓冲区取出进行执行
 */
let command = new Command(v => Signal.of(3).delay(1000), 0)
command.execute(2).subscribe(next, complete)
...

核心--Signal常用操作

有些很复杂的操作在这边不做解释,想了解的可以看代码,主要是为了其他方法的封装操作

1)简单Signal创建操作

static of(value)

快速创建一个只有一个数据Value的signal

static empty()

快速创建一个空的signal

static error(err)

快速创建错误的signal

static never()

快速创建一个不会自己结束且空的signal

static fromArray(arr)

快速创建一个包含数组元素的signal

static fromRange(start, count, distance = 1)

快速创建一个signal,signal的数据依次从start开始,每次增加distance,叠加count次

static fromPromise(promise)

快速创建一个signal,promise then作为signal的数据+complete,catch作为signal的error

static interval(time, start = 0)

快速创建一个signal,从start开始,每time时间发送start+1

2)订阅流程附加操作

initially(block)

在订阅开始前注入操作

do(next,complete,error)

在订阅过程中对数据注入操作

finally(block)

在订阅结束后注入操作

Signal.of(4)
   .do((v) => console.log('do next + ' + v), () => console.log('do complete'))
   .initially(() => console.log('before subscribe'))
   .finally(() => console.log('after complete'))
   .subscribeNext()
/**
 *before subscribe
 *do next + 4
 *do complete
 *after error or complete
 */

3)数据流的衔接

concat(otherSignal)

当前的signal在结束之后,紧接着订阅otherSignal

static concat(signals)

concat的类方法

starWith(value)

Signal.of(value)放在当前流的前面

endWith(value)

Signal.of(value)放在当前流的后面

then(otherSignal)

当前的signal在结束之后(忽略当前signal的next发送的值),紧接着订阅otherSignal

takeUntil(otherSignal)

订阅当前Signal,直到otherSignal有值或者是complete消息过来

replaced(otherSignal)

订阅当前的signal,直到otherSignal有任意的数据(包括error/complete)过来,并且紧接着订阅otherSignal

replace(otherSignal)

订阅otherSignal,直到当前signal有任意的数据过来,并且紧接着订阅当前的signal

catchError(error => signal)

如果当前signal发生错误,就根据错误生成新的signal,并且紧接着订阅这signal

catchTo(otherSignal: Signal)

如果当前signal发生错误,并且紧接着订阅新的otherSignal

//依次打印-2,-1,-0,1,2,3,4;如果最后的4000ms变为比3000ms小的数值打印就会变成-2,-1,0,4
Signal.of(2)
   // .do((v) => console.log('do next + ' + v), () => console.log('do complete'))
   // .initially(() => console.log('before subscribe'))
   // .finally(() => console.log('after complete'))
   .startWith(1)
   .endWith(3)
   .delay(3000)
   .replace(Signal.fromArray([-2,-1,0]))
   .replaced(Signal.of(4).delay(4000))
   .subscribeNext(v => console.log('real v ' + v))

4)类数组操作

将数据流里面所有的数据想象成一个数组处理

map((v,index) => otherV)

将signal里面的数据都做了一个映射

mapTo(v)

将signal里面的数据都映射成一个值

filter((v,index) => boolean)

将signal中不符合条件的都过滤掉

ignoreValues()

过滤所有的值,只关心完成和错误

every((v,index) => boolean)

判断signal的所有数据是不是满足条件

some((v,index) => boolean)

判断signal有没有数据满足条件

find((v,index) => boolean)

找出signal中第一个满足条件的值

findIndex((v,index) => boolean)

找出signal中第一个满足条件的值的index

elementAt(index, defaultValue)

找出signal里面的第index的值,如果不存在就发送defaultValue

scan((sourceValue, nowValue, index) => desValue, seed, useFirst)

usefirst ? 使用signal的第一个值作为sourceValue :使用seed作为sourceValue signal每次发出的数据nowValue,都会经过第一个函数的处理变成新的数据发送出去,同时将最新的值作为下一次的sourceValue

reduce((sourceValue, nowValue, index) => desValue, seed, useFirst)

取scan中的最后一个值

isEmpty()

判断当前signal是不是没有数据

ifEmpty(defaultValue, throwError)

如果signal为空,是发送defaultValue,还是发送一个空的错误

take(count)

取前count的数据

takeLast(count)

取最后几个数据

takeWhile(v => boolean)

一直取到signal的数据中第一个不满足条件为止

fisrt()

取第一个

last()

取最后一个

skip(count)

从第count开始取

skipLast(count)

最后几个不取

skipWhile

从signal的数据中第一个满足条件开始取

count()

获取当前signal包含的数据总长度

5)其他工具类操作

bufferCount(count)

将源signal的数据每几个一组发送出去

distinct(v => key:String)

如果当前的值经过转换为key,发现key已经存在,该值就会被过滤掉

distinctUntilKeyChanged(v => key:String)

如果当前的值经过转换为key,和上一次值的key一样,该值就会被过滤掉

distinctUntilValueChanged()

如果当前的值和上一次一样,该值就会被过滤掉

materialize()

把signal的数据,完成,错误都包装成Notification对象发送出去

dematerialize()

将包装成的Notification恢复成值,完成,错误发送出去

repeat(count)

如果成功了,重复订阅当前的signal几次

retry(count)

如果错误,重复订阅当前的signal几次

6)多流操作

merge(signals:Array)

将当前流和传入的流数组合并成一个新的流,所有流的数据都会被当成新流的数据。只有当所有流都完成了,新流才会完成;只要有一个流发生错误,心新流就会发生错误

static merge(signals:Array)

merge的类方法

zip(signals:Array)

将当前流和传入的流数组进行打包形成一个新的流,每当打包内的所有流都有了新的数据,新的流就会将这些流的数据打包成一个数组发送,如果有一个流发生(错误)了,那么新的流也就完成了。任意一个流既没有新的数据且完成了,新的流就会发送完成

static zip(signals:Array)

zip的类方法

combine(signals:Array)

将当前流和传入的流数组进行联系起来,每当有一个流发送了一个新的数据且所有的流都有数据的时,新的流就会将这边流的最新的数组打包成一个数组发送。所有流完成,新流才会完成;任意一个流错误,新流就会错误

static combine(signals:Array)

combine的类方法

switchToLastest()

如果当前流发送的值都是signal类型,每发送新的值就会订阅该值,同时取消前一次的订阅

switchCase(signalMap,defaultSignal)

根据当前的值从signalMap中找到对应的signal,并订阅,同时取消上一次的订阅

ifelse(trueSignal, falseSignal)

根据当前的值是否,订阅对应的signal

7)一些时间类操作

delay(time)

将当前流的所有数据和完成错误信息都延迟发送

debounceTime(time)

防抖,一定时间内只接收第一次的数据

throttleTime(time)

防抖,在接收到最新的数据后且一定时间内没有数据过来后,发送这个最新值(先等待再发送)

auditTime(time)

防抖,在一定时间内没有数据过来后,发送下一次接收到的值(先发送再等待)

bufferTime(time, count=0)

每次有数据后,都会在固定的时间读取缓冲池中的数据,缓冲池的大小由count决定,0代表无限大

timeout(time)

在一定的时间内如果没有完成,就会发送超时错误

buffer(othersignal)

每当otherSignal有新的值/完成,就会发送当前signal的缓冲池

takeLatest(otherSignal)

每当otherSignal有新的值/完成,就会发送当前signal的最新值

8)线程操作

在RacJs中,弱化了RxJs里面的schedule概念,只提供了异步订阅/发送的两个操作

subscribeOn(schedule)

sync/async/asap,决定当前的订阅操作在哪个线程执行

observeOn(schedule)

sync/async/asap,决定当前的发送数据操作在哪个线程执行

未完,待补充。如果有问题,或者发现bug,请发邮件[472077629@qq.com]

Readme

Keywords

none

Package Sidebar

Install

npm i racjs

Weekly Downloads

0

Version

1.0.1

License

MIT

Unpacked Size

61.1 kB

Total Files

19

Last publish

Collaborators

  • chenyin