[AURON #2366] fix: Handle Paimon metadata columns in V2 native scan#2367
[AURON #2366] fix: Handle Paimon metadata columns in V2 native scan#2367lyne7-sc wants to merge 3 commits into
Conversation
There was a problem hiding this comment.
@lyne7-sc, thanks for the fix! The overall approach is sound: materialize __paimon_file_path/__paimon_bucket as per-file constants via partitionSchema, and fall back to Spark for unsupported metadata columns. The functional Test Paimon 1.2 CI job (which runs the new integration tests) is green.
|
|
||
| private def isPaimonMetadataColumn(name: String): Boolean = { | ||
| containsName(PaimonMetadataColumns, name) || | ||
| name.toLowerCase(Locale.ROOT).startsWith(PaimonMetadataColumnPrefix) |
There was a problem hiding this comment.
Style / CI blocker. spotless scalafmt rejects this line — the || continuation should be indented 4 spaces, not 6. This is one of the two violations turning the Style job red (-······name.toLowerCase → +····name.toLowerCase). mvn spotless:apply fixes it:
private def isPaimonMetadataColumn(name: String): Boolean = {
containsName(PaimonMetadataColumns, name) ||
name.toLowerCase(Locale.ROOT).startsWith(PaimonMetadataColumnPrefix)
}There was a problem hiding this comment.
Thanks for pointing this out. Fixed by running Spotless.
| assert(df.collect().length === 1) | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Style / CI blocker. This blank line has trailing whitespace (8 spaces), which spotless rejects (-········ → +). It is the second cause of the failing Style job. mvn spotless:apply removes it.
There was a problem hiding this comment.
Removed the trailing whitespace, and the ci is green now.
| } | ||
| split.dataFiles().asScala.map { dataFile => | ||
| val filePath = s"${split.bucketPath()}/${dataFile.fileName()}" | ||
| val partitionValues = if (partitionSchema.isEmpty) { |
There was a problem hiding this comment.
Efficiency: partitionValues is now computed inside split.dataFiles().map, so partitionConverter.convert(split.partition()), indexByName, and the per-field DataConverter.fromPaimon conversions all run once per data file — even though everything except __paimon_file_path is constant across the files of a split (split.partition() and split.bucket() are split-level). For a split with N data files this rebuilds the whole partition row N times. Consider computing the split-invariant portion once per split and only filling the per-file file_path slot inside the loop.
There was a problem hiding this comment.
Makes sense. Addressed by computing the split-invariant partition/metadata values once per split, and only filling the per-file __paimon_file_path inside the data-file loop.
| def isPartitionValueField(name: String): Boolean = | ||
| containsName(partitionKeys, name) || isSupportedMetadataColumn(name) | ||
| val partitionFields = readSchema.fields.filter(f => isPartitionValueField(f.name)) | ||
| val fileFields = readSchema.fields.filterNot(f => isPartitionValueField(f.name)) |
There was a problem hiding this comment.
Coverage gap worth a test: when only metadata columns are projected from a non-partitioned table (e.g. select __paimon_file_path from t), every field is classified as a partition/metadata constant, so fileFields is empty and fileSchema is empty. The native Parquet/ORC scan is then asked to read zero data columns but must still emit one row per record so the constant columns get the right cardinality. All three new tests also select id, so the empty-fileSchema path is never exercised (and there's no existing partition-only/count(*) test either). Please add a metadata-only projection test on a multi-row non-partitioned table to confirm the row count is correct on this path.
There was a problem hiding this comment.
Good point. Added a regression test for metadata-only projection on a multi-row non-partitioned table.
| val partitionFields = readSchema.fields.filter(f => containsName(partitionKeys, f.name)) | ||
| val fileFields = readSchema.fields.filterNot(f => containsName(partitionKeys, f.name)) | ||
| def isPartitionValueField(name: String): Boolean = | ||
| containsName(partitionKeys, name) || isSupportedMetadataColumn(name) |
There was a problem hiding this comment.
Minor / edge case: classification here is purely by name (resolver against __paimon_file_path/__paimon_bucket). Paimon's schema validation reserves only the _KEY_ prefix and the core system field names — not the __paimon_ prefix — so a user could in principle define a real physical column named __paimon_bucket. It would then be treated as a per-file constant and return split.bucket() instead of the stored value (a silent wrong result rather than a fallback). Very unlikely in practice, but flagging it.
There was a problem hiding this comment.
Fixed by making physical table columns take precedence over metadata name matching. A real column named
__paimon_bucket now stays in fileSchema and is read from the data file. Added a regression test for this case.
| s"plan should use native paimon scan:\n$plan") | ||
| } | ||
|
|
||
| private def checkSparkAnswerAndNativePaimonScan(sqlText: String): DataFrame = { |
There was a problem hiding this comment.
Minor test cleanup: the DataFrame return value is ignored by both callers (and the third metadata test doesn't use this helper), so Unit would be clearer. Also var expected: Seq[Row] = Nil reassigned inside withSQLConf can be a val, since withSQLConf returns its block value:
val expected = withSQLConf("spark.auron.enable.paimon.scan" -> "false") {
sql(sqlText).collect().toSeq
}There was a problem hiding this comment.
Updated the helper to return Unit.
But I kept the existing var expected pattern because val expected = withSQLConf { ... } does not compile on Spark 3.x: withSQLConf returns Unit there.
|
@SteNicholas Thanks for the careful review! Addressed the comments in the latest update, and the relevant ci is green now. |
Which issue does this PR close?
Closes #2366
Rationale for this change
Paimon metadata columns are produced by the Paimon scan layer rather than stored as physical columns in data files. The Paimon V2 native scan was passing these columns to the native Parquet/ORC reader as file columns, which can return incorrect values.
For example:
The native path returned null for
__paimon_file_path, while Spark/Paimon's scan path returns the actual file path.What changes are included in this PR?
Are there any user-facing changes?
No API changes. This is a correctness fix for Paimon V2 native scan.
How was this patch tested?
Adds Paimon V2 integration tests