Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 48 additions & 16 deletions src/Aspire.Hosting/Dcp/DcpExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ async Task CreateResourceExecutablesAsyncCore(IResource resource, IEnumerable<Ap
// available if the resource isn't immediately started because it's waiting or is configured for explicit start.
foreach (var er in executables)
{
await _executorEvents.PublishAsync(new OnResourceChangedContext(_shutdownCancellation.Token, resourceType, resource, er.DcpResourceName, new ResourceStatus(null, null, null), s => _snapshotBuilder.ToSnapshot((Executable) er.DcpResource, s))).ConfigureAwait(false);
await _executorEvents.PublishAsync(new OnResourceChangedContext(_shutdownCancellation.Token, resourceType, resource, er.DcpResourceName, new ResourceStatus(null, null, null), s => _snapshotBuilder.ToSnapshot((Executable)er.DcpResource, s))).ConfigureAwait(false);
}

await _executorEvents.PublishAsync(new OnResourceStartingContext(cancellationToken, resourceType, resource, DcpResourceName: null)).ConfigureAwait(false);
Expand Down Expand Up @@ -1176,7 +1176,7 @@ async Task CreateContainerAsyncCore(AppResource cr, CancellationToken cancellati
{
// Publish snapshot built from DCP resource. Do this now to populate more values from DCP (URLs, source) to ensure they're
// available if the resource isn't immediately started because it's waiting or is configured for explicit start.
await _executorEvents.PublishAsync(new OnResourceChangedContext(_shutdownCancellation.Token, KnownResourceTypes.Container, cr.ModelResource, cr.DcpResourceName, new ResourceStatus(null, null, null), s => _snapshotBuilder.ToSnapshot((Container) cr.DcpResource, s))).ConfigureAwait(false);
await _executorEvents.PublishAsync(new OnResourceChangedContext(_shutdownCancellation.Token, KnownResourceTypes.Container, cr.ModelResource, cr.DcpResourceName, new ResourceStatus(null, null, null), s => _snapshotBuilder.ToSnapshot((Container)cr.DcpResource, s))).ConfigureAwait(false);

if (cr.ModelResource.TryGetLastAnnotation<ExplicitStartupAnnotation>(out _))
{
Expand Down Expand Up @@ -1430,21 +1430,53 @@ private static V1Patch CreatePatch<T>(T obj, Action<T> change) where T : CustomR

public async Task StopResourceAsync(IResourceReference resourceReference, CancellationToken cancellationToken)
{
var appResource = (AppResource)resourceReference;
_logger.LogDebug("Stopping resource '{ResourceName}'...", resourceReference.DcpResourceName);

V1Patch patch;
switch (appResource.DcpResource)
var result = await DeleteResourceRetryPipeline.ExecuteAsync(async (resourceName, attemptCancellationToken) =>
{
case Container c:
patch = CreatePatch(c, obj => obj.Spec.Stop = true);
await _kubernetesService.PatchAsync(c, patch, cancellationToken).ConfigureAwait(false);
break;
case Executable e:
patch = CreatePatch(e, obj => obj.Spec.Stop = true);
await _kubernetesService.PatchAsync(e, patch, cancellationToken).ConfigureAwait(false);
break;
default:
throw new InvalidOperationException($"Unexpected resource type: {appResource.DcpResource.GetType().FullName}");
var appResource = (AppResource)resourceReference;

V1Patch patch;
switch (appResource.DcpResource)
{
case Container c:
patch = CreatePatch(c, obj => obj.Spec.Stop = true);
await _kubernetesService.PatchAsync(c, patch, attemptCancellationToken).ConfigureAwait(false);
var cu = await _kubernetesService.GetAsync<Container>(c.Metadata.Name, cancellationToken: attemptCancellationToken).ConfigureAwait(false);
if (cu.Status?.State == ContainerState.Exited)
{
_logger.LogDebug("Container '{ResourceName}' was stopped.", resourceReference.DcpResourceName);
return true;
}
else
{
_logger.LogDebug("Container '{ResourceName}' is still running; trying again to stop it...", resourceReference.DcpResourceName);
return false;
}

case Executable e:
patch = CreatePatch(e, obj => obj.Spec.Stop = true);
await _kubernetesService.PatchAsync(e, patch, attemptCancellationToken).ConfigureAwait(false);
var eu = await _kubernetesService.GetAsync<Executable>(e.Metadata.Name, cancellationToken: attemptCancellationToken).ConfigureAwait(false);
if (eu.Status?.State == ExecutableState.Finished || eu.Status?.State == ExecutableState.Terminated)
{
_logger.LogDebug("Executable '{ResourceName}' was stopped.", resourceReference.DcpResourceName);
return true;
}
else
{
_logger.LogDebug("Executable '{ResourceName}' is still running; trying again to stop it...", resourceReference.DcpResourceName);
return false;
}

default:
throw new InvalidOperationException($"Unexpected resource type: {appResource.DcpResource.GetType().FullName}");
}
}, resourceReference.DcpResourceName, cancellationToken).ConfigureAwait(false);

if (!result)
{
throw new InvalidOperationException($"Failed to stop resource '{resourceReference.DcpResourceName}'.");
}
}

Expand Down Expand Up @@ -1509,7 +1541,7 @@ async Task EnsureResourceDeletedAsync<T>(string resourceName) where T : CustomRe
{
_logger.LogDebug("Ensuring '{ResourceName}' is deleted.", resourceName);

var result = await DeleteResourceRetryPipeline.ExecuteAsync<bool, string>(async (resourceName, attemptCancellationToken) =>
var result = await DeleteResourceRetryPipeline.ExecuteAsync(async (resourceName, attemptCancellationToken) =>
{
string? uid = null;

Expand Down
49 changes: 49 additions & 0 deletions tests/Aspire.Hosting.Tests/Dcp/TestKubernetesService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using System.Text;
using k8s.Models;
using k8s.Autorest;
using Json.Patch;

namespace Aspire.Hosting.Tests.Dcp;

Expand Down Expand Up @@ -168,6 +169,54 @@ public Task<Stream> GetLogStreamAsync<T>(T obj, string logStreamType, bool? foll

public Task<T> PatchAsync<T>(T obj, V1Patch patch, CancellationToken cancellationToken = default) where T : CustomResource
{
// Not a complete implementation, but Aspire is using patching only to stop resources,
// so this is good enough.

if (patch.Type == V1Patch.PatchType.JsonPatch)
{
Json.Patch.JsonPatch jsonPatch = (Json.Patch.JsonPatch)patch.Content;

var res = CreatedResources.OfType<T>().FirstOrDefault(r =>
r.Metadata.Name == obj.Metadata.Name &&
string.Equals(r.Metadata.NamespaceProperty, obj.Metadata.NamespaceProperty)
);
if (res == null)
{
throw new ArgumentException($"Resource '{obj.Metadata.NamespaceProperty}/{obj.Metadata.Name}' not found");
}

var result = jsonPatch.Apply<T, T>(res);

if (res is Executable exe && result is Executable eu)
{
if (eu.Spec.Stop == true)
{
exe.Spec.Stop = true;
if (exe.Status is null)
{
exe.Status = new ExecutableStatus();
}
exe.Status.State = ExecutableState.Finished;
}
}

if (res is Container ctr && result is Container cu)
{
if (cu.Spec.Stop == true)
{
ctr.Spec.Stop = true;
if (ctr.Status is null)
{
ctr.Status = new ContainerStatus();
}
ctr.Status.State = ContainerState.Exited;
}
}

return Task.FromResult(res);
}

// Fall back to doing noting.
return Task.FromResult(obj);
}

Expand Down
Loading