RIL
RIL

Reputation: 187

Different results for N>1 goroutines (on N>1 Cpu:s). Why?

I have a test program that gives different results when executing more than one goroutine on more than one Cpu (Goroutines = Cpus). The "test" is about syncing goroutines using channels, and the program itself counts occurences of chars in strings. It produces consistent results on one Cpu / one goroutine.

See code example on playground (Note: Run on local machine to execute on multi core, and watch the resulting numbers vary): http://play.golang.org/p/PT5jeCKgBv .

Code summary: The program counts occurences of 4 different chars (A,T, G,C) in (DNA) strings.

Problem: Result (n occurences of chars) varies when executed on multiple Cpu's (goroutines). Why?

Description:

  1. A goroutine spawns work (SpawnWork) as strings to Workers. Sets up artificial string input data (hardcoded strings are copied n times).
  2. Goroutine Workers (Worker) are created equalling the numbers of Cpu's.
  3. Workers checks each char in string and counts A,T's and sends the sum into a channel, and G,C counts to another channel.
  4. SpawnWork closes workstring channel as to control Workers (which consumes strings using range, which quits when the input channel is closed by SpawnWork).
  5. When Workers has consumed its ranges (of chars) it sends a quit signal on the quit channel (quit <- true). These "pulses" will occure Cpu number of times ( Cpu count = goroutines count).
  6. Main (select) loop will quit when it has received Cpu-count number of quit signals.
  7. Main func prints a summary of occurences of Chars (A,T's, G,C's).

Simplified code:

1. "Worker" (goroutines) counting chars in lines:

func Worker(inCh chan *[]byte, resA chan<- *int, resB chan<- *int, quit chan bool) {
    //for p_ch := range inCh {
    for {
        p_ch, ok := <-inCh // similar to range
        if ok {
            ch := *p_ch
            for i := 0; i < len(ch); i++ {
                if ch[i] == 'A' || ch[i] == 'T' {        // Count A:s and T:s
                    at++
                } else if ch[i] == 'G' || ch[i] == 'C' { // Count G:s and C:s
                    gc++
                }
            }
            resA <- &at  // Send line results on separate channels
            resB <- &gc  // Send line results on separate channels
        } else {
            quit <- true // Indicate that we're all done
            break
        }
    }
}

2. Spawn work (strings) to workers:

func SpawnWork(inStr chan<- *[]byte, quit chan bool) {
    // Artificial input data
    StringData :=
        "NNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN\n" +
        "NTGAGAAATATGCTTTCTACTTTTTTGTTTAATTTGAACTTGAAAACAAAACACACACAA\n" +
        "... etc\n" +
    // ...
    for scanner.Scan() {
        s := scanner.Bytes()
        if len(s) == 0 || s[0] == '>' {
            continue
        } else {
            i++
            inStr <- &s
        }
    }
    close(inStr) // Indicate (to Workers) that there's no more strings coming.
}

3. Main routine:

func main() {
    // Count Cpus, and count down in final select clause
    CpuCnt := runtime.NumCPU() 
    runtime.GOMAXPROCS(CpuCnt)
    // Make channels
    resChA := make(chan *int)
    resChB := make(chan *int)
    quit := make(chan bool)
    inStr := make(chan *[]byte)

    // Set up Workers ( n = Cpu )
    for i := 0; i < CpuCnt; i++ {
        go Worker(inStr, resChA, resChB, quit)
    }
    // Send lines to Workers
    go SpawnWork(inStr, quit)

    // Count the number of "A","T" & "G","C" per line 
    // (comes in here as ints per row, on separate channels (at and gt))
    for {
        select {
        case tmp_at := <-resChA:
            tmp_gc := <-resChB // Ch A and B go in pairs anyway
            A += *tmp_at       // sum of A's and T's
            B += *tmp_gc       // sum of G's and C's
        case <-quit:
            // Each goroutine sends "quit" signals when it's done. Since 
            // the number of goroutines equals the Cpu counter, we count 
            // down each time a goroutine tells us it's done (quit at 0):
            CpuCnt--
            if CpuCnt == 0 { // When all goroutines are done then we're done.
                goto out     
            }
        }
    }
out:
    // Print report to screen
}

Why does this code count consistently only when executed on a singel cpu/goroutine? That is, the channels doesn't seem to sync, or the main loop quits forcefully before all goroutines are done? Scratching head.

(Again: See/run the full code at the playground: http://play.golang.org/p/PT5jeCKgBv )

// Rolf Lampa

Upvotes: 3

Views: 496

Answers (1)

Nick Craig-Wood
Nick Craig-Wood

Reputation: 54079

Here is a working version which consistently produces the same results no matter how many cpus are used.

Here is what I did

  • remove passing of *int - very racy to pass in a channel!
  • remove passing of *[]byte - pointless as slices are reference types anyway
  • copy the slice before putting it in the channel - the slice points to the same memory causing a race
  • fix initialisation of at and gc in Worker - they were in the wrong place - this was the major cause of the difference in results
  • use sync.WaitGroup for synchronisation and channel close()

I used the -race parameter of go build to find and fix the data races.

package main

import (
    "bufio"
    "fmt"
    "runtime"
    "strings"
    "sync"
)

func Worker(inCh chan []byte, resA chan<- int, resB chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Println("Worker started...")
    for ch := range inCh {
        at := 0
        gc := 0
        for i := 0; i < len(ch); i++ {
            if ch[i] == 'A' || ch[i] == 'T' {
                at++
            } else if ch[i] == 'G' || ch[i] == 'C' {
                gc++
            }
        }
        resA <- at
        resB <- gc
    }

}

func SpawnWork(inStr chan<- []byte) {
    fmt.Println("Spawning work:")
    // An artificial input source.
    StringData :=
        "NNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN\n" +
            "NTGAGAAATATGCTTTCTACTTTTTTGTTTAATTTGAACTTGAAAACAAAACACACACAA\n" +
            "CTTCCCAATTGGATTAGACTATTAACATTTCAGAAAGGATGTAAGAAAGGACTAGAGAGA\n" +
            "TATACTTAATGTTTTTAGTTTTTTAAACTTTACAAACTTAATACTGTCATTCTGTTGTTC\n" +
            "AGTTAACATCCCTGAATCCTAAATTTCTTCAGATTCTAAAACAAAAAGTTCCAGATGATT\n" +
            "TTATATTACACTATTTACTTAATGGTACTTAAATCCTCATTNNNNNNNNCAGTACGGTTG\n" +
            "TTAAATANNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN\n" +
            "NNNNNNNCTTCAGAAATAAGTATACTGCAATCTGATTCCGGGAAATATTTAGGTTCATAA\n"
    // Expand data n times
    tmp := StringData
    for n := 0; n < 1000; n++ {
        StringData = StringData + tmp
    }
    scanner := bufio.NewScanner(strings.NewReader(StringData))
    scanner.Split(bufio.ScanLines)

    var i int
    for scanner.Scan() {
        s := scanner.Bytes()
        if len(s) == 0 || s[0] == '>' {
            continue
        } else {
            i++
            s_copy := append([]byte(nil), s...)
            inStr <- s_copy
        }
    }
    close(inStr)
}

func main() {
    CpuCnt := runtime.NumCPU() // Count down in select clause
    CpuOut := CpuCnt           // Save for print report
    runtime.GOMAXPROCS(CpuCnt)
    fmt.Printf("Processors: %d\n", CpuCnt)

    resChA := make(chan int)
    resChB := make(chan int)
    inStr := make(chan []byte)

    fmt.Println("Spawning workers:")
    var wg sync.WaitGroup
    for i := 0; i < CpuCnt; i++ {
        wg.Add(1)
        go Worker(inStr, resChA, resChB, &wg)
    }
    fmt.Println("Spawning work:")
    go func() {
        SpawnWork(inStr)
        wg.Wait()
        close(resChA)
        close(resChB)
    }()

    A := 0
    B := 0
    LineCnt := 0
    for tmp_at := range resChA {
        tmp_gc := <-resChB // Theese go together anyway
        A += tmp_at
        B += tmp_gc
        LineCnt++
    }

    if !(A+B > 0) {
        fmt.Println("No A/B was found!")
    } else {
        ABFraction := float32(B) / float32(A+B)
        fmt.Println("\n----------------------------")
        fmt.Printf("Cpu's  : %d\n", CpuOut)
        fmt.Printf("Lines  : %d\n", LineCnt)
        fmt.Printf("A+B    : %d\n", A+B)
        fmt.Printf("A      : %d\n", A)
        fmt.Printf("B      : %d\n", A)
        fmt.Printf("AB frac: %v\n", ABFraction*100)
        fmt.Println("----------------------------")
    }
}

Upvotes: 3

Related Questions