Skip to content

Conversation

Khongchai
Copy link

Hi, folks! Thanks for the great lib. I had this use case for cancelling multiple subscribing streams -- here's my implementation:

When cancelling, we publish a cancel event that calls reader.cancel. This will cause

reader.read().then(async ({ done, value }) => {

done to be true here and follows the normal done flow.

I also released the reader lock because I needed to assert the state of the original stream in the new test case. This is probably useful for library user in the future too if they wanna do something with it after.

@tjazsilovsek
Copy link

I think this is missing the ability to cancel the original stream producer. Correct me if I am wrong please, but I think this is the most crucial part of cancellation.

@Khongchai
Copy link
Author

I think this is missing the ability to cancel the original stream producer. Correct me if I am wrong please, but I think this is the most crucial part of cancellation.

Hi! If you mean the original stream that publishes to everyone else, it is cancelled here (it then causes the cancellation of everything else)

image

@tjazsilovsek
Copy link

I think this is missing the ability to cancel the original stream producer. Correct me if I am wrong please, but I think this is the most crucial part of cancellation.

Hi! If you mean the original stream that publishes to everyone else, it is cancelled here (it then causes the cancellation of everything else)

image

Lets say I have a long running agent, that does a bunch of llm calls and is streaming a bunch of things to the client. On cancellation I would like to abort the agent, so no additional work is being done. I don't think that is possible with current implementation?

@Khongchai
Copy link
Author

@tjazsilovsek

We're actually using this in production right now and it's working fine.

Untitled.mov

@tjazsilovsek
Copy link

@Khongchai yes, I know the regular flow works fine. But what happens if you cancel. Will the original stream producer terminate? How do you signal termination?

@Khongchai
Copy link
Author

@Khongchai yes, I know the regular flow works fine. But what happens if you cancel. Will the original stream producer terminate? How do you signal termination?

Yes. In the video, I cancelled the orignal stream producer (i clicked on the stop button bottom right) and both stream stops.

I created a new endpoint that when called with the streamId (in our case the id of the chat), invokes my method, which then invokes the originating ReadableStream's cancel method

@tjazsilovsek
Copy link

tjazsilovsek commented Sep 1, 2025

@Khongchai yes, I know the regular flow works fine. But what happens if you cancel. Will the original stream producer terminate? How do you signal termination?

Yes. In the video, I cancelled the orignal stream producer (i clicked on the stop button bottom right) and both stream stops.

I created a new endpoint that when called with the streamId (in our case the id of the chat), invokes my method, which then invokes the originating ReadableStream's cancel method

I think you are not understading me. Your code DOES stop the stream. So the client will not be getting any new data. But the underlying producer is NOT stopped, it is still producing new data, it just cannot be enqued due to closed reader. So your approach works on surface, but I don't think is the correct one.

There is also a race condition, if the stream is cancelled before you start listening for the signal.

@Khongchai
Copy link
Author

I think I misunderstood what you meant, I thought by "original stream producer", you meant the stream returned from makeStream. So let me just show you roughly how our makeStream is implemented

 let originalStream = await obtainSomehow();
const makeStream = () => {
      return new ReadableStream({
        start(controller) {
             // ... publish from originalStream to controller, etc
       }, 

        cancel() {
             originalStream.cancel()   // this?
        },
      });
    }
    const myStream = await resumableStream.createNewResumableStream(
      streamId,
      makeStream,
    );

originalStream.cancel() is this part what you were asking about? I didn't think of this because, this PR is only about cancelling the result of makeStream, not the one producing stuff to it.

Essentially people can just react to cancel and do whatever they want -- in our case, cancelling the producing stream.

There is also a race condition, if the stream is cancelled before you start listening for the signal.

I won't handle this because like the original author, it is unlikely to happen (at least for us).

Hope this clears things up for you.

Cheers

@tjazsilovsek
Copy link

I think I misunderstood what you meant, I thought by "original stream producer", you meant the stream returned from makeStream. So let me just show you roughly how our makeStream is implemented

 let originalStream = await obtainSomehow();
const makeStream = () => {
      return new ReadableStream({
        start(controller) {
             // ... publish from originalStream to controller, etc
       }, 

        cancel() {
             originalStream.cancel()   // this?
        },
      });
    }
    const myStream = await resumableStream.createNewResumableStream(
      streamId,
      makeStream,
    );

originalStream.cancel() is this part what you were asking about? I didn't think of this because, this PR is only about cancelling the result of makeStream, not the one producing stuff to it.

Essentially people can just react to cancel and do whatever they want -- in our case, cancelling the producing stream.

There is also a race condition, if the stream is cancelled before you start listening for the signal.

I won't handle this because like the original author, it is unlikely to happen (at least for us).

Hope this clears things up for you.

Cheers

This makes sense, thanks!

I think it would be great to write some minimal documentation in regards to the discussion we had since many people will be wondering the same thing.

@Khongchai
Copy link
Author

@tjazsilovsek yeah you're right, most people will probably want to react to the cancellation. I added a short section for the stream cancellation. Do you think that could have answered your question?

@tjazsilovsek
Copy link

@tjazsilovsek yeah you're right, most people will probably want to react to the cancellation. I added a short section for the stream cancellation. Do you think that could have answered your question?

Great, I would just add a sentence or two about the mechanism. How this stream cancel method only closes the reader, so users should also consider aborting the original stream.

Maybe it would make sense, to expose onCancelled callback directly on createNewResumableStream?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants