AsyncSystem

An AsyncSystem is a systematic combination of four Units, one each for every aspect of an asynchronous task or API, e.g.: XHR, fetch or a third party abstraction like Angular's HttpClient.

The four aspects of an async API are query, response, error, and pending-status. For each of these aspects, we have a Unit, wrapped inside the AsyncSystem.

AsyncSystem employs three GenericUnits for query, response, and error, namely queryUnit, dataUnit, and errorUnit, respectively; and a BoolUnit for the pending-status, named pendingUnit.

A BoolUnit is used for the pending-status because it can only be in two states, true or false

GenericUnit is used for the other three aspects because it's the most permissive of all the Units, and it saves us from creating multiple variations with different kinds of Units. However, you can substitute the GenericUnits with a different kind of Unit if you want and create your own custom AsyncSystem, very easily.

The below diagram shows where and how AsyncSystem helps with an asynchronous API call to the server, and how the data flow looks like, and how the AsyncSystem shares the API call's state with multiple parts of the App.

The data flow diagram of an async API call involving an AsyncSystem.

It might seem like a lot, but not everything is happening at the same time. When you break it down it's not difficult to understand.

  • The data flow starts with a dispatch to the queryUnit, which triggers an API call from the API Service.

  • The API Service contains the logic of listening to the queryUnit, making the actual API call, and then submitting its result back to the AsyncSystem, appropriately.

  • On a successful response, the API Service dispatches the response received, to the dataUnit.

  • On an error response, the API Service dispatches the error to the errorUnit.

  • The pendingUnit is updated automatically by the AsyncSystem, it's given a true value when queryUnit emits, and a false value when dataUnit or errorUnit emits a value.

That's pretty much it, this is how an AsyncSystem works.

There are other automatic things that an AsyncSystem can do apart from updating the value of pendingUnit. Such as, it can clear the errorUnit's value when dataUnit emits a value. It can freeze the queryUnit while the pendingUnit has a true value, etc.

See API reference for more details.

This is how the usage of AsyncSystem would look like in Angular and React apps.

Angular

user.state.ts
user.streams.ts
user.component.ts
user.component.html
user.module.ts
user.state.ts
import {Injectable} from '@angular/core';
import {HttpErrorResponse} from '@angular/common/http';
import {AsyncSystem} from '@activejs/core';
@Injectable({providedIn: 'root'})
export class UserState {
// initialize an AsyncSystem and declare the typings such that
// query would be a number,
// data would be a string,
// error would be an HttpErrorResponse object
readonly userSystem = new AsyncSystem<number, string, HttpErrorResponse>();
// extract the Units for ease of access, or don't if you prefer
readonly userQuery = this.userSystem.queryUnit;
readonly userData = this.userSystem.dataUnit;
readonly userError = this.userSystem.errorUnit;
readonly userPending = this.userSystem.pendingUnit;
}
user.streams.ts
import {Injectable} from '@angular/core';
import {HttpClient} from '@angular/common/http';
import {EMPTY, Observable} from 'rxjs';
import {catchError, switchMap, tap} from 'rxjs/operators';
import {UserState} from './user.state';
// Create a separate service for Streams, it helps avoid cyclic dependencies.
// It'll do all the heavy lifting, it'll make http requests and
// dispatch the results to the userDataSystem.
@Injectable() // we'll instantiate it in user.module.ts so no "provideIn"
export class UserStreams {
// create a Stream using the userSystem and couple it with an http request
userDataStream = this.userState.userSystem.createStream(
(
queryUnit, dataUnit, errorUnit
) => {
// listen to the future values of queryUnit
return queryUnit.future$.pipe(
// you can also do queryUnit.pipe, but that'll start immediately,
// without even dispatching anything to queryUnit
// switch the stream to http request Observable
// to create a new request
switchMap(userId => this.httpClient
.get('https://example.com/username' + userId)
.pipe(
// dispatch the returned data to dataUnit
tap(data => dataUnit.dispatch(data)),
// catch the error
catchError(err => {
// dispatch the error to errorUnit
errorUnit.dispatch(err);
// don't rethrow, to keep the stream alive
return EMPTY;
})
)
));
});
constructor(
private httpClient: HttpClient,
private userState: UserState
) {}
}
user.component.ts
import {Component} from '@angular/core';
import {UserState} from './user.state';
/*
An Angular component.
It will trigger the API request and show the response
along with loading-state.
*/
@Component({
selector: 'app-user',
templateUrl: 'user.component.html'
})
export class UserComponent {
userId: number;
constructor(public userState: UserState) {}
loadData() {
this.userState.userQuery.dispatch(this.userId);
}
reloadData() {
this.userState.userQuery.replay();
}
}
user.component.html
<!-->
The template for Angular component.
</-->
<h1>Load data from a REST API using AsyncSystem.</h1>
<input type="number" [(ngModel)]="userId">
<button (click)="loadData()">Load Data</button>
<button (click)="reloadData()">Reload Data</button>
<p *ngIf="userState.userPending | async">
Loading...
</p>
<p *ngIf="userState.userData | async as name">
My name is: {{name}}
</p>
<p *ngIf="userState.userError | async as error">
Error: {{error.message}}
</p>
user.module.ts
import {NgModule} from '@angular/core';
import {CommonModule} from '@angular/common';
import {UserStreams} from './user.streams';
import {UserComponent} from './user.component';
// here we are creating a Feature Module, so we're making the UserStreams part of
// this module only, so they start working when this Module is used.
// But if we had more generic Streams-service we can do the same
// in the AppModule so that the Streams start when the app starts.
@NgModule({
declarations: [UserComponent],
imports: [CommonModule],
providers: [
// make the UserStreams part of dependecy injection
UserStreams,
// instantiate the UserStreams service using APP_INITILIZER, or
// we can do it in old fashioned way by putting in the contructor of AppModule
{
provide: APP_INITIALIZER,
useFactory: () => () => {},
deps: [UserStreams], // we can put as many Streams services here as we want
multi: true,
},
]
})
export class UserModule {}

See this Typeahead example to for real-life usage.

React

This is how a typical usage would look like, using the React Hooks, and a custom Hook for using ActiveJS Units and Observables called useObservable.

user.state.ts
user.streams.ts
user.component.tsx
user.state.ts
import {AsyncSystem} from '@activejs/core';
// initialize an AsyncSystem and declare the typings such that
// query would be a number,
// data would be a string,
// error would be an HttpErrorResponse object
export const userSystem = new AsyncSystem<string, string, any>();
// extract the Units for ease of access, or don't if you prefer
export const userQueryUnit = userSystem.queryUnit;
export const userDataUnit = userSystem.dataUnit;
export const userErrorUnit = userSystem.errorUnit;
export const userPendingUnit = userSystem.pendingUnit;
user.streams.ts
// Create a separate file for Streams, it helps avoid cyclic dependencies.
// It'll do all the heavy lifting, it'll make http requests and
// dispatch the results to the userDataSystem.
import {EMPTY, Observable} from 'rxjs';
import {catchError, switchMap, tap} from 'rxjs/operators';
import {queryUnit, dataUnit, errorUnit} from './user.state';
// a function to fetch data and disptch the response appropriately
async function fetchAndShareData(query) {
try {
// fetch data using fetch API
const response = await fetch('https://xyz.com/u/' + query.userId);
// extract the JSON data
const data = await response.json();
// dispatch data to the dataUnit, it also sets the pendingUnit's value to false
// and it also clears the errorUnit's value
dataUnit.dispatch(data);
} catch (err) {
// dispatch error to errorUnit, it also sets the pendingUnit's value to false
errorUnit.dispatch(err);
}
}
// setup the stream by observing query values, that triggers fetchAndShareData
// whenever a value is dispatched to queryUnit
queryUnit.subscribe(query => fetchAndShareData(query));
user.component.tsx
/*
A React component.
It will trigger the API request and show the response
along with loading-state.
*/
function App() {
const [userId, setUserId] = useState('');
const data = useObservable(userDataUnit);
const isPending = useObservable(userPendingUnit);
const error = useObservable(userErrorUnit);
const loadData = () => userQueryUnit.dispatch(userId);
const reloadData = () => userQueryUnit.replay();
return (
<React.Fragment>
<h1>Load data from a REST API using AsyncSystem.</h1>
{userId}
<input type="number" onChange={e => setUserId(e.target.value)}/>
{isPending && <p>Loading...</p>}
{data && <p>My name is: {data}</p>}
{error && <p>Error: {error}</p>}
<button onClick={loadData}>Load Data</button>
<button onClick={reloadData}>Reload</button>
</React.Fragment>
);
}

See this Typeahead example to for real-life usage.

Configuration Options

The configuration options can be passed at the time of instantiation. All the configuration options are optional. Most of the options can also be set globally. See Configuration for more details.