Reputation: 6094
Imagine a scenario the connection IP becomes unreachable. In this case, QuickFIX/J will try to reconnect automagically every 30s or so, as configured by parameter ReconnectInterval
. How do I avoid such behavior?
Upvotes: 3
Views: 2115
Reputation: 6094
The idea consists on retrieving parameter ReconnectInterval
from QuickFix/J and create a separate thread which is going to kill the Session
if and only if the Session
is not logged on yet.
In order for this to work, our thread must obviously be fired before the QuickFix/J thread tries to reconnect. In other words: if you configured ReconnectInterval=30
... you have to fire the aforementioned thread before that and close all initiators. This way, QuickFix/J will end up not really retrying to reconnect.
import java.io.InputStream
import java.util.Locale
import scala.util.control.NonFatal
import quickfix._
import quickfix.field._
class SimpleConnection(val configInputStream: InputStream,
val messageFactory: quickfix.MessageFactory)
extends MessageCracker
with quickfix.Application {
private val locale = Locale.getDefault.getCountry
private val settings = new SessionSettings(configInputStream)
private val storeFactory = new FileStoreFactory(settings)
private val loggerFactory = new QuickfixLoggerFactory(settings)
private var initiatorOption: Option[SocketInitiator] = None
private var sessionOption : Option[SessionID] = None
private var senderSeqOption: Option[Int] = None
private var targetSeqOption: Option[Int] = None
override def onLogout(sessionId: SessionID): Unit = {
log.info("onLogout called for %s".format(sessionId))
initiatorOption.foreach(initiator => initiator.stop(true))
fireDisconnectedState // inform listeners
initiatorOption = None
sessionOption = None
}
override def onCreate(sessionId: SessionID): Unit = {
log.info("onCreate called for %s".format(sessionId))
val session = Session.lookupSession(sessionId)
val interval = settings.getLong(session.getSessionID, "ReconnectInterval")
if(interval <= 10)
log.error("ReconnectInterval should be at least 10secs.")
else {
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
val executor = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor)
val monitor = new Runnable {
override def run(): Unit = {
val sleep = (interval-5)*1000
Thread.sleep(sleep)
if(!session.isLoggedOn) {
log.warn("Killing QuickFix/J session before reconnection.")
onLogout(session.getSessionID)
}
}
}
executor.execute(monitor)
}
senderSeqOption.foreach(session.setNextSenderMsgSeqNum(_))
targetSeqOption.foreach(session.setNextTargetMsgSeqNum(_))
senderSeqOption = None
targetSeqOption = None
}
}
Upvotes: 0