Skip to content

Conversation

florissmit10
Copy link
Contributor

@florissmit10 florissmit10 commented Aug 27, 2025

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

The MQTTSplitEnumerator used to make a client for each source. We had some issues with that since it uses the same client id. The better approach here is to use a shared client for each broker, as the done with the kafka connector

Checklist

  • I have written necessary rustdoc comments.
  • I have added necessary unit tests and integration tests.
  • I have added test labels as necessary.
  • I have added fuzzing tests or opened an issue to track them.
  • My PR contains breaking changes.
  • My PR changes performance-critical code, so I will run (micro) benchmarks and present the results.
  • I have checked the Release Timeline and Currently Supported Versions to determine which release branches I need to cherry-pick this PR into.

Documentation

  • My PR needs documentation updates.
Release note

@hzxa21 hzxa21 requested review from Copilot and tabVersion September 1, 2025 13:09
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR refactors the MQTT split enumerator to use a single shared client per MQTT broker instead of creating individual clients for each source, resolving client ID conflicts and improving resource efficiency.

Key Changes

  • Introduced a shared client cache using MokaCache to store weak references to connection objects per broker
  • Extracted connection management logic into a dedicated MqttConnectionCheck struct
  • Modified the enumerator to reuse existing connections when available or create new ones as needed

Copy link
Contributor

@tabVersion tabVersion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for your contribution.

Comment on lines 43 to 44
connected: Arc<AtomicBool>,
stopped: Arc<AtomicBool>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to remove state for stopped? seems connected is enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stopped is used to stop the loop running in the background. (see line 65)
When MqttConnectionCheck is dropped(see line 91), this is set to false, which stops the loop.
Happy to hear any other way in which this can be achieved.

let start = std::time::Instant::now();
loop {
if self.connected.load(std::sync::atomic::Ordering::Relaxed) {
if self.connection_check.is_connected() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add some tokio sleep inside the loop?

Copy link
Contributor Author

@florissmit10 florissmit10 Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a 500 ms sleep on line 154 that gets executed when the client is not connected.
When the client is connected the entire loop stops, and is not started again until the next time list_splits is called.

@tabVersion tabVersion added this pull request to the merge queue Sep 12, 2025
Merged via the queue into risingwavelabs:main with commit f57d74c Sep 12, 2025
30 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants