Stream is a simple construct that creates a subscription by subscribing to the provided Observable and adds the ability to re-subscribe.

Given an Observable, the Stream immediately subscribes to it. The Subscription instance is saved as Stream's property. The Stream keeps the reference to the provided Observable and uses it for re-subscription when asked.

See API reference for more details.

Class Signature

class Stream {
    constructor(observable: Observable<any>){

Explicit Usage Example

import {timer} from 'rxjs';
import {tap} from 'rxjs/operators';
import {Stream} from '@activejs/core';

// create a cold Observable that emits at a 1-second interval.
// RxJS timer emits incremental numbers starting from 0
const randomNumberLogger$ = timer(0, 1000).pipe(
    tap(count => console.log(count))

// create a Stream using the randomNumberLogger$
const randomNumberStream = new Stream(randomNumberLogger$);

// each passing second we'll see a number logged in console
// 0, 1, 2, 3, and so on

// restart the timer

// and the timer will start again
// again we'll see 0, 1, 2, 3, and so on

// you can also stop the stream similar to a simple Subscription

Implicit Usage Example

Stream is implicitly used to create streams when you call createStream on an Action, Unit, System, or Cluster.

// initialize an Action, that accepts a value of type 'A' | 'B' | 'C'
const anAction = new Action<'A' | 'B' | 'C'>();

// create stream
const alertStream = anAction.createStream(value$ => {
    return value$.pipe(
        tap(value => {
            alert(value); // or do something else entirely

// create another stream
const consoleStream = anAction.createStream(value$ => {
    return value$.pipe(
        tap(value => {
            console.log(value); // or do something else entirely

// our setup is complete, now we can dispatch/trigger it from anywhere

// stop the consoleStream

// and so on

Last updated