Skip to content

Commit 750e082

Browse files
authored
Revert "[HUDI-9794] Convert the avro into file bytes in LegacyArchivedMetaEnt…" (#13854)
This reverts commit e7b360d.
1 parent e7b360d commit 750e082

File tree

5 files changed

+6
-117
lines changed

5 files changed

+6
-117
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@
7171
import java.util.HashMap;
7272
import java.util.List;
7373
import java.util.Map;
74-
import java.util.Objects;
7574
import java.util.Set;
7675
import java.util.stream.Collectors;
7776
import java.util.stream.Stream;
@@ -443,14 +442,9 @@ private void writeToFile(Schema wrapperSchema, List<IndexedRecord> records) thro
443442
Map<HeaderMetadataType, String> header = new HashMap<>();
444443
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, wrapperSchema.toString());
445444
final String keyField = table.getMetaClient().getTableConfig().getRecordKeyFieldProp();
446-
List<HoodieRecord> indexRecords = records.stream()
447-
.filter(Objects::nonNull)
448-
.map(HoodieAvroIndexedRecord::new)
449-
.collect(Collectors.toList());
450-
if (!indexRecords.isEmpty()) {
451-
HoodieAvroDataBlock block = new HoodieAvroDataBlock(indexRecords, header, keyField);
452-
writer.appendBlock(block);
453-
}
445+
List<HoodieRecord> indexRecords = records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
446+
HoodieAvroDataBlock block = new HoodieAvroDataBlock(indexRecords, header, keyField);
447+
writer.appendBlock(block);
454448
records.clear();
455449
}
456450
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ private Pair<HoodieInstant, Option<byte[]>> readInstant(GenericRecord record) {
100100
Object actionData = record.get(key);
101101
if (actionData != null) {
102102
if (actionData instanceof IndexedRecord) {
103-
return HoodieAvroUtils.avroToFileBytes((IndexedRecord) actionData);
103+
return HoodieAvroUtils.avroToBytes((IndexedRecord) actionData);
104104
} else {
105105
// should be json bytes.
106106
try {

hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import org.apache.avro.LogicalTypes.Decimal;
4141
import org.apache.avro.Schema;
4242
import org.apache.avro.Schema.Field;
43-
import org.apache.avro.file.DataFileWriter;
4443
import org.apache.avro.generic.GenericData;
4544
import org.apache.avro.generic.GenericData.Record;
4645
import org.apache.avro.generic.GenericDatumReader;
@@ -162,19 +161,6 @@ public static <T extends IndexedRecord> ByteArrayOutputStream indexedRecordToByt
162161
}
163162
}
164163

165-
public static byte[] avroToFileBytes(IndexedRecord record) {
166-
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
167-
try (DataFileWriter<IndexedRecord> writer = new DataFileWriter<>(new GenericDatumWriter<>(record.getSchema()))) {
168-
writer.create(record.getSchema(), baos);
169-
writer.append(record);
170-
writer.flush();
171-
return baos.toByteArray();
172-
}
173-
} catch (IOException e) {
174-
throw new HoodieIOException("Failed to convert IndexedRecord to Avro file format", e);
175-
}
176-
}
177-
178164
/**
179165
* Convert a given avro record to json and return the string
180166
*

hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -183,9 +183,7 @@ public static HoodieArchivedMetaEntry createMetaWrapper(
183183
switch (actionType) {
184184
case HoodieTimeline.CLEAN_ACTION: {
185185
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient, new ByteArrayInputStream(instantDetails.get())));
186-
if (planBytes.isPresent()) {
187-
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient, new ByteArrayInputStream(planBytes.get())));
188-
}
186+
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient, new ByteArrayInputStream(planBytes.get())));
189187
archivedMetaWrapper.setActionType(ActionType.clean.name());
190188
break;
191189
}

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala

Lines changed: 1 addition & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,8 @@ import org.apache.hudi.common.config.{HoodieMetadataConfig, RecordMergeMode, Typ
2525
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecordMerger, HoodieRecordPayload, HoodieTableType, OverwriteWithLatestAvroPayload, TableServiceType}
2626
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, HoodieTableVersion}
2727
import org.apache.hudi.common.table.timeline.InstantComparison.{compareTimestamps, GREATER_THAN_OR_EQUALS}
28-
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion
2928
import org.apache.hudi.common.util.{Option, StringUtils}
30-
import org.apache.hudi.config.{HoodieArchivalConfig, HoodieCleanConfig, HoodieCompactionConfig, HoodieLockConfig, HoodieWriteConfig}
29+
import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig, HoodieLockConfig, HoodieWriteConfig}
3130
import org.apache.hudi.keygen.NonpartitionedKeyGenerator
3231
import org.apache.hudi.keygen.constant.KeyGeneratorType
3332
import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper, UpgradeDowngrade}
@@ -286,94 +285,6 @@ class TestSevenToEightUpgrade extends RecordLevelIndexTestBase {
286285
assertEquals(1, df2.filter("price = 35").count())
287286
}
288287

289-
@Test
290-
def testV6TableUpgradeToV9ToV6(): Unit = {
291-
val partitionFields = "partition:simple"
292-
293-
val hudiOptsV6 = commonOpts ++ Map(
294-
TABLE_TYPE.key -> HoodieTableType.COPY_ON_WRITE.name(),
295-
KEYGENERATOR_CLASS_NAME.key -> KeyGeneratorType.CUSTOM.getClassName,
296-
PARTITIONPATH_FIELD.key -> partitionFields,
297-
"hoodie.metadata.enable" -> "true",
298-
PAYLOAD_CLASS_NAME.key -> classOf[OverwriteWithLatestAvroPayload].getName,
299-
RECORD_MERGE_MODE.key -> RecordMergeMode.COMMIT_TIME_ORDERING.name,
300-
HoodieWriteConfig.WRITE_TABLE_VERSION.key -> "6",
301-
HoodieWriteConfig.TIMELINE_LAYOUT_VERSION_NUM.key() -> Integer.toString(TimelineLayoutVersion.VERSION_1),
302-
HoodieWriteConfig.AUTO_UPGRADE_VERSION.key -> "false",
303-
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "15",
304-
HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key -> "3",
305-
HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key -> "4",
306-
HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key -> "5"
307-
)
308-
309-
doWriteAndValidateDataAndRecordIndex(hudiOptsV6,
310-
operation = INSERT_OPERATION_OPT_VAL,
311-
saveMode = SaveMode.Overwrite,
312-
validate = false)
313-
314-
for (i <- 1 to 10) {
315-
doWriteAndValidateDataAndRecordIndex(hudiOptsV6,
316-
operation = INSERT_OPERATION_OPT_VAL,
317-
saveMode = SaveMode.Append,
318-
validate = false)
319-
}
320-
metaClient = getLatestMetaClient(true)
321-
322-
assertEquals(HoodieTableVersion.SIX, metaClient.getTableConfig.getTableVersion)
323-
assertEquals("partition", HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get())
324-
325-
val archivePath = new org.apache.hudi.storage.StoragePath(metaClient.getArchivePath, ".commits_.archive*")
326-
val archivedFiles = metaClient.getStorage.globEntries(archivePath)
327-
println(s"Archived files found ${archivedFiles.size()}")
328-
329-
metaClient = HoodieTableMetaClient.builder()
330-
.setBasePath(basePath)
331-
.setConf(storage.getConf())
332-
.build()
333-
334-
val hudiOptsUpgrade = hudiOptsV6 ++ Map(
335-
HoodieWriteConfig.WRITE_TABLE_VERSION.key -> HoodieTableVersion.current().versionCode().toString
336-
) - HoodieWriteConfig.AUTO_UPGRADE_VERSION.key
337-
338-
doWriteAndValidateDataAndRecordIndex(hudiOptsUpgrade,
339-
operation = UPSERT_OPERATION_OPT_VAL,
340-
saveMode = SaveMode.Append,
341-
validate = false)
342-
343-
metaClient = HoodieTableMetaClient.builder()
344-
.setBasePath(basePath)
345-
.setConf(storage.getConf())
346-
.build()
347-
348-
assertEquals(HoodieTableVersion.current(), metaClient.getTableConfig.getTableVersion)
349-
assertEquals(partitionFields, HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get())
350-
351-
val archivedFilesAfterUpgrade = metaClient.getStorage.globEntries(archivePath)
352-
353-
assertTrue(archivedFilesAfterUpgrade.size() > 0,
354-
"Even after upgrade, fresh table with ~12 commits should have archived files")
355-
356-
val hudiOptsDowngrade = hudiOptsV6 ++ Map(
357-
HoodieWriteConfig.WRITE_TABLE_VERSION.key -> HoodieTableVersion.SIX.versionCode().toString
358-
)
359-
360-
new UpgradeDowngrade(metaClient, getWriteConfig(hudiOptsDowngrade, basePath), context, SparkUpgradeDowngradeHelper.getInstance)
361-
.run(HoodieTableVersion.SIX, null)
362-
363-
metaClient = HoodieTableMetaClient.builder()
364-
.setBasePath(basePath)
365-
.setConf(storage.getConf())
366-
.build()
367-
368-
assertEquals(HoodieTableVersion.SIX, metaClient.getTableConfig.getTableVersion)
369-
assertEquals("partition", HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get())
370-
371-
val v6ArchivePath = new org.apache.hudi.storage.StoragePath(metaClient.getArchivePath, ".commits_.archive*")
372-
val v6ArchivedFiles = metaClient.getStorage.globEntries(v6ArchivePath)
373-
374-
assertTrue(v6ArchivedFiles.size() > 0, "Downgrade should have archived files in V6 format")
375-
}
376-
377288
private def getWriteConfig(hudiOpts: Map[String, String], basePath: String): HoodieWriteConfig = {
378289
val props = TypedProperties.fromMap(hudiOpts.asJava)
379290
HoodieWriteConfig.newBuilder()

0 commit comments

Comments
 (0)