Skip to content

[AURON #2366] fix: Handle Paimon metadata columns in V2 native scan#2367

Open
lyne7-sc wants to merge 3 commits into
apache:masterfrom
lyne7-sc:fix/paimon_meta
Open

[AURON #2366] fix: Handle Paimon metadata columns in V2 native scan#2367
lyne7-sc wants to merge 3 commits into
apache:masterfrom
lyne7-sc:fix/paimon_meta

Conversation

@lyne7-sc

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #2366

Rationale for this change

Paimon metadata columns are produced by the Paimon scan layer rather than stored as physical columns in data files. The Paimon V2 native scan was passing these columns to the native Parquet/ORC reader as file columns, which can return incorrect values.

For example:

create table paimon.db.t_metadata (id int, v string) using paimon;
insert into paimon.db.t_metadata values (1, 'a');
select id, __paimon_file_path from paimon.db.t_metadata;

The native path returned null for __paimon_file_path, while Spark/Paimon's scan path returns the actual file path.

What changes are included in this PR?

  • Recognize Paimon metadata columns using PaimonMetadataColumn.
  • Materialize supported file-level metadata columns (__paimon_file_path, __paimon_bucket) as per-file constants.
  • Keep unsupported Paimon metadata columns on Spark/Paimon's scan path instead of reading them from Parquet/ORC files.
  • Cover metadata columns both with and without table partition columns.

Are there any user-facing changes?

No API changes. This is a correctness fix for Paimon V2 native scan.

How was this patch tested?

Adds Paimon V2 integration tests

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot was unable to review this pull request because the user who requested the review has reached their quota limit.

@SteNicholas SteNicholas left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lyne7-sc, thanks for the fix! The overall approach is sound: materialize __paimon_file_path/__paimon_bucket as per-file constants via partitionSchema, and fall back to Spark for unsupported metadata columns. The functional Test Paimon 1.2 CI job (which runs the new integration tests) is green.


private def isPaimonMetadataColumn(name: String): Boolean = {
containsName(PaimonMetadataColumns, name) ||
name.toLowerCase(Locale.ROOT).startsWith(PaimonMetadataColumnPrefix)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style / CI blocker. spotless scalafmt rejects this line — the || continuation should be indented 4 spaces, not 6. This is one of the two violations turning the Style job red (-······name.toLowerCase+····name.toLowerCase). mvn spotless:apply fixes it:

  private def isPaimonMetadataColumn(name: String): Boolean = {
    containsName(PaimonMetadataColumns, name) ||
    name.toLowerCase(Locale.ROOT).startsWith(PaimonMetadataColumnPrefix)
  }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. Fixed by running Spotless.

assert(df.collect().length === 1)
}
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style / CI blocker. This blank line has trailing whitespace (8 spaces), which spotless rejects (-········+). It is the second cause of the failing Style job. mvn spotless:apply removes it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the trailing whitespace, and the ci is green now.

}
split.dataFiles().asScala.map { dataFile =>
val filePath = s"${split.bucketPath()}/${dataFile.fileName()}"
val partitionValues = if (partitionSchema.isEmpty) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Efficiency: partitionValues is now computed inside split.dataFiles().map, so partitionConverter.convert(split.partition()), indexByName, and the per-field DataConverter.fromPaimon conversions all run once per data file — even though everything except __paimon_file_path is constant across the files of a split (split.partition() and split.bucket() are split-level). For a split with N data files this rebuilds the whole partition row N times. Consider computing the split-invariant portion once per split and only filling the per-file file_path slot inside the loop.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Addressed by computing the split-invariant partition/metadata values once per split, and only filling the per-file __paimon_file_path inside the data-file loop.

def isPartitionValueField(name: String): Boolean =
containsName(partitionKeys, name) || isSupportedMetadataColumn(name)
val partitionFields = readSchema.fields.filter(f => isPartitionValueField(f.name))
val fileFields = readSchema.fields.filterNot(f => isPartitionValueField(f.name))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coverage gap worth a test: when only metadata columns are projected from a non-partitioned table (e.g. select __paimon_file_path from t), every field is classified as a partition/metadata constant, so fileFields is empty and fileSchema is empty. The native Parquet/ORC scan is then asked to read zero data columns but must still emit one row per record so the constant columns get the right cardinality. All three new tests also select id, so the empty-fileSchema path is never exercised (and there's no existing partition-only/count(*) test either). Please add a metadata-only projection test on a multi-row non-partitioned table to confirm the row count is correct on this path.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Added a regression test for metadata-only projection on a multi-row non-partitioned table.

val partitionFields = readSchema.fields.filter(f => containsName(partitionKeys, f.name))
val fileFields = readSchema.fields.filterNot(f => containsName(partitionKeys, f.name))
def isPartitionValueField(name: String): Boolean =
containsName(partitionKeys, name) || isSupportedMetadataColumn(name)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor / edge case: classification here is purely by name (resolver against __paimon_file_path/__paimon_bucket). Paimon's schema validation reserves only the _KEY_ prefix and the core system field names — not the __paimon_ prefix — so a user could in principle define a real physical column named __paimon_bucket. It would then be treated as a per-file constant and return split.bucket() instead of the stored value (a silent wrong result rather than a fallback). Very unlikely in practice, but flagging it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed by making physical table columns take precedence over metadata name matching. A real column named
__paimon_bucket now stays in fileSchema and is read from the data file. Added a regression test for this case.

s"plan should use native paimon scan:\n$plan")
}

private def checkSparkAnswerAndNativePaimonScan(sqlText: String): DataFrame = {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor test cleanup: the DataFrame return value is ignored by both callers (and the third metadata test doesn't use this helper), so Unit would be clearer. Also var expected: Seq[Row] = Nil reassigned inside withSQLConf can be a val, since withSQLConf returns its block value:

val expected = withSQLConf("spark.auron.enable.paimon.scan" -> "false") {
  sql(sqlText).collect().toSeq
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the helper to return Unit.

But I kept the existing var expected pattern because val expected = withSQLConf { ... } does not compile on Spark 3.x: withSQLConf returns Unit there.

@SteNicholas SteNicholas self-assigned this Jun 28, 2026
@lyne7-sc

Copy link
Copy Markdown
Contributor Author

@SteNicholas Thanks for the careful review! Addressed the comments in the latest update, and the relevant ci is green now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Paimon V2 native scan does not handle metadata columns correctly

3 participants