Reputation: 296
I'm writing a pipeline code that will be used in both batch and streaming mode with DataFlow, and I'm having OOM issues when using GroupByKey
when operating in batch mode. The code bellow shows the issue: when I have a large file, GroupByKey
appears to hold everything in memory, only emitting values after the input finishes. I tried to use triggers to force events to be triggered, but failed. I can't find any way of using this transform on big files.
How can implement a pipeline in beam go that includes grouping and that can efficiently work on large files?
package sisubqio_test
import (
"context"
"flag"
"fmt"
"io"
"os"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
func TestWriter(t *testing.T) {
mustNotFail := func(err error) {
if err != nil {
t.Fatal(err)
}
}
// test file with a few lines of text
fName := "in.tmp.txt"
f, err := os.OpenFile(fName, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
mustNotFail(err)
defer func() {
mustNotFail(f.Close())
mustNotFail(os.Remove(fName))
}()
for i := 0; i < 10; i++ {
_, err = fmt.Fprintf(f, "line %d\n", i)
mustNotFail(err)
}
_, err = f.Seek(0, io.SeekStart)
mustNotFail(err)
flag.Parse()
beam.Init()
pipeline, s := beam.NewPipelineWithRoot()
col := textio.Read(s, fName)
// add timestamp to messages: each message has a timestamp 20s after
// the previous one
now := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)
var counter int32
col = beam.ParDo(s, func(line beam.X) (typex.EventTime, beam.X) {
i := atomic.AddInt32(&counter, 1) - 1
evTime := mtime.Time(mtime.FromTime(now.Add(20 * time.Duration(i) * time.Second)).Milliseconds())
t.Logf("[0] input event, time=%v", evTime)
return evTime, line
}, col)
// add a window and inspect events, when emitted
col = beam.WindowInto(s,
window.NewFixedWindows(time.Minute),
col,
beam.Trigger(window.TriggerAlways()), // I tried all triggers here; makes no difference
)
col = beam.ParDo(s, func(w typex.Window, e string) string {
t.Logf("[1] window: %v", w)
return e
}, col)
// add a key and group by it; inspect events, when emitted
col = beam.AddFixedKey(s, col)
col = beam.ParDo(s, func(et typex.EventTime, group int, x beam.X) (int, beam.X) {
t.Logf("[2] at %v got (group %d)",
time.UnixMilli(int64(et)),
group)
return group, x
}, col)
// ISSUE IS HERE
// It doesn't matter the trigger I use, it looks like GroupByKey
// always wants to hold everything into memory and only then
// emit it's outputs. With large files is always OOMs.
col = beam.GroupByKey(s, col)
beam.ParDo0(s, func(w typex.Window, group int, valIter func(*string) bool) {
sb := strings.Builder{}
fmt.Fprintf(&sb, "[3] win=%v out group=%d", w, group)
var elm string
for valIter(&elm) {
fmt.Fprintf(&sb, " %s;", elm)
}
t.Log(sb.String())
}, col)
mustNotFail(beamx.Run(context.Background(), pipeline))
}
Output:
writer_test.go:58: [0] input event, time=1577836800000
writer_test.go:69: [1] window: [1577836800000:1577836860000)
writer_test.go:79: [2] at 2020-01-01 01:00:00 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836820000
writer_test.go:69: [1] window: [1577836800000:1577836860000)
writer_test.go:79: [2] at 2020-01-01 01:00:20 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836840000
writer_test.go:69: [1] window: [1577836800000:1577836860000)
writer_test.go:79: [2] at 2020-01-01 01:00:40 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836860000
writer_test.go:69: [1] window: [1577836860000:1577836920000)
writer_test.go:79: [2] at 2020-01-01 01:01:00 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836880000
writer_test.go:69: [1] window: [1577836860000:1577836920000)
writer_test.go:79: [2] at 2020-01-01 01:01:20 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836900000
writer_test.go:69: [1] window: [1577836860000:1577836920000)
writer_test.go:79: [2] at 2020-01-01 01:01:40 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836920000
writer_test.go:69: [1] window: [1577836920000:1577836980000)
writer_test.go:79: [2] at 2020-01-01 01:02:00 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836940000
writer_test.go:69: [1] window: [1577836920000:1577836980000)
writer_test.go:79: [2] at 2020-01-01 01:02:20 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836960000
writer_test.go:69: [1] window: [1577836920000:1577836980000)
writer_test.go:79: [2] at 2020-01-01 01:02:40 +0100 CET got (group 0)
writer_test.go:58: [0] input event, time=1577836980000
writer_test.go:69: [1] window: [1577836980000:1577837040000)
writer_test.go:79: [2] at 2020-01-01 01:03:00 +0100 CET got (group 0)
writer_test.go:95: [3] win=[1577836920000:1577836980000) out group=0 line 6; line 7; line 8;
writer_test.go:95: [3] win=[1577836980000:1577837040000) out group=0 line 9;
writer_test.go:95: [3] win=[1577836800000:1577836860000) out group=0 line 0; line 1; line 2;
writer_test.go:95: [3] win=[1577836860000:1577836920000) out group=0 line 3; line 4; line 5;
EDIT: I found Jira tickets related to triggers and windows that, at the time of writing this, make be believe that the trigger, and specially trigger propagation is a WIP.
Upvotes: 0
Views: 825
Reputation: 75860
Beam
uses map and reduce operations. Map
(transforms) can be done in parallel and on different workers/VM. Reduce
needs to know all the elements to be performed, thus it loads all the elements in memory and then it performs the reduce groupBy
operation.
You have 2 solutions:
You can create windows to process only chunks of your large file. However, your groupBy
won't be global, but per window.
You can also try the new Dataflow
prime option. It's serverless and fully scalable. The promise is to remove all the OOM error (that I got only in Java, I never use the Beam Go SDK)
You can also increase the memory of your worker, but it's not a scalable solution (and it costs more!). The prime option is the good one (but still in preview)
Upvotes: 0