Skip to content

Commit 93865aa

Browse files
authored
fix(gensupport): update chunk upload logic for robust timeout handling. (#3208)
The ChunkTransferTimeout was previously initiated before reading the media chunk via Media.Chunk(). This caused the timeout to incorrectly include the media read time, leading to premature timeouts for uploads with slow media sources (e.g., slow disk I/O or slow write from application). This change refactors the upload logic to apply the chunkRetryDeadline timer after the chunk has been read from Media.Chunk() and chunkTransferTimeout timeout only applies to the network request portion of the chunk transfer. The context.WithTimeout is now created within the transferChunk function, immediately before the doUploadRequest call, ensuring its scope is limited to the network round trip. Additionally, this commit introduces additional unit test to validate the fix and prevent regressions: A slow media read, which should now succeed. A slow network response, which correctly continues to fail with a timeout. Retries work as expected within chunkRetryDeadline. Note: Testing of storage (unit tests, emulator test, integration tests) is done and results are attached in the bug. Internal Bug: 427490995
1 parent ceceb79 commit 93865aa

File tree

2 files changed

+407
-157
lines changed

2 files changed

+407
-157
lines changed

internal/gensupport/resumable.go

Lines changed: 121 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -126,16 +126,21 @@ func (rx *ResumableUpload) reportProgress(old, updated int64) {
126126
}
127127
}
128128

129-
// transferChunk performs a single HTTP request to upload a single chunk from rx.Media.
130-
func (rx *ResumableUpload) transferChunk(ctx context.Context) (*http.Response, error) {
131-
chunk, off, size, err := rx.Media.Chunk()
132-
133-
done := err == io.EOF
134-
if !done && err != nil {
135-
return nil, err
129+
// transferChunk performs a single HTTP request to upload a single chunk.
130+
func (rx *ResumableUpload) transferChunk(ctx context.Context, chunk io.Reader, off, size int64, done bool) (*http.Response, error) {
131+
// rCtx is derived from a context with a defined ChunkTransferTimeout with non-zero value.
132+
// If a particular request exceeds this transfer time for getting response, the rCtx deadline will be exceeded,
133+
// triggering a retry of the request.
134+
var rCtx context.Context
135+
var cancel context.CancelFunc
136+
137+
rCtx = ctx
138+
if rx.ChunkTransferTimeout != 0 {
139+
rCtx, cancel = context.WithTimeout(ctx, rx.ChunkTransferTimeout)
140+
defer cancel()
136141
}
137142

138-
res, err := rx.doUploadRequest(ctx, chunk, off, int64(size), done)
143+
res, err := rx.doUploadRequest(rCtx, chunk, off, size, done)
139144
if err != nil {
140145
return res, err
141146
}
@@ -149,161 +154,146 @@ func (rx *ResumableUpload) transferChunk(ctx context.Context) (*http.Response, e
149154
if res.StatusCode == http.StatusOK {
150155
rx.reportProgress(off, off+int64(size))
151156
}
157+
return res, nil
158+
}
152159

153-
if statusResumeIncomplete(res) {
154-
rx.Media.Next()
160+
// uploadChunkWithRetries attempts to upload a single chunk, with retries
161+
// within ChunkRetryDeadline if ChunkTransferTimeout is non-zero.
162+
func (rx *ResumableUpload) uploadChunkWithRetries(ctx context.Context, chunk io.Reader, off, size int64, done bool) (*http.Response, error) {
163+
// Configure error retryable criteria.
164+
shouldRetry := rx.Retry.errorFunc()
165+
166+
// Configure single chunk retry deadline.
167+
retryDeadline := defaultRetryDeadline
168+
if rx.ChunkRetryDeadline != 0 {
169+
retryDeadline = rx.ChunkRetryDeadline
170+
}
171+
172+
// Each chunk gets its own initialized-at-zero backoff and invocation ID.
173+
bo := rx.Retry.backoff()
174+
quitAfterTimer := time.NewTimer(retryDeadline)
175+
defer quitAfterTimer.Stop()
176+
rx.attempts = 1
177+
rx.invocationID = uuid.New().String()
178+
179+
var pause time.Duration
180+
var resp *http.Response
181+
var err error
182+
183+
// Retry loop for a single chunk.
184+
for {
185+
// Wait for the backoff period, unless the context is canceled or the
186+
// retry deadline is hit.
187+
pauseTimer := time.NewTimer(pause)
188+
select {
189+
case <-ctx.Done():
190+
pauseTimer.Stop()
191+
if err == nil {
192+
err = ctx.Err()
193+
}
194+
return resp, err
195+
case <-pauseTimer.C:
196+
case <-quitAfterTimer.C:
197+
pauseTimer.Stop()
198+
return resp, err
199+
}
200+
pauseTimer.Stop()
201+
202+
// Check for context cancellation or timeout once more. If more than one
203+
// case in the select statement above was satisfied at the same time, Go
204+
// will choose one arbitrarily.
205+
// That can cause an operation to go through even if the context was
206+
// canceled before or the timeout was reached.
207+
select {
208+
case <-ctx.Done():
209+
if err == nil {
210+
err = ctx.Err()
211+
}
212+
return resp, err
213+
case <-quitAfterTimer.C:
214+
return resp, err
215+
default:
216+
}
217+
218+
// We close the response's body here, since we definitely will not
219+
// return `resp` now. If we close it before the select case above, a
220+
// timer may fire and cause us to return a response with a closed body
221+
// (in which case, the caller will not get the error message in the body).
222+
if resp != nil && resp.Body != nil {
223+
// Read the body to EOF - if the Body is not both read to EOF and closed,
224+
// the Client's underlying RoundTripper may not be able to re-use the
225+
// persistent TCP connection to the server for a subsequent "keep-alive" request.
226+
// See https://pkg.go.dev/net/http#Client.Do
227+
io.Copy(io.Discard, resp.Body)
228+
resp.Body.Close()
229+
}
230+
231+
resp, err = rx.transferChunk(ctx, chunk, off, size, done)
232+
status := 0
233+
if resp != nil {
234+
status = resp.StatusCode
235+
}
236+
// Chunk upload should be retried if the ChunkTransferTimeout is non-zero and err is context deadline exceeded
237+
// or we encounter a retryable error.
238+
if (rx.ChunkTransferTimeout != 0 && errors.Is(err, context.DeadlineExceeded)) || shouldRetry(status, err) {
239+
rx.attempts++
240+
pause = bo.Pause()
241+
chunk, _, _, _ = rx.Media.Chunk()
242+
continue
243+
}
244+
return resp, err
155245
}
156-
return res, nil
157246
}
158247

159248
// Upload starts the process of a resumable upload with a cancellable context.
160-
// It retries using the provided back off strategy until cancelled or the
161-
// strategy indicates to stop retrying.
162249
// It is called from the auto-generated API code and is not visible to the user.
163250
// Before sending an HTTP request, Upload calls any registered hook functions,
164251
// and calls the returned functions after the request returns (see send.go).
165252
// rx is private to the auto-generated API code.
166253
// Exactly one of resp or err will be nil. If resp is non-nil, the caller must call resp.Body.Close.
167254
// Upload does not parse the response into the error on a non 200 response;
168255
// it is the caller's responsibility to call resp.Body.Close.
169-
func (rx *ResumableUpload) Upload(ctx context.Context) (resp *http.Response, err error) {
256+
func (rx *ResumableUpload) Upload(ctx context.Context) (*http.Response, error) {
257+
for {
258+
chunk, off, size, err := rx.Media.Chunk()
259+
done := err == io.EOF
260+
if !done && err != nil {
261+
return nil, err
262+
}
170263

171-
// There are a couple of cases where it's possible for err and resp to both
172-
// be non-nil. However, we expose a simpler contract to our callers: exactly
173-
// one of resp and err will be non-nil. This means that any response body
174-
// must be closed here before returning a non-nil error.
175-
var prepareReturn = func(resp *http.Response, err error) (*http.Response, error) {
264+
resp, err := rx.uploadChunkWithRetries(ctx, chunk, off, int64(size), done)
265+
// There are a couple of cases where it's possible for err and resp to both
266+
// be non-nil. However, we expose a simpler contract to our callers: exactly
267+
// one of resp and err will be non-nil. This means that any response body
268+
// must be closed here before returning a non-nil error.
176269
if err != nil {
177270
if resp != nil && resp.Body != nil {
178271
resp.Body.Close()
179272
}
180273
// If there were retries, indicate this in the error message and wrap the final error.
181274
if rx.attempts > 1 {
182-
return nil, fmt.Errorf("chunk upload failed after %d attempts;, final error: %w", rx.attempts, err)
275+
return nil, fmt.Errorf("chunk upload failed after %d attempts, final error: %w", rx.attempts, err)
183276
}
184277
return nil, err
185278
}
279+
186280
// This case is very unlikely but possible only if rx.ChunkRetryDeadline is
187281
// set to a very small value, in which case no requests will be sent before
188282
// the deadline. Return an error to avoid causing a panic.
189283
if resp == nil {
190-
return nil, fmt.Errorf("upload request to %v not sent, choose larger value for ChunkRetryDealine", rx.URI)
284+
return nil, fmt.Errorf("upload request to %v not sent, choose larger value for ChunkRetryDeadline", rx.URI)
191285
}
192-
return resp, nil
193-
}
194-
// Configure retryable error criteria.
195-
errorFunc := rx.Retry.errorFunc()
196-
197-
// Configure per-chunk retry deadline.
198-
var retryDeadline time.Duration
199-
if rx.ChunkRetryDeadline != 0 {
200-
retryDeadline = rx.ChunkRetryDeadline
201-
} else {
202-
retryDeadline = defaultRetryDeadline
203-
}
204-
205-
// Send all chunks.
206-
for {
207-
var pause time.Duration
208-
209-
// Each chunk gets its own initialized-at-zero backoff and invocation ID.
210-
bo := rx.Retry.backoff()
211-
quitAfterTimer := time.NewTimer(retryDeadline)
212-
rx.attempts = 1
213-
rx.invocationID = uuid.New().String()
214-
215-
// Retry loop for a single chunk.
216-
for {
217-
pauseTimer := time.NewTimer(pause)
218-
select {
219-
case <-ctx.Done():
220-
quitAfterTimer.Stop()
221-
pauseTimer.Stop()
222-
if err == nil {
223-
err = ctx.Err()
224-
}
225-
return prepareReturn(resp, err)
226-
case <-pauseTimer.C:
227-
case <-quitAfterTimer.C:
228-
pauseTimer.Stop()
229-
return prepareReturn(resp, err)
230-
}
231-
pauseTimer.Stop()
232-
233-
// Check for context cancellation or timeout once more. If more than one
234-
// case in the select statement above was satisfied at the same time, Go
235-
// will choose one arbitrarily.
236-
// That can cause an operation to go through even if the context was
237-
// canceled before or the timeout was reached.
238-
select {
239-
case <-ctx.Done():
240-
quitAfterTimer.Stop()
241-
if err == nil {
242-
err = ctx.Err()
243-
}
244-
return prepareReturn(resp, err)
245-
case <-quitAfterTimer.C:
246-
return prepareReturn(resp, err)
247-
default:
248-
}
249286

250-
// rCtx is derived from a context with a defined transferTimeout with non-zero value.
251-
// If a particular request exceeds this transfer time for getting response, the rCtx deadline will be exceeded,
252-
// triggering a retry of the request.
253-
var rCtx context.Context
254-
var cancel context.CancelFunc
255-
256-
rCtx = ctx
257-
if rx.ChunkTransferTimeout != 0 {
258-
rCtx, cancel = context.WithTimeout(ctx, rx.ChunkTransferTimeout)
259-
}
260-
261-
// We close the response's body here, since we definitely will not
262-
// return `resp` now. If we close it before the select case above, a
263-
// timer may fire and cause us to return a response with a closed body
264-
// (in which case, the caller will not get the error message in the body).
265-
if resp != nil && resp.Body != nil {
266-
// Read the body to EOF - if the Body is not both read to EOF and closed,
267-
// the Client's underlying RoundTripper may not be able to re-use the
268-
// persistent TCP connection to the server for a subsequent "keep-alive" request.
269-
// See https://pkg.go.dev/net/http#Client.Do
287+
if statusResumeIncomplete(resp) {
288+
// The upload is not yet complete, but the server has acknowledged this chunk.
289+
// We don't have anything to do with the response body.
290+
if resp.Body != nil {
270291
io.Copy(io.Discard, resp.Body)
271292
resp.Body.Close()
272293
}
273-
resp, err = rx.transferChunk(rCtx)
274-
275-
var status int
276-
if resp != nil {
277-
status = resp.StatusCode
278-
}
279-
280-
// The upload should be retried if the rCtx is canceled due to a timeout.
281-
select {
282-
case <-rCtx.Done():
283-
if rx.ChunkTransferTimeout != 0 && errors.Is(rCtx.Err(), context.DeadlineExceeded) {
284-
// Cancel the context for rCtx
285-
cancel()
286-
continue
287-
}
288-
default:
289-
}
290-
291-
// Check if we should retry the request.
292-
if !errorFunc(status, err) {
293-
quitAfterTimer.Stop()
294-
break
295-
}
296-
297-
rx.attempts++
298-
pause = bo.Pause()
299-
}
300-
301-
// If the chunk was uploaded successfully, but there's still
302-
// more to go, upload the next chunk without any delay.
303-
if statusResumeIncomplete(resp) {
294+
rx.Media.Next()
304295
continue
305296
}
306-
307-
return prepareReturn(resp, err)
297+
return resp, nil
308298
}
309299
}

0 commit comments

Comments
 (0)