Jo Liss
Jo Liss

Reputation: 32935

Concatenate two (or n) streams

Upvotes: 39

Views: 34267

Answers (12)

John Kristian
John Kristian

Reputation: 61

Here's one way. This works with node versions as old as 4.9.1.

"use strict";
const Stream = require('stream');

/** Pipe several Readable streams into a single Writable stream, in sequence.
    That is, pipe the first source stream into the target until the source ends
    or closes, then pipe the second source stream into the target, and so on.
    The sources may be an iterable (for example, an array), an async iterable,
    an iterator or an async iterator. In any case, it must produce Readable streams.
*/
function concatenateStreams(sources, target) {
    const iterator =
          ((typeof sources[Symbol.asyncIterator]) == 'function')
          ? sources[Symbol.asyncIterator]()
          : ((typeof sources[Symbol.iterator]) == 'function')
          ? sources[Symbol.iterator]()
          : sources;
    const nextSource = function() {
        Promise.resolve(
            iterator.next()
        ).then(function(iteratorResult) {
            if ((typeof iteratorResult) != 'object') {
                throw new TypeError("iterator.next() returned a non-object value");
            } else if (iteratorResult.done) {
                target.end();
            } else if (!iteratorResult.value) {
                throw new Error(`iterator.next().value is ${JSON.stringify(iteratorResult.value)}`);
            } else {
                const source = iteratorResult.value;
                if (source.closed || source.destroyed) {
                    nextSource();
                } else {
                    var sourceEnded = false;
                    const sourceEnd = function() {
                        if (!sourceEnded) {
                            sourceEnded = true;
                            nextSource();
                        }
                    };
                    source.on('end', sourceEnd);
                    source.on('close', sourceEnd);
                    source.pipe(target, {end: false});
                }
            }
        }).catch(function(err) {
            if ((typeof target.destroy) == 'function') {
                target.destroy(err);
            } else { // node.js version older than 8.0.0
                if (err) target.emit('error', err);
                target.end();
                target.emit('close');
            }
            if (iterator && (typeof iterator.return) == 'function') {
                iterator.return();
            }
        });
    };
    nextSource();
}

/** Concatenate several Readable streams into a single Readable stream, in sequence. */
class ConcatenatedStream extends Stream.Transform {
    constructor(sources, options) {
        super(Object.assign({
            decodeStrings: false, // don't convert strings to Buffers
        }, options || {}));
        this._pushEncoding = !(options && (options.objectMode || options.readableObjectMode));
        concatenateStreams(sources, this);
    }
    _transform(chunk, encoding, callback) {
        this.push(chunk, this._pushEncoding ? encoding : undefined);
        if (callback) callback();
    }
}

Upvotes: 0

Feng
Feng

Reputation: 2910

This can be done with vanilla Node.js

import { PassThrough } from 'stream'
const merge = (...streams) => {
    let pass = new PassThrough()
    for (let stream of streams) {
        const end = stream == streams.at(-1);
        pass = stream.pipe(pass, { end })
    }
    return pass
}

Use streams.slice(-1)[0] if you don't have .at() in your version of Node.js

Upvotes: 35

Kaller
Kaller

Reputation: 1

Nisha provided my favourite solution to this problem. Some of the solutions didn't remove the end event which caused some issues when doing audio stream merging. However, he forgot to handle the obvious case of when there is just one stream. Thank you so much for the well-thought solution Nisha!

const pipeStreams = (streams: Stream[]): Stream => {
    //If there is only one stream, return that stream
    if (streams.length == 1) return streams[0];
    const out = new PassThrough()
    // Piping the first stream to the out stream
    // Also prevent the automated 'end' event of out stream from firing
    streams[0].pipe(out, { end: false })
    for (let i = 0; i < streams.length - 2; i++) {
        // On the end of each stream (until the second last) pipe the next stream to the out stream
        // Prevent the automated 'end' event of out stream from firing
        streams[i].on('end', () => {
            streams[i + 1].pipe(out, { end: false })
        })
    }
    // On the end of second last stream pipe the last stream to the out stream.
    // Don't prevent the 'end flag from firing'
    streams[streams.length - 2].on('end', () => {
        streams[streams.length - 1].pipe(out)
    })
    return out
}

Upvotes: 0

ivoputzer
ivoputzer

Reputation: 6469

If you don't care about the ordering of data in the streams, a simple reduce operation should be fine in nodejs!

const {PassThrough} = require('stream')

let joined = [s0, s1, s2, ...sN].reduce((pt, s, i, a) => {
  s.pipe(pt, {end: false})
  s.once('end', () => a.every(s => s.ended) && pt.emit('end'))
  return pt
}, new PassThrough())

Cheers ;)

Upvotes: 8

ducaale
ducaale

Reputation: 645

This can now be easily done using async iterators

async function* concatStreams(readables) {
  for (const readable of readables) {
    for await (const chunk of readable) { yield chunk }
  }
} 

And you can use it like this

const fs = require('fs')
const stream = require('stream')

const files = ['file1.txt', 'file2.txt', 'file3.txt'] 
const iterable = await concatStreams(files.map(f => fs.createReadStream(f)))

// convert the async iterable to a readable stream
const mergedStream = stream.Readable.from(iterable)

More info regarding async iterators: https://2ality.com/2019/11/nodejs-streams-async-iteration.html

Upvotes: 16

Mr. S
Mr. S

Reputation: 104

Both of the most upvoted answers here aren't working with asynchronous streams because they just pipe things on regardless whether the source stream is ready to produce. I had to combine in-memory string streams with data feed from a database, and the database content was always at the end of the resulting stream because it takes a second to get a db response. Here's what I ended up writing for my purposes.

export function joinedStream(...streams: Readable[]): Readable {
  function pipeNext(): void {
    const nextStream = streams.shift();
    if (nextStream) {
      nextStream.pipe(out, { end: false });
      nextStream.on('end', function() {
        pipeNext();
      });
    } else {
      out.end();
    }
  }
  const out = new PassThrough();
  pipeNext();
  return out;
}

Upvotes: 3

Nisha
Nisha

Reputation: 1

The below code worked for me :). Have taken the inputs from all the answers given earlier

  const pipeStreams = (streams) => {
  const out = new PassThrough()
  // Piping the first stream to the out stream
  // Also prevent the automated 'end' event of out stream from firing
  streams[0].pipe(out, { end: false })
  for (let i = 0; i < streams.length - 2; i++) {
    // On the end of each stream (until the second last) pipe the next stream to the out stream
    // Prevent the automated 'end' event of out stream from firing
    streams[i].on('end', () => {
      streams[i + 1].pipe(out, { end: false })
    })
  }
  // On the end of second last stream pipe the last stream to the out stream.
  // Don't prevent the 'end flag from firing'
  streams[streams.length - 2].on('end', () => {
    streams[streams.length - 1].pipe(out)
  })
  return out
} 

Upvotes: 0

GTPV
GTPV

Reputation: 402

In vanilla nodejs using ECMA 15+ and combining the good answers of Ivo and Feng.

The PassThrough class is a trivial Transform stream which does not modify the stream in any way.

const { PassThrough } = require('stream');

const concatStreams = (streamArray, streamCounter = streamArray.length) => streamArray
  .reduce((mergedStream, stream) => {
    // pipe each stream of the array into the merged stream
    // prevent the automated 'end' event from firing
    mergedStream = stream.pipe(mergedStream, { end: false });
    // rewrite the 'end' event handler
    // Every time one of the stream ends, the counter is decremented.
    // Once the counter reaches 0, the mergedstream can emit its 'end' event.
    stream.once('end', () => --streamCounter === 0 && mergedStream.emit('end'));
    return mergedStream;
  }, new PassThrough());

Can be used like this:

const mergedStreams = concatStreams([stream1, stream2, stream3]);

Upvotes: 4

pob
pob

Reputation: 381

https://github.com/joepie91/node-combined-stream2 is a drop-in Streams2-compatible replacement for the combined-stream module (which is described above.) It automatically wraps Streams1 streams.

Example code for combined-stream2:

var CombinedStream = require('combined-stream2');
var fs = require('fs');

var combinedStream = CombinedStream.create();
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));

combinedStream.pipe(fs.createWriteStream('combined.txt'));

Upvotes: 2

atamborrino
atamborrino

Reputation: 478

streamee.js is a set of stream transformers and composers based on node 1.0+ streams and include a concatenate method:

var stream1ThenStream2 = streamee.concatenate([stream1, stream2]);

Upvotes: 1

Jo Liss
Jo Liss

Reputation: 32935

The combined-stream package concatenates streams. Example from the README:

var CombinedStream = require('combined-stream');
var fs = require('fs');

var combinedStream = CombinedStream.create();
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));

combinedStream.pipe(fs.createWriteStream('combined.txt'));

I believe you have to append all streams at once. If the queue runs empty, the combinedStream automatically ends. See issue #5.

The stream-stream library is an alternative that has an explicit .end, but it's much less popular and presumably not as well-tested. It uses the streams2 API of Node 0.10 (see this discussion).

Upvotes: 20

Aaron Dufour
Aaron Dufour

Reputation: 17505

You might be able to make it more concise, but here's one that works:

var util = require('util');
var EventEmitter = require('events').EventEmitter;

function ConcatStream(streamStream) {
  EventEmitter.call(this);
  var isStreaming = false,
    streamsEnded = false,
    that = this;

  var streams = [];
  streamStream.on('stream', function(stream){
    stream.pause();
    streams.push(stream);
    ensureState();
  });

  streamStream.on('end', function() {
    streamsEnded = true;
    ensureState();
  });

  var ensureState = function() {
    if(isStreaming) return;
    if(streams.length == 0) {
      if(streamsEnded)
        that.emit('end');
      return;
    }
    isStreaming = true;
    streams[0].on('data', onData);
    streams[0].on('end', onEnd);
    streams[0].resume();
  };

  var onData = function(data) {
    that.emit('data', data);
  };

  var onEnd = function() {
    isStreaming = false;
    streams[0].removeAllListeners('data');
    streams[0].removeAllListeners('end');
    streams.shift();
    ensureState();
  };
}

util.inherits(ConcatStream, EventEmitter);

We keep track of state with streams (the queue of streams;push to the back and shift from the front), isStreaming, and streamsEnded. When we get a new stream, we push it, and when a stream ends, we stop listening and shift it. When the stream of streams ends, we set streamsEnded.

On each of these events, we check the state we're in. If we're already streaming (piping a stream), we do nothing. If the queue is empty and streamsEnded is set, we emit the end event. If there is something in the queue, we resume it and listen to its events.

*Note that pause and resume are advisory, so some streams may not behave correctly, and would require buffering. This exercise is left to the reader.

Having done all of this, I would do the n=2 case by constructing an EventEmitter, creating a ConcatStream with it, and emitting two stream events followed by an end event. I'm sure it could be done more concisely, but we may as well use what we've got.

Upvotes: 3

Related Questions