浏览代码

lib/sync: Log everything...

Audrius Butkevicius 9 年之前
父节点
当前提交
3418497f3d
共有 3 个文件被更改,包括 99 次插入43 次删除
  1. 4 5
      lib/model/util.go
  2. 94 37
      lib/sync/sync.go
  3. 1 1
      lib/sync/sync_test.go

+ 4 - 5
lib/model/util.go

@@ -12,8 +12,8 @@ import (
 	"time"
 )
 
-type Holder interface {
-	Holder() (string, int)
+type Holdable interface {
+	Holders() string
 }
 
 func newDeadlockDetector(timeout time.Duration) *deadlockDetector {
@@ -49,9 +49,8 @@ func (d *deadlockDetector) Watch(name string, mut sync.Locker) {
 			if r := <-ok; !r {
 				msg := fmt.Sprintf("deadlock detected at %s", name)
 				for otherName, otherMut := range d.lockers {
-					if otherHolder, ok := otherMut.(Holder); ok {
-						holder, goid := otherHolder.Holder()
-						msg += fmt.Sprintf("\n %s = current holder: %s at routine %d", otherName, holder, goid)
+					if otherHolder, ok := otherMut.(Holdable); ok {
+						msg += "\n===" + otherName + "===\n" + otherHolder.Holders()
 					}
 				}
 				panic(msg)

+ 94 - 37
lib/sync/sync.go

@@ -36,16 +36,21 @@ type WaitGroup interface {
 
 func NewMutex() Mutex {
 	if debug {
-		return &loggedMutex{}
+		mutex := &loggedMutex{}
+		mutex.holder.Store(holder{})
+		return mutex
 	}
 	return &sync.Mutex{}
 }
 
 func NewRWMutex() RWMutex {
 	if debug {
-		return &loggedRWMutex{
-			unlockers: make([]string, 0),
+		mutex := &loggedRWMutex{
+			readHolders: make(map[int][]holder),
+			unlockers:   make(chan holder, 1024),
 		}
+		mutex.holder.Store(holder{})
+		return mutex
 	}
 	return &sync.RWMutex{}
 }
@@ -57,81 +62,129 @@ func NewWaitGroup() WaitGroup {
 	return &sync.WaitGroup{}
 }
 
+type holder struct {
+	at   string
+	time time.Time
+	goid int
+}
+
+func (h holder) String() string {
+	if h.at == "" {
+		return "not held"
+	}
+	return fmt.Sprintf("at %s goid: %d for %s", h.at, h.goid, time.Now().Sub(h.time))
+}
+
 type loggedMutex struct {
 	sync.Mutex
-	start    time.Time
-	lockedAt string
-	goid     int
+	start  time.Time
+	holder atomic.Value
 }
 
 func (m *loggedMutex) Lock() {
 	m.Mutex.Lock()
-	m.start = time.Now()
-	m.lockedAt = getCaller()
-	m.goid = goid()
+	m.holder.Store(getHolder())
 }
 
 func (m *loggedMutex) Unlock() {
-	duration := time.Now().Sub(m.start)
+	currentHolder := m.holder.Load().(holder)
+	duration := time.Now().Sub(currentHolder.time)
 	if duration >= threshold {
-		l.Debugf("Mutex held for %v. Locked at %s unlocked at %s", duration, m.lockedAt, getCaller())
+		l.Debugf("Mutex held for %v. Locked at %s unlocked at %s", duration, currentHolder.at, getHolder().at)
 	}
+	m.holder.Store(holder{})
 	m.Mutex.Unlock()
 }
 
-func (m *loggedMutex) Holder() (string, int) {
-	return m.lockedAt, m.goid
+func (m *loggedMutex) Holders() string {
+	return m.holder.Load().(holder).String()
 }
 
 type loggedRWMutex struct {
 	sync.RWMutex
-	start    time.Time
-	lockedAt string
-	goid     int
+	holder atomic.Value
 
-	logUnlockers uint32
+	readHolders    map[int][]holder
+	readHoldersMut sync.Mutex
 
-	unlockers    []string
-	unlockersMut sync.Mutex
+	logUnlockers int32
+	unlockers    chan holder
 }
 
 func (m *loggedRWMutex) Lock() {
 	start := time.Now()
 
-	atomic.StoreUint32(&m.logUnlockers, 1)
+	atomic.StoreInt32(&m.logUnlockers, 1)
 	m.RWMutex.Lock()
 	m.logUnlockers = 0
 
-	m.start = time.Now()
-	duration := m.start.Sub(start)
+	holder := getHolder()
+	m.holder.Store(holder)
+
+	duration := holder.time.Sub(start)
 
-	m.lockedAt = getCaller()
-	m.goid = goid()
 	if duration > threshold {
-		l.Debugf("RWMutex took %v to lock. Locked at %s. RUnlockers while locking: %s", duration, m.lockedAt, strings.Join(m.unlockers, ", "))
+		var unlockerStrings []string
+	loop:
+		for {
+			select {
+			case holder := <-m.unlockers:
+				unlockerStrings = append(unlockerStrings, holder.String())
+			default:
+				break loop
+			}
+		}
+		l.Debugf("RWMutex took %v to lock. Locked at %s. RUnlockers while locking:\n%s", duration, holder.at, strings.Join(unlockerStrings, "\n"))
 	}
-	m.unlockers = m.unlockers[0:]
 }
 
 func (m *loggedRWMutex) Unlock() {
-	duration := time.Now().Sub(m.start)
+	currentHolder := m.holder.Load().(holder)
+	duration := time.Now().Sub(currentHolder.time)
 	if duration >= threshold {
-		l.Debugf("RWMutex held for %v. Locked at %s: unlocked at %s", duration, m.lockedAt, getCaller())
+		l.Debugf("RWMutex held for %v. Locked at %s unlocked at %s", duration, currentHolder.at, getHolder().at)
 	}
+	m.holder.Store(holder{})
 	m.RWMutex.Unlock()
 }
 
+func (m *loggedRWMutex) RLock() {
+	m.RWMutex.RLock()
+	holder := getHolder()
+	m.readHoldersMut.Lock()
+	m.readHolders[holder.goid] = append(m.readHolders[holder.goid], holder)
+	m.readHoldersMut.Unlock()
+}
+
 func (m *loggedRWMutex) RUnlock() {
-	if atomic.LoadUint32(&m.logUnlockers) == 1 {
-		m.unlockersMut.Lock()
-		m.unlockers = append(m.unlockers, getCaller())
-		m.unlockersMut.Unlock()
+	id := goid()
+	m.readHoldersMut.Lock()
+	current := m.readHolders[id]
+	if len(current) > 0 {
+		m.readHolders[id] = current[:len(current)-1]
+	}
+	m.readHoldersMut.Unlock()
+	if atomic.LoadInt32(&m.logUnlockers) == 1 {
+		holder := getHolder()
+		select {
+		case m.unlockers <- holder:
+		default:
+			l.Debugf("Dropped holder %s as channel full", holder)
+		}
 	}
 	m.RWMutex.RUnlock()
 }
 
-func (m *loggedRWMutex) Holder() (string, int) {
-	return m.lockedAt, m.goid
+func (m *loggedRWMutex) Holders() string {
+	output := m.holder.Load().(holder).String() + " (writer)"
+	m.readHoldersMut.Lock()
+	for _, holders := range m.readHolders {
+		for _, holder := range holders {
+			output += "\n" + holder.String() + " (reader)"
+		}
+	}
+	m.readHoldersMut.Unlock()
+	return output
 }
 
 type loggedWaitGroup struct {
@@ -143,14 +196,18 @@ func (wg *loggedWaitGroup) Wait() {
 	wg.WaitGroup.Wait()
 	duration := time.Now().Sub(start)
 	if duration >= threshold {
-		l.Debugf("WaitGroup took %v at %s", duration, getCaller())
+		l.Debugf("WaitGroup took %v at %s", duration, getHolder())
 	}
 }
 
-func getCaller() string {
+func getHolder() holder {
 	_, file, line, _ := runtime.Caller(2)
 	file = filepath.Join(filepath.Base(filepath.Dir(file)), filepath.Base(file))
-	return fmt.Sprintf("%s:%d", file, line)
+	return holder{
+		at:   fmt.Sprintf("%s:%d", file, line),
+		goid: goid(),
+		time: time.Now(),
+	}
 }
 
 func goid() int {

+ 1 - 1
lib/sync/sync_test.go

@@ -162,7 +162,7 @@ func TestRWMutex(t *testing.T) {
 	if len(messages) != 2 {
 		t.Errorf("Unexpected message count")
 	}
-	if !strings.Contains(messages[1], "RUnlockers while locking: sync") || !strings.Contains(messages[1], "sync_test.go:") {
+	if !strings.Contains(messages[1], "RUnlockers while locking:\nat sync") || !strings.Contains(messages[1], "sync_test.go:") {
 		t.Error("Unexpected message")
 	}