sekula87
sekula87

Reputation: 357

Periodically crawl API in Golang

I neet to asynchroniously, periodically(every 10 second) scrape/invoke specific URL(for e.g. http://dummy.com/{address}) for a list of addresses.
Based on the result, received from the URL, the event needs to be published.
The crawler needs to be started in one goroutine and each API call needs to be in a separate goroutine.
Another goroutine will be started that will listen for events.
The crawler is initialized with a list of addresses but it needs to have exposed methods to add a new address that is going to be scraped or remove the existing one, at any point in time.

Please see bellow, my solution which has a race issue.
It is happening cause the observables field of crawler struct is not 'thread save' for concurrent access.
I am aware of the "do not communicate sharing memory" rule, but didn't figure out how would I use it channel(talking about observables field) instead of a slice and how it would be possible to add/remove additional address for 'watching' if channel is used.

How can I modify the bellow solution to fix race condition?

package crawler

import (
    "fmt"
    log "github.com/sirupsen/logrus"
    "io/ioutil"
    "net/http"
    "strconv"
    "time"
)

type Service interface {
    Start()
    Stop()
    AddObservable(observable Observable)
    RemoveObservable(observable Observable)
    GetEventChannel() chan event
}

type event struct {
    EventType int
    Result    Result
}

type Result struct {
    resp []byte
}

type Observable struct {
    AccountType int
    Address     string
}

type crawler struct {
    explorerApiUrl string
    interval       *time.Ticker
    errChan        chan error
    quitChan       chan int
    eventChan      chan event
    observables    []Observable
    errorHandler   func(err error)
}

func NewService(
    observables []Observable,
    errorHandler func(err error),
) Service {

    interval := time.NewTicker(10 * time.Second)

    return &crawler{
        explorerApiUrl: "http://dummy.com",
        interval:       interval,
        errChan:        make(chan error),
        quitChan:       make(chan int),
        eventChan:      make(chan event),
        observables:    observables,
        errorHandler:   errorHandler,
    }
}

func (u *crawler) Start() {
    log.Debug("start observe")
    for {
        select {
        case <-u.interval.C:
            log.Debug("observe interval")
            u.observeAll(u.observables)
        case err := <-u.errChan:
            u.errorHandler(err)
        case <-u.quitChan:
            log.Debug("stop observe")
            u.interval.Stop()
            time.Sleep(time.Second)
            close(u.eventChan)
            return
        }
    }
}

func (u *crawler) Stop() {
    u.quitChan <- 1
}

func (u *crawler) AddObservable(observable Observable) {
    u.observables = append(u.observables, observable)
}

func (u *crawler) RemoveObservable(observable Observable) {
    newObservableList := make([]Observable, 0)
    for _, o := range u.observables {
        if o.Address != observable.Address {
            newObservableList = append(newObservableList, o)
        }
    }
    u.observables = newObservableList
}

func (u *crawler) GetEventChannel() chan event {
    return u.eventChan
}

func (u *crawler) observeAll(observables []Observable) {
    for _, a := range observables {
        go u.observe(a)
    }
}

func (u *crawler) observe(observe Observable) {

    resp, err := http.Get(
        fmt.Sprintf("%v/%v", u.explorerApiUrl, observe.Address),
    )
    if err != nil {
        log.Error(err)
    }
    defer resp.Body.Close()
    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        log.Error(err)
    }
    e := event{
        EventType: 0,
        Result: Result{
            resp: body,
        },
    }

    u.eventChan <- e
}

//// TEST ////

func TestCrawler(t *testing.T) {
    observables := make([]Observable, 0)
    for i := 0; i < 100; i++ {
        observable := Observable{
            AccountType: 1,
            Address:     strconv.Itoa(i),
        }
        observables = append(observables, observable)
    }

    crawlSvc := NewService(observables, nil)

    go crawlSvc.Start()

    go removeObservableAfterTimeout(crawlSvc)

    go addObservableAfterTimeout(crawlSvc)

    go stopCrawlerAfterTimeout(crawlSvc)

    for event := range crawlSvc.GetEventChannel() {
        t.Log(event)
    }
}

func stopCrawlerAfterTimeout(crawler Service) {
    time.Sleep(7 * time.Second)
    crawler.Stop()
}

func removeObservableAfterTimeout(crawler Service) {
    time.Sleep(2 * time.Second)
    crawler.RemoveObservable(Observable{
        AccountType: 0,
        Address:     "2",
    })
}

func addObservableAfterTimeout(crawler Service) {
    time.Sleep(5 * time.Second)
    crawler.AddObservable(Observable{
        AccountType: 0,
        Address:     "101",
    })
}

Upvotes: 0

Views: 506

Answers (1)

Emin Laletovic
Emin Laletovic

Reputation: 4324

The easiest thing to do here, without modifying the solution much, is to introduce a RWMutex to the crawler struct. It would help would lock the critical part of the code while handling the slice. See the changes below:

type crawler struct {
    explorerAPIURL string
    interval       *time.Ticker
    errChan        chan error
    quitChan       chan int
    eventChan      chan event
    observables    []Observable
    errorHandler   func(err error)
    mtx            sync.RWMutex
}

...

func (u *crawler) AddObservable(observable Observable) {
    u.mtx.Lock()
    u.observables = append(u.observables, observable)
    u.mtx.Unlock()
}

func (u *crawler) RemoveObservable(observable Observable) {
    u.mtx.Lock()
    newObservableList := make([]Observable, 0)
    for _, o := range u.observables {
        if o.Address != observable.Address {
            newObservableList = append(newObservableList, o)
        }
    }
    u.observables = newObservableList
    u.mtx.Unlock()
}

However, while this resolves the race issue, I do not guarantee that you will not eventually run into memory leakage issues at some point. For example, while trying to remove an observable that still didn't finish its execution from the slice.

My suggestion would be to either hold off with the slice actions (add or remove) until the all execution is finished or introduce a check to cancel the execution of an observable when removing it.

One solution would be to introduce additional channels for slice operations and handle the operations in the Start function.

type crawler struct {
    explorerAPIURL string
    interval       *time.Ticker
    errChan        chan error
    quitChan       chan int
    addChan        chan Observable
    removeChan     chan Observable
    eventChan      chan event
    observables    []Observable
    errorHandler   func(err error)
    mtx            sync.RWMutex
}

// NewService --
func NewService(
    observables []Observable,
    errorHandler func(err error),
) Service {

    interval := time.NewTicker(10 * time.Second)

    return &crawler{
        explorerAPIURL: "http://dummy.com",
        interval:       interval,
        errChan:        make(chan error),
        quitChan:       make(chan int),
        addChan:        make(chan Observable),
        removeChan:     make(chan Observable),
        eventChan:      make(chan event),
        observables:    observables,
        errorHandler:   errorHandler,
    }
}

func (u *crawler) Start() {
    log.Debug("start observe")
    for {
        select {
        case <-u.interval.C:
            log.Debug("observe interval")
            u.observeAll(u.observables)
        case err := <-u.errChan:
            u.errorHandler(err)
        case <-u.quitChan:
            log.Debug("stop observe")
            u.interval.Stop()
            time.Sleep(time.Second)
            close(u.eventChan)
            return
        case o := <-u.addChan:
            u.observables = append(u.observables, o)
        case o := <-u.removeChan:
            newObservableList := make([]Observable, 0)
            for _, observable := range u.observables {
                if o.Address != observable.Address {
                    newObservableList = append(newObservableList, observable)
                }
            }
            u.observables = newObservableList
        }
    }
}

...

func (u *crawler) AddObservable(observable Observable) {
    u.addChan <- observable
}

func (u *crawler) RemoveObservable(observable Observable) {
    u.removeChan <- observable
}

...

//EDIT - I've added the modified versions of these functions as well.
func (u *crawler) observeAll(observables []Observable) {
    g, _ := errgroup.WithContext(context.Background())
    for _, a := range observables {
        g.Go(func() error {
            return u.observe(a)
        })
    }

    if err := g.Wait(); err != nil {
        log.Error(err)
    }
}

func (u *crawler) observe(observe Observable) error {

    resp, err := http.Get(
        fmt.Sprintf("%v/%v", u.explorerAPIURL, observe.Address),
    )
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return err
    }
    e := event{
        EventType: 0,
        Result: Result{
            resp: body,
        },
    }

    u.eventChan <- e
    return nil
}

Upvotes: 1

Related Questions