-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Improve performance of DataBlock serde #13303
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #13303 +/- ##
============================================
- Coverage 61.75% 57.84% -3.91%
- Complexity 207 219 +12
============================================
Files 2436 2615 +179
Lines 133233 143536 +10303
Branches 20636 22053 +1417
============================================
+ Hits 82274 83028 +754
- Misses 44911 54020 +9109
- Partials 6048 6488 +440
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
270f679
to
2152e36
Compare
public void testSerdeCorrectness(BaseDataBlock dataBlock) | ||
throws IOException { | ||
byte[] bytes = dataBlock.toBytes(); | ||
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); | ||
int versionType = DataBlockUtils.readVersionType(byteBuffer); | ||
BaseDataBlock deserialize = deserialize(byteBuffer, versionType); | ||
|
||
assertEquals(byteBuffer.position(), bytes.length, "Buffer position should be at the end of the buffer"); | ||
assertEquals(deserialize, dataBlock, "Deserialized data block should be the same as the original data block"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Serde correctness is implemented now in the DataBlockSerde tests
rowBuilder._fixedSizeDataByteArrayOutputStream.write(byteBuffer.array(), 0, byteBuffer.position()); | ||
} | ||
|
||
CompoundDataBuffer.Builder varBufferBuilder = new CompoundDataBuffer.Builder(ByteOrder.BIG_ENDIAN, true) | ||
.addPagedOutputStream(varSize); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably the biggest improvement in performance when creating the block. The older version allocated a large array (which is expensive as it may be outside the TLAB) and then copy that into the ArrayOutputStream, which probably ends up allocating that amount of bytes again.
Now instead we just reuse the whole byte buffer, adding it into the builder, which is basically a list of byte buffers that can be later be used to send the data through the network or directly read the info on another local stage.
} | ||
columnarBuilder._fixedSizeDataByteArrayOutputStream.write(byteBuffer.array(), 0, byteBuffer.position()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here again we were copying bytes. In this case the allocation is smaller, but problematic anyway.
// @Param(value = {"0", "10", "90"}) | ||
int _nullPerCent = 10; | ||
|
||
// @Param(value = {"direct_small", "heap_small"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be used to study the performance on different allocators. heal_small
is the faster by a large margin.
Methods have been moved to PinotInputStream and PinotOutputStream
5033ea4
to
5332e3f
Compare
…tablock deserialization
I think I've applied most if not all suggestions. Please take a look to this PR again, as it would be great to be able to merge it soon. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this improvement @gortiz, the numbers look super impressive! I've left some minor comments and questions (many of which are simply to improve my understanding of these areas in Pinot 😄).
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockEquals.java
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockSerde.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockSerde.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
Show resolved
Hide resolved
} catch (AssertionError e) { | ||
throw new AssertionError( | ||
"Error comparing Row/Column Block at (" + rowId + "," + colId + ") of Type: " + columnDataType + "!", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose of this catch block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH I don't remember. Probably to catch errors produced by assertions in DataBlockTestUtils.getElement. I can change the code to make it clearer.
pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/datablock/ZeroCopyDataBlockSerde.java
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/datablock/ZeroCopyDataBlockSerde.java
Show resolved
Hide resolved
} | ||
|
||
@Test(dataProvider = "blocks") | ||
void testSerde(String desc, DataBlock block) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we also add tests for RowDataBlock
/ ColumnarDataBlock
and if possible, including some edge cases like null dictionary, null data schema etc.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be great, but it is not easy to create actual row and column blocks here given DataBlockBuilder is in pinot-core
. I remember I tried to move it to pinot-common
but I guess I didn't commit the change because there was some issue (maybe some other dependency in pinot-core
)
Instead we are testing these serde properties in DataBlockSerdeTest
. That is not great, but it is easier to implement right now.
including some edge cases like null dictionary, null data schema etc.?
I don't think these are valid cases for row/column blocks
pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
Show resolved
Hide resolved
ef88e9d
to
83b6790
Compare
…dation.constraints.NotNull
pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java
Outdated
Show resolved
Hide resolved
|
||
static { | ||
SERDES = new EnumMap<>(DataBlockSerde.Version.class); | ||
SERDES.put(DataBlockSerde.Version.V1_V2, new ZeroCopyDataBlockSerde()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
V1_V2
is hard-coded here and in ZeroCopyDataBlockSerde.getVersion
. Change to SERDES.put(ZeroCopyDataBlockSerde.VERSION, new ZeroCopyDataBlockSerde())
?
Also do you plan to make this configurable in the future or punt on it until the next version is added ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case we end up having more serdes we will probably add more entries to the map and/or open the ability to change them using configurations.
For example, we could end up implementing a better implementation of V1_V2. Then we could have both Serdes and change from one to the other using configuration. In case we create new protocol versions (like a V3) we would be able to add it here, although we would need a way to decide which version should be used (as explained in some doc, probably just setting the version used as a config meanwhile we have a heterogeneous cluster).
@@ -74,22 +77,26 @@ public static void main(String[] args) | |||
.addProfiler(GCProfiler.class)); | |||
} | |||
|
|||
@Param(value = {"INT", "LONG", "STRING", "BYTES", "BIG_DECIMAL", "LONG_ARRAY", "STRING_ARRAY"}) | |||
// @Param(value = {"INT", "LONG", "STRING", "BYTES", "BIG_DECIMAL", "LONG_ARRAY", "STRING_ARRAY"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should be removed or re-instated ?
Why are STRING_ARRAY
and BIGDECIMAL
removed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We had the same discussion below:
I think it is useful to have them there to have easy access to other common configurations. The current configuration (always use 10k rows and only {"INT", "LONG", "STRING", "BYTES", "LONG_ARRAY"} types is useful, but keeping the larger list of params and the other rows may be also useful in more extensive tests.
…xplicitly add the ByteBuffer wrapper
This PR includes several changes in the code that builds, serializes and deserializes DataBlocks in order to improve the performace.
Changes here should not change the binary format (test included verify that). Instead I've changed how the code to reduce allocation and copies. I'm sure more can be done to improve the performance without breaking the binary format and even more could be done if we decide to break the format.
The PR includes 4 benchmarks: One that given a List<Object[]> creates a datablock, one that serializes that datablock, one that deserialize it and one that does the three things in a row.
The old version of this benchmark is in dbeeaaf.
The results of the test that does creates, serialize and deserialize is:
As you can see, throughput is between 1x to 3x, but the difference in allocation is even better. This is specially important because while this benchmark is executed on a machine with enough memory, a large allocation rate can be problematic in production because a single query allocating too much may heavily affect latency on other queries due to GC (even using non-blocking GCs!).
One of the key techniques to achieve the performance gain was to use special type of buffers and streams. In order to make this PR smaller, I created #13304 that only contains the buffer and stream changes. We can merge that PR before merging this one.
TODO: