Skip to content

Conversation

yaauie
Copy link
Member

@yaauie yaauie commented Sep 4, 2025

Release notes

Adds support for event compression in the persisted queue, controlled by the per-pipeline queue.compression setting, which defaults to none.

What does this PR do?

Adds non-breaking support for event compression to the persisted queue, as
configured by a new per-pipeline setting queue.compression, which supports:

  • none (default): no compression is performed, but if compressed events are encountered in the queue they will be decompressed
  • speed: compression optimized for speed (minimal overhead, but less compression)
  • balanced: compression balancing speed against result size
  • size: compression optimized for maximum reduction of size (minimal size, but more resource-intensive)
  • disabled: compression support entirely disabled; if a pipeline is run in this configuration against a PQ that already contains unacked compressed events, the pipeline WILL crash.

This PR does necessary refactors as no-op stand-alone commits to make reviewing more straight-forward. It is best reviewed in commit order.

Why is it important/What is the impact to the user?

Disk IO is often a performance bottleneck when using the PQ. This feature allows users to spend available resources to reduce the size of events on disk, and therefore also the Disk IO.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files (and/or docker env variables)
  • I have added tests that prove my fix is effective or that my feature works

Author's Checklist

  • [ ]

How to test this PR locally

  • Add a ndjson file named example-input.ndjson with event contents
  • Run Logstash with trace-logging enabled, using -S to set queue.type=persisted, queue.drain=true, and queue.compression=size:
    bin/logstash --log.level=trace \
    -Squeue.type=persisted \
    -Squeue.drain=true \
    -Squeue.compression=size \
    --config.string 'input { stdin { codec => json_lines } } output { sink {} }' < example-input.ndjson
    
  • Observe trace logs showing compression and decompression:
    [2025-09-04T22:36:07,055][TRACE][org.logstash.ackedqueue.Queue][main][454a7f73ae57dfec89e89329c9e0eba182f7780fd885c7bf2f17d8afba2bba67] serialized: 9000->3645
    [2025-09-04T22:36:07,056][TRACE][org.logstash.ackedqueue.Queue][main][454a7f73ae57dfec89e89329c9e0eba182f7780fd885c7bf2f17d8afba2bba67] serialized: 9448->3838
    [2025-09-04T22:36:07,057][TRACE][org.logstash.ackedqueue.Queue][main][454a7f73ae57dfec89e89329c9e0eba182f7780fd885c7bf2f17d8afba2bba67] serialized: 7160->2666
    [2025-09-04T22:36:07,059][TRACE][org.logstash.ackedqueue.Queue][main][454a7f73ae57dfec89e89329c9e0eba182f7780fd885c7bf2f17d8afba2bba67] serialized: 8642->3572
    [2025-09-04T22:36:07,060][TRACE][org.logstash.ackedqueue.Queue][main][454a7f73ae57dfec89e89329c9e0eba182f7780fd885c7bf2f17d8afba2bba67] serialized: 8714->3739
    [2025-09-04T22:36:07,061][TRACE][org.logstash.ackedqueue.Queue][main][454a7f73ae57dfec89e89329c9e0eba182f7780fd885c7bf2f17d8afba2bba67] serialized: 8048->3500
    [2025-09-04T22:36:07,063][TRACE][org.logstash.ackedqueue.Queue][main][454a7f73ae57dfec89e89329c9e0eba182f7780fd885c7bf2f17d8afba2bba67] serialized: 10007->3871
    

    ...

    [2025-09-04T22:36:09,428][TRACE][org.logstash.ackedqueue.Queue][main] deserialized: 3594->8060
    [2025-09-04T22:36:09,428][TRACE][org.logstash.ackedqueue.Queue][main] deserialized: 3571->8622
    [2025-09-04T22:36:09,428][TRACE][org.logstash.ackedqueue.Queue][main] deserialized: 3673->9051
    [2025-09-04T22:36:09,428][TRACE][org.logstash.ackedqueue.Queue][main] deserialized: 3538->8343
    [2025-09-04T22:36:09,428][TRACE][org.logstash.ackedqueue.Queue][main] deserialized: 3729->8943
    [2025-09-04T22:36:09,428][TRACE][org.logstash.ackedqueue.Queue][main] deserialized: 2683->7460
    [2025-09-04T22:36:09,428][TRACE][org.logstash.ackedqueue.Queue][main] deserialized: 3928->10059
    [2025-09-04T22:36:09,428][TRACE][org.logstash.ackedqueue.Queue][main] deserialized: 3563->8329
    [2025-09-04T22:36:09,428][TRACE][org.logstash.ackedqueue.Queue][main] deserialized: 2708->7285
    
  • Inspect the page(s) left behind with lsq-pagedump:
    2099    3851    0EB311A1        page.0  ZSTD(9552)
    2100    3961    D497496F        page.0  ZSTD(10416)
    2101    2667    59F1903D        page.0  ZSTD(6978)
    2102    3677    B442D62D        page.0  ZSTD(9006)
    2103    3748    3EEF1737        page.0  ZSTD(8791)
    2104    2746    DE697FE9        page.0  ZSTD(7903)
    

Related issues

Use cases

  • Constrained or metered disk IO
  • Limited Disk capacity

@yaauie yaauie added enhancement persistent queues backport-skip Skip automated backport with mergify labels Sep 4, 2025
Copy link
Contributor

github-actions bot commented Sep 4, 2025

🤖 GitHub comments

Expand to view the GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)

Copy link
Contributor

github-actions bot commented Sep 4, 2025

🔍 Preview links for changed docs

Copy link
Member

@jsvd jsvd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first pass, minor annotations, going to test this manually now.

settings.getQueueMaxBytes(), settings.getMaxUnread(), settings.getCheckpointMaxAcks(),
settings.getCheckpointMaxWrites(), settings.getCheckpointRetry()
);
return new BuilderImpl(settings);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a suggestion, this Builder refactoring could have been a separate PR as it doesn't require the compression settings at all and is still a significant part of the changeset in this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another reason to introduce this change ASAP: yet another parameter is coming in https://github.com/elastic/logstash/pull/18000/files

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pared off as #18180

@jsvd
Copy link
Member

jsvd commented Sep 5, 2025

Look at profiling, it seems like with Zstd.compress/decompress the instance spends about nearly 9% of the time doing context initializations:

Screenshot 2025-09-05 at 18 49 12

profile: profile.html

Something worth investigating is the use of thread locals for the contexts.

yaauie and others added 7 commits September 22, 2025 17:26
Adds non-breaking support for event compression to the persisted queue, as
configured by a new per-pipeline setting `queue.compression`, which supports:

 - `none` (default): no compression is performed, but if compressed events
                     are encountered in the queue they will be decompressed
 - `speed`: compression optimized for speed
 - `balanced`: compression balancing speed against result size
 - `size`: compression optimized for maximum reduction of size
 - `disabled`: compression support entirely disabled; if a pipeline is run
               in this configuration against a PQ that already contains
               unacked compressed events, the pipeline WILL crash.

To accomplish this, we then provide an abstract base implementation of the
CompressionCodec whose decode method is capable of _detecting_ and decoding
zstd-encoded payload while letting other payloads through unmodified.
The detection is done with an operation on the first four bytes of the
payload, so no additional context is needed.

An instance of this zstd-aware compression codec is provided with a
pass-through encode operation when configured with `queue.compression: none`,
which is the default, ensuring that by default logstash is able to decode any
event that had previously been written.

We provide an additional implementation that is capable of _encoding_ events
with a configurable goal: speed, size, or a balance of the two.
Copy link

@elasticmachine
Copy link
Collaborator

💛 Build succeeded, but was flaky

Failed CI Steps

History

cc @yaauie

Copy link
Member

@jsvd jsvd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two minor doc changes, otherwise LGTM!


`queue.compression` {applies_to}`stack: ga 9.2`
: Sets the event compression level for use with the Persisted Queue. Default is `none`. Possible values are:
* `speed`: optimize for fastest compression operation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

none should be part of this list.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* `speed`: optimize for fastest compression operation
* `none`: does not perform compression, but reads compressed events
* `speed`: optimize for fastest compression operation

| `queue.checkpoint.acks` | The maximum number of ACKed events before forcing a checkpoint when persistent queues are enabled (`queue.type: persisted`). Specify `queue.checkpoint.acks: 0` to set this value to unlimited. | 1024 |
| `queue.checkpoint.writes` | The maximum number of written events before forcing a checkpoint when persistent queues are enabled (`queue.type: persisted`). Specify `queue.checkpoint.writes: 0` to set this value to unlimited. | 1024 |
| `queue.checkpoint.retry` | When enabled, Logstash will retry four times per attempted checkpoint write for any checkpoint writes that fail. Any subsequent errors are not retried. This is a workaround for failed checkpoint writes that have been seen only on Windows platform, filesystems with non-standard behavior such as SANs and is not recommended except in those specific circumstances. (`queue.type: persisted`) | `true` |
| `queue.compression` | Set a persisted queue compression level, which allows the pipeline to reduce the event size on disk at the cost of CPU usage. Possible values are `speed`, `balanced`, and `size`. | `none` |
Copy link
Member

@jsvd jsvd Sep 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| `queue.compression` | Set a persisted queue compression level, which allows the pipeline to reduce the event size on disk at the cost of CPU usage. Possible values are `speed`, `balanced`, and `size`. | `none` |
| `queue.compression` | Set a persisted queue compression level, which allows the pipeline to reduce the event size on disk at the cost of CPU usage. Possible values are `none`, `speed`, `balanced`, and `size`. | `none` |

Copy link
Member

@robbavey robbavey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of doc comments

| `queue.checkpoint.acks` | The maximum number of ACKed events before forcing a checkpoint when persistent queues are enabled (`queue.type: persisted`). Specify `queue.checkpoint.acks: 0` to set this value to unlimited. | 1024 |
| `queue.checkpoint.writes` | The maximum number of written events before forcing a checkpoint when persistent queues are enabled (`queue.type: persisted`). Specify `queue.checkpoint.writes: 0` to set this value to unlimited. | 1024 |
| `queue.checkpoint.retry` | When enabled, Logstash will retry four times per attempted checkpoint write for any checkpoint writes that fail. Any subsequent errors are not retried. This is a workaround for failed checkpoint writes that have been seen only on Windows platform, filesystems with non-standard behavior such as SANs and is not recommended except in those specific circumstances. (`queue.type: persisted`) | `true` |
| `queue.compression` | Set a persisted queue compression level, which allows the pipeline to reduce the event size on disk at the cost of CPU usage. Possible values are `speed`, `balanced`, and `size`. | `none` |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add applies_to tags

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| `queue.compression` | Set a persisted queue compression level, which allows the pipeline to reduce the event size on disk at the cost of CPU usage. Possible values are `speed`, `balanced`, and `size`. | `none` |
| `queue.compression` | {applies_to}`stack: ga 9.2` Set a persisted queue compression level, which allows the pipeline to reduce the event size on disk at the cost of CPU usage. Possible values are `speed`, `balanced`, `size`, and `none`. | `none` |

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like applies_to tags should go at the end of the first column

Suggested change
| `queue.compression` | Set a persisted queue compression level, which allows the pipeline to reduce the event size on disk at the cost of CPU usage. Possible values are `speed`, `balanced`, and `size`. | `none` |
| `queue.compression` {applies_to}`stack: ga 9.2` | Set a persisted queue compression level, which allows the pipeline to reduce the event size on disk at the cost of CPU usage. Possible values are `speed`, `balanced`, and `size`. | `none` |

* `size`: optimize for smallest possible size on disk, spending more CPU
* `balanced`: a balance between the `speed` and `size` settings
:::{important}
Enabling compression will make the PQ incompatible with previous Logstash releases that did not support compression.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to provide additional guidance on the nature of the incompatibility -

  • That this is a read-only incompatibility - versions of Logstash without compression support are unable to read from PQs written with compression.
  • However, when upgrading from a version of Logstash that already has a PQ, it is possible to enable compression support for that PQ going forward, and versions of Logstash with compression support will be able to read and write to the PQ.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robbavey how is this?

Suggested change
Enabling compression will make the PQ incompatible with previous Logstash releases that did not support compression.
Compression can be enabled for an existing PQ, but once compressed elements have been added to a PQ, that PQ cannot be read by previous Logstash releases that did not support compression.
If you need to downgrade Logstash after enabling the PQ, you will need to either delete the PQ or run the pipeline with `queue.drain: true` first to ensure that no compressed elements remain.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport-skip Skip automated backport with mergify enhancement persistent queues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants