Reputation: 163
I have an iteration which calls LogisticRegressionWithLBFGS x times.
The problem is, that the iteration is getting slower every loop and finally hangs forever.
I tried a lot of different approaches, but no luck so far.
The code looks like that:
def getBootsrapedAttribution( iNumberOfSamples, df):
def parsePoint(line):
return LabeledPoint(line[2], line[3:])
aResults = {}
while x <= iNumberOfSamples:
print ("## Sample: " + str(x))
a = datetime.datetime.now()
dfSample = sampleData(df)
dfSample.repartition(700)
parsedData = dfSample.rdd.map(parsePoint)
parsedData = parsedData.repartition(700)
parsedData.persist()
model = LogisticRegressionWithLBFGS.train(parsedData)
parsedData.unpersist()
b = datetime.datetime.now()
print(b-a)
x+=1
def sampleData(df):
df = df.repartition(500)
dfFalse = df.filter('col == 0').sample(False, 0.00035)
dfTrue = df.filter('col == 1')
dfSample = dfTrue.unionAll(dfFalse)
return dfSample
getBootsrapedAttribution(50, df)
And the Output looks like this:
## Sample: 1
0:00:44.393886
## Sample: 2
0:00:28.403687
## Sample: 3
0:00:30.884087
## Sample: 4
0:00:33.523481
## Sample: 5
0:00:36.107836
## Sample: 6
0:00:37.077169
## Sample: 7
0:00:41.160941
## Sample: 8
0:00:54.768870
## Sample: 9
0:01:01.31139
## Sample: 10
0:00:59.326750
## Sample: 11
0:01:37.222967
## Sample: 12
...hangs forever
Without model = LogisticRegressionWithLBFGS.train(parsedData)
it runs without performance issues.
My cluster looks like this:
spark.default.parallelism 500
spark.driver.maxResultSize 20G
spark.driver.memory 200G
spark.executor.cores 32
spark.executor.instances 2
spark.executor.memory 124G
Does anyone know this issue?
Upvotes: 1
Views: 462
Reputation: 163
I'm answering my own question.
The Problem is within the method LogisticRegressionWithLBFGS
. Replacing this method with LogisticRegression
in Spark 2.1+
resolved the issue. There is no more slowing down per iteration.
Furthermore there are some more improvements you could do with the code above.
The rdd
method sample
can be replaced with the DataFrame
method sampleBy
. This will also avoid the unnecessary union
:
.sampleBy('col', fractions={0: 0.00035, 1: 1}, seed=1234)
Furthermore all the repartitions
in the code above are unnecessary. Important is that df
passed to getBootsrapedAttribution()
is partioned well and cached
.
Upvotes: 1