Skip to content
252 changes: 121 additions & 131 deletions internal/gensupport/resumable.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,21 @@ func (rx *ResumableUpload) reportProgress(old, updated int64) {
}
}

// transferChunk performs a single HTTP request to upload a single chunk from rx.Media.
func (rx *ResumableUpload) transferChunk(ctx context.Context) (*http.Response, error) {
chunk, off, size, err := rx.Media.Chunk()

done := err == io.EOF
if !done && err != nil {
return nil, err
// transferChunk performs a single HTTP request to upload a single chunk.
func (rx *ResumableUpload) transferChunk(ctx context.Context, chunk io.Reader, off, size int64, done bool) (*http.Response, error) {
// rCtx is derived from a context with a defined ChunkTransferTimeout with non-zero value.
// If a particular request exceeds this transfer time for getting response, the rCtx deadline will be exceeded,
// triggering a retry of the request.
var rCtx context.Context
var cancel context.CancelFunc

rCtx = ctx
if rx.ChunkTransferTimeout != 0 {
rCtx, cancel = context.WithTimeout(ctx, rx.ChunkTransferTimeout)
defer cancel()
}

res, err := rx.doUploadRequest(ctx, chunk, off, int64(size), done)
res, err := rx.doUploadRequest(rCtx, chunk, off, size, done)
if err != nil {
return res, err
}
Expand All @@ -149,161 +154,146 @@ func (rx *ResumableUpload) transferChunk(ctx context.Context) (*http.Response, e
if res.StatusCode == http.StatusOK {
rx.reportProgress(off, off+int64(size))
}
return res, nil
}

if statusResumeIncomplete(res) {
rx.Media.Next()
// uploadChunkWithRetries attempts to upload a single chunk, with retries
// within ChunkRetryDeadline if ChunkTransferTimeout is non-zero.
func (rx *ResumableUpload) uploadChunkWithRetries(ctx context.Context, chunk io.Reader, off, size int64, done bool) (*http.Response, error) {
// Configure error retryable criteria.
shouldRetry := rx.Retry.errorFunc()

// Configure single chunk retry deadline.
retryDeadline := defaultRetryDeadline
if rx.ChunkRetryDeadline != 0 {
retryDeadline = rx.ChunkRetryDeadline
}

// Each chunk gets its own initialized-at-zero backoff and invocation ID.
bo := rx.Retry.backoff()
quitAfterTimer := time.NewTimer(retryDeadline)
defer quitAfterTimer.Stop()
rx.attempts = 1
rx.invocationID = uuid.New().String()

var pause time.Duration
var resp *http.Response
var err error

// Retry loop for a single chunk.
for {
// Wait for the backoff period, unless the context is canceled or the
// retry deadline is hit.
pauseTimer := time.NewTimer(pause)
select {
case <-ctx.Done():
pauseTimer.Stop()
if err == nil {
err = ctx.Err()
}
return resp, err
case <-pauseTimer.C:
case <-quitAfterTimer.C:
pauseTimer.Stop()
return resp, err
}
pauseTimer.Stop()

// Check for context cancellation or timeout once more. If more than one
// case in the select statement above was satisfied at the same time, Go
// will choose one arbitrarily.
// That can cause an operation to go through even if the context was
// canceled before or the timeout was reached.
select {
case <-ctx.Done():
if err == nil {
err = ctx.Err()
}
return resp, err
case <-quitAfterTimer.C:
return resp, err
default:
}

// We close the response's body here, since we definitely will not
// return `resp` now. If we close it before the select case above, a
// timer may fire and cause us to return a response with a closed body
// (in which case, the caller will not get the error message in the body).
if resp != nil && resp.Body != nil {
// Read the body to EOF - if the Body is not both read to EOF and closed,
// the Client's underlying RoundTripper may not be able to re-use the
// persistent TCP connection to the server for a subsequent "keep-alive" request.
// See https://pkg.go.dev/net/http#Client.Do
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}

resp, err = rx.transferChunk(ctx, chunk, off, size, done)
status := 0
if resp != nil {
status = resp.StatusCode
}
// Chunk upload should be retried if the ChunkTransferTimeout is non-zero and err is context deadline exceeded
// or we encounter a retryable error.
if (rx.ChunkTransferTimeout != 0 && errors.Is(err, context.DeadlineExceeded)) || shouldRetry(status, err) {
rx.attempts++
pause = bo.Pause()
chunk, _, _, _ = rx.Media.Chunk()
continue
}
return resp, err
}
return res, nil
}

// Upload starts the process of a resumable upload with a cancellable context.
// It retries using the provided back off strategy until cancelled or the
// strategy indicates to stop retrying.
// It is called from the auto-generated API code and is not visible to the user.
// Before sending an HTTP request, Upload calls any registered hook functions,
// and calls the returned functions after the request returns (see send.go).
// rx is private to the auto-generated API code.
// Exactly one of resp or err will be nil. If resp is non-nil, the caller must call resp.Body.Close.
// Upload does not parse the response into the error on a non 200 response;
// it is the caller's responsibility to call resp.Body.Close.
func (rx *ResumableUpload) Upload(ctx context.Context) (resp *http.Response, err error) {
func (rx *ResumableUpload) Upload(ctx context.Context) (*http.Response, error) {
for {
chunk, off, size, err := rx.Media.Chunk()
done := err == io.EOF
if !done && err != nil {
return nil, err
}

// There are a couple of cases where it's possible for err and resp to both
// be non-nil. However, we expose a simpler contract to our callers: exactly
// one of resp and err will be non-nil. This means that any response body
// must be closed here before returning a non-nil error.
var prepareReturn = func(resp *http.Response, err error) (*http.Response, error) {
resp, err := rx.uploadChunkWithRetries(ctx, chunk, off, int64(size), done)
// There are a couple of cases where it's possible for err and resp to both
// be non-nil. However, we expose a simpler contract to our callers: exactly
// one of resp and err will be non-nil. This means that any response body
// must be closed here before returning a non-nil error.
if err != nil {
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
// If there were retries, indicate this in the error message and wrap the final error.
if rx.attempts > 1 {
return nil, fmt.Errorf("chunk upload failed after %d attempts;, final error: %w", rx.attempts, err)
return nil, fmt.Errorf("chunk upload failed after %d attempts, final error: %w", rx.attempts, err)
}
return nil, err
}

// This case is very unlikely but possible only if rx.ChunkRetryDeadline is
// set to a very small value, in which case no requests will be sent before
// the deadline. Return an error to avoid causing a panic.
if resp == nil {
return nil, fmt.Errorf("upload request to %v not sent, choose larger value for ChunkRetryDealine", rx.URI)
return nil, fmt.Errorf("upload request to %v not sent, choose larger value for ChunkRetryDeadline", rx.URI)
}
return resp, nil
}
// Configure retryable error criteria.
errorFunc := rx.Retry.errorFunc()

// Configure per-chunk retry deadline.
var retryDeadline time.Duration
if rx.ChunkRetryDeadline != 0 {
retryDeadline = rx.ChunkRetryDeadline
} else {
retryDeadline = defaultRetryDeadline
}

// Send all chunks.
for {
var pause time.Duration

// Each chunk gets its own initialized-at-zero backoff and invocation ID.
bo := rx.Retry.backoff()
quitAfterTimer := time.NewTimer(retryDeadline)
rx.attempts = 1
rx.invocationID = uuid.New().String()

// Retry loop for a single chunk.
for {
pauseTimer := time.NewTimer(pause)
select {
case <-ctx.Done():
quitAfterTimer.Stop()
pauseTimer.Stop()
if err == nil {
err = ctx.Err()
}
return prepareReturn(resp, err)
case <-pauseTimer.C:
case <-quitAfterTimer.C:
pauseTimer.Stop()
return prepareReturn(resp, err)
}
pauseTimer.Stop()

// Check for context cancellation or timeout once more. If more than one
// case in the select statement above was satisfied at the same time, Go
// will choose one arbitrarily.
// That can cause an operation to go through even if the context was
// canceled before or the timeout was reached.
select {
case <-ctx.Done():
quitAfterTimer.Stop()
if err == nil {
err = ctx.Err()
}
return prepareReturn(resp, err)
case <-quitAfterTimer.C:
return prepareReturn(resp, err)
default:
}

// rCtx is derived from a context with a defined transferTimeout with non-zero value.
// If a particular request exceeds this transfer time for getting response, the rCtx deadline will be exceeded,
// triggering a retry of the request.
var rCtx context.Context
var cancel context.CancelFunc

rCtx = ctx
if rx.ChunkTransferTimeout != 0 {
rCtx, cancel = context.WithTimeout(ctx, rx.ChunkTransferTimeout)
}

// We close the response's body here, since we definitely will not
// return `resp` now. If we close it before the select case above, a
// timer may fire and cause us to return a response with a closed body
// (in which case, the caller will not get the error message in the body).
if resp != nil && resp.Body != nil {
// Read the body to EOF - if the Body is not both read to EOF and closed,
// the Client's underlying RoundTripper may not be able to re-use the
// persistent TCP connection to the server for a subsequent "keep-alive" request.
// See https://pkg.go.dev/net/http#Client.Do
if statusResumeIncomplete(resp) {
// The upload is not yet complete, but the server has acknowledged this chunk.
// We don't have anything to do with the response body.
if resp.Body != nil {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}
resp, err = rx.transferChunk(rCtx)

var status int
if resp != nil {
status = resp.StatusCode
}

// The upload should be retried if the rCtx is canceled due to a timeout.
select {
case <-rCtx.Done():
if rx.ChunkTransferTimeout != 0 && errors.Is(rCtx.Err(), context.DeadlineExceeded) {
// Cancel the context for rCtx
cancel()
continue
}
default:
}

// Check if we should retry the request.
if !errorFunc(status, err) {
quitAfterTimer.Stop()
break
}

rx.attempts++
pause = bo.Pause()
}

// If the chunk was uploaded successfully, but there's still
// more to go, upload the next chunk without any delay.
if statusResumeIncomplete(resp) {
rx.Media.Next()
continue
}

return prepareReturn(resp, err)
return resp, nil
}
}
Loading