Projects STRLCPY dolt Commits 45acdc0c
🤬
Revision indexing in progress... (symbol navigation in revisions will be accurate after indexed)
  • ■ ■ ■ ■ ■ ■
    go/store/nbs/store.go
    skipped 151 lines
    152 152   }
    153 153   
    154 154   ranges := make(map[hash.Hash]map[hash.Hash]Range)
    155  - f := func(css chunkSources) error {
     155 + f := func(css chunkSourceSet) error {
    156 156   for _, cs := range css {
    157 157   switch tr := cs.(type) {
    158 158   case *fileTableReader:
    skipped 227 lines
    386 386   contents, upstreamAppendixSpecs := upstream.removeAppendixSpecs()
    387 387   switch option {
    388 388   case ManifestAppendixOption_Append:
    389  - // prepend all appendix specs to contents.specs
     389 + // append all appendix specs to contents.specs
    390 390   specs := append([]tableSpec{}, appendixSpecs...)
    391 391   specs = append(specs, upstreamAppendixSpecs...)
    392 392   contents.specs = append(specs, contents.specs...)
    skipped 9 lines
    402 402   return contents, nil
    403 403   }
    404 404   
    405  - // prepend new appendix specs to contents.specs
     405 + // append new appendix specs to contents.specs
    406 406   // dropping all upstream appendix specs
    407 407   specs := append([]tableSpec{}, appendixSpecs...)
    408 408   contents.specs = append(specs, contents.specs...)
    skipped 234 lines
    643 643   nbs.mt = newMemTable(nbs.mtSize)
    644 644   }
    645 645   if !nbs.mt.addChunk(h, data) {
    646  - ts, err := nbs.tables.prepend(ctx, nbs.mt, nbs.stats)
     646 + ts, err := nbs.tables.append(ctx, nbs.mt, nbs.stats)
    647 647   if err != nil {
    648 648   return false, err
    649 649   }
    skipped 334 lines
    984 984   }
    985 985   
    986 986   if cnt > preflushChunkCount {
    987  - ts, err := nbs.tables.prepend(ctx, nbs.mt, nbs.stats)
     987 + ts, err := nbs.tables.append(ctx, nbs.mt, nbs.stats)
    988 988   if err != nil {
    989 989   return err
    990 990   }
    skipped 79 lines
    1070 1070   }
    1071 1071   
    1072 1072   if cnt > 0 {
    1073  - ts, err := nbs.tables.prepend(ctx, nbs.mt, nbs.stats)
     1073 + ts, err := nbs.tables.append(ctx, nbs.mt, nbs.stats)
    1074 1074   if err != nil {
    1075 1075   return err
    1076 1076   }
    skipped 606 lines
  • ■ ■ ■ ■ ■ ■
    go/store/nbs/table.go
    skipped 261 lines
    262 262   
    263 263  type chunkSources []chunkSource
    264 264   
     265 +type chunkSourceSet map[addr]chunkSource
     266 + 
     267 +func copyChunkSourceSet(s chunkSourceSet) (cp chunkSourceSet) {
     268 + cp = make(chunkSourceSet, len(s))
     269 + for k, v := range s {
     270 + cp[k] = v
     271 + }
     272 + return
     273 +}
     274 + 
    265 275  // TableFile is an interface for working with an existing table file
    266 276  type TableFile interface {
    267 277   // FileID gets the id of the file
    skipped 47 lines
  • ■ ■ ■ ■ ■ ■
    go/store/nbs/table_set.go
    skipped 41 lines
    42 42   
    43 43  // tableSet is an immutable set of persistable chunkSources.
    44 44  type tableSet struct {
    45  - novel, upstream chunkSources
     45 + novel, upstream chunkSourceSet
    46 46   p tablePersister
    47 47   q MemoryQuotaProvider
    48 48   rl chan struct{}
    49 49  }
    50 50   
    51 51  func (ts tableSet) has(h addr) (bool, error) {
    52  - f := func(css chunkSources) (bool, error) {
     52 + f := func(css chunkSourceSet) (bool, error) {
    53 53   for _, haver := range css {
    54 54   has, err := haver.has(h)
    55 55   
    skipped 22 lines
    78 78  }
    79 79   
    80 80  func (ts tableSet) hasMany(addrs []hasRecord) (bool, error) {
    81  - f := func(css chunkSources) (bool, error) {
     81 + f := func(css chunkSourceSet) (bool, error) {
    82 82   for _, haver := range css {
    83 83   has, err := haver.hasMany(addrs)
    84 84   
    skipped 21 lines
    106 106  }
    107 107   
    108 108  func (ts tableSet) get(ctx context.Context, h addr, stats *Stats) ([]byte, error) {
    109  - f := func(css chunkSources) ([]byte, error) {
     109 + f := func(css chunkSourceSet) ([]byte, error) {
    110 110   for _, haver := range css {
    111 111   data, err := haver.get(ctx, h, stats)
    112 112   
    skipped 23 lines
    136 136  }
    137 137   
    138 138  func (ts tableSet) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (remaining bool, err error) {
    139  - f := func(css chunkSources) bool {
     139 + f := func(css chunkSourceSet) bool {
    140 140   for _, haver := range css {
    141 141   remaining, err = haver.getMany(ctx, eg, reqs, found, stats)
    142 142   if err != nil {
    skipped 10 lines
    153 153  }
    154 154   
    155 155  func (ts tableSet) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (remaining bool, err error) {
    156  - f := func(css chunkSources) bool {
     156 + f := func(css chunkSourceSet) bool {
    157 157   for _, haver := range css {
    158 158   remaining, err = haver.getManyCompressed(ctx, eg, reqs, found, stats)
    159 159   if err != nil {
    skipped 11 lines
    171 171  }
    172 172   
    173 173  func (ts tableSet) count() (uint32, error) {
    174  - f := func(css chunkSources) (count uint32, err error) {
     174 + f := func(css chunkSourceSet) (count uint32, err error) {
    175 175   for _, haver := range css {
    176 176   thisCount, err := haver.count()
    177 177   
    skipped 22 lines
    200 200  }
    201 201   
    202 202  func (ts tableSet) uncompressedLen() (uint64, error) {
    203  - f := func(css chunkSources) (data uint64, err error) {
     203 + f := func(css chunkSourceSet) (data uint64, err error) {
    204 204   for _, haver := range css {
    205 205   uncmpLen, err := haver.uncompressedLen()
    206 206   
    skipped 22 lines
    229 229  }
    230 230   
    231 231  func (ts tableSet) physicalLen() (uint64, error) {
    232  - f := func(css chunkSources) (data uint64, err error) {
     232 + f := func(css chunkSourceSet) (data uint64, err error) {
    233 233   for _, haver := range css {
    234 234   index, err := haver.index()
    235 235   if err != nil {
    skipped 41 lines
    277 277   return len(ts.novel) + len(ts.upstream)
    278 278  }
    279 279   
    280  -// prepend adds a memTable to an existing tableSet, compacting |mt| and
     280 +// append adds a memTable to an existing tableSet, compacting |mt| and
    281 281  // returning a new tableSet with newly compacted table added.
    282  -func (ts tableSet) prepend(ctx context.Context, mt *memTable, stats *Stats) (tableSet, error) {
     282 +func (ts tableSet) append(ctx context.Context, mt *memTable, stats *Stats) (tableSet, error) {
    283 283   cs, err := ts.p.Persist(ctx, mt, ts, stats)
    284 284   if err != nil {
    285 285   return tableSet{}, err
    286 286   }
    287 287   
    288 288   newTs := tableSet{
    289  - novel: make(chunkSources, len(ts.novel)+1),
    290  - upstream: make(chunkSources, len(ts.upstream)),
     289 + novel: copyChunkSourceSet(ts.novel),
     290 + upstream: copyChunkSourceSet(ts.upstream),
    291 291   p: ts.p,
    292 292   q: ts.q,
    293 293   rl: ts.rl,
    294 294   }
    295  - newTs.novel[0] = cs
    296  - copy(newTs.novel[1:], ts.novel)
    297  - copy(newTs.upstream, ts.upstream)
     295 + newTs.novel[cs.hash()] = cs
    298 296   return newTs, nil
    299 297  }
    300 298   
    skipped 1 lines
    302 300  // and ts.upstream.
    303 301  func (ts tableSet) flatten(ctx context.Context) (tableSet, error) {
    304 302   flattened := tableSet{
    305  - upstream: make(chunkSources, 0, ts.Size()),
     303 + upstream: copyChunkSourceSet(ts.upstream),
    306 304   p: ts.p,
    307 305   q: ts.q,
    308 306   rl: ts.rl,
    skipped 3 lines
    312 310   cnt, err := src.count()
    313 311   if err != nil {
    314 312   return tableSet{}, err
    315  - }
    316  - if cnt > 0 {
    317  - flattened.upstream = append(flattened.upstream, src)
     313 + } else if cnt > 0 {
     314 + flattened.upstream[src.hash()] = src
    318 315   }
    319 316   }
    320  - 
    321  - flattened.upstream = append(flattened.upstream, ts.upstream...)
    322 317   return flattened, nil
    323 318  }
    324 319   
    skipped 29 lines
    354 349   
    355 350   // copy |ts.novel|, skipping empty chunkSources
    356 351   // (usually due to de-duping during table compaction)
    357  - novel := make(chunkSources, 0, len(ts.novel))
     352 + novel := make(chunkSourceSet, len(ts.novel))
    358 353   for _, t := range ts.novel {
    359 354   cnt, err := t.count()
    360 355   if err != nil {
    skipped 5 lines
    366 361   if err != nil {
    367 362   return tableSet{}, err
    368 363   }
    369  - novel = append(novel, t2)
    370  - }
    371  - 
    372  - existing := make(map[addr]chunkSource, len(ts.upstream))
    373  - for _, cs := range ts.upstream {
    374  - existing[cs.hash()] = cs
     364 + novel[t2.hash()] = t2
    375 365   }
    376 366   
    377 367   // newly opened tables are unowned, we must
    378 368   // close them if the rebase operation fails
    379  - opened := new(sync.Map)
     369 + opened := make(chunkSourceSet, len(specs))
    380 370   
    381 371   eg, ctx := errgroup.WithContext(ctx)
    382  - upstream := make([]chunkSource, len(specs))
    383  - for i, s := range specs {
     372 + mu := new(sync.Mutex)
     373 + upstream := make(chunkSourceSet, len(specs))
     374 + for _, s := range specs {
    384 375   // clone tables that we have already opened
    385  - if cs, ok := existing[s.name]; ok {
    386  - c, err := cs.clone()
     376 + if cs, ok := ts.upstream[s.name]; ok {
     377 + cl, err := cs.clone()
    387 378   if err != nil {
    388 379   return tableSet{}, err
    389 380   }
    390  - upstream[i] = c
     381 + upstream[cl.hash()] = cl
    391 382   continue
    392 383   }
    393 384   // open missing tables in parallel
    394  - idx, spec := i, s
     385 + spec := s
    395 386   eg.Go(func() error {
    396 387   cs, err := ts.p.Open(ctx, spec.name, spec.chunkCount, stats)
    397 388   if err != nil {
    398 389   return err
    399 390   }
    400  - upstream[idx] = cs
    401  - opened.Store(spec.name, cs)
     391 + mu.Lock()
     392 + defer mu.Unlock()
     393 + upstream[cs.hash()] = cs
     394 + opened[cs.hash()] = cs
    402 395   return nil
    403 396   })
    404 397   }
    405 398   
    406 399   if err := eg.Wait(); err != nil {
    407  - opened.Range(func(_, v any) bool {
     400 + for _, cs := range opened {
    408 401   // close any opened chunkSources
    409  - _ = v.(chunkSource).close()
    410  - return true
    411  - })
     402 + _ = cs.close()
     403 + }
    412 404   return tableSet{}, err
    413 405   }
    414 406   
    skipped 41 lines
    456 448  }
    457 449   
    458 450  func tableSetCalcReads(ts tableSet, reqs []getRecord, blockSize uint64) (reads int, split, remaining bool, err error) {
    459  - all := append(ts.novel, ts.upstream...)
     451 + all := copyChunkSourceSet(ts.upstream)
     452 + for a, cs := range ts.novel {
     453 + all[a] = cs
     454 + }
    460 455   for _, tbl := range all {
    461 456   rdr, ok := tbl.(*fileTableReader)
    462 457   if !ok {
    skipped 20 lines
  • ■ ■ ■ ■ ■ ■
    go/store/nbs/table_set_test.go
    skipped 31 lines
    32 32  var testChunks = [][]byte{[]byte("hello2"), []byte("goodbye2"), []byte("badbye2")}
    33 33   
    34 34  func TestTableSetPrependEmpty(t *testing.T) {
    35  - ts, err := newFakeTableSet(&UnlimitedQuotaProvider{}).prepend(context.Background(), newMemTable(testMemTableSize), &Stats{})
     35 + ts, err := newFakeTableSet(&UnlimitedQuotaProvider{}).append(context.Background(), newMemTable(testMemTableSize), &Stats{})
    36 36   require.NoError(t, err)
    37 37   specs, err := ts.toSpecs()
    38 38   require.NoError(t, err)
    skipped 8 lines
    47 47   assert.Empty(specs)
    48 48   mt := newMemTable(testMemTableSize)
    49 49   mt.addChunk(computeAddr(testChunks[0]), testChunks[0])
    50  - ts, err = ts.prepend(context.Background(), mt, &Stats{})
     50 + ts, err = ts.append(context.Background(), mt, &Stats{})
    51 51   require.NoError(t, err)
    52 52   
    53 53   firstSpecs, err := ts.toSpecs()
    skipped 3 lines
    57 57   mt = newMemTable(testMemTableSize)
    58 58   mt.addChunk(computeAddr(testChunks[1]), testChunks[1])
    59 59   mt.addChunk(computeAddr(testChunks[2]), testChunks[2])
    60  - ts, err = ts.prepend(context.Background(), mt, &Stats{})
     60 + ts, err = ts.append(context.Background(), mt, &Stats{})
    61 61   require.NoError(t, err)
    62 62   
    63 63   secondSpecs, err := ts.toSpecs()
    skipped 10 lines
    74 74   assert.Empty(specs)
    75 75   mt := newMemTable(testMemTableSize)
    76 76   mt.addChunk(computeAddr(testChunks[0]), testChunks[0])
    77  - ts, err = ts.prepend(context.Background(), mt, &Stats{})
     77 + ts, err = ts.append(context.Background(), mt, &Stats{})
    78 78   require.NoError(t, err)
    79 79   
    80 80   mt = newMemTable(testMemTableSize)
    81  - ts, err = ts.prepend(context.Background(), mt, &Stats{})
     81 + ts, err = ts.append(context.Background(), mt, &Stats{})
    82 82   require.NoError(t, err)
    83 83   
    84 84   mt = newMemTable(testMemTableSize)
    85 85   mt.addChunk(computeAddr(testChunks[1]), testChunks[1])
    86 86   mt.addChunk(computeAddr(testChunks[2]), testChunks[2])
    87  - ts, err = ts.prepend(context.Background(), mt, &Stats{})
     87 + ts, err = ts.append(context.Background(), mt, &Stats{})
    88 88   require.NoError(t, err)
    89 89   
    90 90   specs, err = ts.toSpecs()
    skipped 9 lines
    100 100   assert.Empty(specs)
    101 101   mt := newMemTable(testMemTableSize)
    102 102   mt.addChunk(computeAddr(testChunks[0]), testChunks[0])
    103  - ts, err = ts.prepend(context.Background(), mt, &Stats{})
     103 + ts, err = ts.append(context.Background(), mt, &Stats{})
    104 104   require.NoError(t, err)
    105 105   
    106 106   mt = newMemTable(testMemTableSize)
    107  - ts, err = ts.prepend(context.Background(), mt, &Stats{})
     107 + ts, err = ts.append(context.Background(), mt, &Stats{})
    108 108   require.NoError(t, err)
    109 109   
    110 110   mt = newMemTable(testMemTableSize)
    111 111   mt.addChunk(computeAddr(testChunks[1]), testChunks[1])
    112 112   mt.addChunk(computeAddr(testChunks[2]), testChunks[2])
    113  - ts, err = ts.prepend(context.Background(), mt, &Stats{})
     113 + ts, err = ts.append(context.Background(), mt, &Stats{})
    114 114   require.NoError(t, err)
    115 115   
    116 116   ts, err = ts.flatten(context.Background())
    skipped 21 lines
    138 138   for _, c := range chunks {
    139 139   mt := newMemTable(testMemTableSize)
    140 140   mt.addChunk(computeAddr(c), c)
    141  - ts, err = ts.prepend(context.Background(), mt, &Stats{})
     141 + ts, err = ts.append(context.Background(), mt, &Stats{})
    142 142   require.NoError(t, err)
    143 143   }
    144 144   return ts
    skipped 37 lines
    182 182   assert.Empty(specs)
    183 183   mt := newMemTable(testMemTableSize)
    184 184   mt.addChunk(computeAddr(testChunks[0]), testChunks[0])
    185  - ts, err = ts.prepend(context.Background(), mt, &Stats{})
     185 + ts, err = ts.append(context.Background(), mt, &Stats{})
    186 186   require.NoError(t, err)
    187 187   
    188 188   mt = newMemTable(testMemTableSize)
    189 189   mt.addChunk(computeAddr(testChunks[1]), testChunks[1])
    190 190   mt.addChunk(computeAddr(testChunks[2]), testChunks[2])
    191  - ts, err = ts.prepend(context.Background(), mt, &Stats{})
     191 + ts, err = ts.append(context.Background(), mt, &Stats{})
    192 192   require.NoError(t, err)
    193 193   
    194 194   assert.True(mustUint64(ts.physicalLen()) > indexSize(mustUint32(ts.count())))
    skipped 27 lines
Please wait...
Page is in error, reload to recover