Reputation: 153
Optimize memory usage
msg, err := pulsarConsumer.Receive(ctx)
and send the msg to a channel
dataWriteChan <- msg
msg <- dataWriteChan
dataPayload := msg.Payload()
var avroData interface{}
err := avro.Unmarshal(avroCodec, dataPayload, &avroData)
and then send the avroData to a slice to cache
dataCache = append(dataCache, avroData)
tmpBuf := make([]byte, 0)
bf := bytes.NewBuffer(tmpBuf)
config := goavro.OCFConfig{
W: bf,
Codec: goavroCodec,
}
ocfWriter, _ := goavro.NewOCFWriter(config)
ocfWriter.Append(dataCache)
then use the buffer bf to generate sql
sql := fmt.Sprintf("INSERT INTO %s.%s (%s) FORMAT Avro %v", Database, TableName, cols, bf)
exec sql
conn.Exec(ctx, sql)
sql := fmt.Sprintf("INSERT INTO %s.%s (%s) FORMAT Avro ", Database, TableName, w.cols)
conn.Exec(ctx, sql + "%s", data.String())
I don't know if this can save memory; But the bigger problem is it insert with a error!
write to storage err: %!(NOVERB)%!(EXTRA string=code: 1001, message: avro::Exception: EOF reached)
1. avro.Unmarshal(avroCodec, dataPayload, &avroData)
2. ocfWriter.Append(dataCache)
3. fmt.Sprintf("INSERT INTO %s.%s (%s) FORMAT Avro ", Database, TableName, w.cols)
No matter whether you can help solve it or not, I also appreciated you are willing to spend time thinking together! This is very important to me.
Upvotes: 0
Views: 665
Reputation: 23
The driver itself which you appear to be using does not support avro format. You will need to marshall the data into a struct. You can then exploit the driver's appendStruct method.
Upvotes: 0