Mattia Bradascio
Mattia Bradascio

Reputation: 51

Use observable value to make request that returns another observable and push to Kafka

I am using NestJS and the in-built HttpModule in a Service with the end-goal of pushing the value returned from an Observable<AxiosResponse<any>> to a Kafka topic.

My Observables are responses from an HttpService (provider of HttpModule) that invoke JSON-RPC requests (HTTP POST requests), for example as follows:

getInfo(): Observable<AxiosResponse<any>> {
  return this.httpService.post(this.url, { "method" : "somemethod" } )
}

I know it is not considered good practice to store values from an Observable since it is not in line with the functional programming paradigm.

However, what if I need to store a value from the AxiosResponse<any> and feed this into another httpService.post request? How should I go about doing this?

I have tried using a separate function in my Service which returns an Observable, and I would like to use getInfo() inside this function and subscribe to the result, so I can pass a value from the first Observable into the second Observable in this way:

getMoreInfo(): Observable<AxiosResponse<any>> {
        this.getInfo().pipe(
            take(1))
                .subscribe(
                    (response) => {
                        return this.httpService.post(this.url, {
                            "method" : "othermethod",
                            "params": {
                                "myparam" : response.data.result.param
                            }
                        })
                    }
                )
    }

However, because the return is in the scope of the first Observable, my function is invalid because it does not have a valid return type of Observable<AxiosResponse<any>>.

I wanted to check whether the nesting logic was correct, and indeed it was since I can do this:

getMoreInfo() {
        this.getInfo().pipe(
            take(1))
                .subscribe(
                    (response) => {
                        this.httpService.post(this.url, {
                            "method" : "othermethod",
                            "params": {
                                "stringparam" : response.data.result.param.toString()
                            }
                        }).subscribe((secondResponse) => { console.log(secondResponse) }                         
                    }
                )
    }

Because I want my NestJs Controller to subscribe to my Observables and only let the Service create Observables, I have to return an Observable<AxiosResponse<any>>. How can I do this?

In effect, I solely want to take the output from an Observable and pass that into another function. If I know how to do this I can pass it to the function responsible for pushing to Kafka accordingly, and I would be able to pass the value to the getMoreInfo() function.

Upvotes: 0

Views: 510

Answers (1)

Mattia Bradascio
Mattia Bradascio

Reputation: 51

Thanks to @Drenai for pointing me in the right direction!

I was able to solve it using the mergeMap operator.

The solution looks like this:

getMoreInfo() {
        this.getInfo().pipe(
            mergeMap((res) => this.httpService.post(this.url, {
                "method" : "somemethod",
                "params" : {
                    "stringparam" : res.data.result.myparam.toString()
                }
            })
        ))
    }

Upvotes: 1

Related Questions