Istvan
Istvan

Reputation: 8572

How to get Avro serialised message from a F# record?

I have the following in F#:

type HitType = {
  hitHostname                                   : string;
  hitMemoryUsed                                 : float;
  hitPage                                       : string;
}

I try to serialize it in Avro:

  let specificDatumWriter : SpecificDatumWriter<HitType> =
    SpecificDatumWriter(avroSchema)

  let getAvroMsg msg =
    let memStream =
      new MemoryStream(256)
    let encoder =
      BinaryEncoder(memStream)
    specificDatumWriter.Write(msg, encoder)
    encoder.Flush()
    memStream.ToArray()

When I try to get the Avro byte the following way:

let hit : HitType = { hitHostname = "abc"; hitMemoryUsed = 0.123; hitPage = "None" }
getAvroMsg hit

I get the following exception:

Unable to cast object of type 'TrckFsharp.HitType' to type 'Avro.Specific.ISpecificRecord'.: InvalidCastException
   at Avro.Specific.SpecificDatumWriter`1.WriteRecordFields(Object recordObj, RecordFieldWriter[] writers, Encoder encoder)
   at TrckFsharp.Avro.getAvroMsg(HitType msg) in project-dev/Avro.fs:line 40
   at TrckFsharp.Handler.trckGet(APIGatewayProxyRequest request) in project/Handler.fs:line 77
   at TrckFsharp.Handler.handler(APIGatewayProxyRequest request) in project/Handler.fs:line 137
   at lambda_method(Closure , Stream , Stream , LambdaContextInternal )

Not sure why. Isn't this the way to produce a message with Avro?

Update1:

Avro schema:

{
  "namespace": "com.lambdainsight.hit",
  "type": "record",
  "name": "Hit",
  "fields": [
     {"name": "hitHostname",                                     "type": "string" },
     {"name": "hitMemoryUsed",                                   "type": "float"  },
     {"name": "hitPage",                                         "type": "string" }
  ]
 }

I have uploaded everything into a single repo:

https://github.com/LambdaInsight/avro-test/

Upvotes: 0

Views: 254

Answers (1)

Stuart
Stuart

Reputation: 5506

The Avro SDK wants your HitType needs to implement ISpecificRecord, which is unfortunate cause it would be nice to use a record type.

For now you can do it with this HitType:

type HitType() =
    let schema = Schema.Parse("""{
  "namespace": "com.lambdainsight.hit",
  "type": "record",
  "name": "Hit",
  "fields": [
     {"name": "hitHostname",   "type": "string" },
     {"name": "hitMemoryUsed", "type": "float"  },
     {"name": "hitPage",       "type": "string" }
  ]
 }""")

    member val HitHostname = "" with get, set
    member val HitMemoryUsed = 0.f with get, set
    member val HitPage = "" with get, set

    interface ISpecificRecord with
        member this.Schema with get() = schema
        member this.Get(fieldPos : int) = 
            match fieldPos with
            | 0 -> this.HitHostname :> obj
            | 1 -> this.HitMemoryUsed :> obj
            | 2 -> this.HitPage :> obj
            | _ -> raise (AvroRuntimeException(sprintf "Bad index %i in Get()" fieldPos))
        member this.Put(fieldPos : int, fieldValue : obj) = 
            match fieldPos with
            | 0 -> this.HitHostname <- string fieldValue
            | 1 -> this.HitMemoryUsed <- fieldValue :?> single
            | 2 -> this.HitPage <- string fieldValue
            | _ -> raise (AvroRuntimeException(sprintf "Bad index %i in Put()" fieldPos))

Upvotes: 1

Related Questions