[AURON #2320] Fix PB deserialization bug & improve PB parsing performance#2339
[AURON #2320] Fix PB deserialization bug & improve PB parsing performance#2339Tartarus0zm wants to merge 4 commits into
Conversation
There was a problem hiding this comment.
There are 2 critical correctness bugs that will cause panics in production with common protobuf schemas, plus 2 moderate issues. Cannot approve until the critical ones are addressed.
Critical 1: O3 bitmap optimization skips ensure_size, breaking List/Map/Struct builders
Location: Main deserialization loop, the if seen_tags.count_ones() < total_handlers guard
Problem: ensure_size(row_idx + 1) is the only mechanism that calls SharedListArrayBuilder::append(true) / SharedMapArrayBuilder::append(true) / SharedStructArrayBuilder::append(false) to finalize these builders' per-row offset/null-buffer entries. The individual value handlers (impl_for_repeated_builder, impl_for_message_builder) only append to the child values builder -- they never call the parent's append().
When all tags are present in a row (seen_tags.count_ones() == total_handlers), ensure_size is skipped entirely. List/Map/Struct builders never get their slot finalized for that row.
Reproduction: Any schema with a repeated field where every message populates all fields. After processing N rows, scalar builders have len = N, but SharedListArrayBuilder has len = 0 (no append(true) was ever called). At finish() time, the offsets array is empty while values exist -- corrupt ListArray or panic.
Fix suggestion: The optimization assumption ("if all handlers fired, all builders are correctly sized") is incorrect for composite builders. Either:
- Remove the optimization, or
- After each handler fires, have composite-type handlers also call their own append() internally (i.e., move the list/struct finalization INTO the handler instead of relying on ensure_size)
Critical 2: Sub-message struct handler missing sub_ensure_size in empty-buf branch
Location: Struct handler's if buf.is_empty() branch where it only calls struct_builder.get_mut().append(false)
Problem: When a struct field is present on the wire with zero-length content, the handler calls struct_builder.append(false) (advancing the struct's null buffer to len N) but does NOT call sub_ensure_size to pad the struct's child builders. Combined with Critical 1 (top-level ensure_size may be skipped), the struct builder's null_buffer_builder.len() diverges from its children's lengths.
Reproduction: Messages where a struct field is always present but empty (tag + length 0), with all other fields also present. StructArray::new() panics: "all arrays in a StructArray must have the same length."
Fix: Call sub_ensure_size before struct_builder.append(false) in the empty branch, same as the non-empty branch.
Moderate: log::info with DEBUG prefix left in production code
Location: transfer_output_schema_to_pb_schema function
A debug log statement at INFO level will pollute production logs on every schema initialization. Should be log::debug! or removed.
Moderate: unsafe from_utf8_unchecked on raw wire bytes
Location: String field handlers (impl_for_bytes_builder usage)
While proto3 specifies string fields must be UTF-8, producers may be non-conformant. The unsafe trades a validity check for throughput, but creates undefined behavior (not just wrong output) on malformed input. Consider adding at minimum a debug_assert for UTF-8 validity.
The rest of the PR (error propagation replacing .expect(), skip_pb_value extraction, RefCell buffer reuse, ValueHandlers enum) looks good. The bug fixes (C1 unknown-tag skip, C3 error propagation) are valuable improvements. Just the O3 ensure_size interaction needs rethinking.
|
@SteNicholas thanks for your review! I've fixed the issue you mentioned. PTAL |
There was a problem hiding this comment.
@Tartarus0zm, thanks for updates — the boolean-default fix and the unknown-tag/skip_pb_value cursor-sync fix (C1) are real correctness improvements, and the C2 empty-struct child padding is a good catch. CI is green, but note the new unit tests always populate every field, so they don't exercise the input shape that breaks below.
The one must-fix before merge is the O10 optimization (inline comment on the short-circuit): it makes a whole-batch-absent column come out all-NULL, which silently re-introduces the exact #2320 boolean bug this PR fixes (and regresses int/string/float/binary/list/map the same way). Everything else is non-blocking — a cross-file side effect on the JSON deserializer worth confirming, a couple of behavior changes worth a conscious decision, the expect() panics the PR's own O7/C3 goal meant to remove, and minor cleanups.
Verified sound (did not flag): O3's ensure_size skip (with the C1 ensure_size_every_row guard), nested List/Map inside a top-level Struct, and C2 — all hold. from_utf8_unchecked is pre-existing (master already had it).
| // O10: if the (top-level) column was never written, the entire | ||
| // resulting array is null — skip the lazy bitmap scan entirely. | ||
| let top_idx = mapping[0]; | ||
| if mapping.len() == 1 && !top_builders_touched[top_idx] { |
There was a problem hiding this comment.
[Blocker] O10 returns an all-NULL column for any top-level field absent from the entire batch — this re-introduces the #2320 bug.
top_builders_touched[idx] is set only when a field's tag is seen in some row. But absent rows are padded by ensure_size with non-null defaults (ensure_output_array_builders_size: false/0/""/0.0/b"", and empty non-null list/map). So when a field is absent in every message of a batch:
- without O10 → builder is all-default,
null_count() != len(), cast path emits the defaults (correct); - with O10 →
touched == false→new_null_array→ all null (wrong).
Concretely: a top-level Boolean field never set in a batch yields all null instead of all false — exactly the bug this PR fixes, just for the all-absent case. Same regression for Int/UInt/Float/String/Binary (→ should be 0/""/0.0/b"") and List/Map (→ should be empty non-null). It's also internally inconsistent: present-in-≥1-row → default, absent-in-all → null; and a nested field (mapping.len() > 1, O10 skipped) → default while the top-level field → null.
Suggest dropping O10 entirely: the existing array_ref.null_count() == array_ref.len() path (line ~209) already handles genuinely-all-null columns, so O10 adds no correctness-safe benefit. Removing it also lets you delete tag_to_top_idx_vec/tag_to_top_idx_map (lines 94-95) and the per-batch top_builders_touched allocation (line 152), which exist only to feed this short-circuit.
| Ok(match builder_type { | ||
| BuilderType::Boolean => { | ||
| impl_for_builders!(BooleanBuilder, builders, |b| b.append_null()) | ||
| impl_for_builders!(BooleanBuilder, builders, |b| b.append_value(false)) |
There was a problem hiding this comment.
This boolean padding change (append_null() → append_value(false)) is correct for the proto path, but ensure_output_array_builders_size is pub(crate) and also used by the JSON deserializer (json_deserializer.rs:133). JSON's own boolean handler appends null for an explicit JSON null (json_deserializer.rs:335), so after this change a JSON record that omits a boolean field → false, while an explicit null → null — a new absent-vs-null inconsistency in the JSON path that isn't mentioned or tested. Please confirm this side effect on the JSON deserializer is intended (and ideally add a JSON test pinning the behavior).
| })?; | ||
| if let Some(sub_value_handler) = sub_value_handlers.get(&sub_tag) { | ||
| // O7/C3 fix: propagate error instead of expect() | ||
| (*sub_value_handler)(&mut sub_cursor, sub_tag, sub_wire_type)?; |
There was a problem hiding this comment.
Behavior change worth a conscious decision: the old sub-message loops used if let Ok((tag, wt)) = decode_key { ... }, silently tolerating a malformed/truncated nested message. Propagating with ? here (and in the list/map twins at ~1584 and ~1660) removes a possible spin on a non-advancing decode error (good), but now a single bad nested record fails the entire parse_messages_with_kafka_meta batch instead of being skipped — which could stall a stream on one corrupt message. If best-effort tolerance was intentional upstream, consider skipping the offending sub-message instead of failing the batch.
| nested_msg_mapping, | ||
| &skip_fields, | ||
| ) | ||
| .expect("Failed to transfer output schema to pb schema"); |
There was a problem hiding this comment.
This .expect(...) (and the recursive one at line 465) still panics, which contradicts this PR's own O7/C3 goal of removing expect() so JNI callers don't abort the JVM via SIGABRT. Both the enclosing fn and the callee return Result, so this can just be ?. (Init-path, so lower severity than the per-row paths you already fixed — but it's the same crash mode.)
| // upstream message could surface invalid UTF-8 in the | ||
| // resulting StringArray (downstream Arrow consumers | ||
| // typically tolerate this). | ||
| debug_assert!( |
There was a problem hiding this comment.
Note (pre-existing, not introduced here): str::from_utf8_unchecked on the proto string bytes a few lines down is UB if the payload isn't valid UTF-8, and this debug_assert! is compiled out in release builds — so corrupt/untrusted Kafka input can construct an invalid &str and a StringArray that violates Arrow's UTF-8 invariant. Since you're touching these sites, it'd be worth replacing the unchecked with a checked from_utf8 (error or lossy) on the release path rather than only asserting in debug. The added comment rationalizes the unchecked call as safe, which isn't true for non-conformant producers.
| nested_msg_mapping: &HashMap<String, String>, | ||
| skip_fields: &[String], | ||
| ) -> Result<SchemaRef> { | ||
| log::debug!( |
There was a problem hiding this comment.
Leftover dev-tracing log (was log::info!("[DEBUG] …"), now log::debug!). It dumps internal mapping state and re-derives a Vec of all field names per recursive nested level. Suggest removing it rather than just downgrading the level.
| } | ||
| } | ||
|
|
||
| fn len(&self) -> usize { |
There was a problem hiding this comment.
Minor efficiency: the Vec variant of len() is an O(max_tag) scan (v.iter().filter(|h| h.is_some()).count()), and it's called once per batch as total_handlers (line 147) to recompute a value that's constant for the deserializer's lifetime. Precompute the handler count once in from_map/new() and store it as a field for an O(1) read.
| get_content_after_last_dot(enum_value_descriptor.name()).to_string(), | ||
| ); | ||
| } | ||
| let enum_string_mapping = Arc::new(enum_string_mapping); |
There was a problem hiding this comment.
The O8 comment claims this Arc<HashMap> is shared across multiple handlers, but enum_string_mapping is built fresh inside each create_value_handler call (per field) and the Arc is only cloned into that one field's closure — never shared across handlers. So the Arc adds an atomic refcount for no sharing benefit (a plain owned HashMap moved into the closure is equivalent), and the three let mapping = enum_string_mapping; rebindings in the mutually-exclusive branches are redundant. Either drop the Arc + rebindings, or actually hoist/share the map across fields of the same enum type if that was the intent.
| struct_builder.get_mut().append(false); | ||
| } else { | ||
| let mut sub_cursor = Cursor::new(buf); | ||
| while sub_cursor.has_remaining() { |
There was a problem hiding this comment.
Maintainability: this sub-message decode loop is duplicated almost verbatim three times (struct here, list-of-struct ~1583, map ~1660), as is the preceding for field in sub_message_descriptor.fields() { create_value_handler(...) } builder loop. The C1 "skip unknown sub-tags" and O7 error-propagation fixes each had to be applied in all three; a shared helper (e.g. decode_sub_message(buf, &sub_value_handlers)) would collapse them and prevent future drift.
Which issue does this PR close?
Closes #2320
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?
How was this patch tested?