我有一个获取本地文件,对其进行转换并将其存储在GCS中的类:
import java.nio.channels.Channels
import java.nio.file.{ Files, Path }
import java.util.zip.{ GZIPOutputStream, ZipInputStream }
import com.google.cloud.storage.{ BlobInfo, Storage }
import com.google.common.io.ByteStreams
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream
import org.apache.commons.io.IOUtils
import resource._
class GcsService(gcsStorage: Storage) {
def storeFileInGcs(localPath: Path, destination: FileDestination): Unit = {
val blobInfo = BlobInfo.newBuilder(destination.bucket, destination.path).build
if (destination.unzipGzip) {
for (input ← managed(new ZipInputStream(Files.newInputStream(localPath)));
output ← managed(new GZIPOutputStream(Channels.newOutputStream(gcsStorage.writer(blobInfo))))) {
ByteStreams.copy(input, output)
}
} else if (destination.decompressBzip2) {
for (input <- managed(new BZip2CompressorInputStream(Files.newInputStream(localPath)));
output <- managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))) {
ByteStreams.copy(input, output)
}
} else {
for (input <- managed(Files.newInputStream(localPath));
output <- managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))) {
IOUtils.copy(input, output)
}
}
}
}
case class FileDestination(unzipGzip: Boolean, decompressBzip2: Boolean, bucket: String, path: String)
我正在尝试删除一些重复的代码,特别是fileInputStream
和gcsOutputStream
的创建。但是我不能简单地在方法顶部提取这些变量,因为它会在scala-arm managed
块之外创建资源:
import java.io.{ InputStream, OutputStream }
import java.nio.channels.Channels
import java.nio.file.{ Files, Path }
import java.util.zip.{ GZIPOutputStream, ZipInputStream }
import com.google.cloud.storage.{ BlobInfo, Storage }
import com.google.common.io.ByteStreams
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream
import org.apache.commons.io.IOUtils
import resource._
class GcsService(gcsStorage: Storage) {
def storeFileInGcs(localPath: Path, destination: FileDestination): Unit = {
val blobInfo = BlobInfo.newBuilder(destination.bucket, destination.path).build
// FIXME: creates a resource outside of the ARM block
val fileInputStream = Files.newInputStream(localPath)
val gcsOutputStream = Channels.newOutputStream(gcsStorage.writer(blobInfo))
if (destination.unzipGzip) {
unzipGzip(fileInputStream, gcsOutputStream)
} else if (destination.decompressBzip2) {
decompressBzip2(fileInputStream, gcsOutputStream)
} else {
copy(fileInputStream, gcsOutputStream)
}
}
private def unzipGzip(inputStream: InputStream, outputStream: OutputStream): Unit = {
for (input ← managed(new ZipInputStream(inputStream));
output ← managed(new GZIPOutputStream(outputStream))) {
ByteStreams.copy(input, output)
}
}
private def decompressBzip2(inputStream: InputStream, outputStream: OutputStream): Unit = {
for (input <- managed(new BZip2CompressorInputStream(inputStream));
output <- managed(outputStream)) {
ByteStreams.copy(input, output)
}
}
private def copy(inputStream: InputStream, outputStream: OutputStream): Unit = {
for (input <- managed(inputStream);
output <- managed(outputStream)) {
IOUtils.copy(input, output)
}
}
}
case class FileDestination(unzipGzip: Boolean, decompressBzip2: Boolean, bucket: String, path: String)
如您所见,该代码更加清晰和可测试,但是由于资源不是“托管”的,因此无法正确处理。例如,如果在创建gcsOutputStream
时抛出异常,则不会关闭fileInputStream
。
我可能可以使用Google Guava sources and sinks解决此问题,但是我想知道在Scala中是否有更好的方法可以实现这一点,而无需引入番石榴。理想情况下,使用标准库或scala-arm功能,甚至使用Cats
?
fileInputStream
和gcsOutputStream
定义为不带任何内容并返回流的函数?似乎到处都有() => InputStream
和() => OutputStream
的代码会更冗长?fileInputStream
和gcsOutputStream
,另一个用于定义每个子功能)?如果我这样做,那么“内部”输入流将被关闭两次是否不是问题?您可以这样重构:
首先,声明托管资源:
val fileInputStream: ManagedResource[InputStream] = managed(Files.newInputStream(localPath))
val gcsOutputStream: ManagedResource[OutputStream] = managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))
它不会打开这些资源,而只是声明您要管理这些资源。
然后您可以使用map
将它们包装在所需的装饰器中(例如ZipInputStream
):
if (destination.unzipGzip) {
for (input ← fileInputStream.map(s => new ZipInputStream(s));
output ← gcsOutputStream.map(s => new GZIPOutputStream(s))) {
ByteStreams.copy(input, output)
}
} else if (destination.decompressBzip2) {
for (input <- fileInputStream.map(s => new BZip2CompressorInputStream(s));
output <- gcsOutputStream) {
ByteStreams.copy(input, output)
}
} else {
for (input <- fileInputStream;
output <- gcsOutputStream) {
IOUtils.copy(input, output)
}
}
当然ManagedResource[A]
只是值,因此您甚至可以将其作为参数传递给方法:
private def unzipGzip(inputStream: Managed[InputStream], outputStream: Managed[OutputStream]): Unit = {
for (input ← inputStream.map(s => new ZipInputStream(s));
output ← outputStream.map(s => new GZIPOutputStream(s))) {
ByteStreams.copy(input, output)
}
}