leveldb.go 17 KB


  1. // Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
  2. // All rights reserved. Use of this source code is governed by an MIT-style
  3. // license that can be found in the LICENSE file.
  4. package files
  5. import (
  6. "bytes"
  7. "runtime"
  8. "sort"
  9. "sync"
  10. "github.com/syncthing/syncthing/lamport"
  11. "github.com/syncthing/syncthing/protocol"
  12. "github.com/syndtr/goleveldb/leveldb"
  13. "github.com/syndtr/goleveldb/leveldb/iterator"
  14. "github.com/syndtr/goleveldb/leveldb/opt"
  15. "github.com/syndtr/goleveldb/leveldb/util"
  16. )
  17. var (
  18. clockTick uint64
  19. clockMut sync.Mutex
  20. )
  21. func clock(v uint64) uint64 {
  22. clockMut.Lock()
  23. defer clockMut.Unlock()
  24. if v > clockTick {
  25. clockTick = v + 1
  26. } else {
  27. clockTick++
  28. }
  29. return clockTick
  30. }
  31. const (
  32. keyTypeNode = iota
  33. keyTypeGlobal
  34. )
  35. type fileVersion struct {
  36. version uint64
  37. node []byte
  38. }
  39. type versionList struct {
  40. versions []fileVersion
  41. }
  42. type fileList []protocol.FileInfo
  43. func (l fileList) Len() int {
  44. return len(l)
  45. }
  46. func (l fileList) Swap(a, b int) {
  47. l[a], l[b] = l[b], l[a]
  48. }
  49. func (l fileList) Less(a, b int) bool {
  50. return l[a].Name < l[b].Name
  51. }
  52. type dbReader interface {
  53. Get([]byte, *opt.ReadOptions) ([]byte, error)
  54. }
  55. type dbWriter interface {
  56. Put([]byte, []byte)
  57. Delete([]byte)
  58. }
  59. /*
  60. keyTypeNode (1 byte)
  61. repository (64 bytes)
  62. node (32 bytes)
  63. name (variable size)
  64. |
  65. scanner.File
  66. keyTypeGlobal (1 byte)
  67. repository (64 bytes)
  68. name (variable size)
  69. |
  70. []fileVersion (sorted)
  71. */
  72. func nodeKey(repo, node, file []byte) []byte {
  73. k := make([]byte, 1+64+32+len(file))
  74. k[0] = keyTypeNode
  75. copy(k[1:], []byte(repo))
  76. copy(k[1+64:], node[:])
  77. copy(k[1+64+32:], []byte(file))
  78. return k
  79. }
  80. func globalKey(repo, file []byte) []byte {
  81. k := make([]byte, 1+64+len(file))
  82. k[0] = keyTypeGlobal
  83. copy(k[1:], []byte(repo))
  84. copy(k[1+64:], []byte(file))
  85. return k
  86. }
  87. func nodeKeyName(key []byte) []byte {
  88. return key[1+64+32:]
  89. }
  90. func nodeKeyRepo(key []byte) []byte {
  91. repo := key[1 : 1+64]
  92. izero := bytes.IndexByte(repo, 0)
  93. return repo[:izero]
  94. }
  95. func nodeKeyNode(key []byte) []byte {
  96. return key[1+64 : 1+64+32]
  97. }
  98. func globalKeyName(key []byte) []byte {
  99. return key[1+64:]
  100. }
  101. func globalKeyRepo(key []byte) []byte {
  102. repo := key[1 : 1+64]
  103. izero := bytes.IndexByte(repo, 0)
  104. return repo[:izero]
  105. }
  106. type deletionHandler func(db dbReader, batch dbWriter, repo, node, name []byte, dbi iterator.Iterator) uint64
  107. type fileIterator func(f protocol.FileIntf) bool
  108. func ldbGenericReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo, deleteFn deletionHandler) uint64 {
  109. runtime.GC()
  110. sort.Sort(fileList(fs)) // sort list on name, same as on disk
  111. start := nodeKey(repo, node, nil) // before all repo/node files
  112. limit := nodeKey(repo, node, []byte{0xff, 0xff, 0xff, 0xff}) // after all repo/node files
  113. batch := new(leveldb.Batch)
  114. snap, err := db.GetSnapshot()
  115. if err != nil {
  116. panic(err)
  117. }
  118. defer snap.Release()
  119. dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  120. defer dbi.Release()
  121. moreDb := dbi.Next()
  122. fsi := 0
  123. var maxLocalVer uint64
  124. for {
  125. var newName, oldName []byte
  126. moreFs := fsi < len(fs)
  127. if !moreDb && !moreFs {
  128. break
  129. }
  130. if !moreFs && deleteFn == nil {
  131. // We don't have any more updated files to process and deletion
  132. // has not been requested, so we can exit early
  133. break
  134. }
  135. if moreFs {
  136. newName = []byte(fs[fsi].Name)
  137. }
  138. if moreDb {
  139. oldName = nodeKeyName(dbi.Key())
  140. }
  141. cmp := bytes.Compare(newName, oldName)
  142. if debug {
  143. l.Debugf("generic replace; repo=%q node=%v moreFs=%v moreDb=%v cmp=%d newName=%q oldName=%q", repo, protocol.NodeIDFromBytes(node), moreFs, moreDb, cmp, newName, oldName)
  144. }
  145. switch {
  146. case moreFs && (!moreDb || cmp == -1):
  147. // Disk is missing this file. Insert it.
  148. if lv := ldbInsert(batch, repo, node, newName, fs[fsi]); lv > maxLocalVer {
  149. maxLocalVer = lv
  150. }
  151. if fs[fsi].IsInvalid() {
  152. ldbRemoveFromGlobal(snap, batch, repo, node, newName)
  153. } else {
  154. ldbUpdateGlobal(snap, batch, repo, node, newName, fs[fsi].Version)
  155. }
  156. fsi++
  157. case moreFs && moreDb && cmp == 0:
  158. // File exists on both sides - compare versions. We might get an
  159. // update with the same version and different flags if a node has
  160. // marked a file as invalid, so handle that too.
  161. var ef protocol.FileInfoTruncated
  162. ef.UnmarshalXDR(dbi.Value())
  163. if fs[fsi].Version > ef.Version || fs[fsi].Version != ef.Version {
  164. if lv := ldbInsert(batch, repo, node, newName, fs[fsi]); lv > maxLocalVer {
  165. maxLocalVer = lv
  166. }
  167. if fs[fsi].IsInvalid() {
  168. ldbRemoveFromGlobal(snap, batch, repo, node, newName)
  169. } else {
  170. ldbUpdateGlobal(snap, batch, repo, node, newName, fs[fsi].Version)
  171. }
  172. }
  173. // Iterate both sides.
  174. fsi++
  175. moreDb = dbi.Next()
  176. case moreDb && (!moreFs || cmp == 1):
  177. if deleteFn != nil {
  178. if lv := deleteFn(snap, batch, repo, node, oldName, dbi); lv > maxLocalVer {
  179. maxLocalVer = lv
  180. }
  181. }
  182. moreDb = dbi.Next()
  183. }
  184. }
  185. err = db.Write(batch, nil)
  186. if err != nil {
  187. panic(err)
  188. }
  189. return maxLocalVer
  190. }
  191. func ldbReplace(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) uint64 {
  192. // TODO: Return the remaining maxLocalVer?
  193. return ldbGenericReplace(db, repo, node, fs, func(db dbReader, batch dbWriter, repo, node, name []byte, dbi iterator.Iterator) uint64 {
  194. // Disk has files that we are missing. Remove it.
  195. if debug {
  196. l.Debugf("delete; repo=%q node=%v name=%q", repo, protocol.NodeIDFromBytes(node), name)
  197. }
  198. ldbRemoveFromGlobal(db, batch, repo, node, name)
  199. batch.Delete(dbi.Key())
  200. return 0
  201. })
  202. }
  203. func ldbReplaceWithDelete(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) uint64 {
  204. return ldbGenericReplace(db, repo, node, fs, func(db dbReader, batch dbWriter, repo, node, name []byte, dbi iterator.Iterator) uint64 {
  205. var tf protocol.FileInfoTruncated
  206. err := tf.UnmarshalXDR(dbi.Value())
  207. if err != nil {
  208. panic(err)
  209. }
  210. if !tf.IsDeleted() {
  211. if debug {
  212. l.Debugf("mark deleted; repo=%q node=%v name=%q", repo, protocol.NodeIDFromBytes(node), name)
  213. }
  214. ts := clock(tf.LocalVersion)
  215. f := protocol.FileInfo{
  216. Name: tf.Name,
  217. Version: lamport.Default.Tick(tf.Version),
  218. LocalVersion: ts,
  219. Flags: tf.Flags | protocol.FlagDeleted,
  220. Modified: tf.Modified,
  221. }
  222. batch.Put(dbi.Key(), f.MarshalXDR())
  223. ldbUpdateGlobal(db, batch, repo, node, nodeKeyName(dbi.Key()), f.Version)
  224. return ts
  225. }
  226. return 0
  227. })
  228. }
  229. func ldbUpdate(db *leveldb.DB, repo, node []byte, fs []protocol.FileInfo) uint64 {
  230. runtime.GC()
  231. batch := new(leveldb.Batch)
  232. snap, err := db.GetSnapshot()
  233. if err != nil {
  234. panic(err)
  235. }
  236. defer snap.Release()
  237. var maxLocalVer uint64
  238. for _, f := range fs {
  239. name := []byte(f.Name)
  240. fk := nodeKey(repo, node, name)
  241. bs, err := snap.Get(fk, nil)
  242. if err == leveldb.ErrNotFound {
  243. if lv := ldbInsert(batch, repo, node, name, f); lv > maxLocalVer {
  244. maxLocalVer = lv
  245. }
  246. if f.IsInvalid() {
  247. ldbRemoveFromGlobal(snap, batch, repo, node, name)
  248. } else {
  249. ldbUpdateGlobal(snap, batch, repo, node, name, f.Version)
  250. }
  251. continue
  252. }
  253. var ef protocol.FileInfoTruncated
  254. err = ef.UnmarshalXDR(bs)
  255. if err != nil {
  256. panic(err)
  257. }
  258. // Flags might change without the version being bumped when we set the
  259. // invalid flag on an existing file.
  260. if ef.Version != f.Version || ef.Flags != f.Flags {
  261. if lv := ldbInsert(batch, repo, node, name, f); lv > maxLocalVer {
  262. maxLocalVer = lv
  263. }
  264. if f.IsInvalid() {
  265. ldbRemoveFromGlobal(snap, batch, repo, node, name)
  266. } else {
  267. ldbUpdateGlobal(snap, batch, repo, node, name, f.Version)
  268. }
  269. }
  270. }
  271. err = db.Write(batch, nil)
  272. if err != nil {
  273. panic(err)
  274. }
  275. return maxLocalVer
  276. }
  277. func ldbInsert(batch dbWriter, repo, node, name []byte, file protocol.FileInfo) uint64 {
  278. if debug {
  279. l.Debugf("insert; repo=%q node=%v %v", repo, protocol.NodeIDFromBytes(node), file)
  280. }
  281. if file.LocalVersion == 0 {
  282. file.LocalVersion = clock(0)
  283. }
  284. nk := nodeKey(repo, node, name)
  285. batch.Put(nk, file.MarshalXDR())
  286. return file.LocalVersion
  287. }
  288. // ldbUpdateGlobal adds this node+version to the version list for the given
  289. // file. If the node is already present in the list, the version is updated.
  290. // If the file does not have an entry in the global list, it is created.
  291. func ldbUpdateGlobal(db dbReader, batch dbWriter, repo, node, file []byte, version uint64) bool {
  292. if debug {
  293. l.Debugf("update global; repo=%q node=%v file=%q version=%d", repo, protocol.NodeIDFromBytes(node), file, version)
  294. }
  295. gk := globalKey(repo, file)
  296. svl, err := db.Get(gk, nil)
  297. if err != nil && err != leveldb.ErrNotFound {
  298. panic(err)
  299. }
  300. var fl versionList
  301. nv := fileVersion{
  302. node: node,
  303. version: version,
  304. }
  305. if svl != nil {
  306. err = fl.UnmarshalXDR(svl)
  307. if err != nil {
  308. panic(err)
  309. }
  310. for i := range fl.versions {
  311. if bytes.Compare(fl.versions[i].node, node) == 0 {
  312. if fl.versions[i].version == version {
  313. // No need to do anything
  314. return false
  315. }
  316. fl.versions = append(fl.versions[:i], fl.versions[i+1:]...)
  317. break
  318. }
  319. }
  320. }
  321. for i := range fl.versions {
  322. if fl.versions[i].version <= version {
  323. t := append(fl.versions, fileVersion{})
  324. copy(t[i+1:], t[i:])
  325. t[i] = nv
  326. fl.versions = t
  327. goto done
  328. }
  329. }
  330. fl.versions = append(fl.versions, nv)
  331. done:
  332. batch.Put(gk, fl.MarshalXDR())
  333. return true
  334. }
  335. // ldbRemoveFromGlobal removes the node from the global version list for the
  336. // given file. If the version list is empty after this, the file entry is
  337. // removed entirely.
  338. func ldbRemoveFromGlobal(db dbReader, batch dbWriter, repo, node, file []byte) {
  339. if debug {
  340. l.Debugf("remove from global; repo=%q node=%v file=%q", repo, protocol.NodeIDFromBytes(node), file)
  341. }
  342. gk := globalKey(repo, file)
  343. svl, err := db.Get(gk, nil)
  344. if err != nil {
  345. // We might be called to "remove" a global version that doesn't exist
  346. // if the first update for the file is already marked invalid.
  347. return
  348. }
  349. var fl versionList
  350. err = fl.UnmarshalXDR(svl)
  351. if err != nil {
  352. panic(err)
  353. }
  354. for i := range fl.versions {
  355. if bytes.Compare(fl.versions[i].node, node) == 0 {
  356. fl.versions = append(fl.versions[:i], fl.versions[i+1:]...)
  357. break
  358. }
  359. }
  360. if len(fl.versions) == 0 {
  361. batch.Delete(gk)
  362. } else {
  363. batch.Put(gk, fl.MarshalXDR())
  364. }
  365. }
  366. func ldbWithHave(db *leveldb.DB, repo, node []byte, truncate bool, fn fileIterator) {
  367. start := nodeKey(repo, node, nil) // before all repo/node files
  368. limit := nodeKey(repo, node, []byte{0xff, 0xff, 0xff, 0xff}) // after all repo/node files
  369. snap, err := db.GetSnapshot()
  370. if err != nil {
  371. panic(err)
  372. }
  373. defer snap.Release()
  374. dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  375. defer dbi.Release()
  376. for dbi.Next() {
  377. f, err := unmarshalTrunc(dbi.Value(), truncate)
  378. if err != nil {
  379. panic(err)
  380. }
  381. if cont := fn(f); !cont {
  382. return
  383. }
  384. }
  385. }
  386. func ldbWithAllRepoTruncated(db *leveldb.DB, repo []byte, fn func(node []byte, f protocol.FileInfoTruncated) bool) {
  387. runtime.GC()
  388. start := nodeKey(repo, nil, nil) // before all repo/node files
  389. limit := nodeKey(repo, protocol.LocalNodeID[:], []byte{0xff, 0xff, 0xff, 0xff}) // after all repo/node files
  390. snap, err := db.GetSnapshot()
  391. if err != nil {
  392. panic(err)
  393. }
  394. defer snap.Release()
  395. dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  396. defer dbi.Release()
  397. for dbi.Next() {
  398. node := nodeKeyNode(dbi.Key())
  399. var f protocol.FileInfoTruncated
  400. err := f.UnmarshalXDR(dbi.Value())
  401. if err != nil {
  402. panic(err)
  403. }
  404. if cont := fn(node, f); !cont {
  405. return
  406. }
  407. }
  408. }
  409. func ldbGet(db *leveldb.DB, repo, node, file []byte) protocol.FileInfo {
  410. nk := nodeKey(repo, node, file)
  411. bs, err := db.Get(nk, nil)
  412. if err == leveldb.ErrNotFound {
  413. return protocol.FileInfo{}
  414. }
  415. if err != nil {
  416. panic(err)
  417. }
  418. var f protocol.FileInfo
  419. err = f.UnmarshalXDR(bs)
  420. if err != nil {
  421. panic(err)
  422. }
  423. return f
  424. }
  425. func ldbGetGlobal(db *leveldb.DB, repo, file []byte) protocol.FileInfo {
  426. k := globalKey(repo, file)
  427. snap, err := db.GetSnapshot()
  428. if err != nil {
  429. panic(err)
  430. }
  431. defer snap.Release()
  432. bs, err := snap.Get(k, nil)
  433. if err == leveldb.ErrNotFound {
  434. return protocol.FileInfo{}
  435. }
  436. if err != nil {
  437. panic(err)
  438. }
  439. var vl versionList
  440. err = vl.UnmarshalXDR(bs)
  441. if err != nil {
  442. panic(err)
  443. }
  444. if len(vl.versions) == 0 {
  445. l.Debugln(k)
  446. panic("no versions?")
  447. }
  448. k = nodeKey(repo, vl.versions[0].node, file)
  449. bs, err = snap.Get(k, nil)
  450. if err != nil {
  451. panic(err)
  452. }
  453. var f protocol.FileInfo
  454. err = f.UnmarshalXDR(bs)
  455. if err != nil {
  456. panic(err)
  457. }
  458. return f
  459. }
  460. func ldbWithGlobal(db *leveldb.DB, repo []byte, truncate bool, fn fileIterator) {
  461. runtime.GC()
  462. start := globalKey(repo, nil)
  463. limit := globalKey(repo, []byte{0xff, 0xff, 0xff, 0xff})
  464. snap, err := db.GetSnapshot()
  465. if err != nil {
  466. panic(err)
  467. }
  468. defer snap.Release()
  469. dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  470. defer dbi.Release()
  471. for dbi.Next() {
  472. var vl versionList
  473. err := vl.UnmarshalXDR(dbi.Value())
  474. if err != nil {
  475. panic(err)
  476. }
  477. if len(vl.versions) == 0 {
  478. l.Debugln(dbi.Key())
  479. panic("no versions?")
  480. }
  481. fk := nodeKey(repo, vl.versions[0].node, globalKeyName(dbi.Key()))
  482. bs, err := snap.Get(fk, nil)
  483. if err != nil {
  484. panic(err)
  485. }
  486. f, err := unmarshalTrunc(bs, truncate)
  487. if err != nil {
  488. panic(err)
  489. }
  490. if cont := fn(f); !cont {
  491. return
  492. }
  493. }
  494. }
  495. func ldbAvailability(db *leveldb.DB, repo, file []byte) []protocol.NodeID {
  496. k := globalKey(repo, file)
  497. bs, err := db.Get(k, nil)
  498. if err == leveldb.ErrNotFound {
  499. return nil
  500. }
  501. if err != nil {
  502. panic(err)
  503. }
  504. var vl versionList
  505. err = vl.UnmarshalXDR(bs)
  506. if err != nil {
  507. panic(err)
  508. }
  509. var nodes []protocol.NodeID
  510. for _, v := range vl.versions {
  511. if v.version != vl.versions[0].version {
  512. break
  513. }
  514. n := protocol.NodeIDFromBytes(v.node)
  515. nodes = append(nodes, n)
  516. }
  517. return nodes
  518. }
  519. func ldbWithNeed(db *leveldb.DB, repo, node []byte, truncate bool, fn fileIterator) {
  520. runtime.GC()
  521. start := globalKey(repo, nil)
  522. limit := globalKey(repo, []byte{0xff, 0xff, 0xff, 0xff})
  523. snap, err := db.GetSnapshot()
  524. if err != nil {
  525. panic(err)
  526. }
  527. defer snap.Release()
  528. dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  529. defer dbi.Release()
  530. outer:
  531. for dbi.Next() {
  532. var vl versionList
  533. err := vl.UnmarshalXDR(dbi.Value())
  534. if err != nil {
  535. panic(err)
  536. }
  537. if len(vl.versions) == 0 {
  538. l.Debugln(dbi.Key())
  539. panic("no versions?")
  540. }
  541. have := false // If we have the file, any version
  542. need := false // If we have a lower version of the file
  543. var haveVersion uint64
  544. for _, v := range vl.versions {
  545. if bytes.Compare(v.node, node) == 0 {
  546. have = true
  547. haveVersion = v.version
  548. need = v.version < vl.versions[0].version
  549. break
  550. }
  551. }
  552. if need || !have {
  553. name := globalKeyName(dbi.Key())
  554. needVersion := vl.versions[0].version
  555. inner:
  556. for i := range vl.versions {
  557. if vl.versions[i].version != needVersion {
  558. // We haven't found a valid copy of the file with the needed version.
  559. continue outer
  560. }
  561. fk := nodeKey(repo, vl.versions[i].node, name)
  562. bs, err := snap.Get(fk, nil)
  563. if err != nil {
  564. panic(err)
  565. }
  566. gf, err := unmarshalTrunc(bs, truncate)
  567. if err != nil {
  568. panic(err)
  569. }
  570. if gf.IsInvalid() {
  571. // The file is marked invalid for whatever reason, don't use it.
  572. continue inner
  573. }
  574. if gf.IsDeleted() && !have {
  575. // We don't need deleted files that we don't have
  576. continue outer
  577. }
  578. if debug {
  579. l.Debugf("need repo=%q node=%v name=%q need=%v have=%v haveV=%d globalV=%d", repo, protocol.NodeIDFromBytes(node), name, need, have, haveVersion, vl.versions[0].version)
  580. }
  581. if cont := fn(gf); !cont {
  582. return
  583. }
  584. // This file is handled, no need to look further in the version list
  585. continue outer
  586. }
  587. }
  588. }
  589. }
  590. func ldbListRepos(db *leveldb.DB) []string {
  591. runtime.GC()
  592. start := []byte{keyTypeGlobal}
  593. limit := []byte{keyTypeGlobal + 1}
  594. snap, err := db.GetSnapshot()
  595. if err != nil {
  596. panic(err)
  597. }
  598. defer snap.Release()
  599. dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  600. defer dbi.Release()
  601. repoExists := make(map[string]bool)
  602. for dbi.Next() {
  603. repo := string(globalKeyRepo(dbi.Key()))
  604. if !repoExists[repo] {
  605. repoExists[repo] = true
  606. }
  607. }
  608. repos := make([]string, 0, len(repoExists))
  609. for k := range repoExists {
  610. repos = append(repos, k)
  611. }
  612. sort.Strings(repos)
  613. return repos
  614. }
  615. func ldbDropRepo(db *leveldb.DB, repo []byte) {
  616. runtime.GC()
  617. snap, err := db.GetSnapshot()
  618. if err != nil {
  619. panic(err)
  620. }
  621. defer snap.Release()
  622. // Remove all items related to the given repo from the node->file bucket
  623. start := []byte{keyTypeNode}
  624. limit := []byte{keyTypeNode + 1}
  625. dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  626. for dbi.Next() {
  627. itemRepo := nodeKeyRepo(dbi.Key())
  628. if bytes.Compare(repo, itemRepo) == 0 {
  629. db.Delete(dbi.Key(), nil)
  630. }
  631. }
  632. dbi.Release()
  633. // Remove all items related to the given repo from the global bucket
  634. start = []byte{keyTypeGlobal}
  635. limit = []byte{keyTypeGlobal + 1}
  636. dbi = snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
  637. for dbi.Next() {
  638. itemRepo := globalKeyRepo(dbi.Key())
  639. if bytes.Compare(repo, itemRepo) == 0 {
  640. db.Delete(dbi.Key(), nil)
  641. }
  642. }
  643. dbi.Release()
  644. }
  645. func unmarshalTrunc(bs []byte, truncate bool) (protocol.FileIntf, error) {
  646. if truncate {
  647. var tf protocol.FileInfoTruncated
  648. err := tf.UnmarshalXDR(bs)
  649. return tf, err
  650. } else {
  651. var tf protocol.FileInfo
  652. err := tf.UnmarshalXDR(bs)
  653. return tf, err
  654. }
  655. }