Vitrion
Vitrion

Reputation: 415

When I write a DataFrame to a Parquet file, no errors are shown and no file is created

Hi everybody I have a problem while saving a DataFrame. I found a similar unanswered question: Saving Spark dataFrames as parquet files - no errors, but data is not being saved. My problem is that when I tested the following code:

scala> import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.Vectors

scala> val dataset = spark.createDataFrame(
     |   Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0))
     | ).toDF("id", "hour", "mobile", "userFeatures", "clicked")
dataset: org.apache.spark.sql.DataFrame = [id: int, hour: int ... 3 more fields]

scala> dataset.show
+---+----+------+--------------+-------+
| id|hour|mobile|  userFeatures|clicked|
+---+----+------+--------------+-------+
|  0|  18|   1.0|[0.0,10.0,0.5]|    1.0|
+---+----+------+--------------+-------+

scala> dataset.write.parquet("/home/vitrion/out")

No errors has been shown and seems that the DF has been saved as Parquet file. Surprisingly, no file has been created in the output directory.

enter image description here

This is my cluster configuration

enter image description here

The logfile says:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
18/03/01 12:56:53 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 51016@t630-0
18/03/01 12:56:53 INFO SignalUtils: Registered signal handler for TERM
18/03/01 12:56:53 INFO SignalUtils: Registered signal handler for HUP
18/03/01 12:56:53 INFO SignalUtils: Registered signal handler for INT
18/03/01 12:56:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/03/01 12:56:54 WARN Utils: Your hostname, t630-0 resolves to a loopback address: 127.0.1.1; using 192.168.239.218 instead (on interface eno1)
18/03/01 12:56:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
18/03/01 12:56:54 INFO SecurityManager: Changing view acls to: vitrion
18/03/01 12:56:54 INFO SecurityManager: Changing modify acls to: vitrion
18/03/01 12:56:54 INFO SecurityManager: Changing view acls groups to: 
18/03/01 12:56:54 INFO SecurityManager: Changing modify acls groups to: 
18/03/01 12:56:54 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(vitrion); groups with view permissions: Set(); users  with modify permissions: Set(vitrion); groups with modify permissions: Set()
18/03/01 12:56:54 INFO TransportClientFactory: Successfully created connection to /192.168.239.54:42629 after 80 ms (0 ms spent in bootstraps)
18/03/01 12:56:54 INFO SecurityManager: Changing view acls to: vitrion
18/03/01 12:56:54 INFO SecurityManager: Changing modify acls to: vitrion
18/03/01 12:56:54 INFO SecurityManager: Changing view acls groups to: 
18/03/01 12:56:54 INFO SecurityManager: Changing modify acls groups to: 
18/03/01 12:56:54 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(vitrion); groups with view permissions: Set(); users  with modify permissions: Set(vitrion); groups with modify permissions: Set()
18/03/01 12:56:54 INFO TransportClientFactory: Successfully created connection to /192.168.239.54:42629 after 2 ms (0 ms spent in bootstraps)
18/03/01 12:56:54 INFO DiskBlockManager: Created local directory at /tmp/spark-d749d72b-6db2-4f02-8dae-481c0ea1f68f/executor-f379929a-3a6a-4366-8983-b38e19fb9cfc/blockmgr-c6d89ef4-b22a-4344-8816-23306722d40c
18/03/01 12:56:54 INFO MemoryStore: MemoryStore started with capacity 8.4 GB
18/03/01 12:56:54 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://[email protected]:42629
18/03/01 12:56:54 INFO WorkerWatcher: Connecting to worker spark://[email protected]:45532
18/03/01 12:56:54 INFO TransportClientFactory: Successfully created connection to /192.168.239.218:45532 after 1 ms (0 ms spent in bootstraps)
18/03/01 12:56:54 INFO WorkerWatcher: Successfully connected to spark://[email protected]:45532
18/03/01 12:56:54 INFO CoarseGrainedExecutorBackend: Successfully registered with driver
18/03/01 12:56:54 INFO Executor: Starting executor ID 2 on host 192.168.239.218
18/03/01 12:56:54 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 37178.
18/03/01 12:56:54 INFO NettyBlockTransferService: Server created on 192.168.239.218:37178
18/03/01 12:56:54 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
18/03/01 12:56:54 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(2, 192.168.239.218, 37178, None)
18/03/01 12:56:54 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(2, 192.168.239.218, 37178, None)
18/03/01 12:56:54 INFO BlockManager: Initialized BlockManager: BlockManagerId(2, 192.168.239.218, 37178, None)
18/03/01 12:56:54 INFO Executor: Using REPL class URI: spark://192.168.239.54:42629/classes
18/03/01 12:57:54 INFO CoarseGrainedExecutorBackend: Got assigned task 0
18/03/01 12:57:54 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
18/03/01 12:57:54 INFO TorrentBroadcast: Started reading broadcast variable 0
18/03/01 12:57:55 INFO TransportClientFactory: Successfully created connection to /192.168.239.54:35081 after 1 ms (0 ms spent in bootstraps)
18/03/01 12:57:55 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 28.1 KB, free 8.4 GB)
18/03/01 12:57:55 INFO TorrentBroadcast: Reading broadcast variable 0 took 103 ms
18/03/01 12:57:55 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 76.6 KB, free 8.4 GB)
18/03/01 12:57:55 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
18/03/01 12:57:55 INFO SQLHadoopMapReduceCommitProtocol: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
18/03/01 12:57:55 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
18/03/01 12:57:55 INFO SQLHadoopMapReduceCommitProtocol: Using output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
18/03/01 12:57:55 INFO CodecConfig: Compression: SNAPPY
18/03/01 12:57:55 INFO CodecConfig: Compression: SNAPPY
18/03/01 12:57:55 INFO ParquetOutputFormat: Parquet block size to 134217728
18/03/01 12:57:55 INFO ParquetOutputFormat: Parquet page size to 1048576
18/03/01 12:57:55 INFO ParquetOutputFormat: Parquet dictionary page size to 1048576
18/03/01 12:57:55 INFO ParquetOutputFormat: Dictionary is on
18/03/01 12:57:55 INFO ParquetOutputFormat: Validation is off
18/03/01 12:57:55 INFO ParquetOutputFormat: Writer version is: PARQUET_1_0
18/03/01 12:57:55 INFO ParquetOutputFormat: Maximum row group padding size is 0 bytes
18/03/01 12:57:55 INFO ParquetOutputFormat: Page size checking is: estimated
18/03/01 12:57:55 INFO ParquetOutputFormat: Min row count for page size check is: 100
18/03/01 12:57:55 INFO ParquetOutputFormat: Max row count for page size check is: 10000
18/03/01 12:57:55 INFO ParquetWriteSupport: Initialized Parquet WriteSupport with Catalyst schema:
{
  "type" : "struct",
  "fields" : [ {
    "name" : "id",
    "type" : "integer",
    "nullable" : false,
    "metadata" : { }
  }, {
    "name" : "hour",
    "type" : "integer",
    "nullable" : false,
    "metadata" : { }
  }, {
    "name" : "mobile",
    "type" : "double",
    "nullable" : false,
    "metadata" : { }
  }, {
    "name" : "userFeatures",
    "type" : {
      "type" : "udt",
      "class" : "org.apache.spark.ml.linalg.VectorUDT",
      "pyClass" : "pyspark.ml.linalg.VectorUDT",
      "sqlType" : {
        "type" : "struct",
        "fields" : [ {
          "name" : "type",
          "type" : "byte",
          "nullable" : false,
          "metadata" : { }
        }, {
          "name" : "size",
          "type" : "integer",
          "nullable" : true,
          "metadata" : { }
        }, {
          "name" : "indices",
          "type" : {
            "type" : "array",
            "elementType" : "integer",
            "containsNull" : false
          },
          "nullable" : true,
          "metadata" : { }
        }, {
          "name" : "values",
          "type" : {
            "type" : "array",
            "elementType" : "double",
            "containsNull" : false
          },
          "nullable" : true,
          "metadata" : { }
        } ]
      }
    },
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "clicked",
    "type" : "double",
    "nullable" : false,
    "metadata" : { }
  } ]
}
and corresponding Parquet message type:
message spark_schema {
  required int32 id;
  required int32 hour;
  required double mobile;
  optional group userFeatures {
    required int32 type (INT_8);
    optional int32 size;
    optional group indices (LIST) {
      repeated group list {
        required int32 element;
      }
    }
    optional group values (LIST) {
      repeated group list {
        required double element;
      }
    }
  }
  required double clicked;
}

18/03/01 12:57:55 INFO CodecPool: Got brand-new compressor [.snappy]
18/03/01 12:57:55 INFO InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 84
18/03/01 12:57:55 INFO FileOutputCommitter: Saved output of task 'attempt_20180301125755_0000_m_000000_0' to file:/home/vitrion/out/_temporary/0/task_20180301125755_0000_m_000000
18/03/01 12:57:55 INFO SparkHadoopMapRedUtil: attempt_20180301125755_0000_m_000000_0: Committed
18/03/01 12:57:55 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1967 bytes result sent to driver`

Can you please help me to solve this?

Thank you

Upvotes: 2

Views: 1844

Answers (1)

user9382513
user9382513

Reputation: 11

Have you tried writing without the Vector? I have seen it in the past where complex data structures would cause writing issues.

Upvotes: 1

Related Questions