Reputation: 1461
I've set up a Flume service that can monitor Netcat or tail a log with Exec as a Source, that sort of thing. Im using Memory as a channel, and Avro as the sink (Thrift is specified in the docs, but doesn't seem to work in Flume 1.3 or 1.4)
I've set up a C# socket server to receive the messages, and I get a stream of bytes. If I use read them using Encoding.UTF8.GetString(buffer), then I can see something like:
"\0\0\0\0\0\0\0\0\00�����Tt������5\ne\0�����Tt������5\ne\0\0appendBatch\0\0�\0�127.0.0.1 - - [12/Nov/2013:22:42:50 +0000] \"GET /docs/appdev/index.html HTTP/1.1\" 200 7645\0�127.0.0.1 - - [12/Nov/2013:22:44:07 +0000] \"GET /docs/appdev/introduction.html HTTP/1.1\" 200 8619\0�127.0.0.1 - - [12/Nov/2013:22:44:09 +0000] \"GET /docs/appdev/installation.html HTTP/1.1\" 200 9045\0�127.0.0.1 - - [12/Nov/2013:22:44:12 +0000] \"GET /docs/appdev/deployment.html HTTP/1.1\" 200 18800\0�127.0.0.1 - - [12/Nov/2013:22:49:07 +0000] \"GET /docs/appdev/source.html HTTP/1.1\" 200 24554\0�127.0.0.1 - - [12/Nov/2013:22:50:38 +0000] \"GET /docs/appdev/processes.html HTTP/1.1\" 200 30743\0�127.0.0.1 - - [12/Nov/2013:22:51:39 +0000] \"GET /docs/appdev/sample/ HTTP/1.1\" 200 1852\0�0:0:0:0:0:0:0:1 - - [12/Nov/2013:22:51:48 +0000] \"GET /sample HTTP/1.1\" 404 963\0�0:0:0:0:0:0:0:1 - - [12/Nov/2013:22:51:48 +0000] \"GET /favicon.ico HTTP/1.1\" 200 21630\0�0:0:0:0:0:0:0:1 - - [12/Nov/2013:23:02:13 +0000] \"GET /sample HTTP/1.1\" 404 963\0"
So obviously I'm getting the data across, but I'd like to deserialize it properly rather than doing some kind of regex extractions. I can see there's an official Avro C# library, and there's a Microsoft Hadoop library that has deserialization libraries. I created a local object to deserialize to:
[DataContract]
public class AvroEvent
{
[DataMember]
public byte[] Body { get; set; }
}
and try deserializing with this:
client = serverSocket.EndAccept(result);
var myNetworkStream = new NetworkStream(client);
myNetworkStream.Read(buffer, 0, size);
var avro = new AvroSerializer(typeof(AvroEvent));
var deser = avro.Deserialize(myNetworkStream);
then I get this error:
System.InvalidOperationException was unhandled
HResult=-2146233079
Message=Unexpected number of bytes.
Source=Microsoft.Hadoop.Avro
I'm almost certainly going about all this in the wrong way, and I'm sure people are going to be telling me not to use C#, but I've pretty much run out of sources on Google, so if anyone else has actually done this and point me in the right direction, I'd be very grateful
Toby
Upvotes: 0
Views: 941
Reputation: 2846
Flume uses RPC mechanism to communicate Events. If Avro is chosen then Flume relies on Avro RPC which is not supported by Microsoft's Avro Library (As mentioned in What's New) because it is intended to be used only as serialisation framework.
Technically speaking, the Deserialize()
method expects the stream to be having the following data (in bits):
11[size of byte array encoded in variable-length zig zag][actual byte]
(*)
The error you receive is probably because the received data has a different wire-format.
* The starting 1
is necessary because the version 0.8.4951.5418 of the library encapsulates each type in a union of null (0) and the type (1) so the first 1
is for the record AvroEvent and the second 1
is for the field Body
. This behaviour is configurable in the recent version 1.1.0.5.
Upvotes: 1