Skip to content

[AURON #2320] Fix PB deserialization bug & improve PB parsing performance#2339

Open
Tartarus0zm wants to merge 4 commits into
apache:masterfrom
Tartarus0zm:pb-deserializer-opts
Open

[AURON #2320] Fix PB deserialization bug & improve PB parsing performance#2339
Tartarus0zm wants to merge 4 commits into
apache:masterfrom
Tartarus0zm:pb-deserializer-opts

Conversation

@Tartarus0zm

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #2320

Rationale for this change

  • Boolean type was not given a default value, causing incorrect data results.
  • Complex nested PB structures can encounter parsing errors.

What changes are included in this PR?

  • Fix boolean type was not given a default value, causing incorrect data results.
  • FIx complex nested PB structures can encounter parsing errors.
  • improve PB parsing performance

Are there any user-facing changes?

  • No

How was this patch tested?

  • No

@SteNicholas SteNicholas left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 critical correctness bugs that will cause panics in production with common protobuf schemas, plus 2 moderate issues. Cannot approve until the critical ones are addressed.


Critical 1: O3 bitmap optimization skips ensure_size, breaking List/Map/Struct builders

Location: Main deserialization loop, the if seen_tags.count_ones() < total_handlers guard

Problem: ensure_size(row_idx + 1) is the only mechanism that calls SharedListArrayBuilder::append(true) / SharedMapArrayBuilder::append(true) / SharedStructArrayBuilder::append(false) to finalize these builders' per-row offset/null-buffer entries. The individual value handlers (impl_for_repeated_builder, impl_for_message_builder) only append to the child values builder -- they never call the parent's append().

When all tags are present in a row (seen_tags.count_ones() == total_handlers), ensure_size is skipped entirely. List/Map/Struct builders never get their slot finalized for that row.

Reproduction: Any schema with a repeated field where every message populates all fields. After processing N rows, scalar builders have len = N, but SharedListArrayBuilder has len = 0 (no append(true) was ever called). At finish() time, the offsets array is empty while values exist -- corrupt ListArray or panic.

Fix suggestion: The optimization assumption ("if all handlers fired, all builders are correctly sized") is incorrect for composite builders. Either:

  • Remove the optimization, or
  • After each handler fires, have composite-type handlers also call their own append() internally (i.e., move the list/struct finalization INTO the handler instead of relying on ensure_size)

Critical 2: Sub-message struct handler missing sub_ensure_size in empty-buf branch

Location: Struct handler's if buf.is_empty() branch where it only calls struct_builder.get_mut().append(false)

Problem: When a struct field is present on the wire with zero-length content, the handler calls struct_builder.append(false) (advancing the struct's null buffer to len N) but does NOT call sub_ensure_size to pad the struct's child builders. Combined with Critical 1 (top-level ensure_size may be skipped), the struct builder's null_buffer_builder.len() diverges from its children's lengths.

Reproduction: Messages where a struct field is always present but empty (tag + length 0), with all other fields also present. StructArray::new() panics: "all arrays in a StructArray must have the same length."

Fix: Call sub_ensure_size before struct_builder.append(false) in the empty branch, same as the non-empty branch.


Moderate: log::info with DEBUG prefix left in production code

Location: transfer_output_schema_to_pb_schema function

A debug log statement at INFO level will pollute production logs on every schema initialization. Should be log::debug! or removed.


Moderate: unsafe from_utf8_unchecked on raw wire bytes

Location: String field handlers (impl_for_bytes_builder usage)

While proto3 specifies string fields must be UTF-8, producers may be non-conformant. The unsafe trades a validity check for throughput, but creates undefined behavior (not just wrong output) on malformed input. Consider adding at minimum a debug_assert for UTF-8 validity.


The rest of the PR (error propagation replacing .expect(), skip_pb_value extraction, RefCell buffer reuse, ValueHandlers enum) looks good. The bug fixes (C1 unknown-tag skip, C3 error propagation) are valuable improvements. Just the O3 ensure_size interaction needs rethinking.

@github-actions github-actions Bot removed the flink label Jun 24, 2026
@Tartarus0zm

Copy link
Copy Markdown
Contributor Author

@SteNicholas thanks for your review! I've fixed the issue you mentioned. PTAL

@Tartarus0zm Tartarus0zm requested a review from SteNicholas June 24, 2026 11:20

@SteNicholas SteNicholas left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Tartarus0zm, thanks for updates — the boolean-default fix and the unknown-tag/skip_pb_value cursor-sync fix (C1) are real correctness improvements, and the C2 empty-struct child padding is a good catch. CI is green, but note the new unit tests always populate every field, so they don't exercise the input shape that breaks below.

The one must-fix before merge is the O10 optimization (inline comment on the short-circuit): it makes a whole-batch-absent column come out all-NULL, which silently re-introduces the exact #2320 boolean bug this PR fixes (and regresses int/string/float/binary/list/map the same way). Everything else is non-blocking — a cross-file side effect on the JSON deserializer worth confirming, a couple of behavior changes worth a conscious decision, the expect() panics the PR's own O7/C3 goal meant to remove, and minor cleanups.

Verified sound (did not flag): O3's ensure_size skip (with the C1 ensure_size_every_row guard), nested List/Map inside a top-level Struct, and C2 — all hold. from_utf8_unchecked is pre-existing (master already had it).

// O10: if the (top-level) column was never written, the entire
// resulting array is null — skip the lazy bitmap scan entirely.
let top_idx = mapping[0];
if mapping.len() == 1 && !top_builders_touched[top_idx] {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Blocker] O10 returns an all-NULL column for any top-level field absent from the entire batch — this re-introduces the #2320 bug.

top_builders_touched[idx] is set only when a field's tag is seen in some row. But absent rows are padded by ensure_size with non-null defaults (ensure_output_array_builders_size: false/0/""/0.0/b"", and empty non-null list/map). So when a field is absent in every message of a batch:

  • without O10 → builder is all-default, null_count() != len(), cast path emits the defaults (correct);
  • with O10 → touched == falsenew_null_arrayall null (wrong).

Concretely: a top-level Boolean field never set in a batch yields all null instead of all false — exactly the bug this PR fixes, just for the all-absent case. Same regression for Int/UInt/Float/String/Binary (→ should be 0/""/0.0/b"") and List/Map (→ should be empty non-null). It's also internally inconsistent: present-in-≥1-row → default, absent-in-all → null; and a nested field (mapping.len() > 1, O10 skipped) → default while the top-level field → null.

Suggest dropping O10 entirely: the existing array_ref.null_count() == array_ref.len() path (line ~209) already handles genuinely-all-null columns, so O10 adds no correctness-safe benefit. Removing it also lets you delete tag_to_top_idx_vec/tag_to_top_idx_map (lines 94-95) and the per-batch top_builders_touched allocation (line 152), which exist only to feed this short-circuit.

Ok(match builder_type {
BuilderType::Boolean => {
impl_for_builders!(BooleanBuilder, builders, |b| b.append_null())
impl_for_builders!(BooleanBuilder, builders, |b| b.append_value(false))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This boolean padding change (append_null()append_value(false)) is correct for the proto path, but ensure_output_array_builders_size is pub(crate) and also used by the JSON deserializer (json_deserializer.rs:133). JSON's own boolean handler appends null for an explicit JSON null (json_deserializer.rs:335), so after this change a JSON record that omits a boolean field → false, while an explicit nullnull — a new absent-vs-null inconsistency in the JSON path that isn't mentioned or tested. Please confirm this side effect on the JSON deserializer is intended (and ideally add a JSON test pinning the behavior).

})?;
if let Some(sub_value_handler) = sub_value_handlers.get(&sub_tag) {
// O7/C3 fix: propagate error instead of expect()
(*sub_value_handler)(&mut sub_cursor, sub_tag, sub_wire_type)?;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Behavior change worth a conscious decision: the old sub-message loops used if let Ok((tag, wt)) = decode_key { ... }, silently tolerating a malformed/truncated nested message. Propagating with ? here (and in the list/map twins at ~1584 and ~1660) removes a possible spin on a non-advancing decode error (good), but now a single bad nested record fails the entire parse_messages_with_kafka_meta batch instead of being skipped — which could stall a stream on one corrupt message. If best-effort tolerance was intentional upstream, consider skipping the offending sub-message instead of failing the batch.

nested_msg_mapping,
&skip_fields,
)
.expect("Failed to transfer output schema to pb schema");

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This .expect(...) (and the recursive one at line 465) still panics, which contradicts this PR's own O7/C3 goal of removing expect() so JNI callers don't abort the JVM via SIGABRT. Both the enclosing fn and the callee return Result, so this can just be ?. (Init-path, so lower severity than the per-row paths you already fixed — but it's the same crash mode.)

// upstream message could surface invalid UTF-8 in the
// resulting StringArray (downstream Arrow consumers
// typically tolerate this).
debug_assert!(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note (pre-existing, not introduced here): str::from_utf8_unchecked on the proto string bytes a few lines down is UB if the payload isn't valid UTF-8, and this debug_assert! is compiled out in release builds — so corrupt/untrusted Kafka input can construct an invalid &str and a StringArray that violates Arrow's UTF-8 invariant. Since you're touching these sites, it'd be worth replacing the unchecked with a checked from_utf8 (error or lossy) on the release path rather than only asserting in debug. The added comment rationalizes the unchecked call as safe, which isn't true for non-conformant producers.

nested_msg_mapping: &HashMap<String, String>,
skip_fields: &[String],
) -> Result<SchemaRef> {
log::debug!(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leftover dev-tracing log (was log::info!("[DEBUG] …"), now log::debug!). It dumps internal mapping state and re-derives a Vec of all field names per recursive nested level. Suggest removing it rather than just downgrading the level.

}
}

fn len(&self) -> usize {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor efficiency: the Vec variant of len() is an O(max_tag) scan (v.iter().filter(|h| h.is_some()).count()), and it's called once per batch as total_handlers (line 147) to recompute a value that's constant for the deserializer's lifetime. Precompute the handler count once in from_map/new() and store it as a field for an O(1) read.

get_content_after_last_dot(enum_value_descriptor.name()).to_string(),
);
}
let enum_string_mapping = Arc::new(enum_string_mapping);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The O8 comment claims this Arc<HashMap> is shared across multiple handlers, but enum_string_mapping is built fresh inside each create_value_handler call (per field) and the Arc is only cloned into that one field's closure — never shared across handlers. So the Arc adds an atomic refcount for no sharing benefit (a plain owned HashMap moved into the closure is equivalent), and the three let mapping = enum_string_mapping; rebindings in the mutually-exclusive branches are redundant. Either drop the Arc + rebindings, or actually hoist/share the map across fields of the same enum type if that was the intent.

struct_builder.get_mut().append(false);
} else {
let mut sub_cursor = Cursor::new(buf);
while sub_cursor.has_remaining() {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maintainability: this sub-message decode loop is duplicated almost verbatim three times (struct here, list-of-struct ~1583, map ~1660), as is the preceding for field in sub_message_descriptor.fields() { create_value_handler(...) } builder loop. The C1 "skip unknown sub-tags" and O7 error-propagation fixes each had to be applied in all three; a shared helper (e.g. decode_sub_message(buf, &sub_value_handlers)) would collapse them and prevent future drift.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Fix row-column misalignment with complex structures in PB deserializer

3 participants