Warm tip: This article is reproduced from serverfault.com, please click

Why is my forkJoin subscription not reached while the subscriptions work independently?

发布于 2021-01-03 09:52:36

I have two observables which I want to merge within the forkJoin method from rxjs. Executing the observables independently works, but using forkJoin it does not reach the pipe finalize/subscribe method.

My.component.ts

....
const req1 = this.userService.getUser(this.loggedInUser.userId);
const req2 = this.boardGames$;
this.subscriptions$ = forkJoin([req1, req2])
  .pipe(
    finalize(() => {
      console.log('pipe'); // Is not reached
    })
  )
  .subscribe(([obj1, obj2]) => {
    console.log('subscribe'); // Is not reached
  }, err => console.log(err), ()=>console.log('compl'));
req1.subscribe((aa) => console.log(aa)); // This is logged
req2.subscribe((bb) => console.log(bb)); // This is logged
....

I am using Angularfire2 for requests. I am not sure if this can be an issue, because independently the subscriptions work. import { AngularFirestore } from 'angularfire2/firestore';

What is it that I am missing here?

Questioner
Klyner
Viewed
0
callOfCode 2021-01-03 22:51:31

forkJoin emits only when all observables completes. I can't see the rest of your code (for example what is boardGames$ observable). Most likely you are using observables that won't complete after first emission, which is intended behavior of AngularFirestore since most commonly you subscribe to changes in the database (Firebase).

Use combineLatest if you need to get latest values when some observable emits. Keep in mind that it will start to emit only when each of the source observables emitted.

combineLatest([
    this.userService.getUser(this.loggedInUser.userId), 
    this.boardGames$
]).subscribe(([user, boardGames]) => {
     // Don't forget to unsubscribe
});

Use merge ir you want to merge observables into one observable. It would work in your current case. Like this:

merge(
  this.userService.getUser(this.loggedInUser.userId).pipe(map(entity => ({entity, type: 'user'}))),
  this.boardGames$.pipe(map(entity => ({entity, type: 'boardGames'})))
).subscribe(({entity, type}) => {
    // Don't forget to unsubscribe
})

With forkJoin you can achieve it like this:

const req1 = this.userService.getUser(this.loggedInUser.userId).pipe(take(1));
const req2 = this.boardGames$.pipe(take(1));
this.subscriptions$ = forkJoin([req1, req2]).subscribe(() => {
    // I will complete after both observables emits.
});

Note that you would still need to handle subscription even if use take(1) since in case if some observable never emits and component is destroyed, you would have a memory leak. There is awesome library for handling subscriptions without boilerplate.