如何直接与星火编辑HBase的HFILE没有HBase的API

问题描述 投票:0回答:1

我需要大批量编辑HBase的数据,编辑于各行的特定单元格的内容。通过HBase的传递PUT / GET的API是不是一种选择,因为这将是极其缓慢。我想成立一​​个星火任务,HBase的HFILE装入正确定义的话语结构,让我编辑在特定列中的数据,然后将数据保存回HDFS,保持HFILE格式。

我发现如何从散装星火写HFILE到HDFS一些指南,但是,我不知道如何从HDFS获取数据。哪一种数据帧/ RDD的最适合这样的任务?

谢谢

apache-spark hbase
1个回答
0
投票

在回答自己的情况下,其他人都需要这个。

它可以从HBase的快照加载HFiles。执行以下过程:(在HBase的壳)1.禁用“名称空间:表” 2.快照“名称空间:表”“your_snapshot”

这将创建一个访问快照可以在/[HBase_path]/.snapshot/[your_snapshot访问]

加载一个快照作为RDD [ImmutableBytesWritable,结果]

  def loadFromSnapshot(sc: SparkContext): RDD[ImmutableBytesWritable, Result] = {

val restorePath =
  new Path(s"hdfs://$storageDirectory/$restoreDirectory/$snapshotName")
val restorePathString = restorePath.toString

// create hbase conf starting from spark's hadoop conf
val hConf = HBaseConfiguration.create()
val hadoopConf = sc.hadoopConfiguration
HBaseConfiguration.merge(hConf, hadoopConf)

// point HBase root dir to snapshot dir
hConf.set("hbase.rootdir",
  s"hdfs://$storageDirectory/$snapshotDirectory/$snapshotName/")

// point Hadoop to the bucket as default fs
hConf.set("fs.default.name", s"hdfs://$storageDirectory/")

// configure serializations
hConf.setStrings("io.serializations",
  hadoopConf.get("io.serializations"),
  classOf[MutationSerialization].getName,
  classOf[ResultSerialization].getName,
  classOf[KeyValueSerialization].getName)

// disable caches
hConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT)
hConf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0f)
hConf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY)

// configure TableSnapshotInputFormat
hConf.set("hbase.TableSnapshotInputFormat.snapshot.name", settingsAccessor.settings.snapshotName)
hConf.set("hbase.TableSnapshotInputFormat.restore.dir", restorePathString)

val scan = new Scan()     // Fake scan which is applied by spark on HFile. Bypass RPC
val scanString = {
  val proto = ProtobufUtil.toScan(scan)
  Base64.encodeBytes(proto.toByteArray)
}
hConf.set(TableInputFormat.SCAN, scanString)

val job = Job.getInstance(hConf)

TableSnapshotInputFormat.setInput(job, settingsAccessor.settings.snapshotName, restorePath)

// create RDD
sc.newAPIHadoopRDD(job.getConfiguration,
  classOf[TableSnapshotInputFormat],
  classOf[ImmutableBytesWritable],
  classOf[Result])
}

这将从快照目录加载HFILE并将适用于他们的“假”全面扫描,从而避免了缓慢的远程过程调用,但允许有一个扫描相同的输出。

当你做,你可以重新启用自己的表

  • 启用“nasmespace:表”您也可以选择删除快照(不会将数据实际被删除)
  • delete_snapshot 'your_snapshot'
© www.soinside.com 2019 - 2024. All rights reserved.