kennyc
kennyc

Reputation: 5710

Skeleton Example for Swift Combine Publisher-Subscriber

As I port some Objective-C code to Swift, I'm trying to better understand the new Combine framework and how I can use it to re-create a common design pattern.

In this case, the design pattern is a single object (Manager, Service, etc) that any number of "clients" can register with as a delegate to receive callbacks. It's a basic 1:Many pattern using delegates.

Combine looks ideal for this, but the sample code is a bit thin. Below is a working example but I'm not sure if it's correct or being used as intended. In particular, I'm curious about reference cycles between the objects.

class Service {

  let tweets = PassthroughSubject<String, Never>()

  func start() {
    // Simulate the need send to send updates.
    DispatchQueue.global(qos: .utility).async {
      while true {
        self.sendTweet()
        usleep(100000)
      }
    }
  }

  func sendTweet() {
    tweets.send("Message \(Date().timeIntervalSince1970)")
  }
}

class Client : Subscriber {
  typealias Input = String
  typealias Failure = Never

  let service:Service
  var subscription:Subscription?

  init(service:Service) {
    self.service = service

   // Is this a retain cycle?
   // Is this thread-safe? 
    self.service.tweets.subscribe(self) 
  }

  func receive(subscription: Subscription) {
    print("Received subscription: \(subscription)")

    self.subscription = subscription
    self.subscription?.request(.unlimited)
  }

  func receive(_ input: String) -> Subscribers.Demand {
    print("Received tweet: \(input)")
    return .unlimited
  }

  func receive(completion: Subscribers.Completion<Never>) {
    print("Received completion")
  }
}

// Dependency injection is used a lot throughout the 
// application in a similar fashion to this:

let service = Service()
let client = Client(service:service)

// In the real world, the service is started when
// the application is launched and clients come-and-go.

service.start()

Output is:

Received subscription: PassthroughSubject
Received tweet: Message 1560371698.300811
Received tweet: Message 1560371698.4087949
Received tweet: Message 1560371698.578027
...

Is this even remotely close to how Combine was intended to be used?

Upvotes: 7

Views: 6682

Answers (2)

user3441734
user3441734

Reputation: 17544

lets check it! the simplest way is add deinit to both classes and limit the live of service

class Service {
    
    let tweets = PassthroughSubject<String, Never>()
    
    func start() {
        // Simulate the need send to send updates.
        DispatchQueue.global(qos: .utility).async {
            (0 ... 3).forEach { _ in
                self.sendTweet()
                usleep(100000)
            }
        }
    }
    
    func sendTweet() {
        tweets.send("Message \(Date().timeIntervalSince1970)")
    }
    deinit {
        print("server deinit")
    }
}

now it is easy to check that

do {
    let service = Service()
    //_ = Client(service:service)
    
    // In the real world, the service is started when
    // the application is launched and clients come-and-go.
    
    service.start()
}

will finished as expected

server deinit

modify it with subscribing client

do {
    let service = Service()
    _ = Client(service:service)
    service.start()
}

and you immediately know the result

Received subscription: PassthroughSubject
Received tweet: Message 1580816649.7355099
Received tweet: Message 1580816649.8548698
Received tweet: Message 1580816650.001649
Received tweet: Message 1580816650.102639

there is a memory cycle, as you expected :-)

Generally, there is very low probability, that you need your own subscriber implementation.

First modify the service, so the client will know when no more messages will arrive

func start() {
        // Simulate the need send to send updates.
        DispatchQueue.global(qos: .utility).async {
            // send some tweets
            (0 ... 3).forEach { _ in
                self.sendTweet()
                usleep(100000)
            }
            // and send "finished"
            self.tweets.send(completion: .finished)
        }
    }

and next use "build-in" subcriber in your publisher by invoking his .sink method. .sink return AnyCancelable (it is a reference type) which you have to store somewhere.

var cancelable: AnyCancellable?
do {
    let service = Service()
    service.start()
    
    // client
    cancelable = service.tweets.sink { (s) in
        print(s)
    }
}

now, everythig works, es expected ...

Message 1580818277.2908669
Message 1580818277.4674711
Message 1580818277.641886
server deinit

But what about cancelable? Let check it!

var cancelable: AnyCancellable?
do {
    let service = Service()
    service.start()
    
    // client
    cancelable = service.tweets.sink { (s) in
        print(s)
    }
}
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
    print(cancelable)
}

it prints

Message 1580819227.5750608
Message 1580819227.763901
Message 1580819227.9366078
Message 1580819228.072041
server deinit
Optional(Combine.AnyCancellable)

so you have to release it "manualy", if you don't need it anymore. .sink is there again!

var cancelable: AnyCancellable?
do {
    let service = Service()
    service.start()
    
    // client
    cancelable = service.tweets.sink(receiveCompletion: { (completion) in
        print(completion)
        // this inform publisher to "unsubscribe" (not necessery in this scenario)
        cancelable?.cancel()
        // and we can release our canceleble
        cancelable = nil
    }, receiveValue: { (message) in
        print(message)
    })
}
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
    print(cancelable)
}

and result

Message 1580819683.462331
Message 1580819683.638145
Message 1580819683.74383
finished
server deinit
nil

Combine has almost everything you need in real word application, the trouble is a lack of documentation, but a lot of sources are available on the internet.

Upvotes: 3

dwiniarczuk
dwiniarczuk

Reputation: 19

Custom Combine Subscriber should also conform to Cancellable protocol that provides a method to forward cancellation to the received subscription object from Publisher. That way you do not have to expose Subscription property. According to doc:

If you create a custom Subscriber, the publisher sends a Subscription object when you first subscribe to it. Store this subscription, and then call its cancel() method when you want to cancel publishing. When you create a custom subscriber, you should implement the Cancellable protocol, and have your cancel() implementation forward the call to the stored subscription. https://developer.apple.com/documentation/combine/receiving_and_handling_events_with_combine

Upvotes: 1

Related Questions