Projects STRLCPY dolt Commits 98120eb3
🤬
Revision indexing in progress... (symbol navigation in revisions will be accurate after indexed)
  • ■ ■ ■ ■ ■ ■
    go/libraries/doltcore/doltdb/doltdb.go
    skipped 1199 lines
    1200 1200   return datas.PruneTableFiles(ctx, ddb.db)
    1201 1201  }
    1202 1202   
     1203 +func (ddb *DoltDB) OnlineShallowGC(ctx context.Context) error {
     1204 + return datas.OnlinePruneTableFiles(ctx, ddb.db)
     1205 +}
     1206 + 
    1203 1207  func (ddb *DoltDB) pruneUnreferencedDatasets(ctx context.Context) error {
    1204 1208   dd, err := ddb.db.Datasets(ctx)
    1205 1209   if err != nil {
    skipped 119 lines
  • ■ ■ ■ ■ ■
    go/libraries/doltcore/remotestorage/chunk_store.go
    skipped 1135 lines
    1136 1136   return concurrentExec(work[0:largeCutoff+1], dcs.concurrency.ConcurrentLargeFetches)
    1137 1137   })
    1138 1138   eg.Go(func() error {
    1139  - return concurrentExec(work[largeCutoff+1:len(work)], dcs.concurrency.ConcurrentSmallFetches)
     1139 + return concurrentExec(work[largeCutoff+1:], dcs.concurrency.ConcurrentSmallFetches)
    1140 1140   })
    1141 1141   
    1142 1142   defer func() {
    skipped 158 lines
    1301 1301   
    1302 1302  // PruneTableFiles deletes old table files that are no longer referenced in the manifest.
    1303 1303  func (dcs *DoltChunkStore) PruneTableFiles(ctx context.Context) error {
     1304 + return chunks.ErrUnsupportedOperation
     1305 +}
     1306 + 
     1307 +// OnlinePruneTableFiles deletes old table files that are no longer referenced in the manifest.
     1308 +func (dcs *DoltChunkStore) OnlinePruneTableFiles(ctx context.Context) error {
    1304 1309   return chunks.ErrUnsupportedOperation
    1305 1310  }
    1306 1311   
    skipped 95 lines
  • ■ ■ ■ ■ ■ ■
    go/libraries/doltcore/sqle/dprocedures/dolt_gc.go
    skipped 26 lines
    27 27   cmdSuccess = 1
    28 28  )
    29 29   
    30  -// doltGC is the stored procedure version of the functions `gc` and `dolt_gc`.
     30 +// doltGC is the stored procedure to run online garbage collection on a database.
    31 31  func doltGC(ctx *sql.Context, args ...string) (sql.RowIter, error) {
    32 32   res, err := doDoltGC(ctx, args)
    33 33   if err != nil {
    skipped 18 lines
    52 52   return cmdFailure, fmt.Errorf("Could not load database %s", dbName)
    53 53   }
    54 54   
    55  - err := ddb.ShallowGC(ctx)
     55 + err := ddb.OnlineShallowGC(ctx)
    56 56   if err != nil {
    57 57   return cmdFailure, err
    58 58   }
    skipped 4 lines
  • ■ ■ ■ ■ ■ ■
    go/libraries/doltcore/sqle/enginetest/dolt_engine_test.go
    skipped 972 lines
    973 973   }
    974 974  }
    975 975   
     976 +func TestDoltGC(t *testing.T) {
     977 + for _, script := range DoltGC {
     978 + enginetest.TestScript(t, newDoltHarness(t), script)
     979 + }
     980 +}
     981 + 
    976 982  func TestDoltBranch(t *testing.T) {
    977 983   for _, script := range DoltBranchScripts {
    978 984   enginetest.TestScript(t, newDoltHarness(t), script)
    skipped 720 lines
  • ■ ■ ■ ■ ■ ■
    go/store/datas/garbage_collection.go
    skipped 30 lines
    31 31   return tfs.PruneTableFiles(ctx)
    32 32  }
    33 33   
     34 +func OnlinePruneTableFiles(ctx context.Context, db Database) error {
     35 + tfs, ok := db.chunkStore().(nbs.TableFileStore)
     36 + 
     37 + if !ok {
     38 + return chunks.ErrUnsupportedOperation
     39 + }
     40 + 
     41 + return tfs.OnlinePruneTableFiles(ctx)
     42 +}
     43 + 
  • ■ ■ ■ ■ ■ ■
    go/store/datas/pull/pull_test.go
    skipped 541 lines
    542 542   return chunks.ErrUnsupportedOperation
    543 543  }
    544 544   
     545 +func (ttfs *TestTableFileStore) OnlinePruneTableFiles(ctx context.Context) error {
     546 + return chunks.ErrUnsupportedOperation
     547 +}
     548 + 
    545 549  func TestClone(t *testing.T) {
    546 550   hashBytes := [hash.ByteLen]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13}
    547 551   src := &TestTableFileStore{
    548 552   root: hash.Of(hashBytes[:]),
    549 553   tableFiles: map[string]*TestTableFile{
    550  - "file1": &TestTableFile{
     554 + "file1": {
    551 555   fileID: "file1",
    552 556   numChunks: 1,
    553 557   data: []byte("Call me Ishmael. Some years ago—never mind how long precisely—having little or no money in my purse, "),
    554 558   },
    555  - "file2": &TestTableFile{
     559 + "file2": {
    556 560   fileID: "file2",
    557 561   numChunks: 2,
    558 562   data: []byte("and nothing particular to interest me on shore, I thought I would sail about a little and see the watery "),
    559 563   },
    560  - "file3": &TestTableFile{
     564 + "file3": {
    561 565   fileID: "file3",
    562 566   numChunks: 3,
    563 567   data: []byte("part of the world. It is a way I have of driving off the spleen and regulating the "),
    564 568   },
    565  - "file4": &TestTableFile{
     569 + "file4": {
    566 570   fileID: "file4",
    567 571   numChunks: 4,
    568 572   data: []byte("circulation. Whenever I find myself growing grim about the mouth; whenever it is a damp, drizzly "),
    569 573   },
    570  - "file5": &TestTableFile{
     574 + "file5": {
    571 575   fileID: "file5",
    572 576   numChunks: 5,
    573 577   data: []byte("November in my soul; whenever I find myself involuntarily pausing before coffin warehouses, and bringing "),
    skipped 73 lines
  • ■ ■ ■ ■ ■ ■
    go/store/nbs/aws_table_persister.go
    skipped 575 lines
    576 576   return chunks.ErrUnsupportedOperation
    577 577  }
    578 578   
     579 +func (s3p awsTablePersister) OnlinePruneTableFiles(ctx context.Context, contents manifestContents, mtime time.Time) error {
     580 + return chunks.ErrUnsupportedOperation
     581 +}
     582 + 
  • ■ ■ ■ ■ ■
    go/store/nbs/bs_persister.go
    skipped 16 lines
    17 17  import (
    18 18   "context"
    19 19   "io"
     20 + "time"
    20 21   
    21 22   "github.com/dolthub/dolt/go/store/blobstore"
    22 23   "github.com/dolthub/dolt/go/store/chunks"
    skipped 104 lines
    127 128   return chunks.ErrUnsupportedOperation
    128 129  }
    129 130   
     131 +func (bsp *blobstorePersister) OnlinePruneTableFiles(ctx context.Context, contents manifestContents, mtime time.Time) error {
     132 + return chunks.ErrUnsupportedOperation
     133 +}
     134 + 
  • ■ ■ ■ ■ ■ ■
    go/store/nbs/file_table_persister.go
    skipped 241 lines
    242 242   return nil
    243 243  }
    244 244   
     245 +func (ftp *fsTablePersister) OnlinePruneTableFiles(ctx context.Context, contents manifestContents, mtime time.Time) error {
     246 + ss := contents.getSpecSet()
     247 + 
     248 + fileInfos, err := os.ReadDir(ftp.dir)
     249 + 
     250 + if err != nil {
     251 + return err
     252 + }
     253 + 
     254 + err = ftp.fc.ShrinkCache()
     255 + 
     256 + if err != nil {
     257 + return err
     258 + }
     259 + 
     260 + ea := make(gcErrAccum)
     261 + for _, info := range fileInfos {
     262 + if info.IsDir() {
     263 + continue
     264 + }
     265 + 
     266 + filePath := path.Join(ftp.dir, info.Name())
     267 + 
     268 + if strings.HasPrefix(info.Name(), tempTablePrefix) {
     269 + err = file.Remove(filePath)
     270 + if err != nil {
     271 + ea.add(filePath, err)
     272 + }
     273 + continue
     274 + }
     275 + 
     276 + if len(info.Name()) != 32 {
     277 + continue // not a table file
     278 + }
     279 + 
     280 + addy, err := parseAddr(info.Name())
     281 + if err != nil {
     282 + continue // not a table file
     283 + }
     284 + 
     285 + if _, ok := ss[addy]; ok {
     286 + continue // file is referenced in the manifest
     287 + }
     288 + 
     289 + i, err := info.Info()
     290 + 
     291 + if err != nil {
     292 + ea.add(filePath, err)
     293 + }
     294 + 
     295 + ctime := i.ModTime()
     296 + 
     297 + if ctime.After(mtime) {
     298 + continue // file has been updated more recently than manifest
     299 + }
     300 + 
     301 + err = file.Remove(filePath)
     302 + if err != nil {
     303 + ea.add(filePath, err)
     304 + }
     305 + }
     306 + 
     307 + if !ea.isEmpty() {
     308 + return ea
     309 + }
     310 + 
     311 + return nil
     312 +}
     313 + 
  • ■ ■ ■ ■ ■ ■
    go/store/nbs/generational_chunk_store.go
    skipped 314 lines
    315 315   return gcs.newGen.PruneTableFiles(ctx)
    316 316  }
    317 317   
     318 +// OnlinePruneTableFiles deletes old table files that are no longer referenced in the manifest of the new or old gen chunkstores
     319 +func (gcs *GenerationalNBS) OnlinePruneTableFiles(ctx context.Context) error {
     320 + err := gcs.oldGen.OnlinePruneTableFiles(ctx)
     321 + 
     322 + if err != nil {
     323 + return err
     324 + }
     325 + 
     326 + return gcs.newGen.OnlinePruneTableFiles(ctx)
     327 +}
     328 + 
    318 329  // SetRootChunk changes the root chunk hash from the previous value to the new root for the newgen cs
    319 330  func (gcs *GenerationalNBS) SetRootChunk(ctx context.Context, root, previous hash.Hash) error {
    320 331   return gcs.newGen.SetRootChunk(ctx, root, previous)
    skipped 61 lines
  • ■ ■ ■ ■ ■ ■
    go/store/nbs/manifest.go
    skipped 341 lines
    342 342   return
    343 343  }
    344 344   
     345 +func (mm manifestManager) FetchWithTime(ctx context.Context, stats *Stats) (exists bool, contents manifestContents, t time.Time, err error) {
     346 + entryTime := time.Now()
     347 + 
     348 + mm.lockOutFetch()
     349 + defer func() {
     350 + afErr := mm.allowFetch()
     351 + 
     352 + if err == nil {
     353 + err = afErr
     354 + }
     355 + }()
     356 + 
     357 + f := func() (bool, manifestContents, time.Time, error) {
     358 + cached, t, hit := mm.cache.Get(mm.Name())
     359 + 
     360 + if hit && t.After(entryTime) {
     361 + // Cache contains a manifest which is newer than entry time.
     362 + return true, cached, t, nil
     363 + }
     364 + 
     365 + t = time.Now()
     366 + 
     367 + exists, contents, err := mm.m.ParseIfExists(ctx, stats, nil)
     368 + 
     369 + if err != nil {
     370 + return false, manifestContents{}, t, err
     371 + }
     372 + 
     373 + err = mm.cache.Put(mm.Name(), contents, t)
     374 + 
     375 + if err != nil {
     376 + return false, manifestContents{}, t, err
     377 + }
     378 + 
     379 + return exists, contents, t, nil
     380 + }
     381 + 
     382 + exists, contents, t, err = f()
     383 + return
     384 +}
     385 + 
    345 386  // Update attempts to write a new manifest.
    346 387  // Callers MUST protect uses of Update with Lock/UnlockForUpdate.
    347 388  // Update does not call Lock/UnlockForUpdate() on its own because it is
    skipped 166 lines
  • ■ ■ ■ ■ ■
    go/store/nbs/nbs_metrics_wrapper.go
    skipped 79 lines
    80 80   return nbsMW.nbs.PruneTableFiles(ctx)
    81 81  }
    82 82   
     83 +// OnlinePruneTableFiles deletes old table files that are no longer referenced in the manifest.
     84 +func (nbsMW *NBSMetricWrapper) OnlinePruneTableFiles(ctx context.Context) error {
     85 + return nbsMW.nbs.OnlinePruneTableFiles(ctx)
     86 +}
     87 + 
    83 88  // GetManyCompressed gets the compressed Chunks with |hashes| from the store. On return,
    84 89  // |found| will have been fully sent all chunks which have been
    85 90  // found. Any non-present chunks will silently be ignored.
    skipped 5 lines
  • ■ ■ ■ ■ ■
    go/store/nbs/root_tracker_test.go
    skipped 26 lines
    27 27   "fmt"
    28 28   "sync"
    29 29   "testing"
     30 + "time"
    30 31   
    31 32   "github.com/stretchr/testify/assert"
    32 33   "github.com/stretchr/testify/require"
    skipped 591 lines
    624 625  }
    625 626   
    626 627  func (ftp fakeTablePersister) PruneTableFiles(_ context.Context, _ manifestContents) error {
     628 + return chunks.ErrUnsupportedOperation
     629 +}
     630 + 
     631 +func (ftp fakeTablePersister) OnlinePruneTableFiles(_ context.Context, _ manifestContents, _ time.Time) error {
    627 632   return chunks.ErrUnsupportedOperation
    628 633  }
    629 634   
    skipped 22 lines
  • ■ ■ ■ ■ ■ ■
    go/store/nbs/store.go
    skipped 1088 lines
    1089 1089   return err
    1090 1090   }
    1091 1091   
    1092  - // ensure we dont drop appendices on commit
     1092 + // ensure we don't drop appendices on commit
    1093 1093   var appendixSpecs []tableSpec
    1094 1094   if nbs.upstream.appendix != nil && len(nbs.upstream.appendix) > 0 {
    1095 1095   appendixSet := nbs.upstream.getAppendixSet()
    skipped 341 lines
    1437 1437   }
    1438 1438   
    1439 1439   return nbs.p.PruneTableFiles(ctx, contents)
     1440 +}
     1441 + 
     1442 +// OnlinePruneTableFiles deletes old table files that are no longer referenced in the manifest.
     1443 +func (nbs *NomsBlockStore) OnlinePruneTableFiles(ctx context.Context) (err error) {
     1444 + nbs.mu.Lock()
     1445 + defer nbs.mu.Unlock()
     1446 + 
     1447 + nbs.mm.LockForUpdate()
     1448 + defer func() {
     1449 + unlockErr := nbs.mm.UnlockForUpdate()
     1450 + 
     1451 + if err == nil {
     1452 + err = unlockErr
     1453 + }
     1454 + }()
     1455 + 
     1456 + for {
     1457 + // flush all tables and update manifest
     1458 + err = nbs.updateManifest(ctx, nbs.upstream.root, nbs.upstream.root)
     1459 + 
     1460 + if err == nil {
     1461 + break
     1462 + } else if err == errOptimisticLockFailedTables {
     1463 + continue
     1464 + } else {
     1465 + return err
     1466 + }
     1467 + 
     1468 + // Same behavior as Commit
     1469 + // infinitely retries without backoff in the case off errOptimisticLockFailedTables
     1470 + }
     1471 + 
     1472 + // nbs.mm.cache.Get(nbs.mm.Name())
     1473 + 
     1474 + ok, contents, t, err := nbs.mm.FetchWithTime(ctx, &Stats{})
     1475 + if err != nil {
     1476 + return err
     1477 + }
     1478 + if !ok {
     1479 + return nil // no manifest exists
     1480 + }
     1481 + 
     1482 + return nbs.p.OnlinePruneTableFiles(ctx, contents, t)
    1440 1483  }
    1441 1484   
    1442 1485  func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, last hash.Hash, keepChunks <-chan []hash.Hash, dest chunks.ChunkStore) error {
    skipped 227 lines
  • ■ ■ ■ ■ ■ ■
    go/store/nbs/store_test.go
    skipped 250 lines
    251 251   require.Greater(t, size, uint64(0))
    252 252  }
    253 253   
     254 +func TestNBSOnlinePruneTableFiles(t *testing.T) {
     255 + ctx := context.Background()
     256 + 
     257 + // over populate table files
     258 + numTableFiles := 64
     259 + maxTableFiles := 16
     260 + st, nomsDir, _ := makeTestLocalStore(t, maxTableFiles)
     261 + fileToData := populateLocalStore(t, st, numTableFiles)
     262 + 
     263 + // add a chunk and flush to trigger a conjoin
     264 + c := []byte("it's a boy!")
     265 + ok := st.addChunk(ctx, computeAddr(c), c)
     266 + require.True(t, ok)
     267 + ok, err := st.Commit(ctx, st.upstream.root, st.upstream.root)
     268 + require.True(t, ok)
     269 + require.NoError(t, err)
     270 + 
     271 + _, sources, _, err := st.Sources(ctx)
     272 + require.NoError(t, err)
     273 + assert.Greater(t, numTableFiles, len(sources))
     274 + 
     275 + // find which input table files were conjoined
     276 + tfSet := tableFileSetFromSources(sources)
     277 + absent := tfSet.findAbsent(fileToData)
     278 + // assert some input table files were conjoined
     279 + assert.NotEmpty(t, absent)
     280 + 
     281 + currTableFiles := func(dirName string) *set.StrSet {
     282 + infos, err := os.ReadDir(dirName)
     283 + require.NoError(t, err)
     284 + curr := set.NewStrSet(nil)
     285 + for _, fi := range infos {
     286 + if fi.Name() != manifestFileName && fi.Name() != lockFileName {
     287 + curr.Add(fi.Name())
     288 + }
     289 + }
     290 + return curr
     291 + }
     292 + 
     293 + preGC := currTableFiles(nomsDir)
     294 + for _, tf := range sources {
     295 + assert.True(t, preGC.Contains(tf.FileID()))
     296 + }
     297 + for _, fileName := range absent {
     298 + assert.True(t, preGC.Contains(fileName))
     299 + }
     300 + 
     301 + err = st.OnlinePruneTableFiles(ctx)
     302 + require.NoError(t, err)
     303 + 
     304 + postGC := currTableFiles(nomsDir)
     305 + for _, tf := range sources {
     306 + assert.True(t, preGC.Contains(tf.FileID()))
     307 + }
     308 + for _, fileName := range absent {
     309 + assert.False(t, postGC.Contains(fileName))
     310 + }
     311 + infos, err := os.ReadDir(nomsDir)
     312 + require.NoError(t, err)
     313 + 
     314 + // assert that we only have files for current sources,
     315 + // the manifest, and the lock file
     316 + assert.Equal(t, len(sources)+2, len(infos))
     317 + 
     318 + size, err := st.Size(ctx)
     319 + require.NoError(t, err)
     320 + require.Greater(t, size, uint64(0))
     321 +}
     322 + 
    254 323  func makeChunkSet(N, size int) (s map[hash.Hash]chunks.Chunk) {
    255 324   bb := make([]byte, size*N)
    256 325   time.Sleep(10)
    skipped 371 lines
  • ■ ■ ■ ■ ■ ■
    go/store/nbs/table.go
    skipped 304 lines
    305 305   // PruneTableFiles deletes old table files that are no longer referenced in the manifest.
    306 306   PruneTableFiles(ctx context.Context) error
    307 307   
     308 + // OnlinePruneTableFiles deletes old table files that are no longer referenced in the manifest.
     309 + OnlinePruneTableFiles(ctx context.Context) error
     310 + 
    308 311   // SetRootChunk changes the root chunk hash from the previous value to the new root.
    309 312   SetRootChunk(ctx context.Context, root, previous hash.Hash) error
    310 313   
    skipped 4 lines
  • ■ ■ ■ ■ ■ ■
    go/store/nbs/table_persister.go
    skipped 27 lines
    28 28   "encoding/binary"
    29 29   "errors"
    30 30   "sort"
     31 + "time"
    31 32  )
    32 33   
    33 34  var errCacheMiss = errors.New("index cache miss")
    skipped 17 lines
    51 52   
    52 53   // PruneTableFiles deletes old table files that are no longer referenced in the manifest.
    53 54   PruneTableFiles(ctx context.Context, contents manifestContents) error
     55 + 
     56 + // OnlinePruneTableFiles deletes old table files that are no longer referenced in the manifest.
     57 + OnlinePruneTableFiles(ctx context.Context, contents manifestContents, mtime time.Time) error
    54 58  }
    55 59   
    56 60  type chunkSourcesByAscendingCount struct {
    skipped 227 lines
Please wait...
Page is in error, reload to recover