import {Injectable} from '@angular/core';
import {HttpClient} from '@angular/common/http';
import {EMPTY, Observable} from 'rxjs';
import {catchError, switchMap, tap} from 'rxjs/operators';
import {UserState} from './user.state';
// Create a separate service for Streams, it helps avoid cyclic dependencies.
// It'll do all the heavy lifting, it'll make http requests and
// dispatch the results to the userDataSystem.
@Injectable() // we'll instantiate it in user.module.ts so no "provideIn"
export class UserStreams {
// create a Stream using the userSystem and couple it with an http request
userDataStream = this.userState.userSystem.createStream(
queryUnit, dataUnit, errorUnit
// listen to the future values of queryUnit
return queryUnit.future$.pipe(
// you can also do queryUnit.pipe, but that'll start immediately,
// without even dispatching anything to queryUnit
// switch the stream to http request Observable
// to create a new request
switchMap(userId => this.httpClient
.get('https://example.com/username' + userId)
// dispatch the returned data to dataUnit
tap(data => dataUnit.dispatch(data)),
// dispatch the error to errorUnit
// don't rethrow, to keep the stream alive
private httpClient: HttpClient,
private userState: UserState