Reputation: 875
My objective is to read one or multiple csv files that share a common format, and write to separate files based on a partition column in the csv data. Please allow that the last column is the partition, that data is un-sorted, and a given partition can be found in multiple files. Example of one file:
fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,04
22df9,abc,def,2017,11,06,03
1d243,abc,def,2017,11,06,02
If this approach smells like the dreaded XY Problem, I'm happy to adjust.
What I've tried so far:
chan []string
.This obviously doesn't work (yet), as I'm not aware of how to send a line to the correct worker based on the partition value seen on a given line.
I've given each worker an id string
for each partition value, but am not aware how to select that worker to send to, if I should be creating a separate chan []string
for each worker and send to that channel with a select
, or if perhaps a struct should hold each worker with some sort of pool and routing functionality.
TLDR; I'm lost as to how to conditionally send data to a given go routine or channel based on some categorical string
value, where the number of unique's can be arbitrary, but likely does not exceed 24 unique partition values.
I will caveat by stating I've noticed questions like this do get down-voted, so if you feel this is counter-constructive or incomplete enough to down-vote, please comment with why so I can avoid repeating the offense.
Thanks for any help in advance!
Snippet:
package main
import (
"encoding/csv"
"fmt"
"log"
"strings"
"time"
)
func main() {
// CSV
r := csv.NewReader(csvFile1)
lines, err := r.ReadAll()
if err != nil {
log.Fatalf("error reading all lines: %v", err)
}
// CHANNELS
lineChan := make(chan []string)
// TRACKER
var seenPartitions []string
for _, line := range lines {
hour := line[6]
if !stringInSlice(hour, seenPartitions) {
seenPartitions = append(seenPartitions, hour)
go worker(hour, lineChan)
}
// How to send to the correct worker/channel?
lineChan <- line
}
close(lineChan)
}
func worker(id string, lineChan <-chan []string) {
for j := range lineChan {
fmt.Println("worker", id, "started job", j)
// Write to a new file here and wait for input over the channel
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
}
}
func stringInSlice(str string, list []string) bool {
for _, v := range list {
if v == str {
return true
}
}
return false
}
// DUMMY
var csvFile1 = strings.NewReader(`
12fy3,abc,def,2017,11,06,04
fsdio,abc,def,2017,11,06,01
11213,abc,def,2017,11,06,02
1sdf9,abc,def,2017,11,06,01
2123r,abc,def,2017,11,06,03
1v2t3,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1d243,abc,def,2017,11,06,01
1da23,abc,def,2017,11,06,04
a1523,abc,def,2017,11,06,01
12453,abc,def,2017,11,06,04`)
Upvotes: 2
Views: 3013
Reputation: 1313
Synchronous version no go concurrent magic first (see concurrent version below).
package main
import (
"encoding/csv"
"fmt"
"io"
"log"
"strings"
)
func main() {
// CSV
r := csv.NewReader(csvFile1)
partitions := make(map[string][][]string)
for {
rec, err := r.Read()
if err != nil {
if err == io.EOF {
err = nil
save_partitions(partitions)
return
}
log.Fatal(err)
}
process(rec, partitions)
}
}
// prints only
func save_partitions(partitions map[string][][]string) {
for part, recs := range partitions {
fmt.Println(part)
for _, rec := range recs {
fmt.Println(rec)
}
}
}
// this can also write/append directly to a file
func process(rec []string, partitions map[string][][]string) {
l := len(rec)
part := rec[l-1]
if p, ok := partitions[part]; ok {
partitions[part] = append(p, rec)
} else {
partitions[part] = [][]string{rec}
}
}
// DUMMY
var csvFile1 = strings.NewReader(`
fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,01
1d243,abc,def,2017,11,06,01
1v2t3,abc,def,2017,11,06,01
a1523,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
11213,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
2123r,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1da23,abc,def,2017,11,06,04
12fy3,abc,def,2017,11,06,04
12453,abc,def,2017,11,06,04`)
https://play.golang.org/p/--iqZGzxCF
And the concurrent version:
package main
import (
"encoding/csv"
"fmt"
"io"
"log"
"strings"
"sync"
)
var (
// list of channels to communicate with workers
// workers accessed synchronousely no mutex required
workers = make(map[string]chan []string)
// wg is to make sure all workers done before exiting main
wg = sync.WaitGroup{}
// mu used only for sequential printing, not relevant for program logic
mu = sync.Mutex{}
)
func main() {
// wait for all workers to finish up before exit
defer wg.Wait()
r := csv.NewReader(csvFile1)
for {
rec, err := r.Read()
if err != nil {
if err == io.EOF {
savePartitions()
return
}
log.Fatal(err) // sorry for the panic
}
process(rec)
}
}
func process(rec []string) {
l := len(rec)
part := rec[l-1]
if c, ok := workers[part]; ok {
// send rec to worker
c <- rec
} else {
// if no worker for the partition
// make a chan
nc := make(chan []string)
workers[part] = nc
// start worker with this chan
go worker(nc)
// send rec to worker via chan
nc <- rec
}
}
func worker(c chan []string) {
// wg.Done signals to main worker completion
wg.Add(1)
defer wg.Done()
part := [][]string{}
for {
// wait for a rec or close(chan)
rec, ok := <-c
if ok {
// save the rec
// instead of accumulation in memory
// this can be saved to file directly
part = append(part, rec)
} else {
// channel closed on EOF
// dump partition
// locks ensures sequential printing
// not a required for independent files
mu.Lock()
for _, p := range part {
fmt.Printf("%+v\n", p)
}
mu.Unlock()
return
}
}
}
// simply signals to workers to stop
func savePartitions() {
for _, c := range workers {
// signal to all workers to exit
close(c)
}
}
// DUMMY
var csvFile1 = strings.NewReader(`
fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,01
1d243,abc,def,2017,11,06,01
1v2t3,abc,def,2017,11,06,01
a1523,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
11213,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
2123r,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1da23,abc,def,2017,11,06,04
12fy3,abc,def,2017,11,06,04
12453,abc,def,2017,11,06,04`)
https://play.golang.org/p/oBTPosy0yT
Have fun!
Upvotes: 2