Stream
Stream is a simple construct that creates a subscription by subscribing to the provided Observable and adds the ability to re-subscribe.
Given an Observable, the Stream immediately subscribes to it. The Subscription instance is saved as Stream's property. The Stream keeps the reference to the provided Observable and uses it for re-subscription when asked.
See API reference for more details.

Class Signature

1
class Stream {
2
constructor(observable: Observable<any>){
3
}
4
}
Copied!

Explicit Usage Example

1
import {timer} from 'rxjs';
2
import {tap} from 'rxjs/operators';
3
import {Stream} from '@activejs/core';
4
​
5
// create a cold Observable that emits at a 1-second interval.
6
// RxJS timer emits incremental numbers starting from 0
7
const randomNumberLogger$ = timer(0, 1000).pipe(
8
tap(count => console.log(count))
9
);
10
​
11
// create a Stream using the randomNumberLogger$
12
const randomNumberStream = new Stream(randomNumberLogger$);
13
​
14
// each passing second we'll see a number logged in console
15
// 0, 1, 2, 3, and so on
16
​
17
// restart the timer
18
randomNumberStream.resubscribe();
19
​
20
// and the timer will start again
21
// again we'll see 0, 1, 2, 3, and so on
22
​
23
// you can also stop the stream similar to a simple Subscription
24
randomNumberStream.unsubscribe();
Copied!

Implicit Usage Example

Stream is implicitly used to create streams when you call createStream on an Action, Unit, System, or Cluster.
1
// initialize an Action, that accepts a value of type 'A' | 'B' | 'C'
2
const anAction = new Action<'A' | 'B' | 'C'>();
3
​
4
// create stream
5
const alertStream = anAction.createStream(value$ => {
6
return value$.pipe(
7
tap(value => {
8
alert(value); // or do something else entirely
9
})
10
);
11
})
12
​
13
// create another stream
14
const consoleStream = anAction.createStream(value$ => {
15
return value$.pipe(
16
tap(value => {
17
console.log(value); // or do something else entirely
18
})
19
);
20
})
21
​
22
// our setup is complete, now we can dispatch/trigger it from anywhere
23
anAction.dispatch('A');
24
​
25
// stop the consoleStream
26
consoleStream.unsubscribe();
27
​
28
// and so on
Copied!
Last modified 1yr ago