npm install @tanbo/stream
最基础的数据流类,每一次订阅产生一个新的数据流。
import { Observable } from '@tanbo/stream';
const stream = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
})
stream.subscribe(value => {
console.log(value);
})
// 输出:
// 1
// 2
基础广播类,所有订阅者共用同一个数据流,且只会拿到订阅后广播的数据。
import { Subject } from '@tanbo/stream';
const subject = new Subject();
subject.next(1);
subject.subscribe(value => {
console.log(value);
})
subject.next(2);
// 输出:
// 2
有默认值的广播类,所有订阅者共用同一个数据流,且所有订阅者在订阅时会同步拿到数据流中的最后一次数据,如果还没有广播,则同步拿到默认数据。
import { BehaviorSubject } from '@tanbo/stream';
const behaviorSubject = new BehaviorSubject(1);
behaviorSubject.subscribe(value => {
console.log(value);
})
// 输出:
// 1
behaviorSubject.next(2);
// 输出:
// 2
Observable
、Subject
、BehaviorSubject
类都可以通过同样的方法取消订阅。以 Observable 为例:
const stream = new Observable(subscriber => {
setTimeout(() => {
subscriber.next(1);
}, 1000)
})
const subscription = stream.subscribe(value => {
console.log(value);
})
// 取消订阅
subscription.unsubscribe();
// 前面的 console.log 不会执行,因为在还没有发送数据时,已取消了订阅
所有的数据流发射器都返回一个 Observable 实例。
把 DOM 事件转换成数据流。
fromEvent(document.getElementById('button'), 'click').subscribe(event => {
console.log(event);
})
把 Promise 转换成数据流。
const promise = new Promise(resolve => {
setTimeout(() => {
resolve(1)
}, 1000)
})
fromPromise(promise).subscribe(value => {
// 在 1 秒后,会收到由 Promise 发来的值
console.log(value)
})
按固定间隔时间发送值,默认间隔 1 秒,从 0 开始。
interval().subscribe(value => {
console.log(value);
})
// 输出:
// 0
// 1
// 2
// 3
// ...
同时订阅多个数据流,当任意一个数据流有新值时,立即将该值发送出去。
merge(interval(), interval()).subscribe(value => {
console.log(value);
})
// 输出:
// 1
// 1
// 2
// 2
// 3
// 3
// 4
// ...
将既定的值按顺序同步发送。
of(1, 2, 3).subscribe(value => {
console.log(value);
})
// 输出:
// 1
// 2
// 3
同时订阅多个数据流,当任意一个数据流有新值时,立即将该值发送出去,同时忽略后面所有的值
race(interval(1000), of('a')).subscribe(value => {
console.log(value)
})
// 输出:
// 'a'
延迟一段时间发送值。默认延迟一秒。
timeout().subscribe(() => {
console.log('1 秒后打印此消息');
})
监听一组数据流,当所有数据到达时,将最新数据按输入顺序,以一个数组的形式发送并忽略后面的所有数据。可以理解为 Promise.all
。
zip(of(1), of(2), timeout(1000, 'timeout')).subscribe(value => {
console.log(value);
})
// 输出:
// [1, 2, 'timeout']
操作符是对既有数据流作进一步有流程控制、数据转换或添加副作用。
操作符均通过 pipe
方法添加。pipe
方法既可以传入多个操作符,也可以链式调用。以下两种方式是等价的:
// 链式调用
interval()
.pipe(take(4))
.pipe(delay(2000))
.subscribe(value => {
console.log(value)
})
// 多参数调用
interval().pipe(
take(4),
delay(2000)
).subscribe(value => {
console.log(value)
})
当有新值时,记录值,并延迟一段时间,发送记录的值。
interval(1000).pipe(auditTime(2000)).subscribe(value => {
console.log(value);
})
// 输出:
// 1
// 3
// 5
// 7
// ...
按顺序依次发出数据流本身和传入源的值,需要注意的事,只有前一个数据流完成时,才会监听并发送后一个数据流的值。
timeout(1000, 1).pipe(
concat(
of('a', 'b'),
of('A', 'B')
)
).subscribe(value => {
console.log(value);
})
// 输出:
// 1
// 'a'
// 'b'
// 'A'
// 'B'
在一段时间内,没有新值时,才发送最新的值。
interval(1000).pipe(debounceTime(2000)).subscribe(value => {
// 永远也不会输出值,因为每一次新值的间隔都小于 2 秒
console.log(value);
})
将数据流延迟一段时间发送。
of('delay').pipe(delay(1000)).subscribe(value => {
console.log(value)
})
// 1 秒后输出:'dekay'
过滤连续重复的值。
of(1, 3, 3, 3, 5, 6, 6).pipe(distinctUntilChanged()).subscribe(value => {
console.log(value)
})
// 输出:
// 1
// 3
// 5
// 6
过滤源数据流,只发送返回为 true 时的数据。
of(1, 3, 3, 3, 5, 6, 6).pipe(filter(value => {
return value > 3;
})).subscribe(value => {
console.log(value)
})
// 输出:
// 5
// 6
// 6
将源数据转换成另外一种数据。
of('张三').pipe(map(value => {
return {
name: value
}
})).subscribe(value => {
console.log(value);
})
// 输出: {name: '张三'}
启动一个微任务,将数据缓存起来,并在微任务执行时,把缓存起来的数据一起发送出去。
console.log('start')
of(1, 2, 3, 4).pipe(microTask()).subscribe(values => {
console.log(values)
})
console.log('end')
// 输出:
// start
// end
// [1, 2, 3, 4]
忽略源值,并延迟一段时间,发送最新的值。
interval(1000).pipe(sampleTime(2000)).subscribe(value => {
console.log(value);
})
// 输出:
// 3
// 5
// 7
// ...
让多个订阅共享同一个数据源,而不是创建新的
const sharedObs = interval().pipe(share())
sharedObs.subscribe(value => {
console.log(value)
})
setTimeout(() => {
sharedObs.subscribe(value => {
console.log(value)
})
}, 2100)
// 输出:
// 0
// 1
// 2
// 2
// 3
// 3
跳过指定次数的数据,然后发送后面的值。
of('A', 'B', 'C', 'D').pipe(skip(2)).subscribe(value => {
console.log(value);
})
// 输出:
// 'C'
// 'D'
返回一个新的数据流,并以新数据流的订阅结果,发送出去。
of(1).pipe(switchMap(value => {
return new Observable(subscriber => {
subscriber.next(value + 1)
})
})).subscribe(value => {
console.log(value)
})
// 输出:2
指定源数据流最多发送几次。
of('a', 'b', 'c', 'd').pipe(take(2)).subscribe(value => {
console.log(value);
})
// 输出:
// 'c'
// 'd'
在数据流中添加副作用。
of(1, 2).pipe(tap(() => {
console.log('副作用');
})).subscribe(value => {
console.log(value);
})
// 输出:
// '副作用'
// '副作用'
// 1
// 2
发出最先到达的值,并忽略一段时间内的新值,然后再发送时间到达之后最新到达的值。
interval(1000).pipe(throttleTime(2000)).subscribe(value => {
console.log(value);
})
// 输出:
// 0
// 2
// 4
// 6
// ...