fcat
fcat

Reputation: 1251

Allow single connection with Akka stream as tcp server

I am trying to build a simple tcp server using Akka streams.

 Tcp()
  .bind(props.host, props.port)
  .to(Sink.foreach(_.handleWith(handler)))
  .run()
  .onComplete {
    case Success(i) => logger.info(s"Server is bound at ${props.host}:${props.port}")
    case Failure(e) => logger.error("Server binding failure", e)
  }

I want to allow maximum one connection at a time. To achieve this I have added the following line into my application.conf file.

akka.io.tcp.max-channels = 2

With this configuration, akka allows only one connection at a time. However, as soon as the second connection is attempted it rejects the request and fails itself with the following message:

Could not register incoming connection since selector capacity limit is reached, closing connection

From this point, it is not possible to establish any connection since the Tcp server is down.

Question: What is the proper way of enabling only one connection at a time? The main purpose is answering the first connection request and rejecting others while it is still in progress. It should be again possible to make another connection, after the previous connection is closed. As I mentioned, only one connection should be allowed at any time.

BONUS: Is it possible to provide a whitelist to make akka stream accept connections only from this list? I am planning to allow only known ip addresses to connect my server. To achieve this, I think it is enough to know the proper way of rejecting a request. So I can compare the incoming connection's ip address with a given list and reject if it is not in there. But any better solution is also appreciated.

Upvotes: 2

Views: 583

Answers (1)

maks
maks

Reputation: 6006

Bind method of Tcp has a parameter options which accepts a Traversable of Socket options. You can pass smth like this to that paramter:

case class AllowedAddresses(addresses: Seq[InetAddress]) extends SocketOption {
    override def beforeConnect(s: Socket): Unit = {
      if (!addresses.contains(s.getInetAddress)) s.close()
    }
  }

so your code will look like this:

Tcp()
  .bind(props.host, props.port, options = List(AllowedAddresses(listOfAddresses)))
  .to(Sink.foreach(_.handleWith(handler)))
  .run()
  .onComplete {
    case Success(i) => logger.info(s"Server is bound at ${props.host}:${props.port}")
    case Failure(e) => logger.error("Server binding failure", e)
  }

The approach of limiting the number of request is the same, investigate methods in the SocketOptions trait

PS. Have not tried this to run, just concluded after investigation of stream API, so please check for correctness.

Upvotes: 1

Related Questions