Stream is a simple construct that creates a subscription by subscribing to the provided Observable and adds the ability to re-subscribe.
Given an Observable, the Stream immediately subscribes to it. The Subscription
instance is saved as Stream's property. The Stream keeps the reference to the provided Observable and uses it for re-subscription when asked.
See API reference for more details.
class Stream {constructor(observable: Observable<any>){}}
import {timer} from 'rxjs';import {tap} from 'rxjs/operators';import {Stream} from '@activejs/core';// create a cold Observable that emits at a 1-second interval.// RxJS timer emits incremental numbers starting from 0const randomNumberLogger$ = timer(0, 1000).pipe(tap(count => console.log(count)));// create a Stream using the randomNumberLogger$const randomNumberStream = new Stream(randomNumberLogger$);// each passing second we'll see a number logged in console// 0, 1, 2, 3, and so on// restart the timerrandomNumberStream.resubscribe();// and the timer will start again// again we'll see 0, 1, 2, 3, and so on// you can also stop the stream similar to a simple SubscriptionrandomNumberStream.unsubscribe();
Stream is implicitly used to create streams when you call createStream
on an Action, Unit, System, or Cluster.
// initialize an Action, that accepts a value of type 'A' | 'B' | 'C'const anAction = new Action<'A' | 'B' | 'C'>();// create streamconst alertStream = anAction.createStream(value$ => {return value$.pipe(tap(value => {alert(value); // or do something else entirely}));})// create another streamconst consoleStream = anAction.createStream(value$ => {return value$.pipe(tap(value => {console.log(value); // or do something else entirely}));})// our setup is complete, now we can dispatch/trigger it from anywhereanAction.dispatch('A');// stop the consoleStreamconsoleStream.unsubscribe();// and so on