Skip to content

Commit

Permalink
PARQUET-2363: ParquetRewriter encrypts the V2 page header(#1169)
Browse files Browse the repository at this point in the history
  • Loading branch information
ConeyLiu authored Oct 15, 2023
1 parent 04246b9 commit 5ee5133
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ public void startColumn(ColumnDescriptor descriptor,
}

/**
* writes a dictionary page page
* writes a dictionary page
* @param dictionaryPage the dictionary page
* @throws IOException if there is an error while writing
*/
Expand Down Expand Up @@ -677,14 +677,14 @@ private void innerWriteDataPage(
* @throws IOException if there is an error while writing
*/
public void writeDataPage(
int valueCount, int uncompressedPageSize,
BytesInput bytes,
Statistics<?> statistics,
Encoding rlEncoding,
Encoding dlEncoding,
Encoding valuesEncoding,
BlockCipher.Encryptor metadataBlockEncryptor,
byte[] pageHeaderAAD) throws IOException {
int valueCount, int uncompressedPageSize,
BytesInput bytes,
Statistics<?> statistics,
Encoding rlEncoding,
Encoding dlEncoding,
Encoding valuesEncoding,
BlockCipher.Encryptor metadataBlockEncryptor,
byte[] pageHeaderAAD) throws IOException {
state = state.write();
long beforeHeader = out.getPos();
if (currentChunkFirstDataPage < 0) {
Expand Down Expand Up @@ -749,6 +749,7 @@ public void addBloomFilter(String column, BloomFilter bloomFilter) {

/**
* Writes a single v2 data page
*
* @param rowCount count of rows
* @param nullCount count of nulls
* @param valueCount count of values
Expand All @@ -760,13 +761,58 @@ public void addBloomFilter(String column, BloomFilter bloomFilter) {
* @param statistics the statistics of the page
* @throws IOException if any I/O error occurs during writing the file
*/
public void writeDataPageV2(int rowCount, int nullCount, int valueCount,
BytesInput repetitionLevels,
BytesInput definitionLevels,
Encoding dataEncoding,
BytesInput compressedData,
int uncompressedDataSize,
Statistics<?> statistics) throws IOException {
public void writeDataPageV2(
int rowCount,
int nullCount,
int valueCount,
BytesInput repetitionLevels,
BytesInput definitionLevels,
Encoding dataEncoding,
BytesInput compressedData,
int uncompressedDataSize,
Statistics<?> statistics) throws IOException {
writeDataPageV2(
rowCount,
nullCount,
valueCount,
repetitionLevels,
definitionLevels,
dataEncoding,
compressedData,
uncompressedDataSize,
statistics,
null,
null);
}

/**
* Writes a single v2 data page
*
* @param rowCount count of rows
* @param nullCount count of nulls
* @param valueCount count of values
* @param repetitionLevels repetition level bytes
* @param definitionLevels definition level bytes
* @param dataEncoding encoding for data
* @param compressedData compressed data bytes
* @param uncompressedDataSize the size of uncompressed data
* @param statistics the statistics of the page
* @param metadataBlockEncryptor encryptor for block data
* @param pageHeaderAAD pageHeader AAD
* @throws IOException if any I/O error occurs during writing the file
*/
public void writeDataPageV2(
int rowCount,
int nullCount,
int valueCount,
BytesInput repetitionLevels,
BytesInput definitionLevels,
Encoding dataEncoding,
BytesInput compressedData,
int uncompressedDataSize,
Statistics<?> statistics,
BlockCipher.Encryptor metadataBlockEncryptor,
byte[] pageHeaderAAD) throws IOException {
state = state.write();
int rlByteLength = toIntWithCheck(repetitionLevels.size());
int dlByteLength = toIntWithCheck(definitionLevels.size());
Expand All @@ -784,13 +830,38 @@ public void writeDataPageV2(int rowCount, int nullCount, int valueCount,
currentChunkFirstDataPage = beforeHeader;
}

metadataConverter.writeDataPageV2Header(
uncompressedSize, compressedSize,
valueCount, nullCount, rowCount,
dataEncoding,
rlByteLength,
dlByteLength,
out);
if (pageWriteChecksumEnabled) {
crc.reset();
if (repetitionLevels.size() > 0) {
crc.update(repetitionLevels.toByteArray());
}
if (definitionLevels.size() > 0) {
crc.update(definitionLevels.toByteArray());
}
if (compressedData.size() > 0) {
crc.update(compressedData.toByteArray());
}
metadataConverter.writeDataPageV2Header(
uncompressedSize, compressedSize,
valueCount, nullCount, rowCount,
dataEncoding,
rlByteLength,
dlByteLength,
(int) crc.getValue(),
out,
metadataBlockEncryptor,
pageHeaderAAD);
} else {
metadataConverter.writeDataPageV2Header(
uncompressedSize, compressedSize,
valueCount, nullCount, rowCount,
dataEncoding,
rlByteLength,
dlByteLength,
out,
metadataBlockEncryptor,
pageHeaderAAD);
}

long headersSize = out.getPos() - beforeHeader;
this.uncompressedLength += uncompressedSize + headersSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ private void processBlocksFromReader() throws IOException {
ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime = null;
if (encryptMode) {
columnChunkEncryptorRunTime =
new ColumnChunkEncryptorRunTime(writer.getEncryptor(), chunk, numBlocksRewritten, columnId);
new ColumnChunkEncryptorRunTime(writer.getEncryptor(), chunk, numBlocksRewritten, columnId);
}

// Translate compression and/or encryption
Expand Down Expand Up @@ -374,7 +374,7 @@ private void processChunk(ColumnChunkMetaData chunk,
reader.setStreamPosition(chunk.getStartingPos());
DictionaryPage dictionaryPage = null;
long readValues = 0;
Statistics statistics = null;
Statistics<?> statistics = null;
ParquetMetadataConverter converter = new ParquetMetadataConverter();
int pageOrdinal = 0;
long totalChunkValues = chunk.getValueCount();
Expand All @@ -385,7 +385,7 @@ private void processChunk(ColumnChunkMetaData chunk,
switch (pageHeader.type) {
case DICTIONARY_PAGE:
if (dictionaryPage != null) {
throw new IOException("has more than one dictionary page in column chunk");
throw new IOException("has more than one dictionary page in column chunk: " + chunk);
}
//No quickUpdatePageAAD needed for dictionary page
DictionaryPageHeader dictPageHeader = pageHeader.dictionary_page_header;
Expand All @@ -398,12 +398,12 @@ private void processChunk(ColumnChunkMetaData chunk,
encryptColumn,
dataEncryptor,
dictPageAAD);
writer.writeDictionaryPage(new DictionaryPage(BytesInput.from(pageLoad),
pageHeader.getUncompressed_page_size(),
dictPageHeader.getNum_values(),
converter.getEncoding(dictPageHeader.getEncoding())),
metaEncryptor,
dictPageHeaderAAD);
dictionaryPage = new DictionaryPage(
BytesInput.from(pageLoad),
pageHeader.getUncompressed_page_size(),
dictPageHeader.getNum_values(),
converter.getEncoding(dictPageHeader.getEncoding()));
writer.writeDictionaryPage(dictionaryPage, metaEncryptor, dictPageHeaderAAD);
break;
case DATA_PAGE:
if (encryptColumn) {
Expand Down Expand Up @@ -482,7 +482,9 @@ private void processChunk(ColumnChunkMetaData chunk,
converter.getEncoding(headerV2.getEncoding()),
BytesInput.from(pageLoad),
rawDataLength,
statistics);
statistics,
metaEncryptor,
dataPageHeaderAAD);
pageOrdinal++;
break;
default:
Expand All @@ -492,12 +494,12 @@ private void processChunk(ColumnChunkMetaData chunk,
}
}

private Statistics convertStatistics(String createdBy,
PrimitiveType type,
org.apache.parquet.format.Statistics pageStatistics,
ColumnIndex columnIndex,
int pageIndex,
ParquetMetadataConverter converter) throws IOException {
private Statistics<?> convertStatistics(String createdBy,
PrimitiveType type,
org.apache.parquet.format.Statistics pageStatistics,
ColumnIndex columnIndex,
int pageIndex,
ParquetMetadataConverter converter) throws IOException {
if (columnIndex != null) {
if (columnIndex.getNullPages() == null) {
throw new IOException("columnIndex has null variable 'nullPages' which indicates corrupted data for type: " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import org.apache.parquet.schema.Type;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -89,14 +91,26 @@
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

@RunWith(Parameterized.class)
public class ParquetRewriterTest {

private final int numRecord = 100000;
private Configuration conf = new Configuration();
private final Configuration conf = new Configuration();
private final ParquetProperties.WriterVersion writerVersion;

private List<EncryptionTestFile> inputFiles = null;
private String outputFile = null;
private ParquetRewriter rewriter = null;

@Parameterized.Parameters(name = "WriterVersion = {0}")
public static Object[] parameters() {
return new Object[] {"v1", "v2"};
}

public ParquetRewriterTest(String writerVersion) {
this.writerVersion = ParquetProperties.WriterVersion.fromString(writerVersion);
}

private void testPruneSingleColumnTranslateCodec(List<Path> inputPaths) throws Exception {
Path outputPath = new Path(outputFile);
List<String> pruneColumns = Arrays.asList("Gender");
Expand Down Expand Up @@ -498,27 +512,29 @@ public void testMergeTwoFilesOnly() throws Exception {
@Test(expected = InvalidSchemaException.class)
public void testMergeTwoFilesWithDifferentSchema() throws Exception {
MessageType schema1 = new MessageType("schema",
new PrimitiveType(OPTIONAL, INT64, "DocId"),
new PrimitiveType(REQUIRED, BINARY, "Name"),
new PrimitiveType(OPTIONAL, BINARY, "Gender"),
new GroupType(OPTIONAL, "Links",
new PrimitiveType(REPEATED, BINARY, "Backward"),
new PrimitiveType(REPEATED, BINARY, "Forward")));
new PrimitiveType(OPTIONAL, INT64, "DocId"),
new PrimitiveType(REQUIRED, BINARY, "Name"),
new PrimitiveType(OPTIONAL, BINARY, "Gender"),
new GroupType(OPTIONAL, "Links",
new PrimitiveType(REPEATED, BINARY, "Backward"),
new PrimitiveType(REPEATED, BINARY, "Forward")));
MessageType schema2 = new MessageType("schema",
new PrimitiveType(OPTIONAL, INT64, "DocId"),
new PrimitiveType(REQUIRED, BINARY, "Name"),
new PrimitiveType(OPTIONAL, BINARY, "Gender"));
new PrimitiveType(OPTIONAL, INT64, "DocId"),
new PrimitiveType(REQUIRED, BINARY, "Name"),
new PrimitiveType(OPTIONAL, BINARY, "Gender"));
inputFiles = Lists.newArrayList();
inputFiles.add(new TestFileBuilder(conf, schema1)
.withNumRecord(numRecord)
.withCodec("UNCOMPRESSED")
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.build());
.withNumRecord(numRecord)
.withCodec("UNCOMPRESSED")
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.withWriterVersion(writerVersion)
.build());
inputFiles.add(new TestFileBuilder(conf, schema2)
.withNumRecord(numRecord)
.withCodec("UNCOMPRESSED")
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.build());
.withNumRecord(numRecord)
.withCodec("UNCOMPRESSED")
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.withWriterVersion(writerVersion)
.build());

List<Path> inputPaths = new ArrayList<>();
for (EncryptionTestFile inputFile : inputFiles) {
Expand Down Expand Up @@ -617,33 +633,36 @@ private void testSingleInputFileSetup(String compression,
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.withRowGroupSize(rowGroupSize)
.withBloomFilterEnabled(bloomFilterEnabledColumns)
.withWriterVersion(writerVersion)
.build());
}

private void testMultipleInputFilesSetup() throws IOException {
MessageType schema = createSchema();
inputFiles = Lists.newArrayList();
inputFiles.add(new TestFileBuilder(conf, schema)
.withNumRecord(numRecord)
.withCodec("GZIP")
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.build());
.withNumRecord(numRecord)
.withCodec("GZIP")
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.withWriterVersion(writerVersion)
.build());
inputFiles.add(new TestFileBuilder(conf, schema)
.withNumRecord(numRecord)
.withCodec("UNCOMPRESSED")
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.build());
.withNumRecord(numRecord)
.withCodec("UNCOMPRESSED")
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.withWriterVersion(writerVersion)
.build());

}

private MessageType createSchema() {
return new MessageType("schema",
new PrimitiveType(OPTIONAL, INT64, "DocId"),
new PrimitiveType(REQUIRED, BINARY, "Name"),
new PrimitiveType(OPTIONAL, BINARY, "Gender"),
new GroupType(OPTIONAL, "Links",
new PrimitiveType(REPEATED, BINARY, "Backward"),
new PrimitiveType(REPEATED, BINARY, "Forward")));
new PrimitiveType(OPTIONAL, INT64, "DocId"),
new PrimitiveType(REQUIRED, BINARY, "Name"),
new PrimitiveType(OPTIONAL, BINARY, "Gender"),
new GroupType(OPTIONAL, "Links",
new PrimitiveType(REPEATED, BINARY, "Backward"),
new PrimitiveType(REPEATED, BINARY, "Forward")));
}

private void validateColumnData(Set<String> prunePaths,
Expand Down Expand Up @@ -849,7 +868,6 @@ private void validateCreatedBy() throws Exception {
assertNotNull(createdBy);
assertEquals(createdBy, Version.FULL_VERSION);


// Verify original.created.by has been set
String inputCreatedBy = (String) inputCreatedBys[0];
String originalCreatedBy = outFMD.getKeyValueMetaData().get(ParquetRewriter.ORIGINAL_CREATED_BY_KEY);
Expand Down

0 comments on commit 5ee5133

Please sign in to comment.