tailchonk.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. //go:build !ts_omit_tailnetlock
  4. package tka
  5. import (
  6. "bytes"
  7. "errors"
  8. "fmt"
  9. "log"
  10. "maps"
  11. "os"
  12. "path/filepath"
  13. "slices"
  14. "sync"
  15. "time"
  16. "github.com/fxamacker/cbor/v2"
  17. "tailscale.com/atomicfile"
  18. "tailscale.com/tstime"
  19. "tailscale.com/util/testenv"
  20. )
  21. // Chonk implementations provide durable storage for AUMs and other
  22. // TKA state.
  23. //
  24. // All methods must be thread-safe.
  25. //
  26. // The name 'tailchonk' was coined by @catzkorn.
  27. type Chonk interface {
  28. // AUM returns the AUM with the specified digest.
  29. //
  30. // If the AUM does not exist, then os.ErrNotExist is returned.
  31. AUM(hash AUMHash) (AUM, error)
  32. // ChildAUMs returns all AUMs with a specified previous
  33. // AUM hash.
  34. ChildAUMs(prevAUMHash AUMHash) ([]AUM, error)
  35. // CommitVerifiedAUMs durably stores the provided AUMs.
  36. // Callers MUST ONLY provide AUMs which are verified (specifically,
  37. // a call to aumVerify() must return a nil error).
  38. // as the implementation assumes that only verified AUMs are stored.
  39. CommitVerifiedAUMs(updates []AUM) error
  40. // Heads returns AUMs for which there are no children. In other
  41. // words, the latest AUM in all possible chains (the 'leaves').
  42. Heads() ([]AUM, error)
  43. // SetLastActiveAncestor is called to record the oldest-known AUM
  44. // that contributed to the current state. This value is used as
  45. // a hint on next startup to determine which chain to pick when computing
  46. // the current state, if there are multiple distinct chains.
  47. SetLastActiveAncestor(hash AUMHash) error
  48. // LastActiveAncestor returns the oldest-known AUM that was (in a
  49. // previous run) an ancestor of the current state. This is used
  50. // as a hint to pick the correct chain in the event that the Chonk stores
  51. // multiple distinct chains.
  52. LastActiveAncestor() (*AUMHash, error)
  53. }
  54. // CompactableChonk implementation are extensions of Chonk, which are
  55. // able to be operated by compaction logic to deleted old AUMs.
  56. type CompactableChonk interface {
  57. Chonk
  58. // AllAUMs returns all AUMs stored in the chonk.
  59. AllAUMs() ([]AUMHash, error)
  60. // CommitTime returns the time at which the AUM was committed.
  61. //
  62. // If the AUM does not exist, then os.ErrNotExist is returned.
  63. CommitTime(hash AUMHash) (time.Time, error)
  64. // PurgeAUMs permanently and irrevocably deletes the specified
  65. // AUMs from storage.
  66. PurgeAUMs(hashes []AUMHash) error
  67. // RemoveAll permanently and completely clears the TKA state. This should
  68. // be called when the user disables Tailnet Lock.
  69. RemoveAll() error
  70. }
  71. // Mem implements in-memory storage of TKA state, suitable for
  72. // tests or cases where filesystem storage is unavailable.
  73. //
  74. // Mem implements the Chonk interface.
  75. //
  76. // Mem is thread-safe.
  77. type Mem struct {
  78. mu sync.RWMutex
  79. aums map[AUMHash]AUM
  80. commitTimes map[AUMHash]time.Time
  81. clock tstime.Clock
  82. // parentIndex is a map of AUMs to the AUMs for which they are
  83. // the parent.
  84. //
  85. // For example, if parent index is {1 -> {2, 3, 4}}, that means
  86. // that AUMs 2, 3, 4 all have aum.PrevAUMHash = 1.
  87. parentIndex map[AUMHash][]AUMHash
  88. lastActiveAncestor *AUMHash
  89. }
  90. // ChonkMem returns an implementation of Chonk which stores TKA state
  91. // in-memory.
  92. func ChonkMem() *Mem {
  93. return &Mem{
  94. clock: tstime.DefaultClock{},
  95. }
  96. }
  97. // SetClock sets the clock used by [Mem]. This is only for use in tests,
  98. // and will panic if called from non-test code.
  99. func (c *Mem) SetClock(clock tstime.Clock) {
  100. if !testenv.InTest() {
  101. panic("used SetClock in non-test code")
  102. }
  103. c.clock = clock
  104. }
  105. func (c *Mem) SetLastActiveAncestor(hash AUMHash) error {
  106. c.mu.Lock()
  107. defer c.mu.Unlock()
  108. c.lastActiveAncestor = &hash
  109. return nil
  110. }
  111. func (c *Mem) LastActiveAncestor() (*AUMHash, error) {
  112. c.mu.RLock()
  113. defer c.mu.RUnlock()
  114. return c.lastActiveAncestor, nil
  115. }
  116. // Heads returns AUMs for which there are no children. In other
  117. // words, the latest AUM in all chains (the 'leaf').
  118. func (c *Mem) Heads() ([]AUM, error) {
  119. c.mu.RLock()
  120. defer c.mu.RUnlock()
  121. out := make([]AUM, 0, 6)
  122. // An AUM is a 'head' if there are no nodes for which it is the parent.
  123. for _, a := range c.aums {
  124. if len(c.parentIndex[a.Hash()]) == 0 {
  125. out = append(out, a)
  126. }
  127. }
  128. return out, nil
  129. }
  130. // AUM returns the AUM with the specified digest.
  131. func (c *Mem) AUM(hash AUMHash) (AUM, error) {
  132. c.mu.RLock()
  133. defer c.mu.RUnlock()
  134. aum, ok := c.aums[hash]
  135. if !ok {
  136. return AUM{}, os.ErrNotExist
  137. }
  138. return aum, nil
  139. }
  140. // ChildAUMs returns all AUMs with a specified previous
  141. // AUM hash.
  142. func (c *Mem) ChildAUMs(prevAUMHash AUMHash) ([]AUM, error) {
  143. c.mu.RLock()
  144. defer c.mu.RUnlock()
  145. out := make([]AUM, 0, 6)
  146. for _, entry := range c.parentIndex[prevAUMHash] {
  147. out = append(out, c.aums[entry])
  148. }
  149. return out, nil
  150. }
  151. // CommitVerifiedAUMs durably stores the provided AUMs.
  152. // Callers MUST ONLY provide well-formed and verified AUMs,
  153. // as the rest of the TKA implementation assumes that only
  154. // verified AUMs are stored.
  155. func (c *Mem) CommitVerifiedAUMs(updates []AUM) error {
  156. c.mu.Lock()
  157. defer c.mu.Unlock()
  158. if c.aums == nil {
  159. c.parentIndex = make(map[AUMHash][]AUMHash, 64)
  160. c.aums = make(map[AUMHash]AUM, 64)
  161. c.commitTimes = make(map[AUMHash]time.Time, 64)
  162. }
  163. updateLoop:
  164. for _, aum := range updates {
  165. aumHash := aum.Hash()
  166. c.aums[aumHash] = aum
  167. c.commitTimes[aumHash] = c.now()
  168. parent, ok := aum.Parent()
  169. if ok {
  170. for _, exists := range c.parentIndex[parent] {
  171. if exists == aumHash {
  172. continue updateLoop
  173. }
  174. }
  175. c.parentIndex[parent] = append(c.parentIndex[parent], aumHash)
  176. }
  177. }
  178. return nil
  179. }
  180. // now returns the current time, optionally using the overridden
  181. // clock if set.
  182. func (c *Mem) now() time.Time {
  183. if c.clock == nil {
  184. return time.Now()
  185. } else {
  186. return c.clock.Now()
  187. }
  188. }
  189. // RemoveAll permanently and completely clears the TKA state.
  190. func (c *Mem) RemoveAll() error {
  191. c.mu.Lock()
  192. defer c.mu.Unlock()
  193. c.aums = nil
  194. c.commitTimes = nil
  195. c.parentIndex = nil
  196. c.lastActiveAncestor = nil
  197. return nil
  198. }
  199. // AllAUMs returns all AUMs stored in the chonk.
  200. func (c *Mem) AllAUMs() ([]AUMHash, error) {
  201. c.mu.RLock()
  202. defer c.mu.RUnlock()
  203. return slices.Collect(maps.Keys(c.aums)), nil
  204. }
  205. // CommitTime returns the time at which the AUM was committed.
  206. //
  207. // If the AUM does not exist, then os.ErrNotExist is returned.
  208. func (c *Mem) CommitTime(h AUMHash) (time.Time, error) {
  209. c.mu.RLock()
  210. defer c.mu.RUnlock()
  211. t, ok := c.commitTimes[h]
  212. if ok {
  213. return t, nil
  214. } else {
  215. return time.Time{}, os.ErrNotExist
  216. }
  217. }
  218. // PurgeAUMs marks the specified AUMs for deletion from storage.
  219. func (c *Mem) PurgeAUMs(hashes []AUMHash) error {
  220. c.mu.Lock()
  221. defer c.mu.Unlock()
  222. for _, h := range hashes {
  223. // Remove the deleted AUM from the list of its parents' children.
  224. //
  225. // However, we leave the list of this AUM's children in parentIndex,
  226. // so we can find them later in ChildAUMs().
  227. if aum, ok := c.aums[h]; ok {
  228. parent, hasParent := aum.Parent()
  229. if hasParent {
  230. c.parentIndex[parent] = slices.DeleteFunc(
  231. c.parentIndex[parent],
  232. func(other AUMHash) bool { return bytes.Equal(h[:], other[:]) },
  233. )
  234. if len(c.parentIndex[parent]) == 0 {
  235. delete(c.parentIndex, parent)
  236. }
  237. }
  238. }
  239. // Delete this AUM from the list of AUMs and commit times.
  240. delete(c.aums, h)
  241. delete(c.commitTimes, h)
  242. }
  243. return nil
  244. }
  245. // FS implements filesystem storage of TKA state.
  246. //
  247. // FS implements the Chonk interface.
  248. type FS struct {
  249. base string
  250. mu sync.RWMutex
  251. }
  252. // ChonkDir returns an implementation of Chonk which uses the
  253. // given directory to store TKA state.
  254. func ChonkDir(dir string) (*FS, error) {
  255. if err := os.MkdirAll(dir, 0755); err != nil && !os.IsExist(err) {
  256. return nil, fmt.Errorf("creating chonk root dir: %v", err)
  257. }
  258. stat, err := os.Stat(dir)
  259. if err != nil {
  260. return nil, err
  261. }
  262. if !stat.IsDir() {
  263. return nil, fmt.Errorf("chonk directory %q is a file", dir)
  264. }
  265. // TODO(tom): *FS marks AUMs as deleted but does not actually
  266. // delete them, to avoid data loss in the event of a bug.
  267. // Implement deletion after we are fairly sure in the implementation.
  268. return &FS{base: dir}, nil
  269. }
  270. // fsHashInfo describes how information about an AUMHash is represented
  271. // on disk.
  272. //
  273. // The CBOR-serialization of this struct is stored to base/__/base32(hash)
  274. // where __ are the first two characters of base32(hash).
  275. //
  276. // CBOR was chosen because we are already using it and it serializes
  277. // much smaller than JSON for AUMs. The 'keyasint' thing isn't essential
  278. // but again it saves a bunch of bytes.
  279. //
  280. // We have removed the following fields from fsHashInfo, but they may be
  281. // present in data stored in existing deployments. Do not reuse these values,
  282. // to avoid getting unexpected values from legacy data:
  283. // - cbor:1, Children
  284. type fsHashInfo struct {
  285. AUM *AUM `cbor:"2,keyasint"`
  286. CreatedUnix int64 `cbor:"3,keyasint,omitempty"`
  287. // PurgedUnix is set when the AUM is deleted. The value is
  288. // the unix epoch at the time it was deleted.
  289. //
  290. // While a non-zero PurgedUnix symbolizes the AUM is deleted,
  291. // the fsHashInfo entry can continue to exist to track children
  292. // of this AUMHash.
  293. PurgedUnix int64 `cbor:"4,keyasint,omitempty"`
  294. }
  295. // aumDir returns the directory an AUM is stored in, and its filename
  296. // within the directory.
  297. func (c *FS) aumDir(h AUMHash) (dir, base string) {
  298. s := h.String()
  299. return filepath.Join(c.base, s[:2]), s
  300. }
  301. // AUM returns the AUM with the specified digest.
  302. //
  303. // If the AUM does not exist, then os.ErrNotExist is returned.
  304. func (c *FS) AUM(hash AUMHash) (AUM, error) {
  305. c.mu.RLock()
  306. defer c.mu.RUnlock()
  307. info, err := c.get(hash)
  308. if err != nil {
  309. if os.IsNotExist(err) {
  310. return AUM{}, os.ErrNotExist
  311. }
  312. return AUM{}, err
  313. }
  314. if info.AUM == nil || info.PurgedUnix > 0 {
  315. return AUM{}, os.ErrNotExist
  316. }
  317. return *info.AUM, nil
  318. }
  319. // CommitTime returns the time at which the AUM was committed.
  320. //
  321. // If the AUM does not exist, then os.ErrNotExist is returned.
  322. func (c *FS) CommitTime(h AUMHash) (time.Time, error) {
  323. c.mu.RLock()
  324. defer c.mu.RUnlock()
  325. info, err := c.get(h)
  326. if err != nil {
  327. if os.IsNotExist(err) {
  328. return time.Time{}, os.ErrNotExist
  329. }
  330. return time.Time{}, err
  331. }
  332. if info.PurgedUnix > 0 {
  333. return time.Time{}, os.ErrNotExist
  334. }
  335. if info.CreatedUnix > 0 {
  336. return time.Unix(info.CreatedUnix, 0), nil
  337. }
  338. // If we got this far, the AUM exists but CreatedUnix is not
  339. // set, presumably because this AUM was committed using a version
  340. // of tailscaled that pre-dates the introduction of CreatedUnix.
  341. // As such, we use the file modification time as a suitable analog.
  342. dir, base := c.aumDir(h)
  343. s, err := os.Stat(filepath.Join(dir, base))
  344. if err != nil {
  345. return time.Time{}, nil
  346. }
  347. return s.ModTime(), nil
  348. }
  349. // AUM returns any known AUMs with a specific parent hash.
  350. func (c *FS) ChildAUMs(prevAUMHash AUMHash) ([]AUM, error) {
  351. c.mu.RLock()
  352. defer c.mu.RUnlock()
  353. var out []AUM
  354. err := c.scanHashes(func(info *fsHashInfo) {
  355. if info.AUM != nil && bytes.Equal(info.AUM.PrevAUMHash, prevAUMHash[:]) {
  356. out = append(out, *info.AUM)
  357. }
  358. })
  359. return out, err
  360. }
  361. func (c *FS) get(h AUMHash) (*fsHashInfo, error) {
  362. dir, base := c.aumDir(h)
  363. f, err := os.Open(filepath.Join(dir, base))
  364. if err != nil {
  365. return nil, err
  366. }
  367. defer f.Close()
  368. m, err := cborDecOpts.DecMode()
  369. if err != nil {
  370. return nil, err
  371. }
  372. var out fsHashInfo
  373. if err := m.NewDecoder(f).Decode(&out); err != nil {
  374. return nil, err
  375. }
  376. if out.AUM != nil && out.AUM.Hash() != h {
  377. return nil, fmt.Errorf("%s: AUM does not match file name hash %s", f.Name(), out.AUM.Hash())
  378. }
  379. return &out, nil
  380. }
  381. // Heads returns AUMs for which there are no children. In other
  382. // words, the latest AUM in all possible chains (the 'leaves').
  383. //
  384. // Heads is expected to be called infrequently compared to AUM() or
  385. // ChildAUMs(), so we haven't put any work into maintaining an index.
  386. // Instead, the full set of AUMs is scanned.
  387. func (c *FS) Heads() ([]AUM, error) {
  388. c.mu.RLock()
  389. defer c.mu.RUnlock()
  390. // Scan the complete list of AUMs, and build a list of all parent hashes.
  391. // This tells us which AUMs have children.
  392. var parentHashes []AUMHash
  393. allAUMs, err := c.AllAUMs()
  394. if err != nil {
  395. return nil, err
  396. }
  397. for _, h := range allAUMs {
  398. aum, err := c.AUM(h)
  399. if err != nil {
  400. return nil, err
  401. }
  402. parent, hasParent := aum.Parent()
  403. if !hasParent {
  404. continue
  405. }
  406. if !slices.Contains(parentHashes, parent) {
  407. parentHashes = append(parentHashes, parent)
  408. }
  409. }
  410. // Now scan a second time, and only include AUMs which weren't marked as
  411. // the parent of any other AUM.
  412. out := make([]AUM, 0, 6) // 6 is arbitrary.
  413. for _, h := range allAUMs {
  414. if slices.Contains(parentHashes, h) {
  415. continue
  416. }
  417. aum, err := c.AUM(h)
  418. if err != nil {
  419. return nil, err
  420. }
  421. out = append(out, aum)
  422. }
  423. return out, nil
  424. }
  425. // RemoveAll permanently and completely clears the TKA state.
  426. func (c *FS) RemoveAll() error {
  427. return os.RemoveAll(c.base)
  428. }
  429. // AllAUMs returns all AUMs stored in the chonk.
  430. func (c *FS) AllAUMs() ([]AUMHash, error) {
  431. c.mu.RLock()
  432. defer c.mu.RUnlock()
  433. out := make([]AUMHash, 0, 6) // 6 is arbitrary.
  434. err := c.scanHashes(func(info *fsHashInfo) {
  435. if info.AUM != nil {
  436. out = append(out, info.AUM.Hash())
  437. }
  438. })
  439. return out, err
  440. }
  441. func (c *FS) scanHashes(eachHashInfo func(*fsHashInfo)) error {
  442. prefixDirs, err := os.ReadDir(c.base)
  443. if err != nil {
  444. return fmt.Errorf("reading prefix dirs: %v", err)
  445. }
  446. for _, prefix := range prefixDirs {
  447. if !prefix.IsDir() {
  448. continue
  449. }
  450. files, err := os.ReadDir(filepath.Join(c.base, prefix.Name()))
  451. if err != nil {
  452. return fmt.Errorf("reading prefix dir: %v", err)
  453. }
  454. for _, file := range files {
  455. // Ignore files whose names aren't valid AUM hashes, which may be
  456. // temporary files which are partway through being written, or other
  457. // files added by the OS (like .DS_Store) which we can ignore.
  458. // TODO(alexc): it might be useful to append a suffix like `.aum` to
  459. // filenames, so we can more easily distinguish between AUMs and
  460. // arbitrary other files.
  461. var h AUMHash
  462. if err := h.UnmarshalText([]byte(file.Name())); err != nil {
  463. log.Printf("ignoring unexpected non-AUM: %s: %v", file.Name(), err)
  464. continue
  465. }
  466. info, err := c.get(h)
  467. if err != nil {
  468. return fmt.Errorf("reading %x: %v", h, err)
  469. }
  470. if info.PurgedUnix > 0 {
  471. continue
  472. }
  473. eachHashInfo(info)
  474. }
  475. }
  476. return nil
  477. }
  478. // SetLastActiveAncestor is called to record the oldest-known AUM
  479. // that contributed to the current state. This value is used as
  480. // a hint on next startup to determine which chain to pick when computing
  481. // the current state, if there are multiple distinct chains.
  482. func (c *FS) SetLastActiveAncestor(hash AUMHash) error {
  483. c.mu.Lock()
  484. defer c.mu.Unlock()
  485. return atomicfile.WriteFile(filepath.Join(c.base, "last_active_ancestor"), hash[:], 0644)
  486. }
  487. // LastActiveAncestor returns the oldest-known AUM that was (in a
  488. // previous run) an ancestor of the current state. This is used
  489. // as a hint to pick the correct chain in the event that the Chonk stores
  490. // multiple distinct chains.
  491. //
  492. // Nil is returned if no last-active ancestor is set.
  493. func (c *FS) LastActiveAncestor() (*AUMHash, error) {
  494. c.mu.RLock()
  495. defer c.mu.RUnlock()
  496. hash, err := os.ReadFile(filepath.Join(c.base, "last_active_ancestor"))
  497. if err != nil {
  498. if os.IsNotExist(err) {
  499. return nil, nil // Not exist == none set.
  500. }
  501. return nil, err
  502. }
  503. var out AUMHash
  504. if len(hash) != len(out) {
  505. return nil, fmt.Errorf("stored hash is of wrong length: %d != %d", len(hash), len(out))
  506. }
  507. copy(out[:], hash)
  508. return &out, nil
  509. }
  510. // CommitVerifiedAUMs durably stores the provided AUMs.
  511. // Callers MUST ONLY provide AUMs which are verified (specifically,
  512. // a call to aumVerify must return a nil error), as the
  513. // implementation assumes that only verified AUMs are stored.
  514. func (c *FS) CommitVerifiedAUMs(updates []AUM) error {
  515. c.mu.Lock()
  516. defer c.mu.Unlock()
  517. for i, aum := range updates {
  518. h := aum.Hash()
  519. err := c.commit(h, func(info *fsHashInfo) {
  520. info.PurgedUnix = 0 // just in-case it was set for some reason
  521. info.AUM = &aum
  522. })
  523. if err != nil {
  524. return fmt.Errorf("committing update[%d] (%x): %v", i, h, err)
  525. }
  526. }
  527. return nil
  528. }
  529. // PurgeAUMs marks the specified AUMs for deletion from storage.
  530. func (c *FS) PurgeAUMs(hashes []AUMHash) error {
  531. c.mu.Lock()
  532. defer c.mu.Unlock()
  533. now := time.Now()
  534. for i, h := range hashes {
  535. stored, err := c.get(h)
  536. if err != nil {
  537. return fmt.Errorf("reading %d (%x): %w", i, h, err)
  538. }
  539. if stored.AUM == nil || stored.PurgedUnix > 0 {
  540. continue
  541. }
  542. err = c.commit(h, func(info *fsHashInfo) {
  543. info.PurgedUnix = now.Unix()
  544. })
  545. if err != nil {
  546. return fmt.Errorf("committing purge[%d] (%x): %w", i, h, err)
  547. }
  548. }
  549. return nil
  550. }
  551. // commit calls the provided updater function to record changes relevant
  552. // to the given hash. The caller is expected to update the AUM and
  553. // Children fields, as relevant.
  554. func (c *FS) commit(h AUMHash, updater func(*fsHashInfo)) error {
  555. toCommit := fsHashInfo{}
  556. existing, err := c.get(h)
  557. switch {
  558. case os.IsNotExist(err):
  559. toCommit.CreatedUnix = time.Now().Unix()
  560. case err != nil:
  561. return err
  562. default:
  563. toCommit = *existing
  564. }
  565. updater(&toCommit)
  566. if toCommit.AUM != nil && toCommit.AUM.Hash() != h {
  567. return fmt.Errorf("cannot commit AUM with hash %x to %x", toCommit.AUM.Hash(), h)
  568. }
  569. dir, base := c.aumDir(h)
  570. if err := os.MkdirAll(dir, 0755); err != nil && !os.IsExist(err) {
  571. return fmt.Errorf("creating directory: %v", err)
  572. }
  573. m, err := cbor.CTAP2EncOptions().EncMode()
  574. if err != nil {
  575. return fmt.Errorf("cbor EncMode: %v", err)
  576. }
  577. var buff bytes.Buffer
  578. if err := m.NewEncoder(&buff).Encode(toCommit); err != nil {
  579. return fmt.Errorf("encoding: %v", err)
  580. }
  581. return atomicfile.WriteFile(filepath.Join(dir, base), buff.Bytes(), 0644)
  582. }
  583. // CompactionOptions describes tuneables to use when compacting a Chonk.
  584. type CompactionOptions struct {
  585. // The minimum number of ancestor AUMs to remember. The actual length
  586. // of the chain post-compaction may be longer to reach a Checkpoint AUM.
  587. MinChain int
  588. // The minimum duration to store an AUM before it is a candidate for deletion.
  589. MinAge time.Duration
  590. }
  591. // retainState tracks the state of an AUM hash as it is being considered for
  592. // deletion.
  593. type retainState uint8
  594. // Valid retainState flags.
  595. const (
  596. retainStateActive retainState = 1 << iota // The AUM is part of the active chain and less than MinChain hops from HEAD.
  597. retainStateYoung // The AUM is younger than MinAge.
  598. retainStateLeaf // The AUM is a descendant of an AUM to be retained.
  599. retainStateAncestor // The AUM is part of a chain between a retained AUM and the new lastActiveAncestor.
  600. retainStateCandidate // The AUM is part of the active chain.
  601. // retainAUMMask is a bit mask of any bit which should prevent
  602. // the deletion of an AUM.
  603. retainAUMMask retainState = retainStateActive | retainStateYoung | retainStateLeaf | retainStateAncestor
  604. )
  605. // markActiveChain marks AUMs in the active chain.
  606. // All AUMs that are within minChain ancestors of head, or are marked as young, are
  607. // marked retainStateActive, and all remaining ancestors are
  608. // marked retainStateCandidate.
  609. //
  610. // markActiveChain returns the next ancestor AUM which is a checkpoint AUM.
  611. func markActiveChain(storage Chonk, verdict map[AUMHash]retainState, minChain int, head AUMHash) (lastActiveAncestor AUMHash, err error) {
  612. next, err := storage.AUM(head)
  613. if err != nil {
  614. return AUMHash{}, err
  615. }
  616. for i := range minChain {
  617. h := next.Hash()
  618. verdict[h] |= retainStateActive
  619. parent, hasParent := next.Parent()
  620. if !hasParent {
  621. // Genesis AUM (beginning of time). The chain isnt long enough to need truncating.
  622. return h, nil
  623. }
  624. if next, err = storage.AUM(parent); err != nil {
  625. if err == os.ErrNotExist {
  626. // We've reached the end of the chain we have stored.
  627. return h, nil
  628. }
  629. return AUMHash{}, fmt.Errorf("reading active chain (retainStateActive) (%d, %v): %w", i, parent, err)
  630. }
  631. }
  632. // If we got this far, we have at least minChain AUMs stored, and minChain number
  633. // of ancestors have been marked for retention. We now continue to iterate backwards
  634. // till we find an AUM which we can compact to: either a Checkpoint AUM which is old
  635. // enough, or the genesis AUM.
  636. for {
  637. h := next.Hash()
  638. verdict[h] |= retainStateActive
  639. parent, hasParent := next.Parent()
  640. isYoung := verdict[h]&retainStateYoung != 0
  641. if next.MessageKind == AUMCheckpoint {
  642. lastActiveAncestor = h
  643. if !isYoung || !hasParent {
  644. break
  645. }
  646. }
  647. if next, err = storage.AUM(parent); err != nil {
  648. return AUMHash{}, fmt.Errorf("searching for compaction target (%v): %w", parent, err)
  649. }
  650. }
  651. // Mark remaining known ancestors as retainStateCandidate.
  652. for {
  653. parent, hasParent := next.Parent()
  654. if !hasParent {
  655. break
  656. }
  657. verdict[parent] |= retainStateCandidate
  658. if next, err = storage.AUM(parent); err != nil {
  659. if err == os.ErrNotExist {
  660. // We've reached the end of the chain we have stored.
  661. break
  662. }
  663. return AUMHash{}, fmt.Errorf("reading active chain (retainStateCandidate, %v): %w", parent, err)
  664. }
  665. }
  666. return lastActiveAncestor, nil
  667. }
  668. // markYoungAUMs marks all AUMs younger than minAge for retention. All
  669. // candidate AUMs must exist in verdict.
  670. func markYoungAUMs(storage CompactableChonk, verdict map[AUMHash]retainState, minAge time.Duration) error {
  671. minTime := time.Now().Add(-minAge)
  672. for h := range verdict {
  673. commitTime, err := storage.CommitTime(h)
  674. if err != nil {
  675. return err
  676. }
  677. if commitTime.After(minTime) {
  678. verdict[h] |= retainStateYoung
  679. }
  680. }
  681. return nil
  682. }
  683. // markAncestorIntersectionAUMs walks backwards from all AUMs to be retained,
  684. // ensuring they intersect with candidateAncestor. All AUMs between a retained
  685. // AUM and candidateAncestor are marked for retention.
  686. //
  687. // If there is no intersection between candidateAncestor and the ancestors of
  688. // a retained AUM (this can happen if a retained AUM intersects the main chain
  689. // before candidateAncestor) then candidate ancestor is recomputed based on
  690. // the new oldest intersection.
  691. //
  692. // The final value for lastActiveAncestor is returned.
  693. func markAncestorIntersectionAUMs(storage Chonk, verdict map[AUMHash]retainState, candidateAncestor AUMHash) (lastActiveAncestor AUMHash, err error) {
  694. toScan := make([]AUMHash, 0, len(verdict))
  695. for h, v := range verdict {
  696. if (v & retainAUMMask) == 0 {
  697. continue // not marked for retention, so don't need to consider it
  698. }
  699. if h == candidateAncestor {
  700. continue
  701. }
  702. toScan = append(toScan, h)
  703. }
  704. var didAdjustCandidateAncestor bool
  705. for len(toScan) > 0 {
  706. nextIterScan := make([]AUMHash, 0, len(verdict))
  707. for _, h := range toScan {
  708. if verdict[h]&retainStateAncestor != 0 {
  709. // This AUM and its ancestors have already been iterated.
  710. continue
  711. }
  712. verdict[h] |= retainStateAncestor
  713. a, err := storage.AUM(h)
  714. if err != nil {
  715. return AUMHash{}, fmt.Errorf("reading %v: %w", h, err)
  716. }
  717. parent, hasParent := a.Parent()
  718. if !hasParent {
  719. return AUMHash{}, errors.New("reached genesis AUM without intersecting with candidate ancestor")
  720. }
  721. if verdict[parent]&retainAUMMask != 0 {
  722. // Includes candidateAncestor (has retainStateActive set)
  723. continue
  724. }
  725. if verdict[parent]&retainStateCandidate != 0 {
  726. // We've intersected with the active chain but haven't done so through
  727. // candidateAncestor. That means that we intersect the active chain
  728. // before candidateAncestor, hence candidateAncestor actually needs
  729. // to be earlier than it is now.
  730. candidateAncestor = parent
  731. didAdjustCandidateAncestor = true
  732. verdict[parent] |= retainStateAncestor
  733. // There could be AUMs on the active chain between our new candidateAncestor
  734. // and the old one, make sure they are marked as retained.
  735. next := parent
  736. childLoop:
  737. for {
  738. children, err := storage.ChildAUMs(next)
  739. if err != nil {
  740. return AUMHash{}, fmt.Errorf("reading children %v: %w", next, err)
  741. }
  742. // While there can be many children of an AUM, there can only be
  743. // one child on the active chain (it will have retainStateCandidate set).
  744. for _, a := range children {
  745. h := a.Hash()
  746. if v := verdict[h]; v&retainStateCandidate != 0 && v&retainStateActive == 0 {
  747. verdict[h] |= retainStateAncestor
  748. next = h
  749. continue childLoop
  750. }
  751. }
  752. break
  753. }
  754. }
  755. nextIterScan = append(nextIterScan, parent)
  756. }
  757. toScan = nextIterScan
  758. }
  759. // If candidateAncestor was adjusted backwards, then it may not be a checkpoint
  760. // (and hence a valid compaction candidate). If so, iterate backwards and adjust
  761. // the candidateAncestor till we find a checkpoint.
  762. if didAdjustCandidateAncestor {
  763. var next AUM
  764. if next, err = storage.AUM(candidateAncestor); err != nil {
  765. return AUMHash{}, fmt.Errorf("searching for compaction target (%v): %w", candidateAncestor, err)
  766. }
  767. for {
  768. h := next.Hash()
  769. verdict[h] |= retainStateActive
  770. if next.MessageKind == AUMCheckpoint {
  771. candidateAncestor = h
  772. break
  773. }
  774. parent, hasParent := next.Parent()
  775. if !hasParent {
  776. return AUMHash{}, errors.New("reached genesis AUM without finding an appropriate candidateAncestor")
  777. }
  778. if next, err = storage.AUM(parent); err != nil {
  779. return AUMHash{}, fmt.Errorf("searching for compaction target (%v): %w", parent, err)
  780. }
  781. }
  782. }
  783. return candidateAncestor, nil
  784. }
  785. // markDescendantAUMs marks all children of a retained AUM as retained.
  786. func markDescendantAUMs(storage Chonk, verdict map[AUMHash]retainState) error {
  787. toScan := make([]AUMHash, 0, len(verdict))
  788. for h, v := range verdict {
  789. if v&retainAUMMask == 0 {
  790. continue // not marked, so don't need to mark descendants
  791. }
  792. toScan = append(toScan, h)
  793. }
  794. for len(toScan) > 0 {
  795. nextIterScan := make([]AUMHash, 0, len(verdict))
  796. for _, h := range toScan {
  797. if verdict[h]&retainStateLeaf != 0 {
  798. // This AUM and its descendants have already been marked.
  799. continue
  800. }
  801. verdict[h] |= retainStateLeaf
  802. children, err := storage.ChildAUMs(h)
  803. if err != nil {
  804. return err
  805. }
  806. for _, a := range children {
  807. nextIterScan = append(nextIterScan, a.Hash())
  808. }
  809. }
  810. toScan = nextIterScan
  811. }
  812. return nil
  813. }
  814. // Compact deletes old AUMs from storage, based on the parameters given in opts.
  815. func Compact(storage CompactableChonk, head AUMHash, opts CompactionOptions) (lastActiveAncestor AUMHash, err error) {
  816. if opts.MinChain == 0 {
  817. return AUMHash{}, errors.New("opts.MinChain must be set")
  818. }
  819. if opts.MinAge == 0 {
  820. return AUMHash{}, errors.New("opts.MinAge must be set")
  821. }
  822. all, err := storage.AllAUMs()
  823. if err != nil {
  824. return AUMHash{}, fmt.Errorf("AllAUMs: %w", err)
  825. }
  826. verdict := make(map[AUMHash]retainState, len(all))
  827. for _, h := range all {
  828. verdict[h] = 0
  829. }
  830. if err := markYoungAUMs(storage, verdict, opts.MinAge); err != nil {
  831. return AUMHash{}, fmt.Errorf("marking young AUMs: %w", err)
  832. }
  833. if lastActiveAncestor, err = markActiveChain(storage, verdict, opts.MinChain, head); err != nil {
  834. return AUMHash{}, fmt.Errorf("marking active chain: %w", err)
  835. }
  836. if err := markDescendantAUMs(storage, verdict); err != nil {
  837. return AUMHash{}, fmt.Errorf("marking descendant AUMs: %w", err)
  838. }
  839. if lastActiveAncestor, err = markAncestorIntersectionAUMs(storage, verdict, lastActiveAncestor); err != nil {
  840. return AUMHash{}, fmt.Errorf("marking ancestor intersection: %w", err)
  841. }
  842. toDelete := make([]AUMHash, 0, len(verdict))
  843. for h, v := range verdict {
  844. if v&retainAUMMask == 0 { // no retention set
  845. toDelete = append(toDelete, h)
  846. }
  847. }
  848. if err := storage.SetLastActiveAncestor(lastActiveAncestor); err != nil {
  849. return AUMHash{}, err
  850. }
  851. return lastActiveAncestor, storage.PurgeAUMs(toDelete)
  852. }