Stream is a simple construct that creates a subscription by subscribing to the provided Observable and adds the ability to re-subscribe.
Given an Observable, the Stream immediately subscribes to it. The Subscription instance is saved as Stream's property. The Stream keeps the reference to the provided Observable and uses it for re-subscription when asked.
import {timer} from'rxjs';import {tap} from'rxjs/operators';import {Stream} from'@activejs/core';// create a cold Observable that emits at a 1-second interval.// RxJS timer emits incremental numbers starting from 0constrandomNumberLogger$=timer(0,1000).pipe(tap(count =>console.log(count)));// create a Stream using the randomNumberLogger$constrandomNumberStream=newStream(randomNumberLogger$);// each passing second we'll see a number logged in console// 0, 1, 2, 3, and so on// restart the timerrandomNumberStream.resubscribe();// and the timer will start again// again we'll see 0, 1, 2, 3, and so on// you can also stop the stream similar to a simple SubscriptionrandomNumberStream.unsubscribe();
Implicit Usage Example
Stream is implicitly used to create streams when you call createStream on an Action, Unit, System, or Cluster.
// initialize an Action, that accepts a value of type 'A' | 'B' | 'C'constanAction=newAction<'A'|'B'|'C'>();// create streamconstalertStream=anAction.createStream(value$ => {returnvalue$.pipe(tap(value => {alert(value); // or do something else entirely }) );})// create another streamconstconsoleStream=anAction.createStream(value$ => {returnvalue$.pipe(tap(value => {console.log(value); // or do something else entirely }) );})// our setup is complete, now we can dispatch/trigger it from anywhereanAction.dispatch('A');// stop the consoleStreamconsoleStream.unsubscribe();// and so on