yeppe
yeppe

Reputation: 689

Read and Write to files using Multi-threading (producer consumer model) in JAVA

I am stuck here, can someone explain why the consumer thread is running prior producer thread in the below code. How can consumer thread run when the producer has not put any content. Is the program wrong?

Achieve:- run produce consumer thread for each file that is picked up from the given folder.

For instance if the specified folder has 3 then 2 thread (producer/consumer) per file must be initiated, which makes the thread count 6.

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;

class sharedInt {
    private int syncUponInt;
    private boolean available = false;
    private File processingFile;
    private static File[] listOfFile;

    sharedInt(File[] totalList) {
        listOfFile = totalList;
    }

    public int getTotalCount() {
        return listOfFile.length;
    }

    public static File[] getListOfFile() {
        return listOfFile;
    }

    public static void setListOfFile(File[] listOfFile) {
        sharedInt.listOfFile = listOfFile;
    }

    public File getProcessingFile() {
        return processingFile;
    }

    public void setProcessingFile(File processingFile) {
        this.processingFile = processingFile;
    }

    public synchronized int getContents() {
        while (available == false) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }
        available = false;
        notify();
        return syncUponInt;
    }

    public synchronized void setContents(int value) {
        while (available == true) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }
        syncUponInt = value;
        available = true;
        notify();
    }
}

class Producer1 extends Thread {
    private sharedInt cubbyhole;
    private int number;

    public Producer1(sharedInt c, int number) {
        cubbyhole = c;
        this.number = number;
    }

    public void run() {
        // for (int i = 0; i < cubbyhole.getTotalCount(); i++) {
        cubbyhole.setContents(this.number);
        Vector vectorList = new Vector();
        System.out.println("Producer <current thread>" + this.currentThread() + "put: " + this.number
                + "processing file is" + cubbyhole.getProcessingFile());
        RandomAccessFile raf = null;

        try {
            raf = new RandomAccessFile(cubbyhole.getProcessingFile(), "r");
            StringBuffer sb = new StringBuffer();
            String line = null;
            while ((line = raf.readLine()) != null) {
                sb.append(line);
            }
            vectorList.add(sb.toString());
            System.out.println(sb.toString());
        } catch (FileNotFoundException e) {
        } catch (IOException e) {
        }

        // }
    }
}

class Consumer1 extends Thread {
    private sharedInt cubbyhole;

    public Consumer1(sharedInt c) {
        cubbyhole = c;
    }

    public void run() {
        int value = 0;
        // for (int i = 0; i < cubbyhole.getTotalCount(); i++) {

        System.out.println("Consumer <current thread>" + this.currentThread() + "got: " + cubbyhole.getContents()
                + "processing file is" + cubbyhole.getProcessingFile());
    }
}

public class FileManagementApp {
    public static void main(String[] args) throws InterruptedException {

        System.out.println("1. Please enter the path of the <Directory/Folder>...");
        // Scanner scn = new Scanner(System.in);
        // String folderPath = scn.nextLine();
        File folder = new File("C:\\file\\output");
        File[] fileList = folder.listFiles();
        int countOfFiles = fileList.length;

        sharedInt c = new sharedInt(fileList);
        Producer1 p1 = null;
        List<Producer1> pList = new ArrayList<Producer1>();
        Consumer1 c1 = null;
        List<Consumer1> cList = new ArrayList<Consumer1>();
        for (int i = 0; i < countOfFiles; i++) {
            c = new sharedInt(fileList);
            c.setProcessingFile(fileList[i]);

            p1 = new Producer1(c, i);
            p1.setName("Producer--" + i);
            pList.add(p1);
            c1 = new Consumer1(c);
            c1.setName("Consumer--" + i);
            cList.add(c1);
            pList.get(i).start();
            cList.get(i).start();
        }

    }
}

Output:-

1. Please enter the path of the <Directory/Folder>...
Consumer <current thread>Thread[Consumer--0,5,main]got: 0processing file isC:\file\output\0.A.txt
Producer <current thread>Thread[Producer--0,5,main]put: 0processing file isC:\file\output\0.A.txt
Producer <current thread>Thread[Producer--1,5,main]put: 1processing file isC:\file\output\1.A.txt
Producer <current thread>Thread[Producer--2,5,main]put: 2processing file isC:\file\output\2.A.txt
Consumer <current thread>Thread[Consumer--1,5,main]got: 1processing file isC:\file\output\1.A.txt
fg
abc
Consumer <current thread>Thread[Consumer--2,5,main]got: 2processing file isC:\file\output\2.A.txt
de

EDIT:-

Modified the code to something like this and was able to achieve the concurrency /multi threading to read and write files simultaneously using producer consumer model.

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;

class SharedInteger {
    private boolean available = false;
    public File processingFile;
    public long totalNoOfSplits;
    public Vector<Byte> vectorBytes;
    private File[] listOfFiles;

    SharedInteger(File[] totalList) {
        listOfFiles = totalList;
    }

    public synchronized Vector<Byte> get() {
        while (available == false) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }
        available = false;
        notify();
        return vectorBytes;
    }

    public synchronized void put(Vector<Byte> value) {
        while (available == true) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }
        vectorBytes = value;
        available = true;
        notify();
    }
}

class Producer extends Thread {
    private SharedInteger sharedInteger;

    public Producer(SharedInteger c) {
        sharedInteger = c;
    }

    public void run() {
        FileInputStream fis = null;
        Vector<Byte> vectorBytes = new Vector<Byte>();
        try {
            fis = new FileInputStream(sharedInteger.processingFile);
            while (fis.available() != 0) {
                vectorBytes.add((byte) fis.read());
            }
            sharedInteger.put(vectorBytes);
        } catch (Exception e) {

        }
    }
}

class Consumer extends Thread {
    private SharedInteger sharedInteger;
    private FileOutputStream fos;

    public Consumer(SharedInteger c) {
        sharedInteger = c;
    }

    public void run() {
        File newFile = sharedInteger.processingFile;
        try {
            fos = new FileOutputStream(newFile.getParentFile()+"1\\"+newFile.getName());
        } catch (FileNotFoundException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }

        Vector<Byte> v = sharedInteger.get();
        try {
            if (null != v) {
                writeToAFile(v);
            }
        } catch (IOException e) { // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public void writeToAFile(Vector<Byte> contents) throws IOException {
        for (int i = 0; i < contents.size(); i++) {
            System.out.println(Thread.currentThread());
            fos.write(contents.get(i));
            fos.flush();
        }

    }
}

public class ProducerConsumerTest {
    public static void main(String[] args) throws FileNotFoundException {
        File folder = new File("C:\\file\\output");
        File[] fileList = folder.listFiles();
        int countOfFiles = fileList.length;
        SharedInteger c = new SharedInteger(fileList);
        List<Producer> pList = new ArrayList<Producer>();
        List<Consumer> cList = new ArrayList<Consumer>();
        Producer p1 = null;
        Consumer c1 = null;

        for (int i = 0; i < countOfFiles; i++) {
            c = new SharedInteger(fileList);
            c.processingFile = fileList[i];

            p1 = new Producer(c);
            p1.setName("Producer--" + i);
            pList.add(p1);
            pList.get(i).start();

            c1 = new Consumer(c);
            c1.setName("Consumer--" + i);
            cList.add(c1);
            cList.get(i).start();
        }

    }
}

Upvotes: 2

Views: 2503

Answers (2)

beatngu13
beatngu13

Reputation: 9443

Well, there're a few things which are quite fishy. However, have a look at your run() methods:

// Producer1
public void run() {
    cubbyhole.setContents(this.number);
    Vector vectorList = new Vector();
    System.out.println("Producer <current thread>" + this.currentThread()
            + "put: " + this.number
            + "processing file is" + cubbyhole.getProcessingFile());
    RandomAccessFile raf = null;

    try {
        // ...
    }

// Consumer1
public void run() {
    int value = 0;

    System.out.println("Consumer <current thread>" + this.currentThread()
            + "got: " + cubbyhole.getContents()
            + "processing file is" + cubbyhole.getProcessingFile());
}

As soon as your producer invokes setContents(int) (and, therefore, notify() as well), your consumer is free to continue. Just because you see the console output from your consumer first doesn't mean something. Printing is done concurrently without synchronization, so you can't rely on the order of execution.

EDIT: according to your requirements using Vector, wait(), notifiy(), and two threads per file, but please bear in mind that there're far superior ways to implement this (see comments):

public class FileMerger {

    private volatile int currentWriterId = 0;

    public static void main(String[] args) throws Exception {
        // 1st argument: target directory.
        File directory = new File(args[0]);
        // 2nd argument: merge files suffix.
        FilenameFilter filter = (dir, name) -> name.endsWith("." + args[1]);

        new FileMerger().merge(directory, filter);
    }

    public void merge(File directory, FilenameFilter filter) throws IOException, InterruptedException {
        File[] files = directory.listFiles(filter);
        int numberOfFiles = files.length;
        Path mergeFilePath = Paths.get(directory.getPath() + FileSystems.getDefault().getSeparator() + "merge.txt");
        Vector<String> fileContents = new Vector<>(Collections.nCopies(numberOfFiles, null));

        Files.createFile(mergeFilePath);

        for (int i = 0; i < numberOfFiles; i++) {
            final int writerId = i;
            File file = files[i];
            CountDownLatch readWriteLatch = new CountDownLatch(1);

            // Reader.
            new Thread(() -> {
                try {
                    List<String> lines = Files.readAllLines(Paths.get(file.getPath()));
                    String content = String.join("\n", lines);

                    fileContents.set(writerId, content);
                    readWriteLatch.countDown();
                } catch (IOException e) { /* NOP */ }
            }).start();

            // Writer.
            new Thread(() -> {
                try {
                    // Wait for corresponding reader to set content.
                    readWriteLatch.await();

                    // Wait for writer ID.
                    synchronized (this) {
                        while (currentWriterId != writerId) {
                            wait();
                        }
                        Files.write(mergeFilePath, (fileContents.get(writerId) + "\n").getBytes(), StandardOpenOption.APPEND);
                        currentWriterId++;
                        notifyAll();
                    }
                } catch (InterruptedException | IOException e) { /* NOP */ }
            }).start();
        }
    }

}

Upvotes: 2

yeppe
yeppe

Reputation: 689

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;

class SharedInteger {
    private boolean available = false;
    public File processingFile;
    public long totalNoOfSplits;
    public Vector<Byte> vectorBytes;
    private File[] listOfFiles;

    SharedInteger(File[] totalList) {
        listOfFiles = totalList;
    }

    public synchronized Vector<Byte> get() {
        while (available == false) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }
        available = false;
        notify();
        return vectorBytes;
    }

    public synchronized void put(Vector<Byte> value) {
        while (available == true) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }
        vectorBytes = value;
        available = true;
        notify();
    }
}

class Producer extends Thread {
    private SharedInteger sharedInteger;

    public Producer(SharedInteger c) {
        sharedInteger = c;
    }

    public void run() {
        FileInputStream fis = null;
        Vector<Byte> vectorBytes = new Vector<Byte>();
        try {
            fis = new FileInputStream(sharedInteger.processingFile);
            while (fis.available() != 0) {
                vectorBytes.add((byte) fis.read());
            }
            sharedInteger.put(vectorBytes);
        } catch (Exception e) {

        }
    }
}

class Consumer extends Thread {
    private SharedInteger sharedInteger;
    private FileOutputStream fos;

    public Consumer(SharedInteger c) {
        sharedInteger = c;
    }

    public void run() {
        File newFile = sharedInteger.processingFile;
        try {
            fos = new FileOutputStream(newFile.getParentFile()+"1\\"+newFile.getName());
        } catch (FileNotFoundException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }

        Vector<Byte> v = sharedInteger.get();
        try {
            if (null != v) {
                writeToAFile(v);
            }
        } catch (IOException e) { // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public void writeToAFile(Vector<Byte> contents) throws IOException {
        for (int i = 0; i < contents.size(); i++) {
            System.out.println(Thread.currentThread());
            fos.write(contents.get(i));
            fos.flush();
        }

    }
}

public class ProducerConsumerTest {
    public static void main(String[] args) throws FileNotFoundException {
        File folder = new File("C:\\file\\output");
        File[] fileList = folder.listFiles();
        int countOfFiles = fileList.length;
        SharedInteger c = new SharedInteger(fileList);
        List<Producer> pList = new ArrayList<Producer>();
        List<Consumer> cList = new ArrayList<Consumer>();
        Producer p1 = null;
        Consumer c1 = null;

        for (int i = 0; i < countOfFiles; i++) {
            c = new SharedInteger(fileList);
            c.processingFile = fileList[i];

            p1 = new Producer(c);
            p1.setName("Producer--" + i);
            pList.add(p1);
            pList.get(i).start();

            c1 = new Consumer(c);
            c1.setName("Consumer--" + i);
            cList.add(c1);
            cList.get(i).start();
        }

    }
}

Upvotes: 0

Related Questions