SimonVonDerGoltz
SimonVonDerGoltz

Reputation: 163

PySpark: LogisticRegressionWithLBFGS is getting slower in iteration

I have an iteration which calls LogisticRegressionWithLBFGS x times.

The problem is, that the iteration is getting slower every loop and finally hangs forever.

I tried a lot of different approaches, but no luck so far.

The code looks like that:

def getBootsrapedAttribution( iNumberOfSamples, df):

    def parsePoint(line):
        return LabeledPoint(line[2], line[3:])

    aResults = {}
    while x <= iNumberOfSamples:
        print ("## Sample: " + str(x))
        a = datetime.datetime.now()
        dfSample = sampleData(df)
        dfSample.repartition(700)
        parsedData = dfSample.rdd.map(parsePoint)
        parsedData = parsedData.repartition(700)
        parsedData.persist()
        model = LogisticRegressionWithLBFGS.train(parsedData)
        parsedData.unpersist()
        b = datetime.datetime.now()
        print(b-a)
        x+=1

def sampleData(df):
    df = df.repartition(500)
    dfFalse = df.filter('col == 0').sample(False, 0.00035)
    dfTrue = df.filter('col == 1')
    dfSample = dfTrue.unionAll(dfFalse)
    return dfSample


getBootsrapedAttribution(50, df)

And the Output looks like this:

## Sample: 1
0:00:44.393886

## Sample: 2
0:00:28.403687

## Sample: 3
0:00:30.884087

## Sample: 4
0:00:33.523481

## Sample: 5
0:00:36.107836

## Sample: 6
0:00:37.077169

## Sample: 7
0:00:41.160941

## Sample: 8
0:00:54.768870

## Sample: 9
0:01:01.31139

## Sample: 10
0:00:59.326750

## Sample: 11
0:01:37.222967

## Sample: 12

...hangs forever

Without model = LogisticRegressionWithLBFGS.train(parsedData) it runs without performance issues.

My cluster looks like this:

spark.default.parallelism   500
spark.driver.maxResultSize  20G
spark.driver.memory 200G
spark.executor.cores    32
spark.executor.instances    2
spark.executor.memory   124G

Does anyone know this issue?

Upvotes: 1

Views: 462

Answers (1)

SimonVonDerGoltz
SimonVonDerGoltz

Reputation: 163

I'm answering my own question.

The Problem is within the method LogisticRegressionWithLBFGS. Replacing this method with LogisticRegression in Spark 2.1+ resolved the issue. There is no more slowing down per iteration.

Furthermore there are some more improvements you could do with the code above. The rdd method sample can be replaced with the DataFrame method sampleBy. This will also avoid the unnecessary union:

.sampleBy('col', fractions={0: 0.00035, 1: 1}, seed=1234)

Furthermore all the repartitions in the code above are unnecessary. Important is that df passed to getBootsrapedAttribution() is partioned well and cached.

Upvotes: 1

Related Questions