gpanda
gpanda

Reputation: 875

Concurrently write multiple csv files from one, splitting on a partition column in Golang

My objective is to read one or multiple csv files that share a common format, and write to separate files based on a partition column in the csv data. Please allow that the last column is the partition, that data is un-sorted, and a given partition can be found in multiple files. Example of one file:

fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,04
22df9,abc,def,2017,11,06,03
1d243,abc,def,2017,11,06,02

If this approach smells like the dreaded XY Problem, I'm happy to adjust.

What I've tried so far:

This obviously doesn't work (yet), as I'm not aware of how to send a line to the correct worker based on the partition value seen on a given line.

I've given each worker an id string for each partition value, but am not aware how to select that worker to send to, if I should be creating a separate chan []string for each worker and send to that channel with a select, or if perhaps a struct should hold each worker with some sort of pool and routing functionality.

TLDR; I'm lost as to how to conditionally send data to a given go routine or channel based on some categorical string value, where the number of unique's can be arbitrary, but likely does not exceed 24 unique partition values.

I will caveat by stating I've noticed questions like this do get down-voted, so if you feel this is counter-constructive or incomplete enough to down-vote, please comment with why so I can avoid repeating the offense.

Thanks for any help in advance!

Playground

Snippet:

  package main

    import (
        "encoding/csv"
        "fmt"
        "log"
        "strings"
        "time"
    )

    func main() {

        // CSV
        r := csv.NewReader(csvFile1)
        lines, err := r.ReadAll()
        if err != nil {
            log.Fatalf("error reading all lines: %v", err)
        }

        // CHANNELS
        lineChan := make(chan []string)

        // TRACKER
        var seenPartitions []string

        for _, line := range lines {

            hour := line[6]
            if !stringInSlice(hour, seenPartitions) {
                seenPartitions = append(seenPartitions, hour)
                go worker(hour, lineChan)
            }
            // How to send to the correct worker/channel? 
            lineChan <- line

        }
        close(lineChan)
    }

    func worker(id string, lineChan <-chan []string) {
        for j := range lineChan {
            fmt.Println("worker", id, "started  job", j)
            // Write to a new file here and wait for input over the channel
            time.Sleep(time.Second)
            fmt.Println("worker", id, "finished job", j)
        }
    }

    func stringInSlice(str string, list []string) bool {
        for _, v := range list {
            if v == str {
                return true
            }
        }
        return false
    }

    // DUMMY
var csvFile1 = strings.NewReader(`
12fy3,abc,def,2017,11,06,04 
fsdio,abc,def,2017,11,06,01
11213,abc,def,2017,11,06,02
1sdf9,abc,def,2017,11,06,01
2123r,abc,def,2017,11,06,03
1v2t3,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1d243,abc,def,2017,11,06,01
1da23,abc,def,2017,11,06,04
a1523,abc,def,2017,11,06,01
12453,abc,def,2017,11,06,04`)

Upvotes: 2

Views: 3013

Answers (1)

k1m190r
k1m190r

Reputation: 1313

Synchronous version no go concurrent magic first (see concurrent version below).

package main

import (
    "encoding/csv"
    "fmt"
    "io"
    "log"
    "strings"
)

func main() {

    // CSV
    r := csv.NewReader(csvFile1)
    partitions := make(map[string][][]string)

    for {
        rec, err := r.Read()
        if err != nil {
            if err == io.EOF {
                err = nil

                save_partitions(partitions)

                return
            }
            log.Fatal(err)
        }

        process(rec, partitions)
    }

}

// prints only
func save_partitions(partitions map[string][][]string) {
    for part, recs := range partitions {
        fmt.Println(part)
        for _, rec := range recs {
            fmt.Println(rec)
        }
    }
}

// this can also write/append directly to a file
func process(rec []string, partitions map[string][][]string) {
    l := len(rec)
    part := rec[l-1]
    if p, ok := partitions[part]; ok {
        partitions[part] = append(p, rec)
    } else {
        partitions[part] = [][]string{rec}
    }
}

// DUMMY
var csvFile1 = strings.NewReader(`
fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,01
1d243,abc,def,2017,11,06,01
1v2t3,abc,def,2017,11,06,01
a1523,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
11213,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
2123r,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1da23,abc,def,2017,11,06,04
12fy3,abc,def,2017,11,06,04
12453,abc,def,2017,11,06,04`)

https://play.golang.org/p/--iqZGzxCF

And the concurrent version:

package main

import (
    "encoding/csv"
    "fmt"
    "io"
    "log"
    "strings"
    "sync"
)

var (
    // list of channels to communicate with workers
    // workers accessed synchronousely no mutex required
    workers = make(map[string]chan []string)

    // wg is to make sure all workers done before exiting main
    wg = sync.WaitGroup{}

    // mu used only for sequential printing, not relevant for program logic
    mu = sync.Mutex{}
)

func main() {

    // wait for all workers to finish up before exit
    defer wg.Wait()

    r := csv.NewReader(csvFile1)

    for {
        rec, err := r.Read()
        if err != nil {
            if err == io.EOF {
                savePartitions()
                return
            }
            log.Fatal(err) // sorry for the panic
        }
        process(rec)
    }

}

func process(rec []string) {
    l := len(rec)
    part := rec[l-1]

    if c, ok := workers[part]; ok {
        // send rec to worker
        c <- rec
    } else {
        // if no worker for the partition

        // make a chan
        nc := make(chan []string)
        workers[part] = nc

        // start worker with this chan
        go worker(nc)

        // send rec to worker via chan
        nc <- rec
    }
}

func worker(c chan []string) {

    // wg.Done signals to main worker completion
    wg.Add(1)
    defer wg.Done()

    part := [][]string{}
    for {
        // wait for a rec or close(chan)
        rec, ok := <-c
        if ok {
            // save the rec
            // instead of accumulation in memory
            // this can be saved to file directly
            part = append(part, rec)
        } else {
            // channel closed on EOF

            // dump partition
            // locks ensures sequential printing
            // not a required for independent files
            mu.Lock()
            for _, p := range part {
                fmt.Printf("%+v\n", p)
            }
            mu.Unlock()

            return
        }
    }
}

// simply signals to workers to stop
func savePartitions() {
    for _, c := range workers {
        // signal to all workers to exit
        close(c)
    }
}

// DUMMY
var csvFile1 = strings.NewReader(`
fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,01
1d243,abc,def,2017,11,06,01
1v2t3,abc,def,2017,11,06,01
a1523,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
11213,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
2123r,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1da23,abc,def,2017,11,06,04
12fy3,abc,def,2017,11,06,04
12453,abc,def,2017,11,06,04`)

https://play.golang.org/p/oBTPosy0yT

Have fun!

Upvotes: 2

Related Questions