dadadima
dadadima

Reputation: 938

Cannot resolve 'columnname' given input columns: Spark-SQL

I am running this code daily and I am saving the output on some chosen location:

  def aggConnections()(input: DataFrame) = {
    input
      .groupBy (
        $"spid" (0).as ("domain_userid"),
        $"cxid" (0).as ("key_id") )
      .agg (
        min ($"Time").as ("first_seen"),
        max ($"Time").as ("last_seen") )
      .select ($"domain_userid", $"key_id",
        lit ("cxense_id").as ("key_source"),
        lit (1).as ("times_seen"),
        $"first_seen", $"last_seen")
      .filter($"domain_userid".isNotNull && $"key_id".isNotNull)
  }

 val requests = spark.read.parquet("day1").transform(aggConnections())

spid is an array and it looks like this:

spid:array
     element:string

That's why I had to access it like $"spid" (0).as ("domain_userid")

Unfortunately, on some days I get the following error when running this job on AWS EMR running Spark 3.0.1:

diagnostics: User class threw exception: org.apache.spark.sql.AnalysisException: cannot resolve '`spid`' given input columns: [AdUnitId, AudienceSegmentIds, BandWidth, BandwidthGroupId, BandwidthId, Browser, BrowserId, City, CityId, CmsMetadata, Country, CountryId, DeviceCategory, Domain, GfpContentId, IsCompanion, IsFilledRequest, IsInterstitial, IsVideoFallbackRequest, KeyPart, Metro, MetroId, MobileAppId, MobileCapability, MobileCarrier, MobileDevice, OS, OSId, OSVersion, PodPosition, PostalCode, PostalCodeId, PublisherProvidedID, RefererURL, Region, RegionId, RequestLanguage, RequestedAdUnitSizes, Time, TimeUsec2, UserId, VideoPosition, addefend, ads_enabled, app_name, app_version, bereidingstijd, brand, careerlevel, cat, category, categorycode, categoryname, channel_id, channel_title, cid, cmssource, companyname, compid, content_url, contentid, contentsubstype, crt_cpm, crt_size, cxid, cxsg, dc_yt, deviceid, dos, dossier, educationlevel, floor, fsr, gang, gdpr_applies, gdpr_consentstring, gelegenheid, hb_pb, hb_size, hour, ifa, industry, ingredient, iom, itemid, ix_apnx_om, ix_cdb_om, ix_imdi_cpm, jobtitle, k21, kage, kar, kauth, kembed, keuken, kgender, klg, ko, kpid, kvlg, kvz, kw, lat, long, mc_asset_type, mediation, model, moeilijkheid, origin, pag, pagetype, path, pay, pos, positie, production_id, productname, retailercode, retailername, rpfl_12108, screen, sector, shopid, show_id, show_title, soort, src, stad, starttype, station, subforum, tag, theme, top, video_duration, video_label, yt_vrallowed, ytdevice];; 'Aggregate ['spid[0], cxid#136[0]], ['spid[0] AS domain_userid#276, cxid#136[0] AS key_id#277, min(Time#0) AS first_seen#417, max(Time#0) AS last_seen#419]

I thought the problem was because the spid column was not present, howerver it is in the data when I check the schema. Therfore, I decided to test it more accurately on Databricks. The weird thing is that If I run this code on Databricks running the same Spark version 3.0.1 everything works fine on every day of data, including the ones that fail on EMR. I really can't explain what happens.

Upvotes: 2

Views: 1667

Answers (1)

dadadima
dadadima

Reputation: 938

I discovered that my daily partitions were saved in hourly partitions too. The schemas of these hourly partitions may slightly differ in the way that some hours missed my spid column. Hence, on some days my production environment was failing with the error reported above.

The bug was difficult to spot because my test instance had the option("mergeSchema", "true") as a default Spark option, hence it was merging the schema without encountering the error.

Upvotes: 2

Related Questions