From 8c7db31c47bce0701c047c7be22543f6a7e1dee9 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Sun, 28 Jun 2026 19:56:09 +0900 Subject: [PATCH] Support zstd blob decompression in Puffin --- pyiceberg/table/puffin.py | 15 +++- .../puffin/v1/empty-puffin-uncompressed.bin | Bin 0 -> 32 bytes .../v1/sample-metric-data-compressed-zstd.bin | Bin 0 -> 417 bytes .../v1/sample-metric-data-uncompressed.bin | Bin 0 -> 355 bytes tests/table/test_puffin.py | 81 ++++++++++++++++++ 5 files changed, 92 insertions(+), 4 deletions(-) create mode 100644 tests/table/puffin/v1/empty-puffin-uncompressed.bin create mode 100644 tests/table/puffin/v1/sample-metric-data-compressed-zstd.bin create mode 100644 tests/table/puffin/v1/sample-metric-data-uncompressed.bin create mode 100644 tests/table/test_puffin.py diff --git a/pyiceberg/table/puffin.py b/pyiceberg/table/puffin.py index 571687bb3f..56fa2f13b1 100644 --- a/pyiceberg/table/puffin.py +++ b/pyiceberg/table/puffin.py @@ -14,8 +14,10 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from typing import TYPE_CHECKING, Literal +import io +from typing import TYPE_CHECKING +import zstandard from pydantic import Field from pyiceberg.typedef import IcebergBaseModel @@ -29,7 +31,7 @@ class PuffinBlobMetadata(IcebergBaseModel): - type: Literal["deletion-vector-v1"] = Field() + type: str = Field() fields: list[int] = Field() snapshot_id: int = Field(alias="snapshot-id") sequence_number: int = Field(alias="sequence-number") @@ -65,10 +67,15 @@ def __init__(self, puffin: bytes) -> None: footer_payload_size_int = int.from_bytes(puffin[-12:-8], byteorder="little") self.footer = Footer.model_validate_json(puffin[-(footer_payload_size_int + 12) : -12]) - self._payload = puffin[8:] + self._payload = puffin def get_blob_payload(self, blob: PuffinBlobMetadata) -> bytes: - return self._payload[blob.offset : blob.offset + blob.length] + raw = self._payload[blob.offset : blob.offset + blob.length] + if blob.compression_codec is None: + return raw + if blob.compression_codec == "zstd": + return zstandard.ZstdDecompressor().stream_reader(io.BytesIO(raw)).read() + raise ValueError(f"Unsupported compression codec: {blob.compression_codec!r}") @deprecated(deprecated_in="0.12.0", removed_in="0.13.0", help_message="Use deletion_vectors_from_puffin_file(...) instead") def to_vector(self) -> dict[str, "pa.ChunkedArray"]: diff --git a/tests/table/puffin/v1/empty-puffin-uncompressed.bin b/tests/table/puffin/v1/empty-puffin-uncompressed.bin new file mode 100644 index 0000000000000000000000000000000000000000..142b45bd4ebe0b865064ef874325ff1c94399bb1 GIT binary patch literal 32 fcmWG=b2JP9;%cR&ocyF>C9CMzS{?=n0Eq(tjVlLb literal 0 HcmV?d00001 diff --git a/tests/table/puffin/v1/sample-metric-data-compressed-zstd.bin b/tests/table/puffin/v1/sample-metric-data-compressed-zstd.bin new file mode 100644 index 0000000000000000000000000000000000000000..ac8b69c76e5770823137232c7b84d8a9e4903977 GIT binary patch literal 417 zcmb7=u}Z{15Qg^@3u|9s87)K-B=PVdjo1j+9cU*fY<4HfLb4lo=Y$xpw6(IgvA43g zSMU||A$$SB%{3x|;B@oP&;QNL?Cdnze>wcz+nzEea;dN=E4_2IdRwdKTN~_Q)7u7l zUfY~Ao@*mq$CV(#KOUc+IaPwV_S{(FH|V`riUTILw4B% zbuB^$LvTE(5J91_R>L%zN8pWUePt=u3bHixc)dU)F*b`PM+aFFfh`J;1lc%(8cj)6 za0aWiP3zEmZA~n#LK!%>o)j#jpIl27x?lA4pY-(j8$X%+g%824vn@K(81C#rIuwK| X&bzSU1$yF1dNhXR@?gZ)HGJ(0r0$BX literal 0 HcmV?d00001 diff --git a/tests/table/puffin/v1/sample-metric-data-uncompressed.bin b/tests/table/puffin/v1/sample-metric-data-uncompressed.bin new file mode 100644 index 0000000000000000000000000000000000000000..ab8da13822c55c573b8ba925eb9ebbc33cd87f28 GIT binary patch literal 355 zcmb7PY+KhqS$DC(-` zheOd{v1(>i#}#>*2^CgSQ@bc|@HE*vl`jHw&~tW?8*fpyrKZ<2g`S#lJ{d}=q`)`~ znHbex;6!0$hw6S4KeZz}O1}v0KMAt?M%;B#;DY2*W@QQsR&14(16i?5T8D!h- Maoaw2Us3IU0?}=DoB#j- literal 0 HcmV?d00001 diff --git a/tests/table/test_puffin.py b/tests/table/test_puffin.py new file mode 100644 index 0000000000..93b16158bb --- /dev/null +++ b/tests/table/test_puffin.py @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from os import path + +from pyiceberg.table.puffin import PuffinFile + + +def _open_file(file: str) -> bytes: + cur_dir = path.dirname(path.realpath(__file__)) + with open(f"{cur_dir}/puffin/{file}", "rb") as f: + return f.read() + + +def test_read_empty_uncompressed() -> None: + puffin_bytes = _open_file("v1/empty-puffin-uncompressed.bin") + pf = PuffinFile(puffin_bytes) + + assert pf.footer.blobs == [] + assert pf.footer.properties == {} + + +def test_read_compressed_zstd() -> None: + puffin_bytes = _open_file("v1/sample-metric-data-compressed-zstd.bin") + pf = PuffinFile(puffin_bytes) + + assert pf.footer.properties == {"created-by": "Test 1234"} + assert len(pf.footer.blobs) == 2 + + blob1 = pf.footer.blobs[0] + assert blob1.type == "some-blob" + assert blob1.fields == [1] + assert blob1.snapshot_id == 2 + assert blob1.sequence_number == 1 + assert blob1.compression_codec == "zstd" + assert pf.get_blob_payload(blob1) == b"abcdefghi" + + blob2 = pf.footer.blobs[1] + assert blob2.type == "some-other-blob" + assert blob2.fields == [2] + assert blob2.compression_codec == "zstd" + assert pf.get_blob_payload(blob2) == ( + b"some blob \x00 binary data \xf0\x9f\xa4\xaf that is not very very very very very very long, is it?" + ) + + +def test_read_two_blobs_uncompressed() -> None: + puffin_bytes = _open_file("v1/sample-metric-data-uncompressed.bin") + pf = PuffinFile(puffin_bytes) + + assert pf.footer.properties == {"created-by": "Test 1234"} + assert len(pf.footer.blobs) == 2 + + blob1 = pf.footer.blobs[0] + assert blob1.type == "some-blob" + assert blob1.fields == [1] + assert blob1.snapshot_id == 2 + assert blob1.sequence_number == 1 + assert blob1.compression_codec is None + assert pf.get_blob_payload(blob1) == b"abcdefghi" + + blob2 = pf.footer.blobs[1] + assert blob2.type == "some-other-blob" + assert blob2.fields == [2] + assert blob2.compression_codec is None + assert pf.get_blob_payload(blob2) == ( + b"some blob \x00 binary data \xf0\x9f\xa4\xaf that is not very very very very very very long, is it?" + )