I have two observables which I want to merge within the forkJoin
method from rxjs
. Executing the observables independently works, but using forkJoin
it does not reach the pipe finalize/subscribe
method.
My.component.ts
....
const req1 = this.userService.getUser(this.loggedInUser.userId);
const req2 = this.boardGames$;
this.subscriptions$ = forkJoin([req1, req2])
.pipe(
finalize(() => {
console.log('pipe'); // Is not reached
})
)
.subscribe(([obj1, obj2]) => {
console.log('subscribe'); // Is not reached
}, err => console.log(err), ()=>console.log('compl'));
req1.subscribe((aa) => console.log(aa)); // This is logged
req2.subscribe((bb) => console.log(bb)); // This is logged
....
I am using Angularfire2
for requests. I am not sure if this can be an issue, because independently the subscriptions work.
import { AngularFirestore } from 'angularfire2/firestore';
What is it that I am missing here?
forkJoin
emits only when all observables completes. I can't see the rest of your code (for example what is boardGames$
observable). Most likely you are using observables that won't complete after first emission, which is intended behavior of AngularFirestore
since most commonly you subscribe to changes in the database (Firebase).
Use combineLatest
if you need to get latest values when some observable emits. Keep in mind that it will start to emit only when each of the source observables emitted.
combineLatest([
this.userService.getUser(this.loggedInUser.userId),
this.boardGames$
]).subscribe(([user, boardGames]) => {
// Don't forget to unsubscribe
});
Use merge
ir you want to merge observables into one observable. It would work in your current case. Like this:
merge(
this.userService.getUser(this.loggedInUser.userId).pipe(map(entity => ({entity, type: 'user'}))),
this.boardGames$.pipe(map(entity => ({entity, type: 'boardGames'})))
).subscribe(({entity, type}) => {
// Don't forget to unsubscribe
})
With forkJoin
you can achieve it like this:
const req1 = this.userService.getUser(this.loggedInUser.userId).pipe(take(1));
const req2 = this.boardGames$.pipe(take(1));
this.subscriptions$ = forkJoin([req1, req2]).subscribe(() => {
// I will complete after both observables emits.
});
Note that you would still need to handle subscription even if use take(1)
since in case if some observable never emits and component is destroyed, you would have a memory leak. There is awesome library for handling subscriptions without boilerplate.