@@ -20,8 +20,16 @@ namespace Akka.Cluster.Sharding.Delivery.Internal;
20
20
/// INTERNAL API
21
21
/// </summary>
22
22
/// <typeparam name="T">The types of messages handled by the ConsumerController</typeparam>
23
- internal class ShardingConsumerController < T > : ReceiveActor , IWithStash
23
+ internal class ShardingConsumerController < T > : ReceiveActor , IWithStash , IWithTimers
24
24
{
25
+ private const string ShutdownTimeoutTimerKey = nameof ( ShutdownTimeoutTimerKey ) ;
26
+
27
+ private sealed class ShutdownTimeout
28
+ {
29
+ public static readonly ShutdownTimeout Instance = new ( ) ;
30
+ private ShutdownTimeout ( ) { }
31
+ }
32
+
25
33
public ShardingConsumerController ( Func < IActorRef , Props > consumerProps ,
26
34
ShardingConsumerController . Settings settings )
27
35
{
@@ -115,7 +123,17 @@ private void Active()
115
123
Receive < Terminated > ( t => t . ActorRef . Equals ( _consumer ) , _ =>
116
124
{
117
125
_log . Debug ( "Consumer terminated." ) ;
118
- Context . Stop ( Self ) ;
126
+
127
+ // Short-circuit shutdown process, just shut down immediately if there's nothing to clean.
128
+ if ( ProducerControllers . Count == 0 && ConsumerControllers . Count == 0 )
129
+ {
130
+ _log . Debug ( "ShardingConsumerController terminated." ) ;
131
+ Context . Stop ( Self ) ;
132
+ }
133
+ else
134
+ {
135
+ Become ( ShuttingDown ( ) ) ;
136
+ }
119
137
} ) ;
120
138
121
139
Receive < Terminated > ( t =>
@@ -166,6 +184,62 @@ private void Active()
166
184
} ) ;
167
185
}
168
186
187
+ // Shutdown state after `_consumer` actor is downed.
188
+ private Action ShuttingDown ( )
189
+ {
190
+ // start a 3-seconds shutdown timeout timer
191
+ Timers . StartSingleTimer ( ShutdownTimeoutTimerKey , ShutdownTimeout . Instance , TimeSpan . FromSeconds ( 3 ) , Self ) ;
192
+
193
+ _log . Debug ( "Shutting down child controllers" ) ;
194
+
195
+ foreach ( var p in ProducerControllers . Keys )
196
+ Context . Unwatch ( p ) ;
197
+ ProducerControllers = ImmutableDictionary < IActorRef , string > . Empty ;
198
+
199
+ foreach ( var c in ConsumerControllers . Values . Distinct ( ) )
200
+ Context . Stop ( c ) ;
201
+
202
+ return ( ) =>
203
+ {
204
+ Receive < ConsumerController . SequencedMessage < T > > ( seqMsg =>
205
+ {
206
+ var messageType = seqMsg . Message . Chunk . HasValue
207
+ ? $ "Manifest: { seqMsg . Message . Chunk . Value . Manifest } , SerializerId: { seqMsg . Message . Chunk . Value . SerializerId } "
208
+ : seqMsg . Message . Message ? . GetType ( ) . FullName ?? "Unknown type" ;
209
+ _log . Warning ( "Message [{0}] from [{1}] is being ignored because ShardingConsumerController is shutting down." , messageType , seqMsg . ProducerId ) ;
210
+ } ) ;
211
+
212
+ Receive < ShutdownTimeout > ( _ =>
213
+ {
214
+ // We somehow could not terminate cleanly within 3 seconds, shutdown immediately
215
+ _log . Warning ( "ShardingConsumerController cleanup timed out, force terminating." ) ;
216
+ Context . Stop ( Self ) ;
217
+ } ) ;
218
+
219
+ Receive < Terminated > ( t =>
220
+ {
221
+ var removeList = ConsumerControllers
222
+ . Where ( kv => kv . Value . Equals ( t . ActorRef ) )
223
+ . Select ( kv => kv . Key )
224
+ . ToArray ( ) ;
225
+
226
+ if ( removeList . Length > 0 )
227
+ {
228
+ foreach ( var key in removeList )
229
+ _log . Debug ( "ConsumerController for producerId [{0}] terminated." , key ) ;
230
+
231
+ ConsumerControllers = ConsumerControllers . RemoveRange ( removeList ) ;
232
+ }
233
+
234
+ if ( ProducerControllers . Count > 0 || ConsumerControllers . Count > 0 )
235
+ return ;
236
+
237
+ _log . Debug ( "ShardingConsumerController terminated." ) ;
238
+ Context . Stop ( Self ) ;
239
+ } ) ;
240
+ } ;
241
+ }
242
+
169
243
private ImmutableDictionary < IActorRef , string > UpdatedProducerControllers ( IActorRef producerController ,
170
244
string producer )
171
245
{
@@ -183,4 +257,5 @@ protected override void PreStart()
183
257
}
184
258
185
259
public IStash Stash { get ; set ; } = null ! ;
260
+ public ITimerScheduler Timers { get ; set ; } = null ! ;
186
261
}
0 commit comments