pmdaly
pmdaly

Reputation: 1212

how to convert a bytearray in one row of a pyspark dataframe to a column of bytes?

My data currently looks something like this

df = pd.DataFrame({'content': [bytearray(b'\x01%\xeb\x8cH\x89')]})
spark.createDataFrame(df).show()

+-------------------+
|            content|
+-------------------+
|[01 25 EB 8C 48 89]|
+-------------------+

How do I get a column that has a row for each value in the array?

+-------+
|content|
+-------+
|      1|
|     37|
|    235|
|    140|
|     72|
|    137|
+-------+

I've tried explode but this will not work on a bytearray.

edit: additional context, the df is the result of reading in a binary file with spark.read.format('binaryfile').load(...).

Upvotes: 0

Views: 1798

Answers (3)

Kapil
Kapil

Reputation: 166

converting bytearray to array using UDF might help

import pyspark.sql.functions as f
from pyspark.sql.types import IntegerType,ArrayType
byte_to_int = lambda x : [int(y) for y in x]
byte_to_int_udf = f.udf(lambda z :byte_to_int(z),ArrayType(IntegerType()))
df = pd.DataFrame({'content': [bytearray(b'\x01%\xeb\x8cH\x89')]})
df1 = spark.createDataFrame(df)
df1.withColumn("content_array",byte_to_int_udf(f.col('content'))).select(f.explode(f.col('content_array'))).show()

Upvotes: 0

Mike Holcomb
Mike Holcomb

Reputation: 413

You need use to flatMap on your column - you pass in a function to parse each data element. The function you provide should emit a sequence. Each element in the sequence will become a new row.

A longer explanation with more examples is here: https://koalatea.io/python-pyspark-flatmap/

Upvotes: 0

pltc
pltc

Reputation: 6082

I applied a chain of transformations here with comments. It's a bit "hacky" though.

from pyspark.sql import functions as F

(df
    .withColumn('content', F.hex('content')) # convert bytes to hex: 0125EB8C4889
    .withColumn('content', F.regexp_replace('content', '(\w{2})', '$1,')) # split hex to chunks: 01,25,EB,8C,48,89,
    .withColumn('content', F.expr('substring(content, 0, length(content) - 1)')) # remove redundent comma: 01,25,EB,8C,48,89
    .withColumn('content', F.split('content', ',')) # split hex values by comma: [01, 25, EB, 8C, 48, 89]
    .withColumn('content', F.explode('content')) # explode hex values to multiple rows
    .withColumn('content', F.conv('content', 16, 10)) # convert hex to dec
    .show(10, False)
)

# Output
# +-------+
# |content|
# +-------+
# |1      |
# |37     |
# |235    |
# |140    |
# |72     |
# |137    |
# +-------+

Upvotes: 1

Related Questions