Просмотр исходного кода

Add a simple benchmark to e2e tests (#739)

Nate Brown 3 лет назад
Родитель
Сommit
feb3e1317f
7 измененных файлов с 94 добавлено и 23 удалено
  1. 3 0
      Makefile
  2. 24 0
      e2e/handshakes_test.go
  3. 2 2
      e2e/helpers_test.go
  4. 44 13
      e2e/router/router.go
  5. 3 1
      inside.go
  6. 8 2
      overlay/tun_tester.go
  7. 10 5
      udp/udp_tester.go

+ 3 - 0
Makefile

@@ -66,6 +66,9 @@ e2evvv: e2ev
 e2evvvv: TEST_ENV += TEST_LOGS=3
 e2evvvv: e2ev
 
+e2e-bench: TEST_FLAGS = -bench=. -benchmem -run=^$
+e2e-bench: e2e
+
 all: $(ALL:%=build/%/nebula) $(ALL:%=build/%/nebula-cert)
 
 release: $(ALL:%=build/nebula-%.tar.gz)

+ 24 - 0
e2e/handshakes_test.go

@@ -16,6 +16,30 @@ import (
 	"github.com/stretchr/testify/assert"
 )
 
+func BenchmarkHotPath(b *testing.B) {
+	ca, _, caKey, _ := newTestCaCert(time.Now(), time.Now().Add(10*time.Minute), []*net.IPNet{}, []*net.IPNet{}, []string{})
+	myControl, _, _ := newSimpleServer(ca, caKey, "me", net.IP{10, 0, 0, 1}, nil)
+	theirControl, theirVpnIp, theirUdpAddr := newSimpleServer(ca, caKey, "them", net.IP{10, 0, 0, 2}, nil)
+
+	// Put their info in our lighthouse
+	myControl.InjectLightHouseAddr(theirVpnIp, theirUdpAddr)
+
+	// Start the servers
+	myControl.Start()
+	theirControl.Start()
+
+	r := router.NewR(b, myControl, theirControl)
+	r.CancelFlowLogs()
+
+	for n := 0; n < b.N; n++ {
+		myControl.InjectTunUDPPacket(theirVpnIp, 80, 80, []byte("Hi from me"))
+		_ = r.RouteForAllUntilTxTun(theirControl)
+	}
+
+	myControl.Stop()
+	theirControl.Stop()
+}
+
 func TestGoodHandshake(t *testing.T) {
 	ca, _, caKey, _ := newTestCaCert(time.Now(), time.Now().Add(10*time.Minute), []*net.IPNet{}, []*net.IPNet{}, []string{})
 	myControl, myVpnIp, myUdpAddr := newSimpleServer(ca, caKey, "me", net.IP{10, 0, 0, 1}, nil)

+ 2 - 2
e2e/helpers_test.go

@@ -7,7 +7,6 @@ import (
 	"crypto/rand"
 	"fmt"
 	"io"
-	"io/ioutil"
 	"net"
 	"os"
 	"testing"
@@ -304,7 +303,8 @@ func NewTestLogger() *logrus.Logger {
 
 	v := os.Getenv("TEST_LOGS")
 	if v == "" {
-		l.SetOutput(ioutil.Discard)
+		l.SetOutput(io.Discard)
+		l.SetLevel(logrus.PanicLevel)
 		return l
 	}
 

+ 44 - 13
e2e/router/router.go

@@ -47,7 +47,7 @@ type R struct {
 
 	fn           string
 	cancelRender context.CancelFunc
-	t            *testing.T
+	t            testing.TB
 }
 
 type flowEntry struct {
@@ -63,6 +63,12 @@ type packet struct {
 	rx     bool // the packet was received by a udp device
 }
 
+func (p *packet) WasReceived() {
+	if p != nil {
+		p.rx = true
+	}
+}
+
 type ExitType int
 
 const (
@@ -79,7 +85,7 @@ type ExitFunc func(packet *udp.Packet, receiver *nebula.Control) ExitType
 // NewR creates a new router to pass packets in a controlled fashion between the provided controllers.
 // The packet flow will be recorded in a file within the mermaid directory under the same name as the test.
 // Renders will occur automatically, roughly every 100ms, until a call to RenderFlow() is made
-func NewR(t *testing.T, controls ...*nebula.Control) *R {
+func NewR(t testing.TB, controls ...*nebula.Control) *R {
 	ctx, cancel := context.WithCancel(context.Background())
 
 	if err := os.MkdirAll("mermaid", 0755); err != nil {
@@ -91,6 +97,7 @@ func NewR(t *testing.T, controls ...*nebula.Control) *R {
 		vpnControls:  make(map[iputil.VpnIp]*nebula.Control),
 		inNat:        make(map[string]*nebula.Control),
 		outNat:       make(map[string]net.UDPAddr),
+		flow:         []flowEntry{},
 		fn:           filepath.Join("mermaid", fmt.Sprintf("%s.md", t.Name())),
 		t:            t,
 		cancelRender: cancel,
@@ -148,14 +155,24 @@ func (r *R) RenderFlow() {
 	r.renderFlow()
 }
 
+// CancelFlowLogs stops flow logs from being tracked and destroys any logs already collected
+func (r *R) CancelFlowLogs() {
+	r.cancelRender()
+	r.flow = nil
+}
+
 func (r *R) renderFlow() {
+	if r.flow == nil {
+		return
+	}
+
 	f, err := os.OpenFile(r.fn, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0644)
 	if err != nil {
 		panic(err)
 	}
 
 	var participants = map[string]struct{}{}
-	var participansVals []string
+	var participantsVals []string
 
 	fmt.Fprintln(f, "```mermaid")
 	fmt.Fprintln(f, "sequenceDiagram")
@@ -172,7 +189,7 @@ func (r *R) renderFlow() {
 		}
 		participants[addr] = struct{}{}
 		sanAddr := strings.Replace(addr, ":", "#58;", 1)
-		participansVals = append(participansVals, sanAddr)
+		participantsVals = append(participantsVals, sanAddr)
 		fmt.Fprintf(
 			f, "    participant %s as Nebula: %s<br/>UDP: %s\n",
 			sanAddr, e.packet.from.GetVpnIp(), sanAddr,
@@ -183,7 +200,7 @@ func (r *R) renderFlow() {
 	h := &header.H{}
 	for _, e := range r.flow {
 		if e.packet == nil {
-			fmt.Fprintf(f, "    note over %s: %s\n", strings.Join(participansVals, ", "), e.note)
+			fmt.Fprintf(f, "    note over %s: %s\n", strings.Join(participantsVals, ", "), e.note)
 			continue
 		}
 
@@ -222,6 +239,10 @@ func (r *R) InjectFlow(from, to *nebula.Control, p *udp.Packet) {
 }
 
 func (r *R) Log(arg ...any) {
+	if r.flow == nil {
+		return
+	}
+
 	r.Lock()
 	r.flow = append(r.flow, flowEntry{note: fmt.Sprint(arg...)})
 	r.t.Log(arg...)
@@ -229,6 +250,10 @@ func (r *R) Log(arg ...any) {
 }
 
 func (r *R) Logf(format string, arg ...any) {
+	if r.flow == nil {
+		return
+	}
+
 	r.Lock()
 	r.flow = append(r.flow, flowEntry{note: fmt.Sprintf(format, arg...)})
 	r.t.Logf(format, arg...)
@@ -236,14 +261,20 @@ func (r *R) Logf(format string, arg ...any) {
 }
 
 // unlockedInjectFlow is used by the router to record a packet has been transmitted, the packet is returned and
-// should be marked as received AFTER it has been placed on the receivers channel
+// should be marked as received AFTER it has been placed on the receivers channel.
+// If flow logs have been disabled this function will return nil
 func (r *R) unlockedInjectFlow(from, to *nebula.Control, p *udp.Packet, tun bool) *packet {
+	if r.flow == nil {
+		return nil
+	}
+
 	fp := &packet{
 		from:   from,
 		to:     to,
 		packet: p.Copy(),
 		tun:    tun,
 	}
+
 	r.flow = append(r.flow, flowEntry{packet: fp})
 	return fp
 }
@@ -285,7 +316,7 @@ func (r *R) RouteUntilTxTun(sender *nebula.Control, receiver *nebula.Control) []
 			}
 			fp := r.unlockedInjectFlow(sender, c, p, false)
 			c.InjectUDPPacket(p)
-			fp.rx = true
+			fp.WasReceived()
 			r.Unlock()
 		}
 	}
@@ -344,7 +375,7 @@ func (r *R) RouteForAllUntilTxTun(receiver *nebula.Control) []byte {
 			}
 			fp := r.unlockedInjectFlow(cm[x], c, p, false)
 			c.InjectUDPPacket(p)
-			fp.rx = true
+			fp.WasReceived()
 		}
 		r.Unlock()
 	}
@@ -381,14 +412,14 @@ func (r *R) RouteExitFunc(sender *nebula.Control, whatDo ExitFunc) {
 		case RouteAndExit:
 			fp := r.unlockedInjectFlow(sender, receiver, p, false)
 			receiver.InjectUDPPacket(p)
-			fp.rx = true
+			fp.WasReceived()
 			r.Unlock()
 			return
 
 		case KeepRouting:
 			fp := r.unlockedInjectFlow(sender, receiver, p, false)
 			receiver.InjectUDPPacket(p)
-			fp.rx = true
+			fp.WasReceived()
 
 		default:
 			panic(fmt.Sprintf("Unknown exitFunc return: %v", e))
@@ -439,7 +470,7 @@ func (r *R) InjectUDPPacket(sender, receiver *nebula.Control, packet *udp.Packet
 
 	fp := r.unlockedInjectFlow(sender, receiver, packet, false)
 	receiver.InjectUDPPacket(packet)
-	fp.rx = true
+	fp.WasReceived()
 }
 
 // RouteForUntilAfterToAddr will route for sender and return only after it sees and sends a packet destined for toAddr
@@ -503,14 +534,14 @@ func (r *R) RouteForAllExitFunc(whatDo ExitFunc) {
 		case RouteAndExit:
 			fp := r.unlockedInjectFlow(cm[x], receiver, p, false)
 			receiver.InjectUDPPacket(p)
-			fp.rx = true
+			fp.WasReceived()
 			r.Unlock()
 			return
 
 		case KeepRouting:
 			fp := r.unlockedInjectFlow(cm[x], receiver, p, false)
 			receiver.InjectUDPPacket(p)
-			fp.rx = true
+			fp.WasReceived()
 
 		default:
 			panic(fmt.Sprintf("Unknown exitFunc return: %v", e))

+ 3 - 1
inside.go

@@ -14,7 +14,9 @@ import (
 func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *firewall.Packet, nb, out []byte, q int, localCache firewall.ConntrackCache) {
 	err := newPacket(packet, false, fwPacket)
 	if err != nil {
-		f.l.WithField("packet", packet).Debugf("Error while validating outbound packet: %s", err)
+		if f.l.Level >= logrus.DebugLevel {
+			f.l.WithField("packet", packet).Debugf("Error while validating outbound packet: %s", err)
+		}
 		return
 	}
 

+ 8 - 2
overlay/tun_tester.go

@@ -7,6 +7,7 @@ import (
 	"fmt"
 	"io"
 	"net"
+	"os"
 
 	"github.com/sirupsen/logrus"
 	"github.com/slackhq/nebula/cidr"
@@ -49,7 +50,9 @@ func newTunFromFd(_ *logrus.Logger, _ int, _ *net.IPNet, _ int, _ []Route, _ int
 // These are unencrypted ip layer frames destined for another nebula node.
 // packets should exit the udp side, capture them with udpConn.Get
 func (t *TestTun) Send(packet []byte) {
-	t.l.WithField("dataLen", len(packet)).Info("Tun receiving injected packet")
+	if t.l.Level >= logrus.InfoLevel {
+		t.l.WithField("dataLen", len(packet)).Info("Tun receiving injected packet")
+	}
 	t.rxPackets <- packet
 }
 
@@ -107,7 +110,10 @@ func (t *TestTun) Close() error {
 }
 
 func (t *TestTun) Read(b []byte) (int, error) {
-	p := <-t.rxPackets
+	p, ok := <-t.rxPackets
+	if !ok {
+		return 0, os.ErrClosed
+	}
 	copy(b, p)
 	return len(p), nil
 }

+ 10 - 5
udp/udp_tester.go

@@ -62,10 +62,12 @@ func (u *Conn) Send(packet *Packet) {
 	if err := h.Parse(packet.Data); err != nil {
 		panic(err)
 	}
-	u.l.WithField("header", h).
-		WithField("udpAddr", fmt.Sprintf("%v:%v", packet.FromIp, packet.FromPort)).
-		WithField("dataLen", len(packet.Data)).
-		Info("UDP receiving injected packet")
+	if u.l.Level >= logrus.InfoLevel {
+		u.l.WithField("header", h).
+			WithField("udpAddr", fmt.Sprintf("%v:%v", packet.FromIp, packet.FromPort)).
+			WithField("dataLen", len(packet.Data)).
+			Info("UDP receiving injected packet")
+	}
 	u.RxPackets <- packet
 }
 
@@ -114,7 +116,10 @@ func (u *Conn) ListenOut(r EncReader, lhf LightHouseHandlerFunc, cache *firewall
 	nb := make([]byte, 12, 12)
 
 	for {
-		p := <-u.RxPackets
+		p, ok := <-u.RxPackets
+		if !ok {
+			return
+		}
 		ua.Port = p.FromPort
 		copy(ua.IP, p.FromIp.To16())
 		r(ua, nil, plaintext[:0], p.Data, h, fwPacket, lhf, nb, q, cache.Get(u.l))