Warm tip: This article is reproduced from serverfault.com, please click

Dependent nested observables

发布于 2020-12-23 08:42:07

I have two methods in which I'm doing some asynchronous operation but I don't want method 2 to execute until or unless the response from method 1 is positive, to indicate that we can proceed further.

So, this is what I tried:

Method 1:

private method1(): Observable<any> {
  return new Observable(() => {
    executingSomething();
    this.anotherSubscription.pipe(
      map(x => {
        console.log('Control is not reaching here');
        Also, how to indicate the caller that method2 can be executed?
        I can't return anything since I'm already returning the new Observable, above.
      })
    );
  });
}

Caller:

concat(this.method1(), this.method2()).subscribe();

Issue: anotherSubscription is not even getting executed and I can't think of any way to pass a response from within anotherSubscription to the caller.

I feel that I'm not using the observables in correct sense, here but can't seem to find anything anywhere.

Questioner
Suyash Gupta
Viewed
0
Avius 2020-12-23 19:50:50

I've made some assumptions regarding your code prior to writing the solution. If the assumptions are wrong, then the answer may be as well.

  1. executingSomething() is a synchronous method with unrelated functionality, except that you want the whole thing to fail if it throws.
  2. anotherSubscription is not a Subscribtion but an Observable (these are different things).

Here's how I'd solve your problem:

class SomeClass {
  private method1(): Observable<any> {
    try {
      executingSomething();
    } catch (err) {
      // This would return an errorred observable, which is great, since
      // you can still subscribe to it (no need to change the return type
      // of this method).
      return throwError(err);
    }

    return this.anotherSubscription.pipe(
      tap(() => {
        console.log('Controls is reaching here!');
      }),
    );
  }

  private method2(): Observable<any> {
    // This can be whatever obervable, using of(null) for demo purposes.
    return of(null);
  }

  private parentMethod() {
    this.method1()
      .pipe(
        switchMap(valueFromAnotherSubscription => {
          // UPD1 - Implement your custom check here. This check will determine
          // whether `method2` will be called.
          if (valueFromAnotherSubscription === targetValue) {
            return this.method2();
          } else {
            // UPD1 - If the check evaluates to `false`, reemit the same value
            // using the `of` operator.
            return of(valueFromAnotherSubscription);
          }
        }),
      )
      .subscribe(() => {
        console.log('Done!');
      });
  }
}

The key operator here, as correctly indicated by Kevin, is switchMap. Every time an observable (anotherSubscription) emits, switchMap will cancel it and subscribe to a different observable (whatever is returned from method2).

This happens sequentially so method2 will only be subscribed to after method1 emits. If method1 throws, the whole pipeline will fail. You may also filter the results of method1 to decide whether or not you want to switch to method2.

Also, constructing observables using new Observable is probably not needed in most cases. I have a rather large RxJS application and have never used this so far.

Update 1

See code denoted with UPD1 comments.

Keep in mind that if an error is thrown by anotherSubscription, the switchMap function will not be called anyway.