diff --git a/pkg/iter/iter.go b/pkg/iter/iter.go index 47fbe6f4e..34a16f7db 100644 --- a/pkg/iter/iter.go +++ b/pkg/iter/iter.go @@ -101,6 +101,20 @@ func NewSliceSeekIterator[A constraints.Ordered](s []A) SeekIterator[A, A] { } } +type slicePositionIterator[T constraints.Integer, M any] struct { + i Iterator[T] + s []M +} + +func NewSliceIndexIterator[T constraints.Integer, M any](s []M, i Iterator[T]) Iterator[M] { + return slicePositionIterator[T, M]{s: s, i: i} +} + +func (i slicePositionIterator[T, M]) Next() bool { return i.i.Next() } +func (i slicePositionIterator[T, M]) At() M { return i.s[i.i.At()] } +func (i slicePositionIterator[T, M]) Err() error { return i.i.Err() } +func (i slicePositionIterator[T, M]) Close() error { return i.i.Close() } + type sliceSeekIterator[A constraints.Ordered] struct { *sliceIterator[A] } diff --git a/pkg/phlaredb/block_querier.go b/pkg/phlaredb/block_querier.go index e431f14f5..ad0c9ab75 100644 --- a/pkg/phlaredb/block_querier.go +++ b/pkg/phlaredb/block_querier.go @@ -305,7 +305,7 @@ type singleBlockQuerier struct { type StacktraceDB interface { Open(ctx context.Context) error Close() error - Resolve(ctx context.Context, mapping uint64, locs locationsIdsByStacktraceID, stacktraceIDs []uint32) error + Resolve(ctx context.Context, partition uint64, locs symdb.StacktraceInserter, stacktraceIDs []uint32) error } type stacktraceResolverV1 struct { @@ -321,14 +321,17 @@ func (r *stacktraceResolverV1) Close() error { return r.stacktraces.Close() } -func (r *stacktraceResolverV1) Resolve(ctx context.Context, mapping uint64, locs locationsIdsByStacktraceID, stacktraceIDs []uint32) error { +func (r *stacktraceResolverV1) Resolve(ctx context.Context, _ uint64, locs symdb.StacktraceInserter, stacktraceIDs []uint32) error { stacktraces := repeatedColumnIter(ctx, r.stacktraces.file, "LocationIDs.list.element", iter.NewSliceIterator(stacktraceIDs)) defer stacktraces.Close() - + t := make([]int32, 0, 64) for stacktraces.Next() { + t = t[:0] s := stacktraces.At() - locs.addFromParquet(int64(s.Row), s.Values) - + for i, v := range s.Values { + t[i] = v.Int32() + } + locs.InsertStacktrace(s.Row, t) } return stacktraces.Err() } @@ -351,19 +354,14 @@ func (r *stacktraceResolverV2) Close() error { return nil } -func (r *stacktraceResolverV2) Resolve(ctx context.Context, mapping uint64, locs locationsIdsByStacktraceID, stacktraceIDs []uint32) error { +func (r *stacktraceResolverV2) Resolve(ctx context.Context, mapping uint64, locs symdb.StacktraceInserter, stacktraceIDs []uint32) error { mr, ok := r.reader.MappingReader(mapping) if !ok { return nil } resolver := mr.StacktraceResolver() defer resolver.Release() - - return resolver.ResolveStacktraces(ctx, symdb.StacktraceInserterFn( - func(stacktraceID uint32, locations []int32) { - locs.add(int64(stacktraceID), locations) - }, - ), stacktraceIDs) + return resolver.ResolveStacktraces(ctx, locs, stacktraceIDs) } func NewSingleBlockQuerierFromMeta(phlarectx context.Context, bucketReader phlareobj.Bucket, meta *block.Meta) *singleBlockQuerier { @@ -435,6 +433,11 @@ func (b *singleBlockQuerier) Index() IndexReader { return b.index } +func (b *singleBlockQuerier) Symbols() SymbolsReader { + // TODO(kolesnikovae) + return nil +} + func (b *singleBlockQuerier) Meta() block.Meta { if b.meta == nil { return block.Meta{} diff --git a/pkg/phlaredb/block_symbols_appender.go b/pkg/phlaredb/block_symbols_appender.go new file mode 100644 index 000000000..b39adb9e2 --- /dev/null +++ b/pkg/phlaredb/block_symbols_appender.go @@ -0,0 +1,19 @@ +package phlaredb + +import schemav1 "github.com/grafana/phlare/pkg/phlaredb/schemas/v1" + +// TODO(kolesnikovae): Refactor to symdb. + +type SymbolsWriter interface { + SymbolsAppender(partition uint64) (SymbolsAppender, error) +} + +type SymbolsAppender interface { + AppendStacktrace([]int32) uint32 + AppendLocation(*schemav1.InMemoryLocation) uint32 + AppendMapping(*schemav1.InMemoryMapping) uint32 + AppendFunction(*schemav1.InMemoryFunction) uint32 + AppendString(string) uint32 + + Flush() error +} diff --git a/pkg/phlaredb/block_symbols_reader.go b/pkg/phlaredb/block_symbols_reader.go new file mode 100644 index 000000000..6c6df7a52 --- /dev/null +++ b/pkg/phlaredb/block_symbols_reader.go @@ -0,0 +1,92 @@ +package phlaredb + +import ( + "context" + + "github.com/grafana/phlare/pkg/iter" + schemav1 "github.com/grafana/phlare/pkg/phlaredb/schemas/v1" + "github.com/grafana/phlare/pkg/phlaredb/symdb" +) + +// TODO(kolesnikovae): Refactor to symdb. + +type SymbolsReader interface { + SymbolsResolver(partition uint64) (SymbolsResolver, error) +} + +type SymbolsResolver interface { + ResolveStacktraces(ctx context.Context, dst symdb.StacktraceInserter, stacktraces []uint32) error + + Locations(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryLocation] + Mappings(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryMapping] + Functions(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryFunction] + Strings(iter.Iterator[uint32]) iter.Iterator[string] + + WriteStats(*SymbolStats) +} + +type SymbolStats struct { + StacktracesTotal int + LocationsTotal int + MappingsTotal int + FunctionsTotal int + StringsTotal int +} + +type inMemorySymbolsReader struct { + partitions map[uint64]*inMemorySymbolsResolver + + // TODO(kolesnikovae): Split into partitions. + strings inMemoryparquetReader[string, *schemav1.StringPersister] + functions inMemoryparquetReader[*schemav1.InMemoryFunction, *schemav1.FunctionPersister] + locations inMemoryparquetReader[*schemav1.InMemoryLocation, *schemav1.LocationPersister] + mappings inMemoryparquetReader[*schemav1.InMemoryMapping, *schemav1.MappingPersister] + stacktraces StacktraceDB +} + +func (r *inMemorySymbolsReader) Symbols(partition uint64) SymbolsResolver { + p, ok := r.partitions[partition] + if !ok { + p = &inMemorySymbolsResolver{ + partition: 0, + ctx: nil, + reader: nil, + } + r.partitions[partition] = p + } + return p +} + +type inMemorySymbolsResolver struct { + partition uint64 + ctx context.Context + reader *inMemorySymbolsReader +} + +func (s inMemorySymbolsResolver) ResolveStacktraces(ctx context.Context, dst symdb.StacktraceInserter, stacktraces []uint32) error { + return s.reader.stacktraces.Resolve(ctx, s.partition, dst, stacktraces) +} + +func (s inMemorySymbolsResolver) Locations(i iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryLocation] { + return iter.NewSliceIndexIterator(s.reader.locations.cache, i) +} + +func (s inMemorySymbolsResolver) Mappings(i iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryMapping] { + return iter.NewSliceIndexIterator(s.reader.mappings.cache, i) +} + +func (s inMemorySymbolsResolver) Functions(i iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryFunction] { + return iter.NewSliceIndexIterator(s.reader.functions.cache, i) +} + +func (s inMemorySymbolsResolver) Strings(i iter.Iterator[uint32]) iter.Iterator[string] { + return iter.NewSliceIndexIterator(s.reader.strings.cache, i) +} + +func (s inMemorySymbolsResolver) WriteStats(stats *SymbolStats) { + stats.StacktracesTotal = 0 // TODO + stats.LocationsTotal = int(s.reader.locations.NumRows()) + stats.MappingsTotal = int(s.reader.mappings.NumRows()) + stats.FunctionsTotal = int(s.reader.functions.NumRows()) + stats.StringsTotal = int(s.reader.strings.NumRows()) +} diff --git a/pkg/phlaredb/compact.go b/pkg/phlaredb/compact.go index 7d1f57370..bae2fadc7 100644 --- a/pkg/phlaredb/compact.go +++ b/pkg/phlaredb/compact.go @@ -5,6 +5,7 @@ import ( "math" "os" "path/filepath" + "sort" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -27,7 +28,7 @@ type BlockReader interface { Meta() block.Meta Profiles() []parquet.RowGroup Index() IndexReader - // todo symbdb + Symbols() SymbolsReader } func Compact(ctx context.Context, src []BlockReader, dst string) (block.Meta, error) { @@ -50,16 +51,18 @@ func Compact(ctx context.Context, src []BlockReader, dst string) (block.Meta, er return block.Meta{}, err } profileWriter := newProfileWriter(profileFile) - - // todo new symbdb + symw, err := newSymbolsWriter(dst) + if err != nil { + return block.Meta{}, err + } rowsIt, err := newMergeRowProfileIterator(src) if err != nil { return block.Meta{}, err } seriesRewriter := newSeriesRewriter(rowsIt, indexw) - symbolsRewriter := newSymbolsRewriter(seriesRewriter) - reader := phlareparquet.NewIteratorRowReader(newRowsIterator(symbolsRewriter)) + symRewriter := newSymbolsRewriter(seriesRewriter, src, symw) + reader := phlareparquet.NewIteratorRowReader(newRowsIterator(symRewriter)) total, _, err := phlareparquet.CopyAsRowGroups(profileWriter, reader, defaultParquetConfig.MaxBufferRowCount) if err != nil { @@ -77,7 +80,7 @@ func Compact(ctx context.Context, src []BlockReader, dst string) (block.Meta, er // todo: block meta files. meta.Stats.NumProfiles = total meta.Stats.NumSeries = seriesRewriter.NumSeries() - meta.Stats.NumSamples = symbolsRewriter.NumSamples() + meta.Stats.NumSamples = symRewriter.NumSamples() if _, err := meta.WriteToFile(util.Logger, blockPath); err != nil { return block.Meta{}, err @@ -138,10 +141,13 @@ type profileRow struct { labels phlaremodel.Labels fp model.Fingerprint row schemav1.ProfileRow + + blockReader BlockReader } type profileRowIterator struct { profiles iter.Iterator[parquet.Row] + blockReader BlockReader index IndexReader allPostings index.Postings err error @@ -150,15 +156,16 @@ type profileRowIterator struct { chunks []index.ChunkMeta } -func newProfileRowIterator(reader parquet.RowReader, idx IndexReader) (*profileRowIterator, error) { +func newProfileRowIterator(reader parquet.RowReader, s BlockReader) (*profileRowIterator, error) { k, v := index.AllPostingsKey() - allPostings, err := idx.Postings(k, nil, v) + allPostings, err := s.Index().Postings(k, nil, v) if err != nil { return nil, err } return &profileRowIterator{ profiles: phlareparquet.NewBufferedRowReaderIterator(reader, 1024), - index: idx, + blockReader: s, + index: s.Index(), allPostings: allPostings, currentRow: profileRow{ seriesRef: math.MaxUint32, @@ -175,6 +182,7 @@ func (p *profileRowIterator) Next() bool { if !p.profiles.Next() { return false } + p.currentRow.blockReader = p.blockReader p.currentRow.row = schemav1.ProfileRow(p.profiles.At()) seriesIndex := p.currentRow.row.SeriesIndex() p.currentRow.timeNanos = p.currentRow.row.TimeNanos() @@ -217,10 +225,7 @@ func newMergeRowProfileIterator(src []BlockReader) (iter.Iterator[profileRow], e for i, s := range src { // todo: may be we could merge rowgroups in parallel but that requires locking. reader := parquet.MultiRowGroup(s.Profiles()...).Rows() - it, err := newProfileRowIterator( - reader, - s.Index(), - ) + it, err := newProfileRowIterator(reader, s) if err != nil { return nil, err } @@ -252,80 +257,6 @@ func newMergeRowProfileIterator(src []BlockReader) (iter.Iterator[profileRow], e }, nil } -type noopStacktraceRewriter struct{} - -func (noopStacktraceRewriter) RewriteStacktraces(src, dst []uint32) error { - copy(dst, src) - return nil -} - -type StacktraceRewriter interface { - RewriteStacktraces(src, dst []uint32) error -} - -type symbolsRewriter struct { - iter.Iterator[profileRow] - err error - - rewriter StacktraceRewriter - src, dst []uint32 - numSamples uint64 -} - -// todo remap symbols & ingest symbols -func newSymbolsRewriter(it iter.Iterator[profileRow]) *symbolsRewriter { - return &symbolsRewriter{ - Iterator: it, - rewriter: noopStacktraceRewriter{}, - } -} - -func (s *symbolsRewriter) NumSamples() uint64 { - return s.numSamples -} - -func (s *symbolsRewriter) Next() bool { - if !s.Iterator.Next() { - return false - } - var err error - s.Iterator.At().row.ForStacktraceIDsValues(func(values []parquet.Value) { - s.numSamples += uint64(len(values)) - s.loadStacktracesID(values) - err = s.rewriter.RewriteStacktraces(s.src, s.dst) - if err != nil { - return - } - for i, v := range values { - values[i] = parquet.Int64Value(int64(s.dst[i])).Level(v.RepetitionLevel(), v.DefinitionLevel(), v.Column()) - } - }) - if err != nil { - s.err = err - return false - } - return true -} - -func (s *symbolsRewriter) Err() error { - if s.err != nil { - return s.err - } - return s.Iterator.Err() -} - -func (s *symbolsRewriter) loadStacktracesID(values []parquet.Value) { - if cap(s.src) < len(values) { - s.src = make([]uint32, len(values)*2) - s.dst = make([]uint32, len(values)*2) - } - s.src = s.src[:len(values)] - s.dst = s.dst[:len(values)] - for i := range values { - s.src[i] = values[i].Uint32() - } -} - type seriesRewriter struct { iter.Iterator[profileRow] @@ -443,3 +374,457 @@ func prepareIndexWriter(ctx context.Context, path string, readers []BlockReader) return indexw, nil } + +type symbolsRewriter struct { + profiles iter.Iterator[profileRow] + rewriters map[BlockReader]*stacktraceRewriter + stacktraces []uint32 + err error + + numSamples uint64 +} + +func newSymbolsRewriter(it iter.Iterator[profileRow], blocks []BlockReader, w SymbolsWriter) *symbolsRewriter { + sr := symbolsRewriter{ + profiles: it, + rewriters: make(map[BlockReader]*stacktraceRewriter, len(blocks)), + } + for _, r := range blocks { + sr.rewriters[r] = newStacktraceRewriter(r.Symbols(), w) + } + return &sr +} + +func (s *symbolsRewriter) NumSamples() uint64 { return s.numSamples } + +func (s *symbolsRewriter) At() profileRow { return s.profiles.At() } + +func (s *symbolsRewriter) Close() error { return s.profiles.Close() } + +func (s *symbolsRewriter) Err() error { + if s.err != nil { + return s.err + } + return s.profiles.Err() +} + +func (s *symbolsRewriter) Next() bool { + if !s.profiles.Next() { + return false + } + var err error + profile := s.profiles.At() + profile.row.ForStacktraceIDsValues(func(values []parquet.Value) { + s.loadStacktracesID(values) + r := s.rewriters[profile.blockReader] + if err = r.rewriteStacktraces(profile.row.StacktracePartitionID(), s.stacktraces); err != nil { + return + } + s.numSamples += uint64(len(values)) + for i, v := range values { + // FIXME: the original order is not preserved, which will affect encoding. + values[i] = parquet.Int64Value(int64(s.stacktraces[i])).Level(v.RepetitionLevel(), v.DefinitionLevel(), v.Column()) + } + }) + if err != nil { + s.err = err + return false + } + return true +} + +func (s *symbolsRewriter) loadStacktracesID(values []parquet.Value) { + if cap(s.stacktraces) < len(values) { + s.stacktraces = make([]uint32, len(values)*2) + } + s.stacktraces = s.stacktraces[:len(values)] + for i := range values { + s.stacktraces[i] = values[i].Uint32() + } +} + +type stacktraceRewriter struct { + reader SymbolsReader + writer SymbolsWriter + + // Stack trace identifiers are only valid within the partition. + stacktraces map[uint64]*lookupTable[[]int32] + inserter *stacktraceInserter + + // Objects below have global addressing. + locations *lookupTable[*schemav1.InMemoryLocation] + mappings *lookupTable[*schemav1.InMemoryMapping] + functions *lookupTable[*schemav1.InMemoryFunction] + strings *lookupTable[string] + + partition uint64 + resolver SymbolsResolver + appender SymbolsAppender + stats SymbolStats +} + +func newStacktraceRewriter(r SymbolsReader, w SymbolsWriter) *stacktraceRewriter { + return &stacktraceRewriter{ + reader: r, + writer: w, + } +} + +func (r *stacktraceRewriter) init(partition uint64) (err error) { + r.partition = partition + if r.appender, err = r.writer.SymbolsAppender(partition); err != nil { + return err + } + if r.resolver, err = r.reader.SymbolsResolver(partition); err != nil { + return err + } + r.resolver.WriteStats(&r.stats) + + // Only stacktraces are yet partitioned. + if r.stacktraces == nil { + r.stacktraces = make(map[uint64]*lookupTable[[]int32]) + r.inserter = new(stacktraceInserter) + } + p, ok := r.stacktraces[partition] + if !ok { + p = newLookupTable[[]int32](r.stats.StacktracesTotal) + r.stacktraces[partition] = p + } + p.reset() + + if r.locations == nil { + r.locations = newLookupTable[*schemav1.InMemoryLocation](r.stats.LocationsTotal) + r.mappings = newLookupTable[*schemav1.InMemoryMapping](r.stats.MappingsTotal) + r.functions = newLookupTable[*schemav1.InMemoryFunction](r.stats.FunctionsTotal) + r.strings = newLookupTable[string](r.stats.StringsTotal) + return nil + } + r.locations.reset() + r.mappings.reset() + r.functions.reset() + r.strings.reset() + + r.inserter = &stacktraceInserter{ + slt: p, + llt: r.locations, + s: r.inserter.s, + } + + return nil +} + +func (r *stacktraceRewriter) hasUnresolved() bool { + return len(r.stacktraces[r.partition].unresolved)+ + len(r.locations.unresolved)+ + len(r.mappings.unresolved)+ + len(r.functions.unresolved)+ + len(r.strings.unresolved) > 0 +} + +func (r *stacktraceRewriter) rewriteStacktraces(partition uint64, stacktraces []uint32) (err error) { + if err = r.init(partition); err != nil { + return err + } + if err = r.populateUnresolved(stacktraces); err != nil { + return err + } + if r.hasUnresolved() { + if err = r.appendRewrite(stacktraces); err != nil { + return err + } + } + return nil +} + +func (r *stacktraceRewriter) populateUnresolved(stacktraceIDs []uint32) error { + // Filter out all stack traces that have been already + // resolved and populate locations lookup table. + if err := r.resolveStacktraces(stacktraceIDs); err != nil { + return err + } + if len(r.locations.unresolved) == 0 { + return nil + } + + // Resolve functions and mappings for new locations. + unresolvedLocs := r.locations.iter() + locations := r.resolver.Locations(unresolvedLocs) + for locations.Err() == nil && locations.Next() { + location := locations.At() + location.MappingId = r.mappings.tryLookup(location.MappingId) + for j, line := range location.Line { + location.Line[j].FunctionId = r.functions.tryLookup(line.FunctionId) + } + unresolvedLocs.setValue(location) + } + if err := locations.Err(); err != nil { + return err + } + + // Resolve strings. + unresolvedMappings := r.mappings.iter() + mappings := r.resolver.Mappings(unresolvedMappings) + for mappings.Err() == nil && mappings.Next() { + mapping := mappings.At() + mapping.BuildId = r.strings.tryLookup(mapping.BuildId) + mapping.Filename = r.strings.tryLookup(mapping.Filename) + unresolvedMappings.setValue(mapping) + } + if err := mappings.Err(); err != nil { + return err + } + + unresolvedFunctions := r.functions.iter() + functions := r.resolver.Functions(unresolvedFunctions) + for functions.Err() == nil && functions.Next() { + function := functions.At() + function.Name = r.strings.tryLookup(function.Name) + function.Filename = r.strings.tryLookup(function.Filename) + function.SystemName = r.strings.tryLookup(function.SystemName) + unresolvedFunctions.setValue(function) + } + if err := functions.Err(); err != nil { + return err + } + + unresolvedStrings := r.strings.iter() + strings := r.resolver.Strings(unresolvedStrings) + for strings.Err() == nil && strings.Next() { + unresolvedStrings.setValue(strings.At()) + } + return strings.Err() +} + +func (r *stacktraceRewriter) appendRewrite(stacktraces []uint32) error { + for _, v := range r.strings.unresolved { + r.strings.storeResolved(v.rid, r.appender.AppendString(v.val)) + } + + for _, v := range r.functions.unresolved { + function := v.val + function.Name = r.strings.lookupResolved(function.Name) + function.Filename = r.strings.lookupResolved(function.Filename) + function.SystemName = r.strings.lookupResolved(function.SystemName) + r.functions.storeResolved(v.rid, r.appender.AppendFunction(function)) + } + + for _, v := range r.mappings.unresolved { + mapping := v.val + mapping.BuildId = r.strings.lookupResolved(mapping.BuildId) + mapping.Filename = r.strings.lookupResolved(mapping.Filename) + r.mappings.storeResolved(v.rid, r.appender.AppendMapping(mapping)) + } + + for _, v := range r.locations.unresolved { + location := v.val + location.MappingId = r.mappings.lookupResolved(location.MappingId) + for j, line := range location.Line { + location.Line[j].FunctionId = r.functions.lookupResolved(line.FunctionId) + } + r.locations.storeResolved(v.rid, r.appender.AppendLocation(location)) + } + + src := r.stacktraces[r.partition] + for _, v := range src.unresolved { + stacktrace := v.val + for j, lid := range stacktrace { + stacktrace[j] = int32(r.locations.lookupResolved(uint32(lid))) + } + src.storeResolved(v.rid, r.appender.AppendStacktrace(stacktrace)) + } + for i, v := range stacktraces { + stacktraces[i] = src.lookupResolved(v) + } + + return r.appender.Flush() +} + +func (r *stacktraceRewriter) resolveStacktraces(stacktraceIDs []uint32) error { + stacktraces := r.stacktraces[r.partition] + for i, v := range stacktraceIDs { + stacktraceIDs[i] = stacktraces.tryLookup(v) + } + if len(stacktraces.unresolved) == 0 { + return nil + } + + // Gather and sort references to unresolved stacks. + stacktraces.initRefs() + sort.Sort(stacktraces) + grow(r.inserter.s, len(stacktraces.refs)) + for j, u := range stacktraces.refs { + r.inserter.s[j] = u.rid + } + + return r.resolver.ResolveStacktraces(context.TODO(), r.inserter, r.inserter.s) +} + +type stacktraceInserter struct { + slt *lookupTable[[]int32] + llt *lookupTable[*schemav1.InMemoryLocation] + s []uint32 + c int +} + +func (i *stacktraceInserter) InsertStacktrace(stacktrace uint32, locations []int32) { + // Resolve locations for new stack traces. + for j, loc := range locations { + locations[j] = int32(i.llt.tryLookup(uint32(loc))) + } + // Update the unresolved value. + v := i.slt.referenceAt(i.c) + if v.rid != stacktrace { + panic("unexpected stack trace") + } + grow(v.val, len(locations)) + copy(v.val, locations) + i.c++ +} + +const ( + marker = 1 << 31 + markerMask = math.MaxUint32 >> 1 +) + +type lookupTable[T any] struct { + // Index is source ID, and the value is the destination ID. + // If destination ID is not known, the element is index to 'unresolved' (marked). + resolved []uint32 + unresolved []lookupTableValue[T] + refs []lookupTableRef +} + +type lookupTableValue[T any] struct { + rid uint32 // Index to resolved. + val T +} + +type lookupTableRef struct { + rid uint32 // Index to resolved. + uid uint32 // Original index (unresolved). +} + +func newLookupTable[T any](size int) *lookupTable[T] { + var t lookupTable[T] + t.init(size) + return &t +} + +func (t *lookupTable[T]) init(size int) { + if cap(t.resolved) < size { + t.resolved = make([]uint32, size) + return + } + t.resolved = t.resolved[:size] + for i := range t.resolved { + t.resolved[i] = 0 + } +} + +func (t *lookupTable[T]) reset() { + t.unresolved = t.unresolved[:0] + t.refs = t.refs[:0] +} + +// tryLookup looks up the value at x in resolved. +// If x is has not been resolved yet, the x is memorized +// for future resolve, and returned values is the marked +// index to unresolved. +func (t *lookupTable[T]) tryLookup(x uint32) uint32 { + if v := t.resolved[x]; v != 0 { + if v&marker > 0 { + return v // Already marked for resolve. + } + return v - 1 // Already resolved. + } + u := t.newUnresolvedValue(x) | marker + t.resolved[x] = u + return u +} + +func (t *lookupTable[T]) newUnresolvedValue(rid uint32) uint32 { + x := len(t.unresolved) + if x < cap(t.unresolved) { + // Try to reuse previously allocated value. + x++ + t.unresolved = t.unresolved[:x] + t.unresolved[x].rid = rid + } else { + t.unresolved = append(t.unresolved, lookupTableValue[T]{rid: rid}) + } + return uint32(x) +} + +func (t *lookupTable[T]) referenceAt(x int) *lookupTableValue[T] { + u := t.refs[x].uid + return &t.unresolved[u] +} + +func (t *lookupTable[T]) storeResolved(rid, v uint32) { t.resolved[rid] = v + 1 } + +func (t *lookupTable[T]) lookupResolved(x uint32) uint32 { + if x&marker > 0 { // TODO: why? + return t.resolved[t.unresolved[x&markerMask-1].rid] - 1 + } + return x // Already resolved. +} + +func (t *lookupTable[T]) iter() *lookupTableIterator[T] { + t.initRefs() + sort.Sort(t) + return &lookupTableIterator[T]{table: t} +} + +func (t *lookupTable[T]) initRefs() { + grow(t.refs, len(t.unresolved)) + for i, v := range t.unresolved { + t.refs[i] = lookupTableRef{rid: v.rid, uid: uint32(i)} + } +} + +func (t *lookupTable[T]) Len() int { return len(t.refs) } +func (t *lookupTable[T]) Less(i, j int) bool { return t.refs[i].rid < t.refs[j].rid } +func (t *lookupTable[T]) Swap(i, j int) { t.refs[i], t.refs[j] = t.refs[j], t.refs[i] } + +type lookupTableIterator[T any] struct { + table *lookupTable[T] + cur uint32 +} + +func (t *lookupTableIterator[T]) Next() bool { + return t.cur < uint32(len(t.table.refs)) +} + +func (t *lookupTableIterator[T]) At() uint32 { + x := t.table.refs[t.cur].rid + t.cur++ + return x +} + +func (t *lookupTableIterator[T]) setValue(v T) { + uid := t.table.refs[t.cur-1].uid + t.table.unresolved[uid].val = v +} + +func (t *lookupTableIterator[T]) Close() error { return nil } + +func (t *lookupTableIterator[T]) Err() error { return nil } + +func grow[T any](s []T, n int) []T { + if cap(s) < n { + return make([]T, n, 2*n) + } + return s[:n] +} + +// TODO(kolesnikovae): + +type symbolsWriter struct{} + +func newSymbolsWriter(dst string) (*symbolsWriter, error) { + return &symbolsWriter{}, nil +} + +func (w *symbolsWriter) SymbolsAppender(partition uint64) (SymbolsAppender, error) { + return nil, nil +} diff --git a/pkg/phlaredb/compact_test.go b/pkg/phlaredb/compact_test.go index 213ddac24..2d277411a 100644 --- a/pkg/phlaredb/compact_test.go +++ b/pkg/phlaredb/compact_test.go @@ -3,13 +3,12 @@ package phlaredb import ( "context" "net/http" + _ "net/http/pprof" "sort" "sync" "testing" "time" - _ "net/http/pprof" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/storage" "github.com/stretchr/testify/assert" @@ -132,6 +131,15 @@ func testCompact(t *testing.T, metas []*block.Meta, bkt phlareobj.Bucket, dst st "numSamples", new.Stats.NumSamples) } +type blockReaderMock struct { + BlockReader + idxr IndexReader +} + +func (m *blockReaderMock) Index() IndexReader { + return m.idxr +} + func TestProfileRowIterator(t *testing.T) { filePath := t.TempDir() + "/index.tsdb" idxw, err := index.NewWriter(context.Background(), filePath) @@ -158,7 +166,7 @@ func TestProfileRowIterator(t *testing.T) { {SeriesIndex: 1, TimeNanos: 2}, {SeriesIndex: 2, TimeNanos: 3}, }, - ), idxr) + ), &blockReaderMock{idxr: idxr}) require.NoError(t, err) assert.True(t, it.Next()) @@ -191,3 +199,96 @@ func addSeries(t *testing.T, idxw *index.Writer, idx int, labels phlaremodel.Lab t.Helper() require.NoError(t, idxw.AddSeries(storage.SeriesRef(idx), labels, model.Fingerprint(labels.Hash()), index.ChunkMeta{SeriesIndex: uint32(idx)})) } + +func Test_lookupTable(t *testing.T) { + // Given the source data set. + // Copy arbitrary subsets of those items to dst. + var dst []string + src := []string{ + "zero", + "one", + "two", + "three", + "four", + "five", + "six", + "seven", + } + + type testCase struct { + description string + input []uint32 + expected []string + } + + testCases := []testCase{ + { + description: "empty table", + input: []uint32{5, 0, 3, 1, 2, 2, 4}, + expected: []string{"five", "zero", "three", "one", "two", "two", "four"}, + }, + { + description: "no new values", + input: []uint32{2, 1, 2, 3}, + expected: []string{"two", "one", "two", "three"}, + }, + { + description: "new value mixed", + input: []uint32{2, 1, 6, 2, 3}, + expected: []string{"two", "one", "six", "two", "three"}, + }, + } + + // Try to lookup values in src lazily. + // Table size must be greater or equal + // to the source data set. + l := newLookupTable[string](10) + + populate := func(t *testing.T, x []uint32) { + for i, v := range x { + x[i] = l.tryLookup(v) + } + // Resolve unknown yet values. + // Mind the order and deduplication. + p := -1 + for it := l.iter(); it.Err() == nil && it.Next(); { + m := int(it.At()) + if m <= p { + t.Fatal("iterator order invalid") + } + p = m + it.setValue(src[m]) + } + } + + resolveAppend := func() { + // Populate dst with the newly resolved values. + // Note that order in dst does not have to match src. + for _, n := range l.unresolved { + l.storeResolved(n.rid, uint32(len(dst))) + dst = append(dst, n.val) + } + } + + resolve := func(x []uint32) []string { + // Lookup resolved values. + var resolved []string + for _, v := range x { + resolved = append(resolved, dst[l.lookupResolved(v)]) + } + return resolved + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.description, func(t *testing.T) { + l.reset() + populate(t, tc.input) + resolveAppend() + assert.Equal(t, tc.expected, resolve(tc.input)) + }) + } + + assert.Len(t, dst, 7) + assert.NotContains(t, dst, "seven") +} diff --git a/pkg/phlaredb/sample_merge.go b/pkg/phlaredb/sample_merge.go index 14065e8a3..6d2389f25 100644 --- a/pkg/phlaredb/sample_merge.go +++ b/pkg/phlaredb/sample_merge.go @@ -56,20 +56,12 @@ func newLocationsIdsByStacktraceID(size int) locationsIdsByStacktraceID { } } -func (l locationsIdsByStacktraceID) addFromParquet(stacktraceID int64, locs []parquet.Value) { - l.byStacktraceID[stacktraceID] = make([]int32, len(locs)) - for i, locationID := range locs { - locID := locationID.Uint64() - l.ids[int64(locID)] = struct{}{} - l.byStacktraceID[stacktraceID][i] = int32(locID) - } -} - -func (l locationsIdsByStacktraceID) add(stacktraceID int64, locs []int32) { - l.byStacktraceID[stacktraceID] = make([]int32, len(locs)) +func (l locationsIdsByStacktraceID) InsertStacktrace(stacktraceID uint32, locs []int32) { + s := make([]int32, len(locs)) + l.byStacktraceID[int64(stacktraceID)] = s for i, locationID := range locs { l.ids[int64(locationID)] = struct{}{} - l.byStacktraceID[stacktraceID][i] = locationID + s[i] = locationID } } diff --git a/pkg/phlaredb/schemas/v1/profiles.go b/pkg/phlaredb/schemas/v1/profiles.go index c3806ced9..a39381d58 100644 --- a/pkg/phlaredb/schemas/v1/profiles.go +++ b/pkg/phlaredb/schemas/v1/profiles.go @@ -42,10 +42,11 @@ var ( phlareparquet.NewGroupField("DefaultSampleType", parquet.Optional(parquet.Int(64))), }) - maxProfileRow parquet.Row - seriesIndexColIndex int - stacktraceIDColIndex int - timeNanoColIndex int + maxProfileRow parquet.Row + seriesIndexColIndex int + stacktraceIDColIndex int + timeNanoColIndex int + stacktracePartitionColIndex int ) func init() { @@ -68,6 +69,11 @@ func init() { panic(fmt.Errorf("StacktraceID column not found")) } stacktraceIDColIndex = stacktraceIDCol.ColumnIndex + stacktracePartitionCol, ok := profilesSchema.Lookup("StacktracePartition") + if !ok { + panic(fmt.Errorf("StacktracePartition column not found")) + } + stacktracePartitionColIndex = stacktracePartitionCol.ColumnIndex } type Sample struct { @@ -471,6 +477,10 @@ func (p ProfileRow) SeriesIndex() uint32 { return p[seriesIndexColIndex].Uint32() } +func (p ProfileRow) StacktracePartitionID() uint64 { + return p[stacktracePartitionColIndex].Uint64() +} + func (p ProfileRow) TimeNanos() int64 { var ts int64 for i := len(p) - 1; i >= 0; i-- { diff --git a/pkg/phlaredb/symdb/interfaces.go b/pkg/phlaredb/symdb/interfaces.go index fe7c03b20..cd07358d1 100644 --- a/pkg/phlaredb/symdb/interfaces.go +++ b/pkg/phlaredb/symdb/interfaces.go @@ -10,6 +10,7 @@ import ( // collection. https://github.com/google/pprof/blob/main/proto/README.md // // In the package, Mapping represents all the version of a binary. +// TODO(kolesnikovae): Rename mapping to Partition type MappingWriter interface { // StacktraceAppender provides exclusive write access