User456898
User456898

Reputation: 5724

Flume agents are not connecting on different machines

Flume agent 1 does not connect to Flume agent 2. What could be the reason ?

I am using Flume to stream log file to HDFS using 2 Agents. The first agent is located at the source machine where the log file exists, while the second agent is located in the machine (IP Address is 10.10.201.40) where Hadoop is installed.


The configuration file of the first agent (flume-src-agent.conf) is as follows:

source_agent.sources = weblogic_server
source_agent.sources.weblogic_server.type = exec
source_agent.sources.weblogic_server.command = tail -f AdminServer.log
source_agent.sources.weblogic_server.batchSize = 1
source_agent.sources.weblogic_server.channels = memoryChannel
source_agent.sources.weblogic_server.interceptors = itime ihost itype

source_agent.sources.weblogic_server.interceptors.itime.type = timestamp

source_agent.sources.weblogic_server.interceptors.ihost.type = host
source_agent.sources.weblogic_server.interceptors.ihost.useIP = false
source_agent.sources.weblogic_server.interceptors.ihost.hostHeader = host

source_agent.sources.weblogic_server.interceptors.itype.type = static
source_agent.sources.weblogic_server.interceptors.itype.key = log_type
source_agent.sources.weblogic_server.interceptors.itype.value = apache_access_combined

source_agent.channels = memoryChannel
source_agent.channels.memoryChannel.type = memory
source_agent.channels.memoryChannel.capacity = 100


source_agent.sinks = avro_sink
source_agent.sinks.avro_sink.type = avro
source_agent.sinks.avro_sink.channel = memoryChannel
source_agent.sinks.avro_sink.hostname = 10.10.201.40

source_agent.sinks.avro_sink.port = 4545

The configuration file of the second agent (flume-trg-agent.conf) is as follows:

collector.sources = AvroIn
collector.sources.AvroIn.type = avro
collector.sources.AvroIn.bind = 0.0.0.0
collector.sources.AvroIn.port = 4545
collector.sources.AvroIn.channels = mc1 mc2

collector.channels = mc1 mc2
collector.channels.mc1.type = memory
collector.channels.mc1.capacity = 100
collector.channels.mc2.type = memory
collector.channels.mc2.capacity = 100

collector.sinks = HadoopOut
collector.sinks.HadoopOut.type = hdfs
collector.sinks.HadoopOut.channel = mc2
collector.sinks.HadoopOut.hdfs.path = hdfs://localhost:54310/user/root
collector.sinks.HadoopOut.hdfs.callTimeout = 150000
collector.sinks.HadoopOut.hdfs.fileType = DataStream
collector.sinks.HadoopOut.hdfs.writeFormat = Text
collector.sinks.HadoopOut.hdfs.rollSize = 0
collector.sinks.HadoopOut.hdfs.rollCount = 10000
collector.sinks.HadoopOut.hdfs.rollInterval = 600

When the 1st agent is run, I get the following error:

2015-04-08 15:14:10,251 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException:Failed to send events

at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:382)

at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)

at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)

at java.lang.Thread.run(Thread.java:744)

Caused by: org.apache.flume.FlumeException: NettyAvroRpcClient {host:10.10.201.40, port:4545}: RPC connection error

at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:161)

at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:115)

at org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:590)

at org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:88)

at org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:127)

at org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:209)

at org.apache.flume.sink.AbstractRpcSink.verifyConnection(AbstractRpcSink.java:269)

at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:339)

... 3 more

Caused by: java.io.IOException: Error connecting to /10.10.201.40:4545

at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:261)

at.org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:203)

at.org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:152)

at.org.apache.avro.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:147)

When the 2nd Agent is run, I get the following error:

2015-04-08 15:53:31,649 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG-org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:143)] Pollingsink runner starting

2015-04-08 15:53:31,844 (lifecycleSupervisor-1-3) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] Unable to start EventDrivenSourceRunner: { source:Avro source AvroIn: {bindAddress: 0.0.0.0, port: 4545 } } - Exception follows.

org.jboss.netty.channel.ChannelException: Failed to bind to: /0.0.0.0:4545

    at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:298)

    at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:106)

    at org.apache.flume.source.AvroSource.start(AvroSource.java:225)

    at org.apache.flume.source.EventDrivenSourceRunner.start(EventDrivenSourceRunner.java:44)

    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)

    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)

    at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)

    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)

    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)

    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)

    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)

    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)

    at java.lang.Thread.run(Thread.java:662)
Caused by: java.net.BindException: Address already in use

    at sun.nio.ch.Net.bind(Native Method)

    at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:126)

    at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)

    at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.bind(NioServerSocketPipelineSink.java:138)

    at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.handleServerSocket(NioServerSocketPipelineSink.java:90)

    at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.eventSunk(NioServerSocketPipelineSink.java:64)

    at org.jboss.netty.channel.Channels.bind(Channels.java:569)

    at org.jboss.netty.channel.AbstractChannel.bind(AbstractChannel.java:187)

    at org.jboss.netty.bootstrap.ServerBootstrap$Binder.channelOpen(ServerBootstrap.java:343)

    at org.jboss.netty.channel.Channels.fireChannelOpen(Channels.java:170)  

    at org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory.newChannel(NioServerSocketChannelFactory.java:158)

    at org.jboss.netty.channel.socket.nio.NioServerSocketChannel.<init>(NioServerSocketChannel.java:80)

    at org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory.newChannel(NioServerSocketChannelFactory.java:86)

    at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:277)
    ... 13 more

Upvotes: 0

Views: 2409

Answers (2)

pradsav
pradsav

Reputation: 71

For binding issues type netstat -plten and check for the pid for the process and kill the process. Doing that will solve the binding issues when you run the agent again

Upvotes: 1

Erik Schmiegelow
Erik Schmiegelow

Reputation: 2759

The answer to your question is in the second log:

Address already in use

The reason for this is that there's another process using port 4545. Just reconfigure both agents to another port, let say 41414, then it should work.

Upvotes: 3

Related Questions