vidola
vidola

Reputation: 334

How to prioritize goroutines

I want to call two endpoints at the same time (A and B). But if I got a response 200 from both I need to use the response from A otherwise use B response. If B returns first I need to wait for A, in other words, I must use A whenever A returns 200.

Can you guys help me with the pattern?

Thank you

Upvotes: 0

Views: 628

Answers (4)

user12258482
user12258482

Reputation:

Wait for a result from A. If the result is not good, then wait from a result from B. Use a buffered channel for the B result so that the sender does not block when A is good.

In the following snippet, fnA() and fnB() functions that issue requests to the endpoints, consume the response and cleanup. I assume that the result is a []byte, but it could be the result of decoding JSON or something else. Here's an example for fnA:

func fnA() ([]byte, error) {
    r, err := http.Get("http://example.com/a")
    if err != nil {
        return nil, err
    }
    defer r.Body.Close() // <-- Important: close the response body!
    if r.StatusCode != 200 {
        return nil, errors.New("bad response")
    }
    return ioutil.ReadAll(r.Body)
}

Define a type to hold the result and error.

 type response struct {
     result []byte
     err error 
 }

With those preliminaries done, here's how to prioritize A over B.

a := make(chan response)
go func() {
    result, err := fnA()
    a <- response{result, err}
}()

b := make(chan response, 1) // Size > 0 is important!
go func() {
    result, err := fnB()
    b <- response{result, err}
}()

resp := <-a
if resp.err != nil {
    resp = <-b
    if resp.err != nil {
        // handle error.  A and B both failed.
    }
}
result := resp.result


 

If the application does not execute code concurrently with A and B, then there's no need to use a goroutine for A:

b := make(chan response, 1) // Size > 0 is important!
go func() {
    result, err := fnB()
    b <- response{result, err}
}()

result, err := fnA()
if err != nil {
    resp = <-b
    if resp.err != nil {
        // handle error.  A and B both failed.
    }
    result = resp.result
}

Upvotes: 4

ashishmaurya
ashishmaurya

Reputation: 1196

Normally in golang, channel are used for communicating between goroutines. You can orchestrate your scenario with following sample code. basically you pass channel into your callB which will hold response. You don't need to run callA in goroutine as you always need result from that endpoint/service

package main

import (
    "fmt"
    "time"
)

func main() {
    resB := make(chan int)
    go callB(resB)
    res := callA()
    if res == 200 {
        fmt.Print("No Need for B")
    } else {
        res = <-resB
        fmt.Printf("Response from B : %d", res)
    }
}

func callA() int {
    time.Sleep(1000)
    return 200
}

func callB(res chan int) {
    time.Sleep(500)
    res <- 200
}

Update: As suggestion given in comment, above code leaks "callB"

package main

import (
    "fmt"
    "time"
)

func main() {
    resB := make(chan int, 1)
    go callB(resB)
    res := callA()
    if res == 200 {
        fmt.Print("No Need for B")
    } else {
        res = <-resB
        fmt.Printf("Response from B : %d", res)
    }
}

func callA() int {
    time.Sleep(1000 * time.Millisecond)
    return 200
}

func callB(res chan int) {
    time.Sleep(500 * time.Millisecond)
    res <- 200
}

Upvotes: 0

colm.anseo
colm.anseo

Reputation: 22037

@Zombo 's answer has the correct logic flow. Piggybacking off this, I would suggest one addition: leveraging the context package.

Basically, any potentially blocking tasks should use context.Context to allow the call-chain to perform more efficient clean-up in the event of early cancelation.

context.Context also can be leveraged, in your case, to abort the B call early if the A call succeeds:

func failoverResult(ctx context.Context) *http.Response {

    // wrap the (parent) context
    ctx, cancel := context.WithCancel(ctx)
 
    // if we return early i.e. if `fnA()` completes first
    // this will "cancel" `fnB()`'s request.
    defer cancel()

    b := make(chan *http.Response, 1)
    go func() {
        b <- fnB(ctx)
    }()

    resp := fnA(ctx)
    if resp.StatusCode != 200 {
        resp = <-b
    }

    return resp
}

fnA (and fnB) would look something like this:

func fnA(ctx context.Context) (resp *http.Response) {
    req, _ := http.NewRequestWithContext(ctx, "GET", aUrl)
    
    resp, _ = http.DefaultClient.Do(req)  // TODO: check errors
    return
}

Upvotes: 0

Abel
Abel

Reputation: 11

I'm suggesting you to use something like this, this is a bulky solution, but there you can start more than two endpoints for you needs.

func endpointPriorityTest() {

const (
    sourceA = "a"
    sourceB = "b"
    sourceC = "c"
)

type endpointResponse struct {
    source   string
    response *http.Response
    error
}

epResponseChan := make(chan *endpointResponse)

endpointsMap := map[string]string{
    sourceA: "https://jsonplaceholder.typicode.com/posts/1",
    sourceB: "https://jsonplaceholder.typicode.com/posts/10",
    sourceC: "https://jsonplaceholder.typicode.com/posts/100",
}

for source, endpointURL := range endpointsMap {
    source := source
    endpointURL := endpointURL
    go func(respChan chan<- *endpointResponse) {
        // You can add a delay so that the response from A takes longer than from B
        // and look to the result map
        // if source == sourceA {
        //  time.Sleep(time.Second)
        // }
        resp, err := http.Get(endpointURL)

        respChan <- &endpointResponse{
            source:   source,
            response: resp,
            error:    err,
        }
    }(epResponseChan)
}

respCache := make(map[string]*http.Response)

// Reading endpointURL responses from chan
for epResp := range epResponseChan {
    // Skips failed requests
    if epResp.error != nil {
        continue
    }

    // Save successful response to cache map
    respCache[epResp.source] = epResp.response

    // Interrupt reading channel if we've got an response from source A
    if epResp.source == sourceA {
        break
    }
}

fmt.Println("result map: ", respCache)

// Now we can use data from cache map
// resp, ok :=respCache[sourceA]
// if ok{
//  ...
// }
}

Upvotes: 1

Related Questions