Skip to content

Conversation

Arkatufus
Copy link
Contributor

@Arkatufus Arkatufus commented May 15, 2025

Fixes #7627

DistributedPubSub wait-for-subscriber feature flag

HOCON settings added

  • akka.cluster.pub-sub.buffered-messages.max-per-topic = 1000
  • akka.cluster.pub-sub.buffered-messages.timeout-check-interval = 1s

Settings added to DistributedPubSubSettings

  • MaxBufferedMessagePerTopic
  • BufferedMessageTimeoutCheckInterval

Buffer

  • If the mediator received a PublishWithAck message and able to deliver it immediately, it will send back a PublishSucceeded message back to the original sender as a signal.
  • If the mediator could not deliver a PublishWithAck message, it will store the message as a BufferedMessage inside the buffer.

Buffer timeout

  • The PublishWithAck message contains the timeout value for each messages
  • Every timeout-check-interval time, the mediator will scan through all of the buffered message to check and see if it has been sitting in the buffer for more than timeout period.
  • Timed out buffered message will be removed from the buffer and either silently discarded or sent to the dead letter.
  • A PublishFailure message will also be sent to the original sender to signal publish failures.

Buffer delivery

  • We're now tracking every new key insertion into the bucket registry.
  • When keys are being inserted, the mediator send a NewBucketKeysAdded message to itself.
  • When a NewBucketKeysAdded message is received, the mediator checks the buffer and re-send all waiting messages that matches the keys to itself.
  • For each delivered message, the mediator will send a PublishSucceeded message to the original sender as a signal.

Buffer overflow

When a new message is inserted and the topic buffer count exceeds buffered-messages.max-per-topic, the mediator will discard the oldest message in the buffer and send a PublishFailed to the original sender of that message.

Checklist

For significant changes, please ensure that the following have been completed (delete if not relevant):

Copy link
Contributor Author

@Arkatufus Arkatufus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Self-review

.ToImmutableDictionary(kv => kv.Key, kv => kv.Value);
}
}
=> _registry
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No logic change, just modernization

/// <param name="maxBufferedMessagePerTopic">Maximum message buffer size for each topic</param>
/// <param name="bufferedMessageTimeoutCheckInterval">Buffered message timeout condition check interval</param>
/// <exception cref="ArgumentException">Thrown if a user tries to use a <see cref="ConsistentHashingRoutingLogic"/> with routingLogic.</exception>
public DistributedPubSubSettings(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New backward compatible .ctor

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made some suggestions

/// <param name="maxDeltaElements">The maximum number of delta elements that can be propagated in a single gossip tick.</param>
/// <param name="sendToDeadLettersWhenNoSubscribers">When a message is published to a topic with no subscribers send it to the dead letters.</param>
/// <exception cref="ArgumentException">Thrown if a user tries to use a <see cref="ConsistentHashingRoutingLogic"/> with routingLogic.</exception>
[Obsolete("Use .ctor that supports WaitForSubscribers instead. Since 1.4.42")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A thing we did on #7637 that we should do here:

  1. Don't even make the CTORs on these settings classes public. Force users to use the static SettingsClass.Create method. Eliminates a ton of binary compatibility issues long-term.
  2. Make all of these settings classes into record types - this eliminates the .WithValueName( method spam.

Maybe we do that across the board in v1.6 - I'm doing it in v1.5 for the TcpSettings type on #7637. I'm fine doing that for DistributedPubSub too.

/// <param name="maxDeltaElements">The maximum number of delta elements that can be propagated in a single gossip tick.</param>
/// <param name="sendToDeadLettersWhenNoSubscribers">When a message is published to a topic with no subscribers send it to the dead letters.</param>
/// <exception cref="ArgumentException">Thrown if a user tries to use a <see cref="ConsistentHashingRoutingLogic"/> with routingLogic.</exception>
[Obsolete("Use .ctor that supports WaitForSubscribers instead. Since 1.4.42")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implication of this design change is that users have to call DistributedPubSubSettings.Create(ActorSystem) and then they can manipulate it from there.

/// <param name="maxDeltaElements">The maximum number of delta elements that can be propagated in a single gossip tick.</param>
/// <param name="sendToDeadLettersWhenNoSubscribers">When a message is published to a topic with no subscribers send it to the dead letters.</param>
/// <exception cref="ArgumentException">Thrown if a user tries to use a <see cref="ConsistentHashingRoutingLogic"/> with routingLogic.</exception>
[Obsolete("Use .ctor that supports WaitForSubscribers instead. Since 1.4.42")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing this on v1.5 will probably blow a bunch of stuff up immediately, so we should probably save that for v1.6

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left you a suggestion on the pruning check

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More comments

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some more suggestions but I think overall the design looks good - what's the testing plan?

{
if (message is PublishWithAck needAck)
{
sender.Tell(new PublishFailed(needAck, PublishFailReason.Timeout));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to do an early return here and skip logging the dead letter or do we want to do both? The way we have it does both and I actually think that's ok - since the DeadLetter monitoring will usually get universal attention.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think its wise to change old behavior

}

private void PublishToEachGroup(string path, Publish publish)
private void PublishToEachGroup(string path, IWrappedMessage publish)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, do a common interface for both of the Publish message types and handle that instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can't be done because PublishMessage is also used to send SendAll

@Aaronontheweb Aaronontheweb merged commit 3bc4457 into akkadotnet:dev May 19, 2025
11 checks passed
@Arkatufus Arkatufus added this to the 1.5.42 milestone May 21, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

DistributedPubSub: add a wait-for-subscribers value to Publish commands to delay publishing / discarding until subscribers are available
2 participants