Reputation: 4984
I try to test a function Stream transform(Stream input)
. How can I test if the returned stream emits elements at a certain time?
In RxJS (JavaScript) I can use a TestScheduler to emit elements on the input stream at a certain time and test if they are emitted on the output stream at a certain time. In this example, the transform function is passed to scheduler.startWithCreate
:
var scheduler = new Rx.TestScheduler();
// Create hot observable which will start firing
var xs = scheduler.createHotObservable(
onNext(150, 1),
onNext(210, 2),
onNext(220, 3),
onCompleted(230)
);
// Note we'll start at 200 for subscribe, hence missing the 150 mark
var res = scheduler.startWithCreate(function () {
return xs.map(function (x) { return x * x });
});
// Implement collection assertion
collectionAssert.assertEqual(res.messages, [
onNext(210, 4),
onNext(220, 9),
onCompleted(230)
]);
// Check for subscribe/unsubscribe
collectionAssert.assertEqual(xs.subscriptions, [
subscribe(200, 230)
]);
Upvotes: 3
Views: 1488
Reputation: 4984
Update: Released my code as a package called stream_test_scheduler.
This code works similar to TestScheduler in RxJS, but it uses real time (milliseconds) instead of virtual time, since you can't fake time in Dart (see Irn's comment). You can pass a maximum deviation to the matcher. I use 20 milliseconds in this example. But deviation varies. You might have to use a different maximum deviation value for another test or on another (faster/slower) system.
Edit: I changed the example to a delay transform function, which is a shorter (less configurable/parameters) version of the delay function of stream_ext package. The test checks if elements are delayed by one second.
import 'dart:async';
import 'package:test/test.dart';
// To test:
/// Modified version of <https://github.com/theburningmonk/stream_ext/wiki/delay>
Stream delay(Stream input, Duration duration) {
var controller = new StreamController.broadcast(sync : true);
delayCall(Function f, [Iterable args]) => args == null
? new Timer(duration, f)
: new Timer(duration, () => Function.apply(f, args));
input.listen(
(x) => delayCall(_tryAdd, [controller, x]),
onError : (ex) => delayCall(_tryAddError, [ex]),
onDone : () => delayCall(_tryClose, [controller])
);
return controller.stream;
}
_tryAdd(StreamController controller, event) {
if (!controller.isClosed) controller.add(event);
}
_tryAddError(StreamController controller, err) {
if (!controller.isClosed) controller.addError(err);
}
_tryClose(StreamController controller) {
if (!controller.isClosed) controller.close();
}
main() async {
test('delay preserves relative time intervals between the values', () async {
var scheduler = new TestScheduler();
var source = scheduler.createStream([
onNext(150, 1),
onNext(210, 2),
onNext(220, 3),
onCompleted(230)
]);
var result = await scheduler.startWithCreate(() => delay(source, ms(1000)));
expect(result, equalsRecords([
onNext(1150, 1),
onNext(1210, 2),
onNext(1220, 3),
onCompleted(1230)
], maxDeviation: 20));
});
}
equalsRecords(List<Record> records, {int maxDeviation: 0}) {
return pairwiseCompare(records, (Record r1, Record r2) {
var deviation = (r1.ticks.inMilliseconds - r2.ticks.inMilliseconds).abs();
if (deviation > maxDeviation) {
return false;
}
if (r1 is OnNextRecord && r2 is OnNextRecord) {
return r1.value == r2.value;
}
if (r1 is OnErrorRecord && r2 is OnErrorRecord) {
return r1.exception == r2.exception;
}
return (r1 is OnCompletedRecord && r2 is OnCompletedRecord);
}, 'equal with deviation of ${maxDeviation}ms to');
}
class TestScheduler {
final SchedulerTasks _tasks;
TestScheduler() : _tasks = new SchedulerTasks();
Stream createStream(List<Record> records) {
final controller = new StreamController(sync: true);
_tasks.add(controller, records);
return controller.stream;
}
Future<List<Record>> startWithCreate(Stream createStream()) {
final completer = new Completer<List<Record>>();
final records = <Record>[];
final start = new DateTime.now();
int timeStamp() {
final current = new DateTime.now();
return current.difference(start).inMilliseconds;
}
createStream().listen(
(event) => records.add(onNext(timeStamp(), event)),
onError: (exception) => records.add(onError(timeStamp(), exception)),
onDone: () {
records.add(onCompleted(timeStamp()));
completer.complete(records);
}
);
_tasks.run();
return completer.future;
}
}
class SchedulerTasks {
Map<Record, StreamController> _controllers = {};
List<Record> _records = [];
void add(StreamController controller, List<Record> records) {
for (var record in records) {
_controllers[record] = controller;
}
_records.addAll(records);
}
void run() {
_records.sort();
for (var record in _records) {
final controller = _controllers[record];
new Future.delayed(record.ticks, () {
if (record is OnNextRecord) {
controller.add(record.value);
} else if (record is OnErrorRecord) {
controller.addError(record.exception);
} else if (record is OnCompletedRecord) {
controller.close();
}
});
}
}
}
onNext(int ticks, int value) => new OnNextRecord(ms(ticks), value);
onCompleted(int ticks) => new OnCompletedRecord(ms(ticks));
onError(int ticks, exception) => new OnErrorRecord(ms(ticks), exception);
Duration ms(int milliseconds) => new Duration(milliseconds: milliseconds);
abstract class Record implements Comparable {
final Duration ticks;
Record(this.ticks);
@override
int compareTo(other) => ticks.compareTo(other.ticks);
}
class OnNextRecord extends Record {
final value;
OnNextRecord(Duration ticks, this.value) : super (ticks);
@override
String toString() => 'onNext($value)@${ticks.inMilliseconds}';
}
class OnErrorRecord extends Record {
final exception;
OnErrorRecord(Duration ticks, this.exception) : super (ticks);
@override
String toString() => 'onError($exception)@${ticks.inMilliseconds}';
}
class OnCompletedRecord extends Record {
OnCompletedRecord(Duration ticks) : super (ticks);
@override
String toString() => 'onCompleted()@${ticks.inMilliseconds}';
}
Upvotes: 2