Skip to content

Commit 338ab40

Browse files
committed
commands,tq: defer local verification to *tq.TransferQueue
1 parent a7d3585 commit 338ab40

File tree

5 files changed

+70
-90
lines changed

5 files changed

+70
-90
lines changed

commands/command_push.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"os"
66

7+
"github.com/git-lfs/git-lfs/errors"
78
"github.com/git-lfs/git-lfs/git"
89
"github.com/git-lfs/git-lfs/lfs"
910
"github.com/rubyist/tracerx"
@@ -73,8 +74,22 @@ func uploadLeftOrAll(g *lfs.GitScanner, ctx *uploadContext, ref string) error {
7374

7475
func uploadsWithObjectIDs(ctx *uploadContext, oids []string) {
7576
for _, oid := range oids {
77+
mp, err := lfs.LocalMediaPath(oid)
78+
if err != nil {
79+
ExitWithError(errors.Wrap(err, "Unable to find local media path:"))
80+
}
81+
82+
stat, err := os.Stat(mp)
83+
if err != nil {
84+
ExitWithError(errors.Wrap(err, "Unable to stat local media path"))
85+
}
86+
7687
uploadPointers(ctx, &lfs.WrappedPointer{
77-
Pointer: &lfs.Pointer{Oid: oid},
88+
Name: mp,
89+
Pointer: &lfs.Pointer{
90+
Oid: oid,
91+
Size: stat.Size(),
92+
},
7893
})
7994
}
8095

commands/commands.go

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -114,28 +114,26 @@ func downloadTransfer(p *lfs.WrappedPointer) (name, path, oid string, size int64
114114
return p.Name, path, p.Oid, p.Size
115115
}
116116

117-
func uploadTransfer(oid, filename string) (*tq.Transfer, error) {
117+
func uploadTransfer(p *lfs.WrappedPointer) (*tq.Transfer, error) {
118+
filename := p.Name
119+
oid := p.Oid
120+
118121
localMediaPath, err := lfs.LocalMediaPath(oid)
119122
if err != nil {
120123
return nil, errors.Wrapf(err, "Error uploading file %s (%s)", filename, oid)
121124
}
122125

123126
if len(filename) > 0 {
124-
if err = ensureFile(filename, localMediaPath); err != nil {
127+
if err = ensureFile(filename, localMediaPath); err != nil && !errors.IsCleanPointerError(err) {
125128
return nil, err
126129
}
127130
}
128131

129-
fi, err := os.Stat(localMediaPath)
130-
if err != nil {
131-
return nil, errors.Wrapf(err, "Error uploading file %s (%s)", filename, oid)
132-
}
133-
134132
return &tq.Transfer{
135133
Name: filename,
136134
Path: localMediaPath,
137135
Oid: oid,
138-
Size: fi.Size(),
136+
Size: p.Size,
139137
}, nil
140138
}
141139

@@ -146,7 +144,6 @@ func ensureFile(smudgePath, cleanPath string) error {
146144
return nil
147145
}
148146

149-
expectedOid := filepath.Base(cleanPath)
150147
localPath := filepath.Join(config.LocalWorkingDir, smudgePath)
151148
file, err := os.Open(localPath)
152149
if err != nil {
@@ -168,11 +165,6 @@ func ensureFile(smudgePath, cleanPath string) error {
168165
if err != nil {
169166
return err
170167
}
171-
172-
if expectedOid != cleaned.Oid {
173-
return fmt.Errorf("Trying to push %q with OID %s.\nNot found in %s.", smudgePath, expectedOid, filepath.Dir(cleanPath))
174-
}
175-
176168
return nil
177169
}
178170

commands/uploader.go

Lines changed: 6 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package commands
22

33
import (
44
"os"
5-
"sync"
65

76
"github.com/git-lfs/git-lfs/errors"
87
"github.com/git-lfs/git-lfs/lfs"
@@ -11,8 +10,6 @@ import (
1110
"github.com/git-lfs/git-lfs/tq"
1211
)
1312

14-
var uploadMissingErr = "%s does not exist in .git/lfs/objects. Tried %s, which matches %s."
15-
1613
type uploadContext struct {
1714
Remote string
1815
DryRun bool
@@ -21,41 +18,20 @@ type uploadContext struct {
2118

2219
meter progress.Meter
2320
tq *tq.TransferQueue
24-
25-
cwg *sync.WaitGroup
26-
cq *tq.TransferQueue
2721
}
2822

2923
func newUploadContext(remote string, dryRun bool) *uploadContext {
3024
cfg.CurrentRemote = remote
3125

32-
return &uploadContext{
26+
ctx := &uploadContext{
3327
Remote: remote,
3428
Manifest: getTransferManifest(),
3529
DryRun: dryRun,
3630
uploadedOids: tools.NewStringSet(),
37-
38-
cwg: new(sync.WaitGroup),
39-
// TODO(taylor): single item batches are needed to enqueue each
40-
// item immediately to avoid waiting an infinite amount of time
41-
// on an underfilled batch.
42-
cq: newDownloadCheckQueue(c.Manifest, c.Remote, tq.WithBatchSize(1)),
4331
}
4432

45-
ctx.cq.Notify(func(oid string, ok bool) {
46-
if ok {
47-
// If the object was "ok", the server already has it,
48-
// and can be marked as uploaded.
49-
ctx.SetUploaded(oid)
50-
}
51-
52-
// No matter whether or not the sever has the object, mark this
53-
// OID as checked.
54-
ctx.cwg.Done()
55-
})
56-
5733
ctx.meter = buildProgressMeter(ctx.DryRun)
58-
ctx.tq = newUploadQueue(c.Manifest, c.Remote, tq.WithProgress(ctx.meter), tq.DryRun(ctx.DryRun))
34+
ctx.tq = newUploadQueue(ctx.Manifest, ctx.Remote, tq.WithProgress(ctx.meter), tq.DryRun(ctx.DryRun))
5935

6036
return ctx
6137
}
@@ -75,8 +51,6 @@ func (c *uploadContext) HasUploaded(oid string) bool {
7551
func (c *uploadContext) prepareUpload(unfiltered ...*lfs.WrappedPointer) (*tq.TransferQueue, []*lfs.WrappedPointer) {
7652
numUnfiltered := len(unfiltered)
7753
uploadables := make([]*lfs.WrappedPointer, 0, numUnfiltered)
78-
missingLocalObjects := make([]*lfs.WrappedPointer, 0, numUnfiltered)
79-
missingSize := int64(0)
8054

8155
// XXX(taylor): temporary measure to fix duplicate (broken) results from
8256
// scanner
@@ -96,47 +70,12 @@ func (c *uploadContext) prepareUpload(unfiltered ...*lfs.WrappedPointer) (*tq.Tr
9670
// we will call Skip() based on the results of the download check queue.
9771
c.meter.Add(p.Size)
9872

99-
if lfs.ObjectExistsOfSize(p.Oid, p.Size) {
100-
uploadables = append(uploadables, p)
101-
} else {
102-
// We think we need to push this but we don't have it
103-
// Store for server checking later
104-
missingLocalObjects = append(missingLocalObjects, p)
105-
missingSize += p.Size
106-
}
107-
}
108-
109-
// check to see if the server has the missing objects.
110-
c.checkMissing(missingLocalObjects, missingSize)
111-
112-
// use the context's TransferQueue, automatically skipping any missing
113-
// objects that the server already has.
114-
for _, p := range missingLocalObjects {
115-
if c.HasUploaded(p.Oid) {
116-
// if the server already has this object, call Skip() on
117-
// the progressmeter to decrement the number of files by
118-
// 1 and the number of bytes by `p.Size`.
119-
c.tq.Skip(p.Size)
120-
} else {
121-
uploadables = append(uploadables, p)
122-
}
73+
uploadables = append(uploadables, p)
12374
}
12475

12576
return c.tq, uploadables
12677
}
12778

128-
// This checks the given slice of pointers that don't exist in .git/lfs/objects
129-
// against the server. Anything the server already has does not need to be
130-
// uploaded again.
131-
func (c *uploadContext) checkMissing(missing []*lfs.WrappedPointer, missingSize int64) {
132-
c.cwg.Add(len(missing))
133-
for _, p := range missing {
134-
c.cq.Add(downloadTransfer(p))
135-
}
136-
137-
c.cwg.Wait()
138-
}
139-
14079
func uploadPointers(c *uploadContext, unfiltered ...*lfs.WrappedPointer) {
14180
if c.DryRun {
14281
for _, p := range unfiltered {
@@ -153,13 +92,9 @@ func uploadPointers(c *uploadContext, unfiltered ...*lfs.WrappedPointer) {
15392

15493
q, pointers := c.prepareUpload(unfiltered...)
15594
for _, p := range pointers {
156-
t, err := uploadTransfer(p.Oid, p.Name)
157-
if err != nil {
158-
if errors.IsCleanPointerError(err) {
159-
Exit(uploadMissingErr, p.Oid, p.Name, errors.GetContext(err, "pointer").(*lfs.Pointer).Oid)
160-
} else {
161-
ExitWithError(err)
162-
}
95+
t, err := uploadTransfer(p)
96+
if err != nil && !errors.IsCleanPointerError(err) {
97+
ExitWithError(err)
16398
}
16499

165100
q.Add(t.Name, t.Path, t.Oid, t.Size)

test/test-pre-push.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ begin_test "pre-push with missing pointer not on server"
189189
git lfs pre-push origin "$GITSERVER/$reponame" 2>&1 |
190190
tee push.log
191191
set -e
192-
grep "7aa7a5359173d05b63cfd682e3c38487f3cb4f7f1d60659fe59fab1505977d4c does not exist in .git/lfs/objects. Tried new.dat, which matches 7aa7a5359173d05b63cfd682e3c38487f3cb4f7f1d60659fe59fab1505977d4c." push.log
192+
grep "Unable to find object (7aa7a5359173d05b63cfd682e3c38487f3cb4f7f1d60659fe59fab1505977d4c) locally." push.log
193193
)
194194
end_test
195195

tq/transfer_queue.go

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package tq
22

33
import (
4+
"os"
45
"sort"
56
"sync"
67

@@ -298,7 +299,6 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
298299
next = append(next, t)
299300
} else {
300301
q.sendNotifications(t.Oid, false)
301-
q.wait.Done()
302302
}
303303
}
304304

@@ -404,16 +404,21 @@ func (q *TransferQueue) addToAdapter(e lfsapi.Endpoint, pending []*Transfer) <-c
404404
return retries
405405
}
406406

407+
present, missingResults := q.partitionTransfers(pending)
408+
407409
go func() {
408410
defer close(retries)
409411

410412
var results <-chan TransferResult
411413
if q.dryRun {
412-
results = q.makeDryRunResults(pending)
414+
results = q.makeDryRunResults(present)
413415
} else {
414-
results = q.adapter.Add(pending...)
416+
results = q.adapter.Add(present...)
415417
}
416418

419+
for _, res := range missingResults {
420+
q.handleTransferResult(res, retries)
421+
}
417422
for res := range results {
418423
q.handleTransferResult(res, retries)
419424
}
@@ -422,6 +427,39 @@ func (q *TransferQueue) addToAdapter(e lfsapi.Endpoint, pending []*Transfer) <-c
422427
return retries
423428
}
424429

430+
func (q *TransferQueue) partitionTransfers(transfers []*Transfer) (present []*Transfer, results []TransferResult) {
431+
if q.direction != Upload {
432+
return transfers, nil
433+
}
434+
435+
present = make([]*Transfer, 0, len(transfers))
436+
results = make([]TransferResult, 0, len(transfers))
437+
438+
for _, t := range transfers {
439+
var err error
440+
441+
if t.Size < 0 {
442+
err = errors.Errorf("Git LFS: object %q has invalid size (got: %d)", t.Oid, t.Size)
443+
} else {
444+
fd, serr := os.Stat(t.Path)
445+
if serr != nil || fd.Size() != t.Size {
446+
err = errors.Errorf("Unable to find object (%s) locally.", t.Oid)
447+
}
448+
}
449+
450+
if err != nil {
451+
results = append(results, TransferResult{
452+
Transfer: t,
453+
Error: err,
454+
})
455+
} else {
456+
present = append(present, t)
457+
}
458+
}
459+
460+
return
461+
}
462+
425463
// makeDryRunResults returns a channel populated immediately with "successful"
426464
// results for all of the given transfers in "ts".
427465
func (q *TransferQueue) makeDryRunResults(ts []*Transfer) <-chan TransferResult {

0 commit comments

Comments
 (0)