@@ -40,14 +40,14 @@ import org.apache.spark.util.ThreadUtils
4040abstract class ParquetFileSplitter {
4141 def buildSplitter (filters : Seq [Filter ]): (FileStatus => Seq [FileSplit ])
4242
43- def singleFileSplit (stat : FileStatus ): Seq [FileSplit ] = {
44- Seq (new FileSplit (stat.getPath , 0 , stat.getLen , Array .empty))
43+ def singleFileSplit (path : Path , length : Long ): Seq [FileSplit ] = {
44+ Seq (new FileSplit (path , 0 , length , Array .empty))
4545 }
4646}
4747
4848object ParquetDefaultFileSplitter extends ParquetFileSplitter {
4949 override def buildSplitter (filters : Seq [Filter ]): (FileStatus => Seq [FileSplit ]) = {
50- stat => singleFileSplit(stat)
50+ stat => singleFileSplit(stat.getPath, stat.getLen )
5151 }
5252}
5353
@@ -82,18 +82,20 @@ class ParquetMetadataFileSplitter(
8282 (applied, unapplied, filteredBlocks)
8383 }
8484
85+ // Group eligible splits by file Path.
8586 val eligible = applyParquetFilter(unapplied, filteredBlocks).map { bmd =>
8687 val blockPath = new Path (root, bmd.getPath)
8788 new FileSplit (blockPath, bmd.getStartingPos, bmd.getCompressedSize, Array .empty)
88- }
89+ }.groupBy(_.getPath)
8990
9091 val statFilter : (FileStatus => Seq [FileSplit ]) = { stat =>
91- if (referencedFiles.contains(stat.getPath)) {
92- eligible.filter(_.getPath == stat.getPath)
92+ val filePath = stat.getPath
93+ if (referencedFiles.contains(filePath)) {
94+ eligible.getOrElse(filePath, Nil )
9395 } else {
9496 log.warn(s " Found _metadata file for $root, " +
95- s " but no entries for blocks in ${stat.getPath }. Retaining whole file. " )
96- singleFileSplit(stat)
97+ s " but no entries for blocks in ${filePath }. Retaining whole file. " )
98+ singleFileSplit(filePath, stat.getLen )
9799 }
98100 }
99101 statFilter
0 commit comments