Reputation: 81
I'm trying to create a new column with three possible values DEF, FWD, MID.
DEF= ['LB','LWB','RB','LCB','RCB','CB','RWB']
FWD= ['RF','LF','LW','RS','RW','LS','CF','ST']
MID= ['LCM','LM','RDM','CAM','RAM','RCM','CM','CDM','RM','LAM','LDM']
I'm using the FIFA dataset of soccer players. I want to create a new column which tells us whether a player is DEF, FWD, MID based on the position they play.
My Current code is:
df_kmeans = dffiltereds.withColumn("Position_Group", when(col('Position')==DEF,'DEF').when(col('Position')==FWD,'FWD').when(col('Position')==MID,'MID'))
But I keep getting error messages. Any help would be much appreciated.
I get a very long error message:
Py4JJavaError Traceback (most recent call last)
<ipython-input-13-dff2b83d143f> in <module>
9 #
10
---> 11 df_kmeans_new = dffiltereds.withColumn("Position_Group", when(col('Position')==DEF,'DEF').when(col('Position')==FWD,'FWD').when(col('Position')==MID,'MID'))
C:\spark-3.0.0-preview2-bin-hadoop2.7\python\pyspark\sql\column.py in _(self, other)
113 def _(self, other):
114 jc = other._jc if isinstance(other, Column) else other
--> 115 njc = getattr(self._jc, name)(jc)
116 return Column(njc)
117 _.__doc__ = doc
C:\spark-3.0.0-preview2-bin-hadoop2.7\python\lib\py4j-0.10.8.1-src.zip\py4j\java_gateway.py in __call__(self, *args)
1284 answer = self.gateway_client.send_command(command)
1285 return_value = get_return_value(
-> 1286 answer, self.gateway_client, self.target_id, self.name)
1287
1288 for temp_arg in temp_args:
C:\spark-3.0.0-preview2-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
96 def deco(*a, **kw):
97 try:
---> 98 return f(*a, **kw)
99 except py4j.protocol.Py4JJavaError as e:
100 converted = convert_exception(e.java_exception)
C:\spark-3.0.0-preview2-bin-hadoop2.7\python\lib\py4j-0.10.8.1-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o222.equalTo.
: java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [LB, LWB, RB, LCB, RCB, CB, RWB]
at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:85)
at org.apache.spark.sql.catalyst.expressions.Literal$.$anonfun$create$2(literals.scala:145)
at scala.util.Failure.getOrElse(Try.scala:222)
at org.apache.spark.sql.catalyst.expressions.Literal$.create(literals.scala:145)
at org.apache.spark.sql.functions$.typedLit(functions.scala:132)
at org.apache.spark.sql.functions$.lit(functions.scala:115)
at org.apache.spark.sql.Column.$eq$eq$eq(Column.scala:280)
at org.apache.spark.sql.Column.equalTo(Column.scala:303)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:564)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:832)
Upvotes: 2
Views: 119
Reputation: 8410
For spark1.5+
you can use array_contains. Only catch is that you need to create array of literals of your three lists
and use expr
to pass a column
to the value
part of array_contains
. Link to array_contains(spark docs).
In this case you don't need to do a check for MID
or create an array for it, because you can just do otherwise
to negate the others.
#sample data
#df.show()
#+--------+
#|Position|
#+--------+
#| RF|
#| RDM|
#| CB|
#+--------+
from pyspark.sql import functions as F
from pyspark.sql.functions import when
DEF= ['LB','LWB','RB','LCB','RCB','CB','RWB']
FWD= ['RF','LF','LW','RS','RW','LS','CF','ST']
MID= ['LCM','LM','RDM','CAM','RAM','RCM','CM','CDM','RM','LAM','LDM']
df.withColumn("DEF", F.array(*[F.lit(x) for x in DEF]))\
.withColumn("FWD", F.array(*[F.lit(x) for x in FWD]))\
.withColumn("Position_2", F.when(F.expr("""array_contains(DEF,Position)""")==True, F.lit('DEF'))\
.when(F.expr("""array_contains(FWD,Position)""")==True, F.lit('FWD'))\
.otherwise(F.lit("MID"))).drop("DEF","MID","FWD").show()
#+--------+----------+
#|Position|Position_2|
#+--------+----------+
#| RF| FWD|
#| RDM| MID|
#| CB| DEF|
#+--------+----------+
For spark2.4+
you could use arrays_overlap
from pyspark.sql import functions as F
from pyspark.sql.functions import when
df.withColumn("Position_2", F.when(F.arrays_overlap(F.array(*(F.lit(x) for x in DEF)),F.array("Position")),F.lit('DEF'))\
.when(F.arrays_overlap(F.array(*(F.lit(x) for x in FWD)),F.array("Position")),F.lit('FWD'))\
.otherwise(F.lit('MID'))).show()
I overlooked a rather simple method
, using isin
.(spark1.5+).
df.withColumn("Position_2", F.when(F.col("Position").isin(DEF),F.lit('DEF'))\
.when(F.col("Position").isin(FWD),F.lit('FWD'))\
.otherwise(F.lit('MID'))).show()
Upvotes: 2