Warm tip: This article is reproduced from serverfault.com, please click

apache spark-如何在没有** dbutils的情况下在Databricks dbfs中列出文件密钥

(apache spark - How to list file keys in Databricks dbfs **without** dbutils)

发布于 2020-11-09 18:13:40

显然dbutils不能在cmd行的spark-submit中使用,你必须为此使用Jar Jobs,但是由于其他要求,我必须使用spark-submit样式的工作,但是仍然需要列出和遍历dbfs中的文件密钥以决定将哪些文件用作流程的输入...

使用scala,可以使用spark或hadoop中的哪个lib检索dbfs:/filekeys特定模式的列表

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession

def ls(sparkSession: SparkSession, inputDir: String): Seq[String] = {
  println(s"FileUtils.ls path: $inputDir")
  val path = new Path(inputDir)
  val fs = path.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
  val fileStatuses = fs.listStatus(path)
  fileStatuses.filter(_.isFile).map(_.getPath).map(_.getName).toSeq
}

使用上面的代码,如果我输入了部分密钥前缀,例如dbfs:/mnt/path/to/folder在“文件夹”中存在以下密钥:

  • /mnt/path/to/folder/file1.csv
  • /mnt/path/to/folder/file2.csv

dbfs:/mnt/path/to/folder is not a directory击中时我明白val path = new Path(inputDir)

Questioner
Rimer
Viewed
11
Rimer 2020-11-30 23:51:19

需要使用SparkSession来做到这一点。

这是我们的操作方式:

import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SparkSession

def getFileSystem(sparkSession: SparkSession): FileSystem =
    FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)

def listContents(sparkSession: SparkSession, dir: String): Seq[String] = {
  getFileSystem(sparkSession).listStatus(new path(dir)).toSeq.map(_.getPath).map(_.getName)
}