cyril
cyril

Reputation: 3005

Main app only receiving first update from Privileged Helper (macOS)

I have a main app and a Privileged Helper in macOS. I'm trying to implement a system where the Privileged Helper runs a root command in the terminal in interactive mode every 1 second, with the goal of streaming the data back to the main app after every update. Here's the basic structure of my setup:

Main App: Initiates the connection to the Privileged Helper and should receive continuous updates.

Privileged Helper: Runs the root command and should stream data back to the main app.

The issue I'm facing is that my main app only receives the first value and then no updates afterwards. After examining the macOS Console, I can see that the helper is outputting data every 1 second as expected, but this data is not reaching the main app. I would appreciate your patience with me as this is my first time implementing something of the sort.

Here's a simplified version of my code:

Main

ContentView.swift

import SwiftUI

struct ContentView: View {
    @StateObject private var viewModel = ContentViewModel()
    
    var body: some View {
        VStack {
            ScrollView {
                Text(viewModel.scriptOutput)
            }
            Button("Stop Streaming") {
                if viewModel.isStreaming {
                    viewModel.stopStreaming()
                }
            }
        }
        .padding()
        .onAppear {
            viewModel.startStreaming()
        }
    }
}

class ContentViewModel: ObservableObject {
    @Published var scriptOutput = ""
    @Published var isStreaming = false
    
    func startStreaming() {
        Task {
            do {
                try await ExecutionService.startStreamingData { [weak self] output in
                    DispatchQueue.main.async {
                        print(output)
                        self?.scriptOutput = output
                    }
                }
                await MainActor.run { self.isStreaming = true }
            } catch {
                await MainActor.run { self.scriptOutput = error.localizedDescription }
            }
        }
    }
    
    func stopStreaming() {
        Task {
            do {
                try await ExecutionService.stopStreamingData()
                await MainActor.run { self.isStreaming = false }
            } catch {
                await MainActor.run { self.scriptOutput = error.localizedDescription }
            }
        }
    }
}

ExecutionService.swift

enum ExecutionService {

    // MARK: Execute
    
    static func startStreamingData(updateHandler: @escaping (String) -> Void) async throws {
        let helper = try await HelperRemoteProvider.remote()
        helper.startStreamingData { output in
            print("Received: \(output)")
            DispatchQueue.main.async {
                updateHandler(output)
            }
        }
    }
    
    static func stopStreamingData() async throws {
        let helper = try await HelperRemoteProvider.remote()
        helper.stopStreamingData()
    }
}

HelperRemoteProvider.swift

import Foundation
import ServiceManagement

// MARK: - HelperRemoteProvider

/// Provide a `HelperProtocol` object to request the helper.
enum HelperRemoteProvider {

    // MARK: Computed

    private static var isHelperInstalled: Bool { FileManager.default.fileExists(atPath: HelperConstants.helperPath) }
}

// MARK: - Remote

extension HelperRemoteProvider {

    static func remote() async throws -> some HelperProtocol {
        let connection = try connection()

        return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<any HelperProtocol, Error>) in
            let continuationResume = ContinuationResume()
            let helper = connection.remoteObjectProxyWithErrorHandler { error in
                guard continuationResume.shouldResume() else { return }
                // 1st error to arrive, it will be the one thrown
                continuation.resume(throwing: error)
            }

            if let unwrappedHelper = helper as? HelperProtocol {
                continuation.resume(returning: unwrappedHelper)
            } else {
                if continuationResume.shouldResume() {
                    // 1st error to arrive, it will be the one thrown
                    let error = EchoError.helperConnection("Unable to get a valid 'HelperProtocol' object for an unknown reason")
                    continuation.resume(throwing: error)
                }
            }
        }
    }
}

// MARK: - Install helper

extension HelperRemoteProvider {

    /// Install the Helper in the privileged helper tools folder and load the daemon
    private static func installHelper() throws {

        // try to get a valid empty authorization
        var authRef: AuthorizationRef?
        try AuthorizationCreate(nil, nil, [.preAuthorize], &authRef).checkError("AuthorizationCreate")
        defer {
            if let authRef {
                AuthorizationFree(authRef, [])
            }
        }

        // create an AuthorizationItem to specify we want to bless a privileged Helper
        let authStatus = kSMRightBlessPrivilegedHelper.withCString { authorizationString in
            var authItem = AuthorizationItem(name: authorizationString, valueLength: 0, value: nil, flags: 0)

            return withUnsafeMutablePointer(to: &authItem) { pointer in
                var authRights = AuthorizationRights(count: 1, items: pointer)
                let flags: AuthorizationFlags = [.interactionAllowed, .extendRights, .preAuthorize]
                return AuthorizationCreate(&authRights, nil, flags, &authRef)
            }
        }

        guard authStatus == errAuthorizationSuccess else {
           throw EchoError.helperInstallation("Unable to get a valid loading authorization reference to load Helper daemon")
        }

        var blessErrorPointer: Unmanaged<CFError>?
        let wasBlessed = SMJobBless(kSMDomainSystemLaunchd, HelperConstants.domain as CFString, authRef, &blessErrorPointer)

        guard !wasBlessed else { return }
        // throw error since authorization was not blessed
        let blessError: Error = if let blessErrorPointer {
            blessErrorPointer.takeRetainedValue() as Error
        } else {
            EchoError.unknown
        }
        throw EchoError.helperInstallation("Error while installing the Helper: \(blessError.localizedDescription)")
    }
}

// MARK: - Connection

extension HelperRemoteProvider {

    static private func connection() throws -> NSXPCConnection {
        if !isHelperInstalled {
            try installHelper()
        }
        return createConnection()
    }

    private static func createConnection() -> NSXPCConnection {
        let connection = NSXPCConnection(machServiceName: HelperConstants.domain, options: .privileged)
        connection.remoteObjectInterface = NSXPCInterface(with: HelperProtocol.self)
        connection.exportedInterface = NSXPCInterface(with: RemoteApplicationProtocol.self)
        connection.exportedObject = self

        connection.invalidationHandler = {
            if isHelperInstalled {
                print("Unable to connect to Helper although it is installed")
            } else {
                print("Helper is not installed")
            }
        }

        connection.resume()

        return connection
    }
}

// MARK: - ContinuationResume

extension HelperRemoteProvider {

    /// Helper class to safely access a boolean when using a continuation to get the remote.
    private final class ContinuationResume: @unchecked Sendable {

        // MARK: Properties

        private let unfairLockPointer: UnsafeMutablePointer<os_unfair_lock_s>
        private var alreadyResumed = false

        // MARK: Computed

        /// `true` if the continuation should resume.
        func shouldResume() -> Bool {
            os_unfair_lock_lock(unfairLockPointer)
            defer { os_unfair_lock_unlock(unfairLockPointer) }

            if alreadyResumed {
                return false
            } else {
                alreadyResumed = true
                return true
            }
        }

        // MARK: Init

        init() {
            unfairLockPointer = UnsafeMutablePointer<os_unfair_lock_s>.allocate(capacity: 1)
            unfairLockPointer.initialize(to: os_unfair_lock())
        }

        deinit {
            unfairLockPointer.deallocate()
        }
    }
}

Helper

HelperProtocol.swift

@objc public protocol HelperProtocol {
    @objc func startStreamingData(updateHandler: @escaping (String) -> Void)
    @objc func stopStreamingData()
}

Helper.swift

import Foundation

// MARK: - Helper

final class Helper: NSObject {
    let listener: NSXPCListener
    
    private var streamingProcess: Process?
    private var updateHandler: ((String) -> Void)?

    override init() {
        listener = NSXPCListener(machServiceName: HelperConstants.domain)
        super.init()
        listener.delegate = self
    }

    

}

// MARK: - HelperProtocol

extension Helper: HelperProtocol {
    
    func startStreamingData(updateHandler: @escaping (String) -> Void) {
            self.updateHandler = updateHandler
            streamingProcess = ExecutionService.streamData { [weak self] output in
                NSLog("Received output in Helper")
                self?.updateHandler?(output)
            }
        }
        
        func stopStreamingData() {
            streamingProcess?.terminate()
            streamingProcess = nil
            updateHandler = nil
        }
}

// MARK: - Run

extension Helper {

    func run() {
        // start listening on new connections
        listener.resume()

        // prevent the terminal application to exit
        RunLoop.current.run()
    }
}


// MARK: - NSXPCListenerDelegate

extension Helper: NSXPCListenerDelegate {

    func listener(_ listener: NSXPCListener, shouldAcceptNewConnection newConnection: NSXPCConnection) -> Bool {
        do {
            try ConnectionIdentityService.checkConnectionIsValid(connection: newConnection)
        } catch {
            NSLog("Connection \(newConnection) has not been validated. \(error.localizedDescription)")
            return false
        }

        newConnection.exportedInterface = NSXPCInterface(with: HelperProtocol.self)
        newConnection.remoteObjectInterface = NSXPCInterface(with: RemoteApplicationProtocol.self)
        newConnection.exportedObject = self

        newConnection.resume()
        return true
    }
}

ExecutionService.swift

enum ExecutionService {
    
    static func streamData(updateHandler: @escaping (String) -> Void) -> Process {
        let process = Process()
        process.executableURL = URL(fileURLWithPath: "/path/to/root/command")
        process.arguments = [
            "-i", "1000" // update every 1 second
        ]
        
        let outputPipe = Pipe()
        process.standardOutput = outputPipe
        process.standardError = outputPipe
        
        let outHandle = outputPipe.fileHandleForReading
        
        outHandle.readabilityHandler = { fileHandle in
            autoreleasepool {
                let data = fileHandle.availableData
                if data.count > 0 {
                    if let output = String(data: data, encoding: .utf8) {
                        NSLog("🟢 Received output: %@", output)
                        updateHandler(output)
                    } else {
                        NSLog("🔴 Failed to convert output to string")
                    }
                } else {
                    NSLog("🔴 Received empty data from data, ending read")
                    fileHandle.readabilityHandler = nil
                }
            }
        }
        
        do {
            try process.run()
            NSLog("🟠 process started")
        } catch {
            NSLog("🔴 Error starting process: %@", error.localizedDescription)
        }
        
        return process
    }
}

Update 1: No progress yet but I've updated some classes based on comments:

Main

ExecutionService.swift

enum ExecutionService {
    static func startStreamingData() async throws -> AsyncThrowingStream<String, Error> {
        let helper = try await HelperRemoteProvider.remote()
        
        return AsyncThrowingStream { continuation in
            helper.startStreamingData { output in
                continuation.yield(output)
            }
            
            continuation.onTermination = { @Sendable _ in
                Task {
                    await helper.stopStreamingData()
                }
            }
        }
    }
    
    static func stopStreamingData() async throws {
        let helper = try await HelperRemoteProvider.remote()
        helper.stopStreamingData()
    }
}

ContentView.swift

class ContentViewModel: ObservableObject {
    @Published var scriptOutput = ""
    @Published var isStreaming = false
    
    private var streamingTask: Task<Void, Never>?
    
    func startStreaming() {
        streamingTask = Task {
            do {
                let stream = try await ExecutionService.startStreamingData()
                for try await output in stream {
                    await MainActor.run {
                        self.scriptOutput = output
                        self.isStreaming = true
                    }
                }
            } catch {
                await MainActor.run { self.scriptOutput = error.localizedDescription }
            }
        }
    }
    
    func stopStreaming() {
        Task {
            do {
                try await ExecutionService.stopStreamingData()
                streamingTask?.cancel()
                await MainActor.run { self.isStreaming = false }
            } catch {
                await MainActor.run { self.scriptOutput = error.localizedDescription }
            }
        }
    }
}

Update 2: Data.swift

import Foundation

struct DataSequence: AsyncSequence {
    typealias Element = String
    
    let helper: HelperProtocol
    
    func makeAsyncIterator() -> AsyncIterator {
        return AsyncIterator(helper: helper)
    }
    
    class AsyncIterator: AsyncIteratorProtocol {
        private let helper: HelperProtocol
        private var continuations: [CheckedContinuation<String?, Error>] = []
        private var isStreaming = false
        private var isFinished = false
        
        init(helper: HelperProtocol) {
            self.helper = helper
        }
        
        func next() async throws -> String? {
            print("NEXT")
            if isFinished {
                return nil
            }
            
            if !isStreaming {
                print("isStreaming")
                isStreaming = true
                helper.startStreamingData { [weak self] output in
                    guard let self = self else { return }
                    print("my output: \(output)")
                    if let continuation = self.continuations.first {
                        self.continuations.removeFirst()
                        continuation.resume(returning: output)
                    }
                }
            }
            
            return try await withCheckedThrowingContinuation { continuation in
                continuations.append(continuation)
            }
        }
        
        func cancel() {
            isFinished = true
            helper.stopStreamingData()
            for continuation in continuations {
                continuation.resume(returning: nil)
            }
            continuations.removeAll()
        }
    }
}

Upvotes: 1

Views: 102

Answers (0)

Related Questions