GroupByKey always holds holds everything in RAM, causing OOM

I'm writing a pipeline code that will be used in both batch and streaming mode with DataFlow, and I'm having OOM issues when using GroupByKey when operating in batch mode. The code bellow shows the issue: when I have a large file, GroupByKey appears to hold everything in memory, only emitting values after the input finishes. I tried to use triggers to force events to be triggered, but failed. I can't find any way of using this transform on big files.

How can implement a pipeline in beam go that includes grouping and that can efficiently work on large files?

package sisubqio_test

import (
    "context"
    "flag"
    "fmt"
    "io"
    "os"
    "strings"
    "sync/atomic"
    "testing"
    "time"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

func TestWriter(t *testing.T) {
    mustNotFail := func(err error) {
        if err != nil {
            t.Fatal(err)
        }
    }

    // test file with a few lines of text
    fName := "in.tmp.txt"
    f, err := os.OpenFile(fName, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
    mustNotFail(err)
    defer func() {
        mustNotFail(f.Close())
        mustNotFail(os.Remove(fName))
    }()
    for i := 0; i < 10; i++ {
        _, err = fmt.Fprintf(f, "line %d\n", i)
        mustNotFail(err)
    }

    _, err = f.Seek(0, io.SeekStart)
    mustNotFail(err)

    flag.Parse()
    beam.Init()

    pipeline, s := beam.NewPipelineWithRoot()
    col := textio.Read(s, fName)

    // add timestamp to messages: each message has a timestamp 20s after
    // the previous one
    now := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)
    var counter int32
    col = beam.ParDo(s, func(line beam.X) (typex.EventTime, beam.X) {
        i := atomic.AddInt32(&counter, 1) - 1
        evTime := mtime.Time(mtime.FromTime(now.Add(20 * time.Duration(i) * time.Second)).Milliseconds())
        t.Logf("[0] input event, time=%v", evTime)
        return evTime, line
    }, col)

    // add a window and inspect events, when emitted
    col = beam.WindowInto(s,
        window.NewFixedWindows(time.Minute),
        col,
        beam.Trigger(window.TriggerAlways()), // I tried all triggers here; makes no difference
    )
    col = beam.ParDo(s, func(w typex.Window, e string) string {
        t.Logf("[1] window: %v", w)
        return e
    }, col)

    // add a key and group by it; inspect events, when emitted
    col = beam.AddFixedKey(s, col)
    col = beam.ParDo(s, func(et typex.EventTime, group int, x beam.X) (int, beam.X) {
        t.Logf("[2] at %v got (group %d)",
            time.UnixMilli(int64(et)),
            group)
        return group, x
    }, col)

    // ISSUE IS HERE
    // It doesn't matter the trigger I use, it looks like GroupByKey
    // always wants to hold everything into memory and only then
    // emit it's outputs. With large files is always OOMs.
    col = beam.GroupByKey(s, col)
    beam.ParDo0(s, func(w typex.Window, group int, valIter func(*string) bool) {
        sb := strings.Builder{}
        fmt.Fprintf(&sb, "[3] win=%v out group=%d", w, group)
        var elm string
        for valIter(&elm) {
            fmt.Fprintf(&sb, " %s;", elm)
        }
        t.Log(sb.String())
    }, col)

    mustNotFail(beamx.Run(context.Background(), pipeline))
}

Output:

    writer_test.go:58: [0] input event, time=1577836800000
    writer_test.go:69: [1] window: [1577836800000:1577836860000)
    writer_test.go:79: [2] at 2020-01-01 01:00:00 +0100 CET got (group 0)
    writer_test.go:58: [0] input event, time=1577836820000
    writer_test.go:69: [1] window: [1577836800000:1577836860000)
    writer_test.go:79: [2] at 2020-01-01 01:00:20 +0100 CET got (group 0)
    writer_test.go:58: [0] input event, time=1577836840000
    writer_test.go:69: [1] window: [1577836800000:1577836860000)
    writer_test.go:79: [2] at 2020-01-01 01:00:40 +0100 CET got (group 0)
    writer_test.go:58: [0] input event, time=1577836860000
    writer_test.go:69: [1] window: [1577836860000:1577836920000)
    writer_test.go:79: [2] at 2020-01-01 01:01:00 +0100 CET got (group 0)
    writer_test.go:58: [0] input event, time=1577836880000
    writer_test.go:69: [1] window: [1577836860000:1577836920000)
    writer_test.go:79: [2] at 2020-01-01 01:01:20 +0100 CET got (group 0)
    writer_test.go:58: [0] input event, time=1577836900000
    writer_test.go:69: [1] window: [1577836860000:1577836920000)
    writer_test.go:79: [2] at 2020-01-01 01:01:40 +0100 CET got (group 0)
    writer_test.go:58: [0] input event, time=1577836920000
    writer_test.go:69: [1] window: [1577836920000:1577836980000)
    writer_test.go:79: [2] at 2020-01-01 01:02:00 +0100 CET got (group 0)
    writer_test.go:58: [0] input event, time=1577836940000
    writer_test.go:69: [1] window: [1577836920000:1577836980000)
    writer_test.go:79: [2] at 2020-01-01 01:02:20 +0100 CET got (group 0)
    writer_test.go:58: [0] input event, time=1577836960000
    writer_test.go:69: [1] window: [1577836920000:1577836980000)
    writer_test.go:79: [2] at 2020-01-01 01:02:40 +0100 CET got (group 0)
    writer_test.go:58: [0] input event, time=1577836980000
    writer_test.go:69: [1] window: [1577836980000:1577837040000)
    writer_test.go:79: [2] at 2020-01-01 01:03:00 +0100 CET got (group 0)
    writer_test.go:95: [3] win=[1577836920000:1577836980000) out group=0 line 6; line 7; line 8;
    writer_test.go:95: [3] win=[1577836980000:1577837040000) out group=0 line 9;
    writer_test.go:95: [3] win=[1577836800000:1577836860000) out group=0 line 0; line 1; line 2;
    writer_test.go:95: [3] win=[1577836860000:1577836920000) out group=0 line 3; line 4; line 5;

EDIT: I found Jira tickets related to triggers and windows that, at the time of writing this, make be believe that the trigger, and specially trigger propagation is a WIP.

Upvotes: 0

Views: 825

Answers (1)

guillaume blaquiere
guillaume blaquiere

Reputation: 75860

Beam uses map and reduce operations. Map (transforms) can be done in parallel and on different workers/VM. Reduce needs to know all the elements to be performed, thus it loads all the elements in memory and then it performs the reduce groupBy operation.

You have 2 solutions:

  • You can create windows to process only chunks of your large file. However, your groupBy won't be global, but per window.

  • You can also try the new Dataflow prime option. It's serverless and fully scalable. The promise is to remove all the OOM error (that I got only in Java, I never use the Beam Go SDK)

You can also increase the memory of your worker, but it's not a scalable solution (and it costs more!). The prime option is the good one (but still in preview)

Upvotes: 0

Related Questions