Brendon
Brendon

Reputation: 882

Subscribing to ReactiveCocoa signals in parallel, on a limited number of threads

I subscribe to a signal created like this:

RACSignal *signal = [[RACSignal createSignal:^(... subscriber) {
    for (int i = 0; i < 100; i++) {
        [subscriber sendNext:[[RACSignal createSignal:^(... subscriber2) {
            NSString *string = someFunctionThatTakesALongTime(i);
            [subscriber2 sendNext:string];
            [subscriber2 sendComplete];
            return nil;
        }] setNameWithFormat:@"inside signal"]];
    }

    [subscriber sendComplete];
    return nil;
}] setNameWithFormat:@"outside signal"];

int n = 4;
[[signal flatten:n] subscribeNext:^(NSString *string) { ... }];

I want -flatten: to subscribe to the n signals in parallel. I tried -startLazilyWithScheduler:block: with [RACScheduler scheduler] for the the "inside signal"s, but that grinds my computer to a halt. In Instruments, it looks like it is making a new thread for each signal.

The previous version of this code is added as NSOperations to an NSOperationQueue, which is set to run up to n operations in parallel. It works, but I could make it easier to follow using RAC.

How do I -flatten: n signals at a time from my signal of signals, so that the inner signals are each run on the same n threads?

=====================================

Update: I was barking up the wrong tree; my performance problem was due to running out of physical RAM. I would guess that some objects lived too long, leading to my memory issue. I incidentally resolved my memory usage issue at some point, while refactoring to use RAC more heavily. I don't know if people will benefit from seeing my code, but here it is:

I went from consuming the outer signal with this code:

[[[self drawRects] flatten:self.maxProcesses] subscribeNext:^(NSDictionary *result) {
    @strongify(self);

    NSString *keyString = result[kDrawRectsResultsKeyKey];
    self.imagesByLocation[keyString] = result[kDrawRectsResultsImageKey];
    self.repsByLocation[keyString] = result[kDrawRectsResultsRepKey];

    [self setNeedsDisplayInRect:[result[kDrawRectsResultsRectKey] rectValue]];
}];

To using many more RAC operations instead (replacing other imperative code in the same class, as well):

// Get the latest zoomed drawing bounds and get the latest imageProvider's latest imagesByLocation
// Skip one of each signal to avoid firing immediately
RACSignal *zoomedDrawingBounds = [RACChannelTo(self, zoomedDrawingBounds) skip:1];
RACSignal *imagesFromImageProvider = [[[RACChannelTo(self, imageProvider) skip:1]
                                       map:^(id<PTWImageProvider> imageProvider) {
                                           return RACChannelTo(imageProvider, imagesByLocation);
                                       }]
                                      switchToLatest];

// Lift the drawing method, getting a signal of signals on each call
RACSignal *drawingSignals = [[self rac_liftSelector:@selector(drawingSignalsForRect:givenImagesByLocations:)
                               withSignalsFromArray:@[ zoomedDrawingBounds, imagesFromImageProvider, ]]
                             switchToLatest];

@weakify(self);

// Lift flatten: using maxProcesses so that if maxProcesses changes, the number of signals being
// flatten:ed can change almost immediately.
RACSignal *drawnRectanglesZoomed = [[[[drawingSignals
                                       rac_liftSelector:@selector(flatten:) withSignalsFromArray:@[ RACChannelTo(self, maxProcesses) ]]
                                      switchToLatest]
                                     doNext:^(NSDictionary *result) {
                                         @strongify(self);

                                         // side effects! store the rendered image and its associated image rep
                                         NSString *keyString = result[kDrawRectsResultsKeyKey];
                                         self.imagesByLocation[keyString] = result[kDrawRectsResultsImageKey];
                                         self.repsByLocation[keyString] = result[kDrawRectsResultsRepKey];
                                     }]
                                    map:^(NSDictionary *result) {
                                        // Extract the drawn rect from the results
                                        return result[kDrawRectsResultsRectKey];
                                    }];

RACSignal *drawnRectangles = [[drawnRectanglesZoomed
                               combineLatestWith:RACChannelTo(self, zoomLevel)]
                              map:^(RACTuple *tuple) {
                                  // Convert between zoomed and unzoomed coordinates
                                  CGRect zoomedRect = [[tuple first] rectValue];
                                  CGFloat zoomLevel = [[tuple second] floatValue];
                                  CGAffineTransform zoomTransform = CGAffineTransformMakeScale(zoomLevel, zoomLevel);
                                  return [NSValue valueWithRect:CGRectApplyAffineTransform(zoomedRect, zoomTransform)];
                              }];

// Lift setNeedsDisplayInRect: with results from the drawing signals, so setNeedsDisplayInRect: is called
// as tiles are rendered.
[self rac_liftSelector:@selector(setNeedsDisplayInRect:)
  withSignalsFromArray:@[ [drawnRectangles deliverOn:[RACScheduler mainThreadScheduler]] ]];

Now if I update my work method to return cold signals on background schedulers, flatten: causes multiple signals to run at once, without issue:

RACSignal *signal = [[RACSignal createSignal:^(... subscriber) {
    for (int i = 0; i < 100; i++) {
        [subscriber sendNext:[[RACSignal startLazilyWithScheduler:[RACScheduler scheduler] block:^(... subscriber2) {
            NSString *string = someFunctionThatTakesALongTime(i);
            [subscriber2 sendNext:string];
            [subscriber2 sendComplete];
        }] setNameWithFormat:@"inside signal"]];
    }

    [subscriber sendComplete];
    return nil;
}] setNameWithFormat:@"outside signal"];

Upvotes: 4

Views: 2929

Answers (1)

Justin Spahr-Summers
Justin Spahr-Summers

Reputation: 16973

+[RACScheduler scheduler] creates a new serial GCD queue each time it's invoked, but that alone shouldn't cause any problems, since GCD queues don't have a direct relationship with OS threads.

Instead, the issue is probably that +flatten: is subscribing to new signals before the previous ones have completely finished (i.e., this is happening from an event delivered by the old signal).

You could solve this by delaying the subscription to the inner signals:

RACSignal *workSignal = [[[[RACSignal
    // Wait for one scheduler iteration,
    return:RACUnit.defaultUnit]
    delay:0]
    // then actually do the work.
    then:^{
        return [[RACSignal
            createSignal:^(id<RACSubscriber> subscriber2) {
                NSString *string = someFunctionThatTakesALongTime(i);
                [subscriber2 sendNext:string];
                [subscriber2 sendComplete];
                return nil;
            }]
            // Invokes the above block on a new background scheduler.
            subscribeOn:[RACScheduler scheduler]];
    }]
    setNameWithFormat:@"inside signal"];

[subscriber sendNext:workSignal];

However, this seems unnecessarily complicated. GCD will automatically bring your thread count back down as the queues finish up, so I'd question whether this change is actually worth it.

Upvotes: 5

Related Questions