Clock Slave
Clock Slave

Reputation: 7967

PySpark SQL: more than one row returned by a subquery used as an expression:

I a trying to create a new column in my test dataframe using values from another dataframe called train. Below is a snapshot. In the train dataframe, The value in the first row under the column aml_freq_a is the number of times v appears in column a. Similarly, the 42 under aml_freq_b is the number of times l appears in b. The ['aml_freq_a', 'aml_freq_b', 'aml_freq_c'] are basically frequency columns.

>>> train.show(5)

+---+---+---+----------+----------+----------+                                  
|  a|  b|  c|aml_freq_a|aml_freq_b|aml_freq_c|
+---+---+---+----------+----------+----------+
|  v|  l|  l|        56|        42|        29|
|  u|  g|  l|        47|        46|        29|
|  s|  g|  l|        28|        46|        29|
|  v|  m|  l|        56|        33|        29|
|  h|  m|  l|        44|        33|        29|
+---+---+---+----------+----------+----------+

In the test dataset has columns ['a', 'b', 'c']. Here, I need to add the frequency columns - ['aml_freq_a', 'aml_freq_b', 'aml_freq_c'].

>>> test.show(5)
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  w|  j|  c|
|  a|  g|  w|
|  s|  d|  i|
|  g|  j|  r|
|  r|  b|  u|
+---+---+---+

To do this, I wrote subqueries that join train and test on a,b and,c.

query = "select test.*,
  (select aml_freq_a from test left join train on test.a = train.a),
  (select aml_freq_b from test left join train on test.b = train.b),
  (select aml_freq_c from ten left join train on test.c = train.c)
from test"

train.createTempView('train')
test.createTempView('test')

spark.sql(query) runs fine but when I call show() on it it returns me the following error

java.lang.RuntimeException: more than one row returned by a subquery used as an expression:

What does this mean? Initially I thought there was something wrong with my query but I validated my query here and there's nothing wrong here. What am I not seeing here?

Upvotes: 0

Views: 2922

Answers (2)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

In your attempt, (select aml_freq_a from test left join train on test.a = train.a) would return a dataframe i.e. multiple rows which can't be used as a select argument as "select test.*, (select aml_freq_a from test left join train on test.a = train.a),...

Correct query would be as below

query = "select test.* from " \
            "(select test.*, aml_freq_a from " \
                "(select test.*, aml_freq_b from " \
                    "(select test.*, aml_freq_c from test " \
                "left join train on test.c = train.c) as test " \
            "left join train on test.b = train.b)  as test " \
        "left join train on test.a = train.a) as test"

If the header is required in following format

+---+---+---+----------+----------+----------+
|a  |b  |c  |aml_freq_a|aml_freq_b|aml_freq_c|
+---+---+---+----------+----------+----------+

then

query = "select test.* from " \
            "(select test.*, aml_freq_c from " \
                "(select test.*, aml_freq_b from " \
                    "(select test.*, aml_freq_a from test " \
                "left join train on test.a = train.a) as test " \
            "left join train on test.b = train.b)  as test " \
        "left join train on test.c = train.c) as test"

You can do it in much simpler and safer way using dataframe api

test.join(train.select('a', 'aml_freq_a'), ['a'], 'left') \
    .join(train.select('b', 'aml_freq_b'), ['b'], 'left') \
    .join(train.select('c', 'aml_freq_c'), ['c'], 'left')

I hope the answer is helpful

Upvotes: 1

user9822761
user9822761

Reputation: 11

It means that

  • At least one of the correlated subqueries you use returns more than one match.
  • While Spark supports only one value returned for each row (i.e. correlated subqueries have to be aggregated).

Upvotes: 1

Related Questions