Reputation: 100190
I have this code:
import * as stream from 'stream';
export class JSONParser extends stream.Transform {
lastLineData = '';
objectMode = true;
constructor() {
super();
}
transform(chunk, encoding, cb) {
let data = String(chunk);
if (this.lastLineData) {
data = this.lastLineData + data;
}
let lines = data.split('\n');
this.lastLineData = lines.splice(lines.length - 1, 1)[0];
lines.forEach(l => {
try {
// l might be an empty string; ignore if so
l && this.push(JSON.parse(l));
}
catch (err) {
// noop
}
});
cb();
}
flush(cb) {
if (this.lastLineData) {
try {
this.push(JSON.parse(this.lastLineData));
}
catch (err) {
// noop
}
}
this.lastLineData = '';
cb();
}
}
the problem is that the TS typings don't recognize the (prototype) methods. Am I extending the Transform class incorrectly?
Here is the problem:
Note that this is correct:
import * as stream from 'stream';
//////////////////////////////////////////////////
export interface IParsedObject {
[index: string]: any
}
export const createParser = function () {
let lastLineData = '';
return new stream.Transform({
objectMode: true,
transform(chunk: any, encoding: string, cb: Function) {
let data = String(chunk);
if (lastLineData) {
data = lastLineData + data;
}
let lines = data.split('\n');
lastLineData = lines.splice(lines.length - 1, 1)[0];
lines.forEach(l => {
try {
// l might be an empty string; ignore if so
l && this.push(JSON.parse(l));
}
catch (err) {
// noop
}
});
cb();
},
flush(cb: Function) {
if (lastLineData) {
try {
this.push(JSON.parse(lastLineData));
}
catch (err) {
// noop
}
}
lastLineData = '';
cb();
}
});
};
but that above class does not seem to work the same.
Upvotes: 3
Views: 10654
Reputation: 6063
In your screenshot you are attempting to extend the Transform
class, but you are not implementing the correct method, you should implement transform._transform(chunk, encoding, callback)
, then typescript will automatically infer the required types. Since transform.transform(chunk, encoding, callback)
does not exist on the Transform
type typescript has no types to use for inference so the compiler generates a warning.
In your code example you choose to use the "simplified" Transform
constructor. The constructor takes a set of options that allows you to define the necessary transformation methods without explicitly extending the class. These options opt for a non-underscore prefixed naming convention, but they are equivalent to the underscore prefixed methods when extending the class. Since you used the correct name for the method here the types were able to be inferred.
The Transform
API calls for the implementation of three methods, those methods are outlined here.
There are two ways to implement a Transform
stream:
Transform
classTransform
constructor
options.The two methods are outlined here (an example for extending the class in pre ES6 environments is included).
When implementing a Transform
stream only one method must be implemented:
transform._transform(chunk, encoding, callback)
The other two methods are optional and can be implemented if the use case calls for it:
transform._flush(callback)
writable._final(callback)
I've gone ahead and outlined the documented Transform
methods below, highlighting some areas that may be of interest.
transform._flush(callback)
This function MUST NOT be called by application code directly. It should be implemented by child classes, and called by the internal Readable class methods only.
In some cases, a transform operation may need to emit an additional bit of data at the end of the stream. For example, a zlib compression stream will store an amount of internal state used to optimally compress the output. When the stream ends, however, that additional data needs to be flushed so that the compressed data will be complete.
Custom Transform implementations may implement the transform._flush() method. This will be called when there is no more written data to be consumed, but before the 'end' event is emitted signaling the end of the Readable stream.
Within the transform._flush() implementation, the readable.push() method may be called zero or more times, as appropriate. The callback function must be called when the flush operation is complete.
The transform._flush() method is prefixed with an underscore because it is internal to the class that defines it, and should never be called directly by user programs.
transform._transform(chunk, encoding, callback)
This function MUST NOT be called by application code directly. It should be implemented by child classes, and called by the internal Readable class methods only.
All Transform stream implementations must provide a _transform() method to accept input and produce output. The transform._transform() implementation handles the bytes being written, computes an output, then passes that output off to the readable portion using the readable.push() method.
The transform.push() method may be called zero or more times to generate output from a single input chunk, depending on how much is to be output as a result of the chunk.
It is possible that no output is generated from any given chunk of input data.
The callback function must be called only when the current chunk is completely consumed. The first argument passed to the callback must be an Error object if an error occurred while processing the input or null otherwise. If a second argument is passed to the callback, it will be forwarded on to the readable.push() method. In other words the following are equivalent:
transform.prototype._transform = function(data, encoding, callback) {
this.push(data);
callback();
};
transform.prototype._transform = function(data, encoding, callback) {
callback(null, data);
};
The transform._transform() method is prefixed with an underscore because it is internal to the class that defines it, and should never be called directly by user programs.
transform._transform() is never called in parallel; streams implement a queue mechanism, and to receive the next chunk, callback must be called, either synchronously or asynchronously.
writable._final(callback)
The _final() method must not be called directly. It may be implemented by child classes, and if so, will be called by the internal Writable class methods only.
This optional function will be called before the stream closes, delaying the 'finish' event until callback is called. This is useful to close resources or write buffered data before a stream ends.
Upvotes: 6
Reputation: 100190
Ok so it seems to work now, I had to pass the constructor options to super()
, so it becomes super({objectMode:true})
,
export class JSONParser extends stream.Transform {
lastLineData = '';
constructor() {
super({objectMode: true});
}
_transform(chunk: any, encoding: string, cb: Function) {
let data = String(chunk);
if (this.lastLineData) {
data = this.lastLineData + data;
}
let lines = data.split('\n');
this.lastLineData = lines.splice(lines.length - 1, 1)[0];
lines.forEach(l => {
try {
// l might be an empty string; ignore if so
l && this.push(JSON.parse(l));
}
catch (err) {
// noop
}
});
cb();
}
flush(cb: Function) {
if (this.lastLineData) {
try {
this.push(JSON.parse(this.lastLineData));
}
catch (err) {
// noop
}
}
this.lastLineData = '';
cb();
}
_flush(cb: Function) {
if (this.lastLineData) {
try {
this.push(JSON.parse(this.lastLineData));
}
catch (err) {
// noop
}
}
this.lastLineData = '';
cb();
}
}
the only question remaining is whether I should be implementing _flush()
or flush()
, I am not sure...I implemented both for now...
Upvotes: 1