Reputation: 3333
I am trying to use this example to connect Nifi to Flink:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8090/nifi")
.portName("Data for Flink")
.requestBatchCount(5)
.buildConfig();
SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
DataStream<NiFiDataPacket> streamSource = env.addSource(nifiSource).setParallelism(2);
DataStream<String> dataStream = streamSource.map(new MapFunction<NiFiDataPacket, String>() {
@Override
public String map(NiFiDataPacket value) throws Exception {
return new String(value.getContent(), Charset.defaultCharset());
}
});
dataStream.print();
env.execute();
I am running Nifi as a standalone server with default properties, except these properties:
nifi.remote.input.host=localhost
nifi.remote.input.secure=false
nifi.remote.input.socket.port=8090
nifi.remote.input.http.enabled=true
The call fails each time, with following log in Nifi:
[Site-to-Site Worker Thread-24] o.a.nifi.remote.SocketRemoteSiteListener
Unable to communicate with remote instance null due to
org.apache.nifi.remote.exception.HandshakeException: Handshake
with nifi://localhost:61680 failed because the Magic Header
was not present; closing connection
Nifi version: 1.7.1, Flink version: 1.7.1
Upvotes: 2
Views: 770
Reputation: 3333
After using the nifi-toolkit
I removed the custom value of nifi.remote.input.socket.port
and then added transportProtocol(SiteToSiteTransportProtocol.HTTP)
to my SiteToSiteClientConfig
and http://localhost:8080/nifi
as the URL.
The reason why I changed the port in the first place is that without specifying the protocol HTTP
it will use RAW
by default.
And when using the RAW
protocol from Flink side, the client cannot create Transaction
and prints the following warning:
Unable to refresh Remote Group's peers due to Remote instance of NiFi
is not configured to allow RAW Socket site-to-site communications
That's why I thought it was a port issue
So now with the default config of Nifi, this works as expected:
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("portNameAsInNifi")
.transportProtocol(SiteToSiteTransportProtocol.HTTP)
.requestBatchCount(1)
.buildConfig();
Upvotes: 2