-
Notifications
You must be signed in to change notification settings - Fork 1.5k
GH-3601: Avoid repeated created_by parsing in footer metadata conversion #3607
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -47,20 +47,25 @@ public class CorruptStatistics { | |
| private static final SemanticVersion CDH_5_PARQUET_251_FIXED_END = new SemanticVersion(1, 5, 0); | ||
|
|
||
| /** | ||
| * Decides if the statistics from a file created by createdBy (the created_by field from parquet format) | ||
| * should be ignored because they are potentially corrupt. | ||
| * Returns whether the given column type is one of the types affected by the PARQUET-251 bug | ||
| * (BINARY or FIXED_LEN_BYTE_ARRAY). | ||
| * | ||
| * @param createdBy the created-by string from a file footer | ||
| * @param columnType the type of the column that this is checking | ||
| * @return true if the statistics may be invalid and should be ignored, false otherwise | ||
| * @param columnType the primitive type of the column | ||
| * @return true if this column type could have corrupt statistics | ||
| */ | ||
| public static boolean shouldIgnoreStatistics(String createdBy, PrimitiveTypeName columnType) { | ||
|
|
||
| if (columnType != PrimitiveTypeName.BINARY && columnType != PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) { | ||
| // the bug only applies to binary columns | ||
| return false; | ||
| } | ||
| public static boolean isCorruptStatisticsColumnType(PrimitiveTypeName columnType) { | ||
| return columnType == PrimitiveTypeName.BINARY || columnType == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; | ||
| } | ||
|
|
||
| /** | ||
| * Determines whether a file (identified by its created_by string) was written by a version of | ||
| * parquet-mr that had the PARQUET-251 statistics bug. This is a file-level check that does not | ||
| * consider column type. | ||
| * | ||
| * @param createdBy the created-by string from a file footer | ||
| * @return true if the file was written by a version with the corrupt statistics bug | ||
| */ | ||
| public static boolean fileHasCorruptStatistics(String createdBy) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rename it to |
||
| if (Strings.isNullOrEmpty(createdBy)) { | ||
| // created_by is not populated, which could have been caused by | ||
| // parquet-mr during the same time as PARQUET-251, see PARQUET-297 | ||
|
|
@@ -103,6 +108,22 @@ public static boolean shouldIgnoreStatistics(String createdBy, PrimitiveTypeName | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Decides if the statistics from a file created by createdBy (the created_by field from parquet format) | ||
| * should be ignored because they are potentially corrupt. | ||
| * | ||
| * @param createdBy the created-by string from a file footer | ||
| * @param columnType the type of the column that this is checking | ||
| * @return true if the statistics may be invalid and should be ignored, false otherwise | ||
| */ | ||
| public static boolean shouldIgnoreStatistics(String createdBy, PrimitiveTypeName columnType) { | ||
| if (!isCorruptStatisticsColumnType(columnType)) { | ||
| // the bug only applies to binary columns | ||
| return false; | ||
| } | ||
| return fileHasCorruptStatistics(createdBy); | ||
| } | ||
|
|
||
| private static void warnParseErrorOnce(String createdBy, Throwable e) { | ||
| if (!alreadyLogged.getAndSet(true)) { | ||
| LOG.warn("Ignoring statistics because created_by could not be parsed (see PARQUET-251): " + createdBy, e); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -47,6 +47,7 @@ | |
| import org.apache.parquet.CorruptStatistics; | ||
| import org.apache.parquet.ParquetReadOptions; | ||
| import org.apache.parquet.Preconditions; | ||
| import org.apache.parquet.column.ColumnDescriptor; | ||
| import org.apache.parquet.column.EncodingStats; | ||
| import org.apache.parquet.column.ParquetProperties; | ||
| import org.apache.parquet.column.statistics.BinaryStatistics; | ||
|
|
@@ -926,6 +927,13 @@ public static org.apache.parquet.column.statistics.Statistics fromParquetStatist | |
| // Visible for testing | ||
| static org.apache.parquet.column.statistics.Statistics fromParquetStatisticsInternal( | ||
| String createdBy, Statistics formatStats, PrimitiveType type, SortOrder typeSortOrder) { | ||
| boolean fileHasCorruptStats = CorruptStatistics.fileHasCorruptStatistics(createdBy); | ||
| return fromParquetStatisticsInternal(formatStats, type, typeSortOrder, fileHasCorruptStats); | ||
| } | ||
|
|
||
| // Core implementation using pre-computed file-level corrupt stats flag | ||
| static org.apache.parquet.column.statistics.Statistics fromParquetStatisticsInternal( | ||
| Statistics formatStats, PrimitiveType type, SortOrder typeSortOrder, boolean fileHasCorruptStats) { | ||
| // create stats object based on the column type | ||
| org.apache.parquet.column.statistics.Statistics.Builder statsBuilder = | ||
| org.apache.parquet.column.statistics.Statistics.getBuilderForReading(type); | ||
|
|
@@ -948,8 +956,10 @@ static org.apache.parquet.column.statistics.Statistics fromParquetStatisticsInte | |
| // valid with the type's sort order. In previous releases, all stats were | ||
| // aggregated using a signed byte-wise ordering, which isn't valid for all the | ||
| // types (e.g. strings, decimals etc.). | ||
| if (!CorruptStatistics.shouldIgnoreStatistics(createdBy, type.getPrimitiveTypeName()) | ||
| && (sortOrdersMatch || maxEqualsMin)) { | ||
| // The fileHasCorruptStats flag applies only to BINARY and FIXED_LEN_BYTE_ARRAY columns. | ||
| boolean ignoreForThisColumn = | ||
| fileHasCorruptStats && CorruptStatistics.isCorruptStatisticsColumnType(type.getPrimitiveTypeName()); | ||
| if (!ignoreForThisColumn && (sortOrdersMatch || maxEqualsMin)) { | ||
| if (isSet) { | ||
| statsBuilder.withMin(formatStats.min.array()); | ||
| statsBuilder.withMax(formatStats.max.array()); | ||
|
|
@@ -1794,13 +1804,23 @@ public FileMetaDataAndRowGroupOffsetInfo visit(RangeMetadataFilter filter) throw | |
|
|
||
| public ColumnChunkMetaData buildColumnChunkMetaData( | ||
| ColumnMetaData metaData, ColumnPath columnPath, PrimitiveType type, String createdBy) { | ||
| boolean fileHasCorruptStats = CorruptStatistics.fileHasCorruptStatistics(createdBy); | ||
| return buildColumnChunkMetaData(metaData, columnPath, type, fileHasCorruptStats); | ||
| } | ||
|
|
||
| ColumnChunkMetaData buildColumnChunkMetaData( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
No need to pass createdBy downstream, the boolean is all the internal overload needs. SortOrder computation moves here since we bypass Also notice how the new public methods we extracted in
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
| ColumnMetaData metaData, | ||
| ColumnPath columnPath, | ||
| PrimitiveType type, | ||
| boolean fileHasCorruptStats) { | ||
| SortOrder typeSortOrder = overrideSortOrderToSigned(type) ? SortOrder.SIGNED : sortOrder(type); | ||
| return ColumnChunkMetaData.get( | ||
| columnPath, | ||
| type, | ||
| fromFormatCodec(metaData.codec), | ||
| convertEncodingStats(metaData.getEncoding_stats()), | ||
| fromFormatEncodings(metaData.encodings), | ||
| fromParquetStatistics(createdBy, metaData.statistics, type), | ||
| fromParquetStatisticsInternal(metaData.statistics, type, typeSortOrder, fileHasCorruptStats), | ||
| metaData.data_page_offset, | ||
| metaData.dictionary_page_offset, | ||
| metaData.num_values, | ||
|
|
@@ -1829,6 +1849,11 @@ public ParquetMetadata fromParquetMetadata( | |
| MessageType messageType = fromParquetSchema(parquetMetadata.getSchema(), parquetMetadata.getColumn_orders()); | ||
| List<BlockMetaData> blocks = new ArrayList<BlockMetaData>(); | ||
| List<RowGroup> row_groups = parquetMetadata.getRow_groups(); | ||
| // Compute once per file: whether this file was written by a version with the PARQUET-251 bug. | ||
| // Only parse created_by if the schema has columns affected by the bug (BINARY/FIXED_LEN_BYTE_ARRAY). | ||
| // The per-column type check is applied later when statistics are actually read. | ||
| boolean fileHasCorruptStats = schemaHasCorruptStatisticsColumnType(messageType) | ||
| && CorruptStatistics.fileHasCorruptStatistics(parquetMetadata.getCreated_by()); | ||
|
|
||
| if (row_groups != null) { | ||
| for (RowGroup rowGroup : row_groups) { | ||
|
|
@@ -1909,7 +1934,7 @@ public ParquetMetadata fromParquetMetadata( | |
| metaData, | ||
| columnPath, | ||
| messageType.getType(columnPath.toArray()).asPrimitiveType(), | ||
| createdBy); | ||
| fileHasCorruptStats); | ||
| column.setRowGroupOrdinal(rowGroup.getOrdinal()); | ||
| if (metaData.isSetBloom_filter_offset()) { | ||
| column.setBloomFilterOffset(metaData.getBloom_filter_offset()); | ||
|
|
@@ -1988,6 +2013,18 @@ private static ColumnPath getPath(ColumnMetaData metaData) { | |
| return ColumnPath.get(path); | ||
| } | ||
|
|
||
| /** | ||
| * Returns true if the schema contains at least one column with a type affected by the PARQUET-251 bug. | ||
| */ | ||
| private static boolean schemaHasCorruptStatisticsColumnType(MessageType schema) { | ||
| for (ColumnDescriptor column : schema.getColumns()) { | ||
| if (CorruptStatistics.isCorruptStatisticsColumnType(column.getPrimitiveType().getPrimitiveTypeName())) { | ||
| return true; | ||
| } | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| // Visible for testing | ||
| MessageType fromParquetSchema(List<SchemaElement> schema, List<ColumnOrder> columnOrders) { | ||
| Iterator<SchemaElement> iterator = schema.iterator(); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO, we need to be cautious when adding a new public method because we have to maintain it forever. Can we remove it from here and just let downstream where calls it to use a private method instead?