Reputation: 3443
I am trying to solve a Nodejs stream challenge. I have read the node documentation on streams multiple times, and implemented different attempts to solve the challenge. Trying with both duplex, transform, readable and writable :)
I have multiple HTTP readable streams, and the objective is to send data to a single pipeline, with backpressure working. I think this picture helps explain the challenge:
Update (13. september 2017). After reading the documentation again, I am implementing a custom written duplex stream.
Upvotes: 3
Views: 780
Reputation: 3443
This represents a great usecase for a duplex stream, combined with manuel flow control of the HTTP stream.
I have written a custom duplex stream, where the readable and writable part, is structured like this:
If you are interested in the specific code for the duplex stream, please send me a PM.
The code could look something like this (but it's pretty old, and could probably be simplified even more):
import 'rxjs/add/operator/skip';
import 'rxjs/add/operator/take';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
import * as stream from 'stream';
import { logger, streamInspector } from '../shared';
export class DuplexStreamLinker extends stream.Duplex {
public readCount: number = 0;
public acceptDataCount: number = 0;
public acceptData$: BehaviorSubject<boolean>;
public streamName: string;
constructor(options) {
super(options);
this.streamName = this.constructor.name;
this.acceptData$ = new BehaviorSubject(false);
streamInspector(this, this.constructor.name);
}
public _read(size) {
this.readCount++;
this.acceptData$.next(true);
}
public _write(chunk, encoding, cb) {
const acceptData = this.acceptData$.getValue();
if (acceptData) {
cb(this.pushData(chunk));
} else {
this.acceptData$.skip(1).take(1).subscribe(() => {
logger.silly('I dont fire...');
this.acceptDataCount++;
cb(this.pushData(chunk));
});
}
}
public endReadableStream() {
logger.debug('DuplexStreamLinker@endReadableStream was called!');
this.end();
this.push(null);
}
public _final(cb) {
logger.debug('DuplexStreamLinker@_final was called!');
cb(null);
}
private pushData(chunk): null | Error {
const ok = this.push(chunk);
if (ok === false) { this.acceptData$.next(false); }
return null;
}
}
Upvotes: 2