| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977 |
- // Copyright (c) Tailscale Inc & AUTHORS
- // SPDX-License-Identifier: BSD-3-Clause
- //go:build !ts_omit_tailnetlock
- package tka
- import (
- "bytes"
- "errors"
- "fmt"
- "log"
- "maps"
- "os"
- "path/filepath"
- "slices"
- "sync"
- "time"
- "github.com/fxamacker/cbor/v2"
- "tailscale.com/atomicfile"
- "tailscale.com/tstime"
- "tailscale.com/util/testenv"
- )
- // Chonk implementations provide durable storage for AUMs and other
- // TKA state.
- //
- // All methods must be thread-safe.
- //
- // The name 'tailchonk' was coined by @catzkorn.
- type Chonk interface {
- // AUM returns the AUM with the specified digest.
- //
- // If the AUM does not exist, then os.ErrNotExist is returned.
- AUM(hash AUMHash) (AUM, error)
- // ChildAUMs returns all AUMs with a specified previous
- // AUM hash.
- ChildAUMs(prevAUMHash AUMHash) ([]AUM, error)
- // CommitVerifiedAUMs durably stores the provided AUMs.
- // Callers MUST ONLY provide AUMs which are verified (specifically,
- // a call to aumVerify() must return a nil error).
- // as the implementation assumes that only verified AUMs are stored.
- CommitVerifiedAUMs(updates []AUM) error
- // Heads returns AUMs for which there are no children. In other
- // words, the latest AUM in all possible chains (the 'leaves').
- Heads() ([]AUM, error)
- // SetLastActiveAncestor is called to record the oldest-known AUM
- // that contributed to the current state. This value is used as
- // a hint on next startup to determine which chain to pick when computing
- // the current state, if there are multiple distinct chains.
- SetLastActiveAncestor(hash AUMHash) error
- // LastActiveAncestor returns the oldest-known AUM that was (in a
- // previous run) an ancestor of the current state. This is used
- // as a hint to pick the correct chain in the event that the Chonk stores
- // multiple distinct chains.
- LastActiveAncestor() (*AUMHash, error)
- }
- // CompactableChonk implementation are extensions of Chonk, which are
- // able to be operated by compaction logic to deleted old AUMs.
- type CompactableChonk interface {
- Chonk
- // AllAUMs returns all AUMs stored in the chonk.
- AllAUMs() ([]AUMHash, error)
- // CommitTime returns the time at which the AUM was committed.
- //
- // If the AUM does not exist, then os.ErrNotExist is returned.
- CommitTime(hash AUMHash) (time.Time, error)
- // PurgeAUMs permanently and irrevocably deletes the specified
- // AUMs from storage.
- PurgeAUMs(hashes []AUMHash) error
- // RemoveAll permanently and completely clears the TKA state. This should
- // be called when the user disables Tailnet Lock.
- RemoveAll() error
- }
- // Mem implements in-memory storage of TKA state, suitable for
- // tests or cases where filesystem storage is unavailable.
- //
- // Mem implements the Chonk interface.
- //
- // Mem is thread-safe.
- type Mem struct {
- mu sync.RWMutex
- aums map[AUMHash]AUM
- commitTimes map[AUMHash]time.Time
- clock tstime.Clock
- // parentIndex is a map of AUMs to the AUMs for which they are
- // the parent.
- //
- // For example, if parent index is {1 -> {2, 3, 4}}, that means
- // that AUMs 2, 3, 4 all have aum.PrevAUMHash = 1.
- parentIndex map[AUMHash][]AUMHash
- lastActiveAncestor *AUMHash
- }
- // ChonkMem returns an implementation of Chonk which stores TKA state
- // in-memory.
- func ChonkMem() *Mem {
- return &Mem{
- clock: tstime.DefaultClock{},
- }
- }
- // SetClock sets the clock used by [Mem]. This is only for use in tests,
- // and will panic if called from non-test code.
- func (c *Mem) SetClock(clock tstime.Clock) {
- if !testenv.InTest() {
- panic("used SetClock in non-test code")
- }
- c.clock = clock
- }
- func (c *Mem) SetLastActiveAncestor(hash AUMHash) error {
- c.mu.Lock()
- defer c.mu.Unlock()
- c.lastActiveAncestor = &hash
- return nil
- }
- func (c *Mem) LastActiveAncestor() (*AUMHash, error) {
- c.mu.RLock()
- defer c.mu.RUnlock()
- return c.lastActiveAncestor, nil
- }
- // Heads returns AUMs for which there are no children. In other
- // words, the latest AUM in all chains (the 'leaf').
- func (c *Mem) Heads() ([]AUM, error) {
- c.mu.RLock()
- defer c.mu.RUnlock()
- out := make([]AUM, 0, 6)
- // An AUM is a 'head' if there are no nodes for which it is the parent.
- for _, a := range c.aums {
- if len(c.parentIndex[a.Hash()]) == 0 {
- out = append(out, a)
- }
- }
- return out, nil
- }
- // AUM returns the AUM with the specified digest.
- func (c *Mem) AUM(hash AUMHash) (AUM, error) {
- c.mu.RLock()
- defer c.mu.RUnlock()
- aum, ok := c.aums[hash]
- if !ok {
- return AUM{}, os.ErrNotExist
- }
- return aum, nil
- }
- // ChildAUMs returns all AUMs with a specified previous
- // AUM hash.
- func (c *Mem) ChildAUMs(prevAUMHash AUMHash) ([]AUM, error) {
- c.mu.RLock()
- defer c.mu.RUnlock()
- out := make([]AUM, 0, 6)
- for _, entry := range c.parentIndex[prevAUMHash] {
- out = append(out, c.aums[entry])
- }
- return out, nil
- }
- // CommitVerifiedAUMs durably stores the provided AUMs.
- // Callers MUST ONLY provide well-formed and verified AUMs,
- // as the rest of the TKA implementation assumes that only
- // verified AUMs are stored.
- func (c *Mem) CommitVerifiedAUMs(updates []AUM) error {
- c.mu.Lock()
- defer c.mu.Unlock()
- if c.aums == nil {
- c.parentIndex = make(map[AUMHash][]AUMHash, 64)
- c.aums = make(map[AUMHash]AUM, 64)
- c.commitTimes = make(map[AUMHash]time.Time, 64)
- }
- updateLoop:
- for _, aum := range updates {
- aumHash := aum.Hash()
- c.aums[aumHash] = aum
- c.commitTimes[aumHash] = c.now()
- parent, ok := aum.Parent()
- if ok {
- for _, exists := range c.parentIndex[parent] {
- if exists == aumHash {
- continue updateLoop
- }
- }
- c.parentIndex[parent] = append(c.parentIndex[parent], aumHash)
- }
- }
- return nil
- }
- // now returns the current time, optionally using the overridden
- // clock if set.
- func (c *Mem) now() time.Time {
- if c.clock == nil {
- return time.Now()
- } else {
- return c.clock.Now()
- }
- }
- // RemoveAll permanently and completely clears the TKA state.
- func (c *Mem) RemoveAll() error {
- c.mu.Lock()
- defer c.mu.Unlock()
- c.aums = nil
- c.commitTimes = nil
- c.parentIndex = nil
- c.lastActiveAncestor = nil
- return nil
- }
- // AllAUMs returns all AUMs stored in the chonk.
- func (c *Mem) AllAUMs() ([]AUMHash, error) {
- c.mu.RLock()
- defer c.mu.RUnlock()
- return slices.Collect(maps.Keys(c.aums)), nil
- }
- // CommitTime returns the time at which the AUM was committed.
- //
- // If the AUM does not exist, then os.ErrNotExist is returned.
- func (c *Mem) CommitTime(h AUMHash) (time.Time, error) {
- c.mu.RLock()
- defer c.mu.RUnlock()
- t, ok := c.commitTimes[h]
- if ok {
- return t, nil
- } else {
- return time.Time{}, os.ErrNotExist
- }
- }
- // PurgeAUMs marks the specified AUMs for deletion from storage.
- func (c *Mem) PurgeAUMs(hashes []AUMHash) error {
- c.mu.Lock()
- defer c.mu.Unlock()
- for _, h := range hashes {
- // Remove the deleted AUM from the list of its parents' children.
- //
- // However, we leave the list of this AUM's children in parentIndex,
- // so we can find them later in ChildAUMs().
- if aum, ok := c.aums[h]; ok {
- parent, hasParent := aum.Parent()
- if hasParent {
- c.parentIndex[parent] = slices.DeleteFunc(
- c.parentIndex[parent],
- func(other AUMHash) bool { return bytes.Equal(h[:], other[:]) },
- )
- if len(c.parentIndex[parent]) == 0 {
- delete(c.parentIndex, parent)
- }
- }
- }
- // Delete this AUM from the list of AUMs and commit times.
- delete(c.aums, h)
- delete(c.commitTimes, h)
- }
- return nil
- }
- // FS implements filesystem storage of TKA state.
- //
- // FS implements the Chonk interface.
- type FS struct {
- base string
- mu sync.RWMutex
- }
- // ChonkDir returns an implementation of Chonk which uses the
- // given directory to store TKA state.
- func ChonkDir(dir string) (*FS, error) {
- if err := os.MkdirAll(dir, 0755); err != nil && !os.IsExist(err) {
- return nil, fmt.Errorf("creating chonk root dir: %v", err)
- }
- stat, err := os.Stat(dir)
- if err != nil {
- return nil, err
- }
- if !stat.IsDir() {
- return nil, fmt.Errorf("chonk directory %q is a file", dir)
- }
- // TODO(tom): *FS marks AUMs as deleted but does not actually
- // delete them, to avoid data loss in the event of a bug.
- // Implement deletion after we are fairly sure in the implementation.
- return &FS{base: dir}, nil
- }
- // fsHashInfo describes how information about an AUMHash is represented
- // on disk.
- //
- // The CBOR-serialization of this struct is stored to base/__/base32(hash)
- // where __ are the first two characters of base32(hash).
- //
- // CBOR was chosen because we are already using it and it serializes
- // much smaller than JSON for AUMs. The 'keyasint' thing isn't essential
- // but again it saves a bunch of bytes.
- //
- // We have removed the following fields from fsHashInfo, but they may be
- // present in data stored in existing deployments. Do not reuse these values,
- // to avoid getting unexpected values from legacy data:
- // - cbor:1, Children
- type fsHashInfo struct {
- AUM *AUM `cbor:"2,keyasint"`
- CreatedUnix int64 `cbor:"3,keyasint,omitempty"`
- // PurgedUnix is set when the AUM is deleted. The value is
- // the unix epoch at the time it was deleted.
- //
- // While a non-zero PurgedUnix symbolizes the AUM is deleted,
- // the fsHashInfo entry can continue to exist to track children
- // of this AUMHash.
- PurgedUnix int64 `cbor:"4,keyasint,omitempty"`
- }
- // aumDir returns the directory an AUM is stored in, and its filename
- // within the directory.
- func (c *FS) aumDir(h AUMHash) (dir, base string) {
- s := h.String()
- return filepath.Join(c.base, s[:2]), s
- }
- // AUM returns the AUM with the specified digest.
- //
- // If the AUM does not exist, then os.ErrNotExist is returned.
- func (c *FS) AUM(hash AUMHash) (AUM, error) {
- c.mu.RLock()
- defer c.mu.RUnlock()
- info, err := c.get(hash)
- if err != nil {
- if os.IsNotExist(err) {
- return AUM{}, os.ErrNotExist
- }
- return AUM{}, err
- }
- if info.AUM == nil || info.PurgedUnix > 0 {
- return AUM{}, os.ErrNotExist
- }
- return *info.AUM, nil
- }
- // CommitTime returns the time at which the AUM was committed.
- //
- // If the AUM does not exist, then os.ErrNotExist is returned.
- func (c *FS) CommitTime(h AUMHash) (time.Time, error) {
- c.mu.RLock()
- defer c.mu.RUnlock()
- info, err := c.get(h)
- if err != nil {
- if os.IsNotExist(err) {
- return time.Time{}, os.ErrNotExist
- }
- return time.Time{}, err
- }
- if info.PurgedUnix > 0 {
- return time.Time{}, os.ErrNotExist
- }
- if info.CreatedUnix > 0 {
- return time.Unix(info.CreatedUnix, 0), nil
- }
- // If we got this far, the AUM exists but CreatedUnix is not
- // set, presumably because this AUM was committed using a version
- // of tailscaled that pre-dates the introduction of CreatedUnix.
- // As such, we use the file modification time as a suitable analog.
- dir, base := c.aumDir(h)
- s, err := os.Stat(filepath.Join(dir, base))
- if err != nil {
- return time.Time{}, nil
- }
- return s.ModTime(), nil
- }
- // AUM returns any known AUMs with a specific parent hash.
- func (c *FS) ChildAUMs(prevAUMHash AUMHash) ([]AUM, error) {
- c.mu.RLock()
- defer c.mu.RUnlock()
- var out []AUM
- err := c.scanHashes(func(info *fsHashInfo) {
- if info.AUM != nil && bytes.Equal(info.AUM.PrevAUMHash, prevAUMHash[:]) {
- out = append(out, *info.AUM)
- }
- })
- return out, err
- }
- func (c *FS) get(h AUMHash) (*fsHashInfo, error) {
- dir, base := c.aumDir(h)
- f, err := os.Open(filepath.Join(dir, base))
- if err != nil {
- return nil, err
- }
- defer f.Close()
- m, err := cborDecOpts.DecMode()
- if err != nil {
- return nil, err
- }
- var out fsHashInfo
- if err := m.NewDecoder(f).Decode(&out); err != nil {
- return nil, err
- }
- if out.AUM != nil && out.AUM.Hash() != h {
- return nil, fmt.Errorf("%s: AUM does not match file name hash %s", f.Name(), out.AUM.Hash())
- }
- return &out, nil
- }
- // Heads returns AUMs for which there are no children. In other
- // words, the latest AUM in all possible chains (the 'leaves').
- //
- // Heads is expected to be called infrequently compared to AUM() or
- // ChildAUMs(), so we haven't put any work into maintaining an index.
- // Instead, the full set of AUMs is scanned.
- func (c *FS) Heads() ([]AUM, error) {
- c.mu.RLock()
- defer c.mu.RUnlock()
- // Scan the complete list of AUMs, and build a list of all parent hashes.
- // This tells us which AUMs have children.
- var parentHashes []AUMHash
- allAUMs, err := c.AllAUMs()
- if err != nil {
- return nil, err
- }
- for _, h := range allAUMs {
- aum, err := c.AUM(h)
- if err != nil {
- return nil, err
- }
- parent, hasParent := aum.Parent()
- if !hasParent {
- continue
- }
- if !slices.Contains(parentHashes, parent) {
- parentHashes = append(parentHashes, parent)
- }
- }
- // Now scan a second time, and only include AUMs which weren't marked as
- // the parent of any other AUM.
- out := make([]AUM, 0, 6) // 6 is arbitrary.
- for _, h := range allAUMs {
- if slices.Contains(parentHashes, h) {
- continue
- }
- aum, err := c.AUM(h)
- if err != nil {
- return nil, err
- }
- out = append(out, aum)
- }
- return out, nil
- }
- // RemoveAll permanently and completely clears the TKA state.
- func (c *FS) RemoveAll() error {
- return os.RemoveAll(c.base)
- }
- // AllAUMs returns all AUMs stored in the chonk.
- func (c *FS) AllAUMs() ([]AUMHash, error) {
- c.mu.RLock()
- defer c.mu.RUnlock()
- out := make([]AUMHash, 0, 6) // 6 is arbitrary.
- err := c.scanHashes(func(info *fsHashInfo) {
- if info.AUM != nil {
- out = append(out, info.AUM.Hash())
- }
- })
- return out, err
- }
- func (c *FS) scanHashes(eachHashInfo func(*fsHashInfo)) error {
- prefixDirs, err := os.ReadDir(c.base)
- if err != nil {
- return fmt.Errorf("reading prefix dirs: %v", err)
- }
- for _, prefix := range prefixDirs {
- if !prefix.IsDir() {
- continue
- }
- files, err := os.ReadDir(filepath.Join(c.base, prefix.Name()))
- if err != nil {
- return fmt.Errorf("reading prefix dir: %v", err)
- }
- for _, file := range files {
- // Ignore files whose names aren't valid AUM hashes, which may be
- // temporary files which are partway through being written, or other
- // files added by the OS (like .DS_Store) which we can ignore.
- // TODO(alexc): it might be useful to append a suffix like `.aum` to
- // filenames, so we can more easily distinguish between AUMs and
- // arbitrary other files.
- var h AUMHash
- if err := h.UnmarshalText([]byte(file.Name())); err != nil {
- log.Printf("ignoring unexpected non-AUM: %s: %v", file.Name(), err)
- continue
- }
- info, err := c.get(h)
- if err != nil {
- return fmt.Errorf("reading %x: %v", h, err)
- }
- if info.PurgedUnix > 0 {
- continue
- }
- eachHashInfo(info)
- }
- }
- return nil
- }
- // SetLastActiveAncestor is called to record the oldest-known AUM
- // that contributed to the current state. This value is used as
- // a hint on next startup to determine which chain to pick when computing
- // the current state, if there are multiple distinct chains.
- func (c *FS) SetLastActiveAncestor(hash AUMHash) error {
- c.mu.Lock()
- defer c.mu.Unlock()
- return atomicfile.WriteFile(filepath.Join(c.base, "last_active_ancestor"), hash[:], 0644)
- }
- // LastActiveAncestor returns the oldest-known AUM that was (in a
- // previous run) an ancestor of the current state. This is used
- // as a hint to pick the correct chain in the event that the Chonk stores
- // multiple distinct chains.
- //
- // Nil is returned if no last-active ancestor is set.
- func (c *FS) LastActiveAncestor() (*AUMHash, error) {
- c.mu.RLock()
- defer c.mu.RUnlock()
- hash, err := os.ReadFile(filepath.Join(c.base, "last_active_ancestor"))
- if err != nil {
- if os.IsNotExist(err) {
- return nil, nil // Not exist == none set.
- }
- return nil, err
- }
- var out AUMHash
- if len(hash) != len(out) {
- return nil, fmt.Errorf("stored hash is of wrong length: %d != %d", len(hash), len(out))
- }
- copy(out[:], hash)
- return &out, nil
- }
- // CommitVerifiedAUMs durably stores the provided AUMs.
- // Callers MUST ONLY provide AUMs which are verified (specifically,
- // a call to aumVerify must return a nil error), as the
- // implementation assumes that only verified AUMs are stored.
- func (c *FS) CommitVerifiedAUMs(updates []AUM) error {
- c.mu.Lock()
- defer c.mu.Unlock()
- for i, aum := range updates {
- h := aum.Hash()
- err := c.commit(h, func(info *fsHashInfo) {
- info.PurgedUnix = 0 // just in-case it was set for some reason
- info.AUM = &aum
- })
- if err != nil {
- return fmt.Errorf("committing update[%d] (%x): %v", i, h, err)
- }
- }
- return nil
- }
- // PurgeAUMs marks the specified AUMs for deletion from storage.
- func (c *FS) PurgeAUMs(hashes []AUMHash) error {
- c.mu.Lock()
- defer c.mu.Unlock()
- now := time.Now()
- for i, h := range hashes {
- stored, err := c.get(h)
- if err != nil {
- return fmt.Errorf("reading %d (%x): %w", i, h, err)
- }
- if stored.AUM == nil || stored.PurgedUnix > 0 {
- continue
- }
- err = c.commit(h, func(info *fsHashInfo) {
- info.PurgedUnix = now.Unix()
- })
- if err != nil {
- return fmt.Errorf("committing purge[%d] (%x): %w", i, h, err)
- }
- }
- return nil
- }
- // commit calls the provided updater function to record changes relevant
- // to the given hash. The caller is expected to update the AUM and
- // Children fields, as relevant.
- func (c *FS) commit(h AUMHash, updater func(*fsHashInfo)) error {
- toCommit := fsHashInfo{}
- existing, err := c.get(h)
- switch {
- case os.IsNotExist(err):
- toCommit.CreatedUnix = time.Now().Unix()
- case err != nil:
- return err
- default:
- toCommit = *existing
- }
- updater(&toCommit)
- if toCommit.AUM != nil && toCommit.AUM.Hash() != h {
- return fmt.Errorf("cannot commit AUM with hash %x to %x", toCommit.AUM.Hash(), h)
- }
- dir, base := c.aumDir(h)
- if err := os.MkdirAll(dir, 0755); err != nil && !os.IsExist(err) {
- return fmt.Errorf("creating directory: %v", err)
- }
- m, err := cbor.CTAP2EncOptions().EncMode()
- if err != nil {
- return fmt.Errorf("cbor EncMode: %v", err)
- }
- var buff bytes.Buffer
- if err := m.NewEncoder(&buff).Encode(toCommit); err != nil {
- return fmt.Errorf("encoding: %v", err)
- }
- return atomicfile.WriteFile(filepath.Join(dir, base), buff.Bytes(), 0644)
- }
- // CompactionOptions describes tuneables to use when compacting a Chonk.
- type CompactionOptions struct {
- // The minimum number of ancestor AUMs to remember. The actual length
- // of the chain post-compaction may be longer to reach a Checkpoint AUM.
- MinChain int
- // The minimum duration to store an AUM before it is a candidate for deletion.
- MinAge time.Duration
- }
- // retainState tracks the state of an AUM hash as it is being considered for
- // deletion.
- type retainState uint8
- // Valid retainState flags.
- const (
- retainStateActive retainState = 1 << iota // The AUM is part of the active chain and less than MinChain hops from HEAD.
- retainStateYoung // The AUM is younger than MinAge.
- retainStateLeaf // The AUM is a descendant of an AUM to be retained.
- retainStateAncestor // The AUM is part of a chain between a retained AUM and the new lastActiveAncestor.
- retainStateCandidate // The AUM is part of the active chain.
- // retainAUMMask is a bit mask of any bit which should prevent
- // the deletion of an AUM.
- retainAUMMask retainState = retainStateActive | retainStateYoung | retainStateLeaf | retainStateAncestor
- )
- // markActiveChain marks AUMs in the active chain.
- // All AUMs that are within minChain ancestors of head, or are marked as young, are
- // marked retainStateActive, and all remaining ancestors are
- // marked retainStateCandidate.
- //
- // markActiveChain returns the next ancestor AUM which is a checkpoint AUM.
- func markActiveChain(storage Chonk, verdict map[AUMHash]retainState, minChain int, head AUMHash) (lastActiveAncestor AUMHash, err error) {
- next, err := storage.AUM(head)
- if err != nil {
- return AUMHash{}, err
- }
- for i := range minChain {
- h := next.Hash()
- verdict[h] |= retainStateActive
- parent, hasParent := next.Parent()
- if !hasParent {
- // Genesis AUM (beginning of time). The chain isnt long enough to need truncating.
- return h, nil
- }
- if next, err = storage.AUM(parent); err != nil {
- if err == os.ErrNotExist {
- // We've reached the end of the chain we have stored.
- return h, nil
- }
- return AUMHash{}, fmt.Errorf("reading active chain (retainStateActive) (%d, %v): %w", i, parent, err)
- }
- }
- // If we got this far, we have at least minChain AUMs stored, and minChain number
- // of ancestors have been marked for retention. We now continue to iterate backwards
- // till we find an AUM which we can compact to: either a Checkpoint AUM which is old
- // enough, or the genesis AUM.
- for {
- h := next.Hash()
- verdict[h] |= retainStateActive
- parent, hasParent := next.Parent()
- isYoung := verdict[h]&retainStateYoung != 0
- if next.MessageKind == AUMCheckpoint {
- lastActiveAncestor = h
- if !isYoung || !hasParent {
- break
- }
- }
- if next, err = storage.AUM(parent); err != nil {
- return AUMHash{}, fmt.Errorf("searching for compaction target (%v): %w", parent, err)
- }
- }
- // Mark remaining known ancestors as retainStateCandidate.
- for {
- parent, hasParent := next.Parent()
- if !hasParent {
- break
- }
- verdict[parent] |= retainStateCandidate
- if next, err = storage.AUM(parent); err != nil {
- if err == os.ErrNotExist {
- // We've reached the end of the chain we have stored.
- break
- }
- return AUMHash{}, fmt.Errorf("reading active chain (retainStateCandidate, %v): %w", parent, err)
- }
- }
- return lastActiveAncestor, nil
- }
- // markYoungAUMs marks all AUMs younger than minAge for retention. All
- // candidate AUMs must exist in verdict.
- func markYoungAUMs(storage CompactableChonk, verdict map[AUMHash]retainState, minAge time.Duration) error {
- minTime := time.Now().Add(-minAge)
- for h := range verdict {
- commitTime, err := storage.CommitTime(h)
- if err != nil {
- return err
- }
- if commitTime.After(minTime) {
- verdict[h] |= retainStateYoung
- }
- }
- return nil
- }
- // markAncestorIntersectionAUMs walks backwards from all AUMs to be retained,
- // ensuring they intersect with candidateAncestor. All AUMs between a retained
- // AUM and candidateAncestor are marked for retention.
- //
- // If there is no intersection between candidateAncestor and the ancestors of
- // a retained AUM (this can happen if a retained AUM intersects the main chain
- // before candidateAncestor) then candidate ancestor is recomputed based on
- // the new oldest intersection.
- //
- // The final value for lastActiveAncestor is returned.
- func markAncestorIntersectionAUMs(storage Chonk, verdict map[AUMHash]retainState, candidateAncestor AUMHash) (lastActiveAncestor AUMHash, err error) {
- toScan := make([]AUMHash, 0, len(verdict))
- for h, v := range verdict {
- if (v & retainAUMMask) == 0 {
- continue // not marked for retention, so don't need to consider it
- }
- if h == candidateAncestor {
- continue
- }
- toScan = append(toScan, h)
- }
- var didAdjustCandidateAncestor bool
- for len(toScan) > 0 {
- nextIterScan := make([]AUMHash, 0, len(verdict))
- for _, h := range toScan {
- if verdict[h]&retainStateAncestor != 0 {
- // This AUM and its ancestors have already been iterated.
- continue
- }
- verdict[h] |= retainStateAncestor
- a, err := storage.AUM(h)
- if err != nil {
- return AUMHash{}, fmt.Errorf("reading %v: %w", h, err)
- }
- parent, hasParent := a.Parent()
- if !hasParent {
- return AUMHash{}, errors.New("reached genesis AUM without intersecting with candidate ancestor")
- }
- if verdict[parent]&retainAUMMask != 0 {
- // Includes candidateAncestor (has retainStateActive set)
- continue
- }
- if verdict[parent]&retainStateCandidate != 0 {
- // We've intersected with the active chain but haven't done so through
- // candidateAncestor. That means that we intersect the active chain
- // before candidateAncestor, hence candidateAncestor actually needs
- // to be earlier than it is now.
- candidateAncestor = parent
- didAdjustCandidateAncestor = true
- verdict[parent] |= retainStateAncestor
- // There could be AUMs on the active chain between our new candidateAncestor
- // and the old one, make sure they are marked as retained.
- next := parent
- childLoop:
- for {
- children, err := storage.ChildAUMs(next)
- if err != nil {
- return AUMHash{}, fmt.Errorf("reading children %v: %w", next, err)
- }
- // While there can be many children of an AUM, there can only be
- // one child on the active chain (it will have retainStateCandidate set).
- for _, a := range children {
- h := a.Hash()
- if v := verdict[h]; v&retainStateCandidate != 0 && v&retainStateActive == 0 {
- verdict[h] |= retainStateAncestor
- next = h
- continue childLoop
- }
- }
- break
- }
- }
- nextIterScan = append(nextIterScan, parent)
- }
- toScan = nextIterScan
- }
- // If candidateAncestor was adjusted backwards, then it may not be a checkpoint
- // (and hence a valid compaction candidate). If so, iterate backwards and adjust
- // the candidateAncestor till we find a checkpoint.
- if didAdjustCandidateAncestor {
- var next AUM
- if next, err = storage.AUM(candidateAncestor); err != nil {
- return AUMHash{}, fmt.Errorf("searching for compaction target (%v): %w", candidateAncestor, err)
- }
- for {
- h := next.Hash()
- verdict[h] |= retainStateActive
- if next.MessageKind == AUMCheckpoint {
- candidateAncestor = h
- break
- }
- parent, hasParent := next.Parent()
- if !hasParent {
- return AUMHash{}, errors.New("reached genesis AUM without finding an appropriate candidateAncestor")
- }
- if next, err = storage.AUM(parent); err != nil {
- return AUMHash{}, fmt.Errorf("searching for compaction target (%v): %w", parent, err)
- }
- }
- }
- return candidateAncestor, nil
- }
- // markDescendantAUMs marks all children of a retained AUM as retained.
- func markDescendantAUMs(storage Chonk, verdict map[AUMHash]retainState) error {
- toScan := make([]AUMHash, 0, len(verdict))
- for h, v := range verdict {
- if v&retainAUMMask == 0 {
- continue // not marked, so don't need to mark descendants
- }
- toScan = append(toScan, h)
- }
- for len(toScan) > 0 {
- nextIterScan := make([]AUMHash, 0, len(verdict))
- for _, h := range toScan {
- if verdict[h]&retainStateLeaf != 0 {
- // This AUM and its descendants have already been marked.
- continue
- }
- verdict[h] |= retainStateLeaf
- children, err := storage.ChildAUMs(h)
- if err != nil {
- return err
- }
- for _, a := range children {
- nextIterScan = append(nextIterScan, a.Hash())
- }
- }
- toScan = nextIterScan
- }
- return nil
- }
- // Compact deletes old AUMs from storage, based on the parameters given in opts.
- func Compact(storage CompactableChonk, head AUMHash, opts CompactionOptions) (lastActiveAncestor AUMHash, err error) {
- if opts.MinChain == 0 {
- return AUMHash{}, errors.New("opts.MinChain must be set")
- }
- if opts.MinAge == 0 {
- return AUMHash{}, errors.New("opts.MinAge must be set")
- }
- all, err := storage.AllAUMs()
- if err != nil {
- return AUMHash{}, fmt.Errorf("AllAUMs: %w", err)
- }
- verdict := make(map[AUMHash]retainState, len(all))
- for _, h := range all {
- verdict[h] = 0
- }
- if err := markYoungAUMs(storage, verdict, opts.MinAge); err != nil {
- return AUMHash{}, fmt.Errorf("marking young AUMs: %w", err)
- }
- if lastActiveAncestor, err = markActiveChain(storage, verdict, opts.MinChain, head); err != nil {
- return AUMHash{}, fmt.Errorf("marking active chain: %w", err)
- }
- if err := markDescendantAUMs(storage, verdict); err != nil {
- return AUMHash{}, fmt.Errorf("marking descendant AUMs: %w", err)
- }
- if lastActiveAncestor, err = markAncestorIntersectionAUMs(storage, verdict, lastActiveAncestor); err != nil {
- return AUMHash{}, fmt.Errorf("marking ancestor intersection: %w", err)
- }
- toDelete := make([]AUMHash, 0, len(verdict))
- for h, v := range verdict {
- if v&retainAUMMask == 0 { // no retention set
- toDelete = append(toDelete, h)
- }
- }
- if err := storage.SetLastActiveAncestor(lastActiveAncestor); err != nil {
- return AUMHash{}, err
- }
- return lastActiveAncestor, storage.PurgeAUMs(toDelete)
- }
|