Reputation: 938
I am running this code daily and I am saving the output on some chosen location:
def aggConnections()(input: DataFrame) = {
input
.groupBy (
$"spid" (0).as ("domain_userid"),
$"cxid" (0).as ("key_id") )
.agg (
min ($"Time").as ("first_seen"),
max ($"Time").as ("last_seen") )
.select ($"domain_userid", $"key_id",
lit ("cxense_id").as ("key_source"),
lit (1).as ("times_seen"),
$"first_seen", $"last_seen")
.filter($"domain_userid".isNotNull && $"key_id".isNotNull)
}
val requests = spark.read.parquet("day1").transform(aggConnections())
spid
is an array and it looks like this:
spid:array
element:string
That's why I had to access it like $"spid" (0).as ("domain_userid")
Unfortunately, on some days I get the following error when running this job on AWS EMR running Spark 3.0.1:
diagnostics: User class threw exception: org.apache.spark.sql.AnalysisException: cannot resolve '`spid`' given input columns: [AdUnitId, AudienceSegmentIds, BandWidth, BandwidthGroupId, BandwidthId, Browser, BrowserId, City, CityId, CmsMetadata, Country, CountryId, DeviceCategory, Domain, GfpContentId, IsCompanion, IsFilledRequest, IsInterstitial, IsVideoFallbackRequest, KeyPart, Metro, MetroId, MobileAppId, MobileCapability, MobileCarrier, MobileDevice, OS, OSId, OSVersion, PodPosition, PostalCode, PostalCodeId, PublisherProvidedID, RefererURL, Region, RegionId, RequestLanguage, RequestedAdUnitSizes, Time, TimeUsec2, UserId, VideoPosition, addefend, ads_enabled, app_name, app_version, bereidingstijd, brand, careerlevel, cat, category, categorycode, categoryname, channel_id, channel_title, cid, cmssource, companyname, compid, content_url, contentid, contentsubstype, crt_cpm, crt_size, cxid, cxsg, dc_yt, deviceid, dos, dossier, educationlevel, floor, fsr, gang, gdpr_applies, gdpr_consentstring, gelegenheid, hb_pb, hb_size, hour, ifa, industry, ingredient, iom, itemid, ix_apnx_om, ix_cdb_om, ix_imdi_cpm, jobtitle, k21, kage, kar, kauth, kembed, keuken, kgender, klg, ko, kpid, kvlg, kvz, kw, lat, long, mc_asset_type, mediation, model, moeilijkheid, origin, pag, pagetype, path, pay, pos, positie, production_id, productname, retailercode, retailername, rpfl_12108, screen, sector, shopid, show_id, show_title, soort, src, stad, starttype, station, subforum, tag, theme, top, video_duration, video_label, yt_vrallowed, ytdevice];; 'Aggregate ['spid[0], cxid#136[0]], ['spid[0] AS domain_userid#276, cxid#136[0] AS key_id#277, min(Time#0) AS first_seen#417, max(Time#0) AS last_seen#419]
I thought the problem was because the spid
column was not present, howerver it is in the data when I check the schema. Therfore, I decided to test it more accurately on Databricks. The weird thing is that If I run this code on Databricks running the same Spark version 3.0.1 everything works fine on every day of data, including the ones that fail on EMR. I really can't explain what happens.
Upvotes: 2
Views: 1667
Reputation: 938
I discovered that my daily partitions were saved in hourly partitions too. The schemas of these hourly partitions may slightly differ in the way that some hours missed my spid
column. Hence, on some days my production environment was failing with the error reported above.
The bug was difficult to spot because my test instance had the option("mergeSchema", "true")
as a default Spark option, hence it was merging the schema without encountering the error.
Upvotes: 2