| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239 |
- package main
- import (
- "log"
- "sort"
- "sync"
- "time"
- )
- type Monitor interface {
- FileBegins(<-chan content) error
- FileDone() error
- }
- type FileQueue struct {
- files queuedFileList
- sorted bool
- fmut sync.Mutex // protects files and sorted
- availability map[string][]string
- amut sync.Mutex // protects availability
- queued map[string]bool
- }
- type queuedFile struct {
- name string
- blocks []Block
- activeBlocks []bool
- given int
- remaining int
- channel chan content
- nodes []string
- nodesChecked time.Time
- monitor Monitor
- }
- type content struct {
- offset int64
- data []byte
- }
- type queuedFileList []queuedFile
- func (l queuedFileList) Len() int { return len(l) }
- func (l queuedFileList) Swap(a, b int) { l[a], l[b] = l[b], l[a] }
- func (l queuedFileList) Less(a, b int) bool {
- // Sort by most blocks already given out, then alphabetically
- if l[a].given != l[b].given {
- return l[a].given > l[b].given
- }
- return l[a].name < l[b].name
- }
- type queuedBlock struct {
- name string
- block Block
- index int
- }
- func NewFileQueue() *FileQueue {
- return &FileQueue{
- availability: make(map[string][]string),
- queued: make(map[string]bool),
- }
- }
- func (q *FileQueue) Add(name string, blocks []Block, monitor Monitor) {
- q.fmut.Lock()
- defer q.fmut.Unlock()
- if q.queued[name] {
- return
- }
- q.files = append(q.files, queuedFile{
- name: name,
- blocks: blocks,
- activeBlocks: make([]bool, len(blocks)),
- remaining: len(blocks),
- channel: make(chan content),
- monitor: monitor,
- })
- q.queued[name] = true
- q.sorted = false
- }
- func (q *FileQueue) Len() int {
- q.fmut.Lock()
- defer q.fmut.Unlock()
- return len(q.files)
- }
- func (q *FileQueue) Get(nodeID string) (queuedBlock, bool) {
- q.fmut.Lock()
- defer q.fmut.Unlock()
- if !q.sorted {
- sort.Sort(q.files)
- q.sorted = true
- }
- for i := range q.files {
- qf := &q.files[i]
- q.amut.Lock()
- av := q.availability[qf.name]
- q.amut.Unlock()
- if len(av) == 0 {
- // Noone has the file we want; abort.
- if qf.remaining != len(qf.blocks) {
- // We have already started on this file; close it down
- close(qf.channel)
- if mon := qf.monitor; mon != nil {
- mon.FileDone()
- }
- }
- delete(q.queued, qf.name)
- q.deleteAt(i)
- return queuedBlock{}, false
- }
- for _, ni := range av {
- // Find and return the next block in the queue
- if ni == nodeID {
- for j, b := range qf.blocks {
- if !qf.activeBlocks[j] {
- qf.activeBlocks[j] = true
- qf.given++
- return queuedBlock{
- name: qf.name,
- block: b,
- index: j,
- }, true
- }
- }
- break
- }
- }
- }
- // We found nothing to do
- return queuedBlock{}, false
- }
- func (q *FileQueue) Done(file string, offset int64, data []byte) {
- q.fmut.Lock()
- defer q.fmut.Unlock()
- c := content{
- offset: offset,
- data: data,
- }
- for i := range q.files {
- qf := &q.files[i]
- if qf.name == file {
- if qf.monitor != nil && qf.remaining == len(qf.blocks) {
- err := qf.monitor.FileBegins(qf.channel)
- if err != nil {
- log.Printf("WARNING: %s: %v (not synced)", qf.name, err)
- delete(q.queued, qf.name)
- q.deleteAt(i)
- return
- }
- }
- qf.channel <- c
- qf.remaining--
- if qf.remaining == 0 {
- close(qf.channel)
- if qf.monitor != nil {
- err := qf.monitor.FileDone()
- if err != nil {
- log.Printf("WARNING: %s: %v", qf.name, err)
- }
- }
- delete(q.queued, qf.name)
- q.deleteAt(i)
- }
- return
- }
- }
- // We found nothing, might have errored out already
- }
- func (q *FileQueue) QueuedFiles() (files []string) {
- q.fmut.Lock()
- defer q.fmut.Unlock()
- for _, qf := range q.files {
- files = append(files, qf.name)
- }
- return
- }
- func (q *FileQueue) deleteAt(i int) {
- q.files = append(q.files[:i], q.files[i+1:]...)
- }
- func (q *FileQueue) deleteFile(n string) {
- for i, file := range q.files {
- if n == file.name {
- q.deleteAt(i)
- delete(q.queued, file.name)
- return
- }
- }
- }
- func (q *FileQueue) SetAvailable(file string, nodes []string) {
- q.amut.Lock()
- defer q.amut.Unlock()
- q.availability[file] = nodes
- }
- func (q *FileQueue) RemoveAvailable(toRemove string) {
- q.amut.Lock()
- q.fmut.Lock()
- defer q.fmut.Unlock()
- defer q.amut.Unlock()
- for file, nodes := range q.availability {
- for i, node := range nodes {
- if node == toRemove {
- q.availability[file] = nodes[:i+copy(nodes[i:], nodes[i+1:])]
- if len(q.availability[file]) == 0 {
- q.deleteFile(file)
- }
- }
- break
- }
- }
- }
|