Skip to content

Commit bbf8eed

Browse files
committed
Fix overall chunk retry mechanism and add unit tests.
1 parent 76ea0d8 commit bbf8eed

File tree

2 files changed

+322
-170
lines changed

2 files changed

+322
-170
lines changed

internal/gensupport/resumable.go

Lines changed: 123 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -126,51 +126,136 @@ func (rx *ResumableUpload) reportProgress(old, updated int64) {
126126
}
127127
}
128128

129-
// transferChunk performs a single HTTP request to upload a single chunk from rx.Media.
130-
func (rx *ResumableUpload) transferChunk(ctx context.Context) (*http.Response, error) {
131-
chunk, off, size, err := rx.Media.Chunk()
129+
// transferChunk performs the transfer of a single chunk of media from rx.Media.
130+
// It handles retries with backoff for failed attempts and respects several
131+
// timeout and cancellation mechanisms:
132+
// 1. It respects cancellation from the parent context `ctx`.
133+
// 2. It enforces a per-chunk deadline, `rx.ChunkRetryDeadline`, for all
134+
// retries of a chunk.
135+
// 3. It applies a per-attempt timeout, `rx.ChunkTransferTimeout`, to each HTTP
136+
// request to prevent stalls.
137+
//
138+
// Upon successful upload of a chunk, it reports the progress and advances the
139+
// media buffer to the next chunk.
140+
func (rx *ResumableUpload) transferChunk(ctx context.Context) (resp *http.Response, err error) {
141+
select {
142+
case <-ctx.Done():
143+
err = ctx.Err()
144+
return
145+
default:
146+
}
132147

148+
chunk, off, size, err := rx.Media.Chunk()
133149
done := err == io.EOF
134150
if !done && err != nil {
135151
return nil, err
136152
}
137153

138-
// rCtx is derived from a context with a defined transferTimeout with non-zero value.
139-
// If a particular request exceeds this transfer time for getting response, the rCtx deadline will be exceeded,
140-
// triggering a retry of the request.
141-
var rCtx context.Context
142-
var cancel context.CancelFunc
154+
// Configure retryable error criteria.
155+
errorFunc := rx.Retry.errorFunc()
143156

144-
rCtx = ctx
145-
if rx.ChunkTransferTimeout != 0 {
146-
rCtx, cancel = context.WithTimeout(ctx, rx.ChunkTransferTimeout)
147-
defer cancel()
148-
}
157+
// Each chunk gets its own initialized-at-zero backoff and invocation ID.
158+
bo := rx.Retry.backoff()
159+
var pause time.Duration
160+
rx.invocationID = uuid.New().String()
161+
rx.attempts = 1
149162

150-
res, err := rx.doUploadRequest(rCtx, chunk, off, int64(size), done)
151-
if err != nil {
152-
return res, err
163+
// Configure per-chunk retry deadline.
164+
var retryDeadline time.Duration
165+
if rx.ChunkRetryDeadline != 0 {
166+
retryDeadline = rx.ChunkRetryDeadline
167+
} else {
168+
retryDeadline = defaultRetryDeadline
153169
}
170+
quitAfterTimer := time.NewTimer(retryDeadline)
171+
defer quitAfterTimer.Stop()
154172

155-
// We sent "X-GUploader-No-308: yes" (see comment elsewhere in
156-
// this file), so we don't expect to get a 308.
157-
if res.StatusCode == 308 {
158-
return nil, errors.New("unexpected 308 response status code")
159-
}
173+
for {
174+
pauseTimer := time.NewTimer(pause)
175+
select {
176+
case <-ctx.Done():
177+
pauseTimer.Stop()
178+
if err == nil {
179+
err = ctx.Err()
180+
}
181+
return
182+
case <-pauseTimer.C:
183+
case <-quitAfterTimer.C:
184+
pauseTimer.Stop()
185+
return
186+
}
187+
pauseTimer.Stop()
188+
189+
// Check for context cancellation or timeout once more after backoff time.
190+
// If more than one case in the select statement above was satisfied at the same time,
191+
// Go will choose one arbitrarily.
192+
// That can cause an operation to go through even if the context was
193+
// canceled before or the timeout was reached.
194+
select {
195+
case <-ctx.Done():
196+
if err == nil {
197+
err = ctx.Err()
198+
}
199+
return
200+
case <-quitAfterTimer.C:
201+
return
202+
default:
203+
}
160204

161-
if res.StatusCode == http.StatusOK {
162-
rx.reportProgress(off, off+int64(size))
163-
}
205+
// We close the response's body here, since we definitely will not
206+
// return `resp` now. If we close it before the select case above, a
207+
// timer may fire and cause us to return a response with a closed body
208+
// (in which case, the caller will not get the error message in the body).
209+
if resp != nil && resp.Body != nil {
210+
// Read the body to EOF - if the Body is not both read to EOF and closed,
211+
// the Client's underlying RoundTripper may not be able to re-use the
212+
// persistent TCP connection to the server for a subsequent "keep-alive" request.
213+
// See https://pkg.go.dev/net/http#Client.Do
214+
io.Copy(io.Discard, resp.Body)
215+
resp.Body.Close()
216+
}
164217

165-
if statusResumeIncomplete(res) {
166-
rx.Media.Next()
218+
// rCtx is derived from a context with a defined transferTimeout with non-zero value.
219+
// If a particular request exceeds this transfer time for getting response, the rCtx deadline will be exceeded,
220+
// triggering a retry of the request.
221+
var rCtx context.Context
222+
var cancel context.CancelFunc
223+
rCtx = ctx
224+
if rx.ChunkTransferTimeout != 0 {
225+
rCtx, cancel = context.WithTimeout(ctx, rx.ChunkTransferTimeout)
226+
}
227+
228+
resp, err = rx.doUploadRequest(rCtx, chunk, off, int64(size), done)
229+
// Cancel context right after the operation is done.
230+
if cancel != nil {
231+
cancel()
232+
}
233+
var status int
234+
if resp != nil {
235+
status = resp.StatusCode
236+
}
237+
// We sent "X-GUploader-No-308: yes" (see comment elsewhere in
238+
// this file), so we don't expect to get a 308.
239+
if status == 308 {
240+
return nil, errors.New("unexpected 308 response status code")
241+
}
242+
if status == http.StatusOK {
243+
break
244+
}
245+
// Check if we should retry the request.
246+
if !errorFunc(status, err) {
247+
return
248+
}
249+
rx.attempts++
250+
pause = bo.Pause()
167251
}
168-
return res, nil
252+
253+
rx.reportProgress(off, off+int64(size))
254+
rx.Media.Next()
255+
return resp, nil
169256
}
170257

171258
// Upload starts the process of a resumable upload with a cancellable context.
172-
// It retries using the provided back off strategy until cancelled or the
173-
// strategy indicates to stop retrying.
174259
// It is called from the auto-generated API code and is not visible to the user.
175260
// Before sending an HTTP request, Upload calls any registered hook functions,
176261
// and calls the returned functions after the request returns (see send.go).
@@ -203,99 +288,25 @@ func (rx *ResumableUpload) Upload(ctx context.Context) (resp *http.Response, err
203288
}
204289
return resp, nil
205290
}
206-
// Configure retryable error criteria.
207-
errorFunc := rx.Retry.errorFunc()
208-
209-
// Configure per-chunk retry deadline.
210-
var retryDeadline time.Duration
211-
if rx.ChunkRetryDeadline != 0 {
212-
retryDeadline = rx.ChunkRetryDeadline
213-
} else {
214-
retryDeadline = defaultRetryDeadline
215-
}
216291

217292
// Send all chunks.
218293
for {
219-
var pause time.Duration
220-
221-
// Each chunk gets its own initialized-at-zero backoff and invocation ID.
222-
bo := rx.Retry.backoff()
223-
quitAfterTimer := time.NewTimer(retryDeadline)
224-
rx.attempts = 1
225-
rx.invocationID = uuid.New().String()
226-
227-
// Retry loop for a single chunk.
228-
for {
229-
pauseTimer := time.NewTimer(pause)
230-
select {
231-
case <-ctx.Done():
232-
quitAfterTimer.Stop()
233-
pauseTimer.Stop()
234-
if err == nil {
235-
err = ctx.Err()
236-
}
237-
return prepareReturn(resp, err)
238-
case <-pauseTimer.C:
239-
case <-quitAfterTimer.C:
240-
pauseTimer.Stop()
241-
return prepareReturn(resp, err)
242-
}
243-
pauseTimer.Stop()
244-
245-
// Check for context cancellation or timeout once more. If more than one
246-
// case in the select statement above was satisfied at the same time, Go
247-
// will choose one arbitrarily.
248-
// That can cause an operation to go through even if the context was
249-
// canceled before or the timeout was reached.
250-
select {
251-
case <-ctx.Done():
252-
quitAfterTimer.Stop()
253-
if err == nil {
254-
err = ctx.Err()
255-
}
256-
return prepareReturn(resp, err)
257-
case <-quitAfterTimer.C:
258-
return prepareReturn(resp, err)
259-
default:
260-
}
261-
262-
// We close the response's body here, since we definitely will not
263-
// return `resp` now. If we close it before the select case above, a
264-
// timer may fire and cause us to return a response with a closed body
265-
// (in which case, the caller will not get the error message in the body).
266-
if resp != nil && resp.Body != nil {
267-
// Read the body to EOF - if the Body is not both read to EOF and closed,
268-
// the Client's underlying RoundTripper may not be able to re-use the
269-
// persistent TCP connection to the server for a subsequent "keep-alive" request.
270-
// See https://pkg.go.dev/net/http#Client.Do
271-
io.Copy(io.Discard, resp.Body)
272-
resp.Body.Close()
273-
}
274-
resp, err = rx.transferChunk(ctx)
275-
276-
var status int
277-
if resp != nil {
278-
status = resp.StatusCode
279-
}
280294

281-
// The upload should be retried if the err indicates the context timed out.
282-
if rx.ChunkTransferTimeout != 0 && errors.Is(err, context.DeadlineExceeded) {
283-
continue
284-
}
295+
// Transfer a single chunk.
296+
resp, err = rx.transferChunk(ctx)
285297

286-
// Check if we should retry the request.
287-
if !errorFunc(status, err) {
288-
quitAfterTimer.Stop()
289-
break
290-
}
291-
292-
rx.attempts++
293-
pause = bo.Pause()
298+
// If an error occurred, the upload has failed. Return immediately.
299+
if err != nil {
300+
return prepareReturn(resp, err)
294301
}
295302

296303
// If the chunk was uploaded successfully, but there's still
297304
// more to go, upload the next chunk without any delay.
298305
if statusResumeIncomplete(resp) {
306+
// Read the body to EOF and close it to allow the underlying
307+
// transport to reuse the connection for next chunk upload.
308+
io.Copy(io.Discard, resp.Body)
309+
resp.Body.Close()
299310
continue
300311
}
301312

0 commit comments

Comments
 (0)