@@ -17,37 +17,36 @@ import (
17
17
18
18
const (
19
19
defaultBatchSize = 100
20
+ baseRetryDelayMs = 250
20
21
)
21
22
22
23
type retryCounter struct {
23
- MaxRetries int `git:"lfs.transfer.maxretries"`
24
+ MaxRetries int
25
+ MaxRetryDelay int
24
26
25
27
// cmu guards count
26
28
cmu sync.Mutex
27
29
// count maps OIDs to number of retry attempts
28
30
count map [string ]int
29
31
}
30
32
31
- // newRetryCounter instantiates a new *retryCounter. It parses the gitconfig
32
- // value: `lfs.transfer.maxretries`, and falls back to defaultMaxRetries if none
33
- // was provided.
34
- //
35
- // If it encountered an error in Unmarshaling the *config.Configuration, it will
36
- // be returned, otherwise nil.
33
+ // newRetryCounter instantiates a new *retryCounter.
37
34
func newRetryCounter () * retryCounter {
38
35
return & retryCounter {
39
- MaxRetries : defaultMaxRetries ,
40
- count : make (map [string ]int ),
36
+ MaxRetries : defaultMaxRetries ,
37
+ MaxRetryDelay : defaultMaxRetryDelay ,
38
+ count : make (map [string ]int ),
41
39
}
42
40
}
43
41
44
- // Increment increments the number of retries for a given OID. It is safe to
45
- // call across multiple goroutines.
46
- func (r * retryCounter ) Increment (oid string ) {
42
+ // Increment increments the number of retries for a given OID and returns the
43
+ // new value. It is safe to call across multiple goroutines.
44
+ func (r * retryCounter ) Increment (oid string ) int {
47
45
r .cmu .Lock ()
48
46
defer r .cmu .Unlock ()
49
47
50
48
r .count [oid ]++
49
+ return r .count [oid ]
51
50
}
52
51
53
52
// CountFor returns the current number of retries for a given OID. It is safe to
@@ -66,6 +65,22 @@ func (r *retryCounter) CanRetry(oid string) (int, bool) {
66
65
return count , count < r .MaxRetries
67
66
}
68
67
68
+ // ReadyTime returns the time from now when the current retry can occur or the
69
+ // zero time if the retry can occur immediately.
70
+ func (r * retryCounter ) ReadyTime (oid string ) time.Time {
71
+ count := r .CountFor (oid )
72
+ if count < 1 {
73
+ return time.Time {}
74
+ }
75
+
76
+ maxDelayMs := 1000 * uint64 (r .MaxRetryDelay )
77
+ delay := uint64 (baseRetryDelayMs ) * (1 << uint (count - 1 ))
78
+ if delay == 0 || delay > maxDelayMs {
79
+ delay = maxDelayMs
80
+ }
81
+ return time .Now ().Add (time .Duration (delay ) * time .Millisecond )
82
+ }
83
+
69
84
// batch implements the sort.Interface interface and enables sorting on a slice
70
85
// of `*Transfer`s by object size.
71
86
//
@@ -295,6 +310,7 @@ func NewTransferQueue(dir Direction, manifest *Manifest, remote string, options
295
310
}
296
311
297
312
q .rc .MaxRetries = q .manifest .maxRetries
313
+ q .rc .MaxRetryDelay = q .manifest .maxRetryDelay
298
314
q .client .MaxRetries = q .manifest .maxRetries
299
315
300
316
if q .batchSize <= 0 {
@@ -506,6 +522,24 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
506
522
next := q .makeBatch ()
507
523
tracerx .Printf ("tq: sending batch of size %d" , len (batch ))
508
524
525
+ enqueueRetry := func (t * objectTuple , err error , readyTime * time.Time ) {
526
+ count := q .rc .Increment (t .Oid )
527
+
528
+ if readyTime == nil {
529
+ t .ReadyTime = q .rc .ReadyTime (t .Oid )
530
+ } else {
531
+ t .ReadyTime = * readyTime
532
+ }
533
+ delay := time .Until (t .ReadyTime ).Seconds ()
534
+
535
+ var errMsg string
536
+ if err != nil {
537
+ errMsg = fmt .Sprintf (": %s" , err )
538
+ }
539
+ tracerx .Printf ("tq: enqueue retry #%d after %.2fs for %q (size: %d)%s" , count , delay , t .Oid , t .Size , errMsg )
540
+ next = append (next , t )
541
+ }
542
+
509
543
q .meter .Pause ()
510
544
var bRes * BatchResponse
511
545
if q .manifest .standaloneTransferAgent != "" {
@@ -530,14 +564,10 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
530
564
// retried, they will be marked as failed.
531
565
for _ , t := range batch {
532
566
if q .canRetryObject (t .Oid , err ) {
533
- q .rc .Increment (t .Oid )
534
-
535
- next = append (next , t )
567
+ enqueueRetry (t , err , nil )
536
568
} else if readyTime , canRetry := q .canRetryObjectLater (t .Oid , err ); canRetry {
537
- tracerx .Printf ("tq: retrying object %s after %s seconds." , t .Oid , time .Until (readyTime ).Seconds ())
538
569
err = nil
539
- t .ReadyTime = readyTime
540
- next = append (next , t )
570
+ enqueueRetry (t , err , & readyTime )
541
571
} else {
542
572
q .wait .Done ()
543
573
}
@@ -599,13 +629,8 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
599
629
tr := newTransfer (o , objects .First ().Name , objects .First ().Path )
600
630
601
631
if a , err := tr .Rel (q .direction .String ()); err != nil {
602
- // XXX(taylor): duplication
603
632
if q .canRetryObject (tr .Oid , err ) {
604
- q .rc .Increment (tr .Oid )
605
- count := q .rc .CountFor (tr .Oid )
606
-
607
- tracerx .Printf ("tq: enqueue retry #%d for %q (size: %d): %s" , count , tr .Oid , tr .Size , err )
608
- next = append (next , objects .First ())
633
+ enqueueRetry (objects .First (), err , nil )
609
634
} else {
610
635
q .errorc <- errors .Errorf ("[%v] %v" , tr .Name , err )
611
636
@@ -624,12 +649,7 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
624
649
625
650
retries := q .addToAdapter (bRes .endpoint , toTransfer )
626
651
for t := range retries {
627
- q .rc .Increment (t .Oid )
628
- count := q .rc .CountFor (t .Oid )
629
-
630
- tracerx .Printf ("tq: enqueue retry #%d for %q (size: %d)" , count , t .Oid , t .Size )
631
-
632
- next = append (next , t )
652
+ enqueueRetry (t , nil , nil )
633
653
}
634
654
635
655
return next , nil
0 commit comments