diskstore.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. // Copyright (C) 2023 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package main
  7. import (
  8. "bytes"
  9. "cmp"
  10. "compress/gzip"
  11. "context"
  12. "io"
  13. "log"
  14. "math"
  15. "os"
  16. "path/filepath"
  17. "slices"
  18. "time"
  19. )
  20. type diskStore struct {
  21. dir string
  22. inbox chan diskEntry
  23. maxBytes int64
  24. maxFiles int
  25. currentFiles []currentFile
  26. currentSize int64
  27. }
  28. type diskEntry struct {
  29. path string
  30. data []byte
  31. }
  32. type currentFile struct {
  33. path string
  34. size int64
  35. mtime int64
  36. }
  37. func (d *diskStore) Serve(ctx context.Context) {
  38. if err := os.MkdirAll(d.dir, 0o700); err != nil {
  39. log.Println("Creating directory:", err)
  40. return
  41. }
  42. if err := d.inventory(); err != nil {
  43. log.Println("Failed to inventory disk store:", err)
  44. }
  45. d.clean()
  46. cleanTimer := time.NewTicker(time.Minute)
  47. inventoryTimer := time.NewTicker(24 * time.Hour)
  48. buf := new(bytes.Buffer)
  49. gw := gzip.NewWriter(buf)
  50. for {
  51. select {
  52. case entry := <-d.inbox:
  53. path := d.fullPath(entry.path)
  54. if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil {
  55. log.Println("Creating directory:", err)
  56. continue
  57. }
  58. buf.Reset()
  59. gw.Reset(buf)
  60. if _, err := gw.Write(entry.data); err != nil {
  61. log.Println("Failed to compress crash report:", err)
  62. continue
  63. }
  64. if err := gw.Close(); err != nil {
  65. log.Println("Failed to compress crash report:", err)
  66. continue
  67. }
  68. if err := os.WriteFile(path, buf.Bytes(), 0o600); err != nil {
  69. log.Printf("Failed to write %s: %v", entry.path, err)
  70. _ = os.Remove(path)
  71. continue
  72. }
  73. d.currentSize += int64(buf.Len())
  74. d.currentFiles = append(d.currentFiles, currentFile{
  75. size: int64(len(entry.data)),
  76. path: path,
  77. })
  78. case <-cleanTimer.C:
  79. d.clean()
  80. case <-inventoryTimer.C:
  81. if err := d.inventory(); err != nil {
  82. log.Println("Failed to inventory disk store:", err)
  83. }
  84. case <-ctx.Done():
  85. return
  86. }
  87. }
  88. }
  89. func (d *diskStore) Put(path string, data []byte) bool {
  90. select {
  91. case d.inbox <- diskEntry{
  92. path: path,
  93. data: data,
  94. }:
  95. return true
  96. default:
  97. return false
  98. }
  99. }
  100. func (d *diskStore) Get(path string) ([]byte, error) {
  101. path = d.fullPath(path)
  102. bs, err := os.ReadFile(path)
  103. if err != nil {
  104. return nil, err
  105. }
  106. gr, err := gzip.NewReader(bytes.NewReader(bs))
  107. if err != nil {
  108. return nil, err
  109. }
  110. defer gr.Close()
  111. return io.ReadAll(gr)
  112. }
  113. func (d *diskStore) Exists(path string) bool {
  114. path = d.fullPath(path)
  115. _, err := os.Lstat(path)
  116. return err == nil
  117. }
  118. func (d *diskStore) clean() {
  119. for len(d.currentFiles) > 0 && (len(d.currentFiles) > d.maxFiles || d.currentSize > d.maxBytes) {
  120. f := d.currentFiles[0]
  121. log.Println("Removing", f.path)
  122. if err := os.Remove(f.path); err != nil {
  123. log.Println("Failed to remove file:", err)
  124. }
  125. d.currentFiles = d.currentFiles[1:]
  126. d.currentSize -= f.size
  127. }
  128. var oldest time.Duration
  129. if len(d.currentFiles) > 0 {
  130. oldest = time.Since(time.Unix(d.currentFiles[0].mtime, 0)).Truncate(time.Minute)
  131. }
  132. metricDiskstoreFilesTotal.Set(float64(len(d.currentFiles)))
  133. metricDiskstoreBytesTotal.Set(float64(d.currentSize))
  134. metricDiskstoreOldestAgeSeconds.Set(math.Round(oldest.Seconds()))
  135. log.Printf("Clean complete: %d files, %d MB, oldest is %v ago", len(d.currentFiles), d.currentSize>>20, oldest)
  136. }
  137. func (d *diskStore) inventory() error {
  138. d.currentFiles = nil
  139. d.currentSize = 0
  140. err := filepath.Walk(d.dir, func(path string, info os.FileInfo, err error) error {
  141. if err != nil {
  142. return err
  143. }
  144. if info.IsDir() {
  145. return nil
  146. }
  147. if filepath.Ext(path) != ".gz" {
  148. return nil
  149. }
  150. d.currentSize += info.Size()
  151. d.currentFiles = append(d.currentFiles, currentFile{
  152. path: path,
  153. size: info.Size(),
  154. mtime: info.ModTime().Unix(),
  155. })
  156. return nil
  157. })
  158. slices.SortFunc(d.currentFiles, func(a, b currentFile) int {
  159. return cmp.Compare(a.mtime, b.mtime)
  160. })
  161. var oldest time.Duration
  162. if len(d.currentFiles) > 0 {
  163. oldest = time.Since(time.Unix(d.currentFiles[0].mtime, 0)).Truncate(time.Minute)
  164. }
  165. metricDiskstoreFilesTotal.Set(float64(len(d.currentFiles)))
  166. metricDiskstoreBytesTotal.Set(float64(d.currentSize))
  167. metricDiskstoreOldestAgeSeconds.Set(math.Round(oldest.Seconds()))
  168. log.Printf("Inventory complete: %d files, %d MB, oldest is %v ago", len(d.currentFiles), d.currentSize>>20, oldest)
  169. return err
  170. }
  171. func (d *diskStore) fullPath(path string) string {
  172. return filepath.Join(d.dir, path[0:2], path[2:]) + ".gz"
  173. }