Reputation: 2923
I have this piece of code in RxSwift that wraps UDPBroadcastConnection (https://github.com/gunterhager/UDPBroadcastConnection) My question is, what do I have to do to call closeConnection() when the observable is disposed?
struct UDPBroadcastResponse {
let ip: String
let port: Int
let message: [UInt8]
}
final class RxBroadcaster: UDPBroadcastConnection {
let response = PublishSubject<UDPBroadcastResponse>()
private let disposeBag = DisposeBag()
}
extension RxBroadcaster: ReactiveCompatible {}
extension Reactive where Base: RxBroadcaster {
func send(message: String) -> Observable<UDPBroadcastResponse> {
DDLogVerbose("Sending UDP broadcast message: \(message)")
self.base.sendBroadcast(message)
return self.base.response
}
}
extension RxBroadcaster {
convenience init(port: UInt16) {
let rely = ReplaySubject<UDPBroadcastResponse>.createUnbounded()
self.init(port: port) { ip, port, message in
let response = UDPBroadcastResponse(ip: ip, port: port, message: message)
// DDLogVerbose("Receiving UDP message: IP: \(response.ip). Port: \(response.port). Message: \(response.message.map{ String(format: "%02X", $0) }.joined())")
DDLogVerbose("Receiving UDP message: IP: \(response.ip). Port: \(response.port)")
rely.onNext(response)
}
rely.bind(to: self.response).disposed(by: disposeBag)
}
}
Here I see an example: http://adamborek.com/practical-introduction-rxswift/ But I am not sure about the lifecycle of the objects involved. Can you help me? Thanks for suggestions.
Upvotes: 0
Views: 290
Reputation: 33967
I'd be more inclined to implement the RxBroadcaster like this:
final class RxBroadcaster {
let response: Observable<UDPBroadcastResponse>
init(port: UInt16) {
let _response = PublishSubject<UDPBroadcastResponse>()
let _connection = UDPBroadcastConnection(port: port, handler: { ipAddress, port, response in
_response.onNext(UDPBroadcastResponse(ip: ipAddress, port: port, message: response))
})
response = _response.asObservable()
completion = _response.asObserver()
connection = _connection
}
deinit {
connection.closeConnection()
completion.onCompleted()
}
private let connection: UDPBroadcastConnection
private let completion: AnyObserver<UDPBroadcastResponse>
}
extension RxBroadcaster: ObserverType {
public func on(_ event: RxSwift.Event<Data>) {
guard case let .next(data) = event else { return }
connection.sendBroadcast(data)
}
}
Now you can send data by just binding to the RxBroadcaster myData.bind(to: myBroadcaster)
and receive data by subscribing to the broadcaster's response. The connection would naturally close when the Broadcaster goes out of scope.
It feels a bit odd to me to do a call/response operator like you did. The point of a UDP socket is that you will get messages independent of the ones you sent.
Upvotes: 0
Reputation: 7171
You can use the do
operator to hook into lifecycle events from a subscription and perform side effects. You can call do
at any point in an Observable chain before you subscribe or bind. Example:
rely.do(onDispose: {
// called as your subscription is being disposed
}).bind(to: self.response).disposed(by: disposeBag)
Upvotes: 1