imfarhad
imfarhad

Reputation: 172

Reading files in parallel with observables

I am writing some application to read the selected files by the user and convert them to base64. I want to get notification when all the files are read in memory. For this purpose I am using Observable where handle the onload event of FileReader and send a complete notification. I am using forkJoin to run the operation in parallel.

Please see below the code where I am creating Observable and subscribing to it.

onChange($event: any) {
  console.log('No of files selected: ' + $event.target.files.length);
  var observableBatch : any = [];

  var rawFiles = $event.target.files;
  for (var i = rawFiles.length - 1; i >= 0; i--) {

      var reader = new FileReader(); 
      var file = rawFiles[i];
      var myobservable = Observable.create((observer: any) => {
        reader.onload = function (e: any) {
          var data = e.target;
          var imageSrc = data.result;
          console.log('File loaded succesfully.' );
          observer.next("File loaded");
          observer.complete();
        };
       });

      observableBatch.push(myobservable);
      reader.readAsArrayBuffer(file);

  }

  Observable.forkJoin(observableBatch)
  .subscribe(
      (m) => {
        console.log(m);
      },
      (e) => {
        console.log(e);
      },
      () => {
        console.log("All file(s) loading completed!!!");
      }
    ); 
}

Complete sample code is available in plunkr

When I select a single file, onload function is executed and I get the following console logs

enter image description here

However, when I select multiple files, onload gets executed only once and the batch operation is not completed. Please see the following console logs

enter image description here

Can somebody help me to understand where I am making the mistake?

Upvotes: 3

Views: 7302

Answers (3)

Lievno
Lievno

Reputation: 1041

I want to propose this solution. Feel free to tell me if you have some troubles with this solution.

const files = Array.from(event.srcElement.files);

Observable.from(files)
  .map((file: File) => {
    const reader = new FileReader();
    const load$ = Observable.fromEvent(reader, 'load').take(1);
    const read$ = Observable.of(file).do(reader.readAsDataURL.bind(reader));
    return [load$, read$];
  })
  .toArray()
  .switchMap((values: any) => {
    const arrayObservables = values.reduce((acc, value) => acc.concat(value), []);
    return Observable.forkJoin(...arrayObservables);
  })
  .subscribe({
    next: console.log
  });

Cheers.

Upvotes: 3

progressdll
progressdll

Reputation: 374

I am using this code using flatmap to make sure everything is loaded

import {Injectable} from '@angular/core'
import {Attachment} from './attachments.component'
import {Inject} from '@angular/core'
import {BehaviorSubject} from 'rxjs/BehaviorSubject'
import {Observable} from "rxjs/Observable";
import {AttachmentBackendService} from './attachment.backend.service'
import 'rxjs/add/observable/from'
import 'rxjs/add/operator/mergeMap'

@Injectable()
export class AttachmentStore {
  private _attachments: BehaviorSubject<Attachment[]> = new     BehaviorSubject<Attachment[]>([])
  private dataStore : {
    attachments : Attachment[]
  }
  private storeId : string = ''
  private attachmentId : number = 0

  constructor(private attachmentBackendService: AttachmentBackendService) {
    this.dataStore = { attachments : [] }
  }

  get attachments() {
    return this._attachments.asObservable()
  }

  // public
  addFiles(files: FileList) {
    let fileArray = Array.from(files)
    this.processFiles(
         fileArray[0],
         fileArray.slice(1))
       .subscribe(
         (attachment) => {
           this.storeAndSaveAttachment(attachment)
           this._attachments.next(this.dataStore.attachments)
         },
         (e) => {
           console.log(e)
         },
         () => {
           console.log("file loading completed!!!")
         })
   return this.storeId
 }

 removeFile(index: number) {
   let attachment = this.dataStore.attachments[index]
   this.attachmentBackendService.deleteAttachment(this.storeId, attachment.id)
   this.dataStore.attachments.splice(index, 1)
   this._attachments.next(this.dataStore.attachments)
 }

 // private
 private processFiles(file : File, fileArray : File[]) {
   if (fileArray.length > 0) {
     return this.processFiles(
                   fileArray.slice(0,1)[0],
                   fileArray.slice(1))
                .flatMap( (attachment) => {
                  this.storeAndSaveAttachment(attachment)
                  return this.fileReaderObs(file,this.attachmentId++)
                })
   } else {
     if (this.storeId == '')
     {
       this.storeId = this.attachmentBackendService.storeId
     }
     return this.fileReaderObs(file,this.attachmentId++)
   }
 }

 private storeAndSaveAttachment(attachment : Attachment) {
   this.dataStore.attachments.push(attachment)
   this.attachmentBackendService.saveAttachment(this.storeId, attachment)
 }

 private fileReaderObs(file : File, attachmentId : number)  {
   let reader = new FileReader()
   let fileReaderObs = Observable.create((observer: any) => {
     reader.onload = function() {
       let attachment : Attachment = {
         id : attachmentId,
         name : file.name,
         data : btoa(reader.result)
       }
       observer.next(attachment)
       observer.complete()
     }
   })
   reader.readAsBinaryString(file)
   return fileReaderObs
 }

}

Upvotes: 3

ABabin
ABabin

Reputation: 2947

I found this answer from a similar question. Apparently it has to do with the order in which the loop and the callbacks are run. I think the .forkJoin() is waiting for as many Observables to complete as have been passed to it, but by the time it gets all of them and subscribes, the first onload has already finished so the Observable completion never happens.

Anyway, you can solve the issue by putting the code where you set up the FileReader, Observable, and onload callback into it's own function. Here is the plunkr showing that it works.

export class AppComponent {
  title = 'file reader';
  observableBatch : any = [];

  onChange($event: any) {
    console.log('No of files selected: ' + $event.target.files.length);
    //Make sure to clear the observableBatch array before restarting the whole process.
    this.observableBatch = [];

    var rawFiles = $event.target.files;
    for (var i = rawFiles.length - 1; i >= 0; i--) {
      this.setUpFile(rawFiles[i]);
    }

    Observable.forkJoin(this.observableBatch)
    .subscribe(
      (m) => {
        console.log(m);
      },
      (e) => {
        console.log(e);
      },
      () => {
        console.log("All file(s) loading completed!!!");
      }
    ); 
  }


  setUpFile(file) {
    var reader = new FileReader(file); 
    var myobservable = Observable.create((observer: any) => {
      reader.onload = function (e: any) {
        var data = e.target;
        var imageSrc = data.result;
        console.log('File loaded succesfully.' );
        observer.next("File loaded");
        observer.complete();
      };
    });

    this.observableBatch.push(myobservable);
    reader.readAsArrayBuffer(file);
  } 
}

Upvotes: 2

Related Questions