| skipped 41 lines |
42 | 42 | | |
43 | 43 | | // tableSet is an immutable set of persistable chunkSources. |
44 | 44 | | type tableSet struct { |
45 | | - | novel, upstream chunkSources |
| 45 | + | novel, upstream chunkSourceSet |
46 | 46 | | p tablePersister |
47 | 47 | | q MemoryQuotaProvider |
48 | 48 | | rl chan struct{} |
49 | 49 | | } |
50 | 50 | | |
51 | 51 | | func (ts tableSet) has(h addr) (bool, error) { |
52 | | - | f := func(css chunkSources) (bool, error) { |
| 52 | + | f := func(css chunkSourceSet) (bool, error) { |
53 | 53 | | for _, haver := range css { |
54 | 54 | | has, err := haver.has(h) |
55 | 55 | | |
| skipped 22 lines |
78 | 78 | | } |
79 | 79 | | |
80 | 80 | | func (ts tableSet) hasMany(addrs []hasRecord) (bool, error) { |
81 | | - | f := func(css chunkSources) (bool, error) { |
| 81 | + | f := func(css chunkSourceSet) (bool, error) { |
82 | 82 | | for _, haver := range css { |
83 | 83 | | has, err := haver.hasMany(addrs) |
84 | 84 | | |
| skipped 21 lines |
106 | 106 | | } |
107 | 107 | | |
108 | 108 | | func (ts tableSet) get(ctx context.Context, h addr, stats *Stats) ([]byte, error) { |
109 | | - | f := func(css chunkSources) ([]byte, error) { |
| 109 | + | f := func(css chunkSourceSet) ([]byte, error) { |
110 | 110 | | for _, haver := range css { |
111 | 111 | | data, err := haver.get(ctx, h, stats) |
112 | 112 | | |
| skipped 23 lines |
136 | 136 | | } |
137 | 137 | | |
138 | 138 | | func (ts tableSet) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (remaining bool, err error) { |
139 | | - | f := func(css chunkSources) bool { |
| 139 | + | f := func(css chunkSourceSet) bool { |
140 | 140 | | for _, haver := range css { |
141 | 141 | | remaining, err = haver.getMany(ctx, eg, reqs, found, stats) |
142 | 142 | | if err != nil { |
| skipped 10 lines |
153 | 153 | | } |
154 | 154 | | |
155 | 155 | | func (ts tableSet) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (remaining bool, err error) { |
156 | | - | f := func(css chunkSources) bool { |
| 156 | + | f := func(css chunkSourceSet) bool { |
157 | 157 | | for _, haver := range css { |
158 | 158 | | remaining, err = haver.getManyCompressed(ctx, eg, reqs, found, stats) |
159 | 159 | | if err != nil { |
| skipped 11 lines |
171 | 171 | | } |
172 | 172 | | |
173 | 173 | | func (ts tableSet) count() (uint32, error) { |
174 | | - | f := func(css chunkSources) (count uint32, err error) { |
| 174 | + | f := func(css chunkSourceSet) (count uint32, err error) { |
175 | 175 | | for _, haver := range css { |
176 | 176 | | thisCount, err := haver.count() |
177 | 177 | | |
| skipped 22 lines |
200 | 200 | | } |
201 | 201 | | |
202 | 202 | | func (ts tableSet) uncompressedLen() (uint64, error) { |
203 | | - | f := func(css chunkSources) (data uint64, err error) { |
| 203 | + | f := func(css chunkSourceSet) (data uint64, err error) { |
204 | 204 | | for _, haver := range css { |
205 | 205 | | uncmpLen, err := haver.uncompressedLen() |
206 | 206 | | |
| skipped 22 lines |
229 | 229 | | } |
230 | 230 | | |
231 | 231 | | func (ts tableSet) physicalLen() (uint64, error) { |
232 | | - | f := func(css chunkSources) (data uint64, err error) { |
| 232 | + | f := func(css chunkSourceSet) (data uint64, err error) { |
233 | 233 | | for _, haver := range css { |
234 | 234 | | index, err := haver.index() |
235 | 235 | | if err != nil { |
| skipped 41 lines |
277 | 277 | | return len(ts.novel) + len(ts.upstream) |
278 | 278 | | } |
279 | 279 | | |
280 | | - | // prepend adds a memTable to an existing tableSet, compacting |mt| and |
| 280 | + | // append adds a memTable to an existing tableSet, compacting |mt| and |
281 | 281 | | // returning a new tableSet with newly compacted table added. |
282 | | - | func (ts tableSet) prepend(ctx context.Context, mt *memTable, stats *Stats) (tableSet, error) { |
| 282 | + | func (ts tableSet) append(ctx context.Context, mt *memTable, stats *Stats) (tableSet, error) { |
283 | 283 | | cs, err := ts.p.Persist(ctx, mt, ts, stats) |
284 | 284 | | if err != nil { |
285 | 285 | | return tableSet{}, err |
286 | 286 | | } |
287 | 287 | | |
288 | 288 | | newTs := tableSet{ |
289 | | - | novel: make(chunkSources, len(ts.novel)+1), |
290 | | - | upstream: make(chunkSources, len(ts.upstream)), |
| 289 | + | novel: copyChunkSourceSet(ts.novel), |
| 290 | + | upstream: copyChunkSourceSet(ts.upstream), |
291 | 291 | | p: ts.p, |
292 | 292 | | q: ts.q, |
293 | 293 | | rl: ts.rl, |
294 | 294 | | } |
295 | | - | newTs.novel[0] = cs |
296 | | - | copy(newTs.novel[1:], ts.novel) |
297 | | - | copy(newTs.upstream, ts.upstream) |
| 295 | + | newTs.novel[cs.hash()] = cs |
298 | 296 | | return newTs, nil |
299 | 297 | | } |
300 | 298 | | |
| skipped 1 lines |
302 | 300 | | // and ts.upstream. |
303 | 301 | | func (ts tableSet) flatten(ctx context.Context) (tableSet, error) { |
304 | 302 | | flattened := tableSet{ |
305 | | - | upstream: make(chunkSources, 0, ts.Size()), |
| 303 | + | upstream: copyChunkSourceSet(ts.upstream), |
306 | 304 | | p: ts.p, |
307 | 305 | | q: ts.q, |
308 | 306 | | rl: ts.rl, |
| skipped 3 lines |
312 | 310 | | cnt, err := src.count() |
313 | 311 | | if err != nil { |
314 | 312 | | return tableSet{}, err |
315 | | - | } |
316 | | - | if cnt > 0 { |
317 | | - | flattened.upstream = append(flattened.upstream, src) |
| 313 | + | } else if cnt > 0 { |
| 314 | + | flattened.upstream[src.hash()] = src |
318 | 315 | | } |
319 | 316 | | } |
320 | | - | |
321 | | - | flattened.upstream = append(flattened.upstream, ts.upstream...) |
322 | 317 | | return flattened, nil |
323 | 318 | | } |
324 | 319 | | |
| skipped 29 lines |
354 | 349 | | |
355 | 350 | | // copy |ts.novel|, skipping empty chunkSources |
356 | 351 | | // (usually due to de-duping during table compaction) |
357 | | - | novel := make(chunkSources, 0, len(ts.novel)) |
| 352 | + | novel := make(chunkSourceSet, len(ts.novel)) |
358 | 353 | | for _, t := range ts.novel { |
359 | 354 | | cnt, err := t.count() |
360 | 355 | | if err != nil { |
| skipped 5 lines |
366 | 361 | | if err != nil { |
367 | 362 | | return tableSet{}, err |
368 | 363 | | } |
369 | | - | novel = append(novel, t2) |
370 | | - | } |
371 | | - | |
372 | | - | existing := make(map[addr]chunkSource, len(ts.upstream)) |
373 | | - | for _, cs := range ts.upstream { |
374 | | - | existing[cs.hash()] = cs |
| 364 | + | novel[t2.hash()] = t2 |
375 | 365 | | } |
376 | 366 | | |
377 | 367 | | // newly opened tables are unowned, we must |
378 | 368 | | // close them if the rebase operation fails |
379 | | - | opened := new(sync.Map) |
| 369 | + | opened := make(chunkSourceSet, len(specs)) |
380 | 370 | | |
381 | 371 | | eg, ctx := errgroup.WithContext(ctx) |
382 | | - | upstream := make([]chunkSource, len(specs)) |
383 | | - | for i, s := range specs { |
| 372 | + | mu := new(sync.Mutex) |
| 373 | + | upstream := make(chunkSourceSet, len(specs)) |
| 374 | + | for _, s := range specs { |
384 | 375 | | // clone tables that we have already opened |
385 | | - | if cs, ok := existing[s.name]; ok { |
386 | | - | c, err := cs.clone() |
| 376 | + | if cs, ok := ts.upstream[s.name]; ok { |
| 377 | + | cl, err := cs.clone() |
387 | 378 | | if err != nil { |
388 | 379 | | return tableSet{}, err |
389 | 380 | | } |
390 | | - | upstream[i] = c |
| 381 | + | upstream[cl.hash()] = cl |
391 | 382 | | continue |
392 | 383 | | } |
393 | 384 | | // open missing tables in parallel |
394 | | - | idx, spec := i, s |
| 385 | + | spec := s |
395 | 386 | | eg.Go(func() error { |
396 | 387 | | cs, err := ts.p.Open(ctx, spec.name, spec.chunkCount, stats) |
397 | 388 | | if err != nil { |
398 | 389 | | return err |
399 | 390 | | } |
400 | | - | upstream[idx] = cs |
401 | | - | opened.Store(spec.name, cs) |
| 391 | + | mu.Lock() |
| 392 | + | defer mu.Unlock() |
| 393 | + | upstream[cs.hash()] = cs |
| 394 | + | opened[cs.hash()] = cs |
402 | 395 | | return nil |
403 | 396 | | }) |
404 | 397 | | } |
405 | 398 | | |
406 | 399 | | if err := eg.Wait(); err != nil { |
407 | | - | opened.Range(func(_, v any) bool { |
| 400 | + | for _, cs := range opened { |
408 | 401 | | // close any opened chunkSources |
409 | | - | _ = v.(chunkSource).close() |
410 | | - | return true |
411 | | - | }) |
| 402 | + | _ = cs.close() |
| 403 | + | } |
412 | 404 | | return tableSet{}, err |
413 | 405 | | } |
414 | 406 | | |
| skipped 41 lines |
456 | 448 | | } |
457 | 449 | | |
458 | 450 | | func tableSetCalcReads(ts tableSet, reqs []getRecord, blockSize uint64) (reads int, split, remaining bool, err error) { |
459 | | - | all := append(ts.novel, ts.upstream...) |
| 451 | + | all := copyChunkSourceSet(ts.upstream) |
| 452 | + | for a, cs := range ts.novel { |
| 453 | + | all[a] = cs |
| 454 | + | } |
460 | 455 | | for _, tbl := range all { |
461 | 456 | | rdr, ok := tbl.(*fileTableReader) |
462 | 457 | | if !ok { |
| skipped 20 lines |