Reputation: 3005
I have a main app and a Privileged Helper in macOS. I'm trying to implement a system where the Privileged Helper runs a root command in the terminal in interactive mode every 1 second, with the goal of streaming the data back to the main app after every update. Here's the basic structure of my setup:
Main App
: Initiates the connection to the Privileged Helper and should receive continuous updates.
Privileged Helper
: Runs the root command and should stream data back to the main app.
The issue I'm facing is that my main app only receives the first value and then no updates afterwards. After examining the macOS Console
, I can see that the helper is outputting data every 1 second as expected, but this data is not reaching the main app. I would appreciate your patience with me as this is my first time implementing something of the sort.
Here's a simplified version of my code:
ContentView.swift
import SwiftUI
struct ContentView: View {
@StateObject private var viewModel = ContentViewModel()
var body: some View {
VStack {
ScrollView {
Text(viewModel.scriptOutput)
}
Button("Stop Streaming") {
if viewModel.isStreaming {
viewModel.stopStreaming()
}
}
}
.padding()
.onAppear {
viewModel.startStreaming()
}
}
}
class ContentViewModel: ObservableObject {
@Published var scriptOutput = ""
@Published var isStreaming = false
func startStreaming() {
Task {
do {
try await ExecutionService.startStreamingData { [weak self] output in
DispatchQueue.main.async {
print(output)
self?.scriptOutput = output
}
}
await MainActor.run { self.isStreaming = true }
} catch {
await MainActor.run { self.scriptOutput = error.localizedDescription }
}
}
}
func stopStreaming() {
Task {
do {
try await ExecutionService.stopStreamingData()
await MainActor.run { self.isStreaming = false }
} catch {
await MainActor.run { self.scriptOutput = error.localizedDescription }
}
}
}
}
ExecutionService.swift
enum ExecutionService {
// MARK: Execute
static func startStreamingData(updateHandler: @escaping (String) -> Void) async throws {
let helper = try await HelperRemoteProvider.remote()
helper.startStreamingData { output in
print("Received: \(output)")
DispatchQueue.main.async {
updateHandler(output)
}
}
}
static func stopStreamingData() async throws {
let helper = try await HelperRemoteProvider.remote()
helper.stopStreamingData()
}
}
HelperRemoteProvider.swift
import Foundation
import ServiceManagement
// MARK: - HelperRemoteProvider
/// Provide a `HelperProtocol` object to request the helper.
enum HelperRemoteProvider {
// MARK: Computed
private static var isHelperInstalled: Bool { FileManager.default.fileExists(atPath: HelperConstants.helperPath) }
}
// MARK: - Remote
extension HelperRemoteProvider {
static func remote() async throws -> some HelperProtocol {
let connection = try connection()
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<any HelperProtocol, Error>) in
let continuationResume = ContinuationResume()
let helper = connection.remoteObjectProxyWithErrorHandler { error in
guard continuationResume.shouldResume() else { return }
// 1st error to arrive, it will be the one thrown
continuation.resume(throwing: error)
}
if let unwrappedHelper = helper as? HelperProtocol {
continuation.resume(returning: unwrappedHelper)
} else {
if continuationResume.shouldResume() {
// 1st error to arrive, it will be the one thrown
let error = EchoError.helperConnection("Unable to get a valid 'HelperProtocol' object for an unknown reason")
continuation.resume(throwing: error)
}
}
}
}
}
// MARK: - Install helper
extension HelperRemoteProvider {
/// Install the Helper in the privileged helper tools folder and load the daemon
private static func installHelper() throws {
// try to get a valid empty authorization
var authRef: AuthorizationRef?
try AuthorizationCreate(nil, nil, [.preAuthorize], &authRef).checkError("AuthorizationCreate")
defer {
if let authRef {
AuthorizationFree(authRef, [])
}
}
// create an AuthorizationItem to specify we want to bless a privileged Helper
let authStatus = kSMRightBlessPrivilegedHelper.withCString { authorizationString in
var authItem = AuthorizationItem(name: authorizationString, valueLength: 0, value: nil, flags: 0)
return withUnsafeMutablePointer(to: &authItem) { pointer in
var authRights = AuthorizationRights(count: 1, items: pointer)
let flags: AuthorizationFlags = [.interactionAllowed, .extendRights, .preAuthorize]
return AuthorizationCreate(&authRights, nil, flags, &authRef)
}
}
guard authStatus == errAuthorizationSuccess else {
throw EchoError.helperInstallation("Unable to get a valid loading authorization reference to load Helper daemon")
}
var blessErrorPointer: Unmanaged<CFError>?
let wasBlessed = SMJobBless(kSMDomainSystemLaunchd, HelperConstants.domain as CFString, authRef, &blessErrorPointer)
guard !wasBlessed else { return }
// throw error since authorization was not blessed
let blessError: Error = if let blessErrorPointer {
blessErrorPointer.takeRetainedValue() as Error
} else {
EchoError.unknown
}
throw EchoError.helperInstallation("Error while installing the Helper: \(blessError.localizedDescription)")
}
}
// MARK: - Connection
extension HelperRemoteProvider {
static private func connection() throws -> NSXPCConnection {
if !isHelperInstalled {
try installHelper()
}
return createConnection()
}
private static func createConnection() -> NSXPCConnection {
let connection = NSXPCConnection(machServiceName: HelperConstants.domain, options: .privileged)
connection.remoteObjectInterface = NSXPCInterface(with: HelperProtocol.self)
connection.exportedInterface = NSXPCInterface(with: RemoteApplicationProtocol.self)
connection.exportedObject = self
connection.invalidationHandler = {
if isHelperInstalled {
print("Unable to connect to Helper although it is installed")
} else {
print("Helper is not installed")
}
}
connection.resume()
return connection
}
}
// MARK: - ContinuationResume
extension HelperRemoteProvider {
/// Helper class to safely access a boolean when using a continuation to get the remote.
private final class ContinuationResume: @unchecked Sendable {
// MARK: Properties
private let unfairLockPointer: UnsafeMutablePointer<os_unfair_lock_s>
private var alreadyResumed = false
// MARK: Computed
/// `true` if the continuation should resume.
func shouldResume() -> Bool {
os_unfair_lock_lock(unfairLockPointer)
defer { os_unfair_lock_unlock(unfairLockPointer) }
if alreadyResumed {
return false
} else {
alreadyResumed = true
return true
}
}
// MARK: Init
init() {
unfairLockPointer = UnsafeMutablePointer<os_unfair_lock_s>.allocate(capacity: 1)
unfairLockPointer.initialize(to: os_unfair_lock())
}
deinit {
unfairLockPointer.deallocate()
}
}
}
HelperProtocol.swift
@objc public protocol HelperProtocol {
@objc func startStreamingData(updateHandler: @escaping (String) -> Void)
@objc func stopStreamingData()
}
Helper.swift
import Foundation
// MARK: - Helper
final class Helper: NSObject {
let listener: NSXPCListener
private var streamingProcess: Process?
private var updateHandler: ((String) -> Void)?
override init() {
listener = NSXPCListener(machServiceName: HelperConstants.domain)
super.init()
listener.delegate = self
}
}
// MARK: - HelperProtocol
extension Helper: HelperProtocol {
func startStreamingData(updateHandler: @escaping (String) -> Void) {
self.updateHandler = updateHandler
streamingProcess = ExecutionService.streamData { [weak self] output in
NSLog("Received output in Helper")
self?.updateHandler?(output)
}
}
func stopStreamingData() {
streamingProcess?.terminate()
streamingProcess = nil
updateHandler = nil
}
}
// MARK: - Run
extension Helper {
func run() {
// start listening on new connections
listener.resume()
// prevent the terminal application to exit
RunLoop.current.run()
}
}
// MARK: - NSXPCListenerDelegate
extension Helper: NSXPCListenerDelegate {
func listener(_ listener: NSXPCListener, shouldAcceptNewConnection newConnection: NSXPCConnection) -> Bool {
do {
try ConnectionIdentityService.checkConnectionIsValid(connection: newConnection)
} catch {
NSLog("Connection \(newConnection) has not been validated. \(error.localizedDescription)")
return false
}
newConnection.exportedInterface = NSXPCInterface(with: HelperProtocol.self)
newConnection.remoteObjectInterface = NSXPCInterface(with: RemoteApplicationProtocol.self)
newConnection.exportedObject = self
newConnection.resume()
return true
}
}
ExecutionService.swift
enum ExecutionService {
static func streamData(updateHandler: @escaping (String) -> Void) -> Process {
let process = Process()
process.executableURL = URL(fileURLWithPath: "/path/to/root/command")
process.arguments = [
"-i", "1000" // update every 1 second
]
let outputPipe = Pipe()
process.standardOutput = outputPipe
process.standardError = outputPipe
let outHandle = outputPipe.fileHandleForReading
outHandle.readabilityHandler = { fileHandle in
autoreleasepool {
let data = fileHandle.availableData
if data.count > 0 {
if let output = String(data: data, encoding: .utf8) {
NSLog("🟢 Received output: %@", output)
updateHandler(output)
} else {
NSLog("🔴 Failed to convert output to string")
}
} else {
NSLog("🔴 Received empty data from data, ending read")
fileHandle.readabilityHandler = nil
}
}
}
do {
try process.run()
NSLog("🟠process started")
} catch {
NSLog("🔴 Error starting process: %@", error.localizedDescription)
}
return process
}
}
Update 1: No progress yet but I've updated some classes based on comments:
ExecutionService.swift
enum ExecutionService {
static func startStreamingData() async throws -> AsyncThrowingStream<String, Error> {
let helper = try await HelperRemoteProvider.remote()
return AsyncThrowingStream { continuation in
helper.startStreamingData { output in
continuation.yield(output)
}
continuation.onTermination = { @Sendable _ in
Task {
await helper.stopStreamingData()
}
}
}
}
static func stopStreamingData() async throws {
let helper = try await HelperRemoteProvider.remote()
helper.stopStreamingData()
}
}
ContentView.swift
class ContentViewModel: ObservableObject {
@Published var scriptOutput = ""
@Published var isStreaming = false
private var streamingTask: Task<Void, Never>?
func startStreaming() {
streamingTask = Task {
do {
let stream = try await ExecutionService.startStreamingData()
for try await output in stream {
await MainActor.run {
self.scriptOutput = output
self.isStreaming = true
}
}
} catch {
await MainActor.run { self.scriptOutput = error.localizedDescription }
}
}
}
func stopStreaming() {
Task {
do {
try await ExecutionService.stopStreamingData()
streamingTask?.cancel()
await MainActor.run { self.isStreaming = false }
} catch {
await MainActor.run { self.scriptOutput = error.localizedDescription }
}
}
}
}
Update 2:
Data.swift
import Foundation
struct DataSequence: AsyncSequence {
typealias Element = String
let helper: HelperProtocol
func makeAsyncIterator() -> AsyncIterator {
return AsyncIterator(helper: helper)
}
class AsyncIterator: AsyncIteratorProtocol {
private let helper: HelperProtocol
private var continuations: [CheckedContinuation<String?, Error>] = []
private var isStreaming = false
private var isFinished = false
init(helper: HelperProtocol) {
self.helper = helper
}
func next() async throws -> String? {
print("NEXT")
if isFinished {
return nil
}
if !isStreaming {
print("isStreaming")
isStreaming = true
helper.startStreamingData { [weak self] output in
guard let self = self else { return }
print("my output: \(output)")
if let continuation = self.continuations.first {
self.continuations.removeFirst()
continuation.resume(returning: output)
}
}
}
return try await withCheckedThrowingContinuation { continuation in
continuations.append(continuation)
}
}
func cancel() {
isFinished = true
helper.stopStreamingData()
for continuation in continuations {
continuation.resume(returning: nil)
}
continuations.removeAll()
}
}
}
Upvotes: 1
Views: 102