Reputation: 12692
I have a class that takes a local file, transforms it, and stores it in GCS:
import java.nio.channels.Channels
import java.nio.file.{ Files, Path }
import java.util.zip.{ GZIPOutputStream, ZipInputStream }
import com.google.cloud.storage.{ BlobInfo, Storage }
import com.google.common.io.ByteStreams
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream
import org.apache.commons.io.IOUtils
import resource._
class GcsService(gcsStorage: Storage) {
def storeFileInGcs(localPath: Path, destination: FileDestination): Unit = {
val blobInfo = BlobInfo.newBuilder(destination.bucket, destination.path).build
if (destination.unzipGzip) {
for (input ← managed(new ZipInputStream(Files.newInputStream(localPath)));
output ← managed(new GZIPOutputStream(Channels.newOutputStream(gcsStorage.writer(blobInfo))))) {
ByteStreams.copy(input, output)
}
} else if (destination.decompressBzip2) {
for (input <- managed(new BZip2CompressorInputStream(Files.newInputStream(localPath)));
output <- managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))) {
ByteStreams.copy(input, output)
}
} else {
for (input <- managed(Files.newInputStream(localPath));
output <- managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))) {
IOUtils.copy(input, output)
}
}
}
}
case class FileDestination(unzipGzip: Boolean, decompressBzip2: Boolean, bucket: String, path: String)
I am trying to remove some code duplication, in particular the creation of the fileInputStream
and gcsOutputStream
. But I cannot simply extract those variables at the top of the method, because it would create resources outside of the scala-arm managed
block:
import java.io.{ InputStream, OutputStream }
import java.nio.channels.Channels
import java.nio.file.{ Files, Path }
import java.util.zip.{ GZIPOutputStream, ZipInputStream }
import com.google.cloud.storage.{ BlobInfo, Storage }
import com.google.common.io.ByteStreams
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream
import org.apache.commons.io.IOUtils
import resource._
class GcsService(gcsStorage: Storage) {
def storeFileInGcs(localPath: Path, destination: FileDestination): Unit = {
val blobInfo = BlobInfo.newBuilder(destination.bucket, destination.path).build
// FIXME: creates a resource outside of the ARM block
val fileInputStream = Files.newInputStream(localPath)
val gcsOutputStream = Channels.newOutputStream(gcsStorage.writer(blobInfo))
if (destination.unzipGzip) {
unzipGzip(fileInputStream, gcsOutputStream)
} else if (destination.decompressBzip2) {
decompressBzip2(fileInputStream, gcsOutputStream)
} else {
copy(fileInputStream, gcsOutputStream)
}
}
private def unzipGzip(inputStream: InputStream, outputStream: OutputStream): Unit = {
for (input ← managed(new ZipInputStream(inputStream));
output ← managed(new GZIPOutputStream(outputStream))) {
ByteStreams.copy(input, output)
}
}
private def decompressBzip2(inputStream: InputStream, outputStream: OutputStream): Unit = {
for (input <- managed(new BZip2CompressorInputStream(inputStream));
output <- managed(outputStream)) {
ByteStreams.copy(input, output)
}
}
private def copy(inputStream: InputStream, outputStream: OutputStream): Unit = {
for (input <- managed(inputStream);
output <- managed(outputStream)) {
IOUtils.copy(input, output)
}
}
}
case class FileDestination(unzipGzip: Boolean, decompressBzip2: Boolean, bucket: String, path: String)
As you can see, the code is a lot clearer and more testable, but resources are not handled correctly since they are not "managed". As an example, if an exception is thrown when creating gcsOutputStream
, fileInputStream
won't be closed.
I could probably solve this using Google Guava sources and sinks, but I am wondering if there is a better way of doing this in Scala, without introducing Guava. Ideally using the standard library, or a scala-arm feature, or maybe even in Cats
?
fileInputStream
and gcsOutputStream
as functions that take nothing and return the stream? It seems the code will be more verbose with () => InputStream
and () => OutputStream
everywhere?fileInputStream
and gcsOutputStream
, and another one inside each sub-function)? If I do that, isn't it a problem that the "inner" inputstream will be closed twice?Upvotes: 1
Views: 382
Reputation: 22625
You could refactor it like this:
First, declare managed resources:
val fileInputStream: ManagedResource[InputStream] = managed(Files.newInputStream(localPath))
val gcsOutputStream: ManagedResource[OutputStream] = managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))
It doesn't open these resources, it's just declaration, that you want these resources to be managed.
Then you could use map
to wrap them in desired decorators (like ZipInputStream
):
if (destination.unzipGzip) {
for (input ← fileInputStream.map(s => new ZipInputStream(s));
output ← gcsOutputStream.map(s => new GZIPOutputStream(s))) {
ByteStreams.copy(input, output)
}
} else if (destination.decompressBzip2) {
for (input <- fileInputStream.map(s => new BZip2CompressorInputStream(s));
output <- gcsOutputStream) {
ByteStreams.copy(input, output)
}
} else {
for (input <- fileInputStream;
output <- gcsOutputStream) {
IOUtils.copy(input, output)
}
}
Of course ManagedResource[A]
is just value, so you can even pass it to a method as parameter:
private def unzipGzip(inputStream: Managed[InputStream], outputStream: Managed[OutputStream]): Unit = {
for (input ← inputStream.map(s => new ZipInputStream(s));
output ← outputStream.map(s => new GZIPOutputStream(s))) {
ByteStreams.copy(input, output)
}
}
Upvotes: 1