Browse Source

Move top level packages to internal.

Jakob Borg 11 years ago
commit
f28367bcfc
16 changed files with 2777 additions and 0 deletions
  1. 1 0
      .gitignore
  2. 76 0
      common_test.go
  3. 64 0
      counting.go
  4. 17 0
      debug.go
  5. 6 0
      doc.go
  6. 45 0
      header.go
  7. 139 0
      message.go
  8. 964 0
      message_xdr.go
  9. 42 0
      nativemodel_darwin.go
  10. 33 0
      nativemodel_unix.go
  11. 72 0
      nativemodel_windows.go
  12. 159 0
      nodeid.go
  13. 78 0
      nodeid_test.go
  14. 640 0
      protocol.go
  15. 383 0
      protocol_test.go
  16. 58 0
      wireformat.go

+ 1 - 0
.gitignore

@@ -0,0 +1 @@
+*.txt

+ 76 - 0
common_test.go

@@ -0,0 +1,76 @@
+// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
+// All rights reserved. Use of this source code is governed by an MIT-style
+// license that can be found in the LICENSE file.
+
+package protocol
+
+import (
+	"io"
+	"time"
+)
+
+type TestModel struct {
+	data     []byte
+	repo     string
+	name     string
+	offset   int64
+	size     int
+	closedCh chan bool
+}
+
+func newTestModel() *TestModel {
+	return &TestModel{
+		closedCh: make(chan bool),
+	}
+}
+
+func (t *TestModel) Index(nodeID NodeID, repo string, files []FileInfo) {
+}
+
+func (t *TestModel) IndexUpdate(nodeID NodeID, repo string, files []FileInfo) {
+}
+
+func (t *TestModel) Request(nodeID NodeID, repo, name string, offset int64, size int) ([]byte, error) {
+	t.repo = repo
+	t.name = name
+	t.offset = offset
+	t.size = size
+	return t.data, nil
+}
+
+func (t *TestModel) Close(nodeID NodeID, err error) {
+	close(t.closedCh)
+}
+
+func (t *TestModel) ClusterConfig(nodeID NodeID, config ClusterConfigMessage) {
+}
+
+func (t *TestModel) isClosed() bool {
+	select {
+	case <-t.closedCh:
+		return true
+	case <-time.After(1 * time.Second):
+		return false // Timeout
+	}
+}
+
+type ErrPipe struct {
+	io.PipeWriter
+	written int
+	max     int
+	err     error
+	closed  bool
+}
+
+func (e *ErrPipe) Write(data []byte) (int, error) {
+	if e.closed {
+		return 0, e.err
+	}
+	if e.written+len(data) > e.max {
+		n, _ := e.PipeWriter.Write(data[:e.max-e.written])
+		e.PipeWriter.CloseWithError(e.err)
+		e.closed = true
+		return n, e.err
+	}
+	return e.PipeWriter.Write(data)
+}

+ 64 - 0
counting.go

@@ -0,0 +1,64 @@
+// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
+// All rights reserved. Use of this source code is governed by an MIT-style
+// license that can be found in the LICENSE file.
+
+package protocol
+
+import (
+	"io"
+	"sync/atomic"
+	"time"
+)
+
+type countingReader struct {
+	io.Reader
+	tot  uint64 // bytes
+	last int64  // unix nanos
+}
+
+var (
+	totalIncoming uint64
+	totalOutgoing uint64
+)
+
+func (c *countingReader) Read(bs []byte) (int, error) {
+	n, err := c.Reader.Read(bs)
+	atomic.AddUint64(&c.tot, uint64(n))
+	atomic.AddUint64(&totalIncoming, uint64(n))
+	atomic.StoreInt64(&c.last, time.Now().UnixNano())
+	return n, err
+}
+
+func (c *countingReader) Tot() uint64 {
+	return atomic.LoadUint64(&c.tot)
+}
+
+func (c *countingReader) Last() time.Time {
+	return time.Unix(0, atomic.LoadInt64(&c.last))
+}
+
+type countingWriter struct {
+	io.Writer
+	tot  uint64 // bytes
+	last int64  // unix nanos
+}
+
+func (c *countingWriter) Write(bs []byte) (int, error) {
+	n, err := c.Writer.Write(bs)
+	atomic.AddUint64(&c.tot, uint64(n))
+	atomic.AddUint64(&totalOutgoing, uint64(n))
+	atomic.StoreInt64(&c.last, time.Now().UnixNano())
+	return n, err
+}
+
+func (c *countingWriter) Tot() uint64 {
+	return atomic.LoadUint64(&c.tot)
+}
+
+func (c *countingWriter) Last() time.Time {
+	return time.Unix(0, atomic.LoadInt64(&c.last))
+}
+
+func TotalInOut() (uint64, uint64) {
+	return atomic.LoadUint64(&totalIncoming), atomic.LoadUint64(&totalOutgoing)
+}

+ 17 - 0
debug.go

@@ -0,0 +1,17 @@
+// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
+// All rights reserved. Use of this source code is governed by an MIT-style
+// license that can be found in the LICENSE file.
+
+package protocol
+
+import (
+	"os"
+	"strings"
+
+	"github.com/syncthing/syncthing/internal/logger"
+)
+
+var (
+	debug = strings.Contains(os.Getenv("STTRACE"), "protocol") || os.Getenv("STTRACE") == "all"
+	l     = logger.DefaultLogger
+)

+ 6 - 0
doc.go

@@ -0,0 +1,6 @@
+// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
+// All rights reserved. Use of this source code is governed by an MIT-style
+// license that can be found in the LICENSE file.
+
+// Package protocol implements the Block Exchange Protocol.
+package protocol

+ 45 - 0
header.go

@@ -0,0 +1,45 @@
+// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
+// All rights reserved. Use of this source code is governed by an MIT-style
+// license that can be found in the LICENSE file.
+
+package protocol
+
+import "github.com/calmh/xdr"
+
+type header struct {
+	version     int
+	msgID       int
+	msgType     int
+	compression bool
+}
+
+func (h header) encodeXDR(xw *xdr.Writer) (int, error) {
+	u := encodeHeader(h)
+	return xw.WriteUint32(u)
+}
+
+func (h *header) decodeXDR(xr *xdr.Reader) error {
+	u := xr.ReadUint32()
+	*h = decodeHeader(u)
+	return xr.Error()
+}
+
+func encodeHeader(h header) uint32 {
+	var isComp uint32
+	if h.compression {
+		isComp = 1 << 0 // the zeroth bit is the compression bit
+	}
+	return uint32(h.version&0xf)<<28 +
+		uint32(h.msgID&0xfff)<<16 +
+		uint32(h.msgType&0xff)<<8 +
+		isComp
+}
+
+func decodeHeader(u uint32) header {
+	return header{
+		version:     int(u>>28) & 0xf,
+		msgID:       int(u>>16) & 0xfff,
+		msgType:     int(u>>8) & 0xff,
+		compression: u&1 == 1,
+	}
+}

+ 139 - 0
message.go

@@ -0,0 +1,139 @@
+// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
+// All rights reserved. Use of this source code is governed by an MIT-style
+// license that can be found in the LICENSE file.
+
+package protocol
+
+import "fmt"
+
+type IndexMessage struct {
+	Repository string // max:64
+	Files      []FileInfo
+}
+
+type FileInfo struct {
+	Name         string // max:8192
+	Flags        uint32
+	Modified     int64
+	Version      uint64
+	LocalVersion uint64
+	Blocks       []BlockInfo
+}
+
+func (f FileInfo) String() string {
+	return fmt.Sprintf("File{Name:%q, Flags:0%o, Modified:%d, Version:%d, Size:%d, Blocks:%v}",
+		f.Name, f.Flags, f.Modified, f.Version, f.Size(), f.Blocks)
+}
+
+func (f FileInfo) Size() (bytes int64) {
+	if IsDeleted(f.Flags) || IsDirectory(f.Flags) {
+		return 128
+	}
+	for _, b := range f.Blocks {
+		bytes += int64(b.Size)
+	}
+	return
+}
+
+func (f FileInfo) IsDeleted() bool {
+	return IsDeleted(f.Flags)
+}
+
+func (f FileInfo) IsInvalid() bool {
+	return IsInvalid(f.Flags)
+}
+
+// Used for unmarshalling a FileInfo structure but skipping the actual block list
+type FileInfoTruncated struct {
+	Name         string // max:8192
+	Flags        uint32
+	Modified     int64
+	Version      uint64
+	LocalVersion uint64
+	NumBlocks    uint32
+}
+
+// Returns a statistical guess on the size, not the exact figure
+func (f FileInfoTruncated) Size() int64 {
+	if IsDeleted(f.Flags) || IsDirectory(f.Flags) {
+		return 128
+	}
+	if f.NumBlocks < 2 {
+		return BlockSize / 2
+	} else {
+		return int64(f.NumBlocks-1)*BlockSize + BlockSize/2
+	}
+}
+
+func (f FileInfoTruncated) IsDeleted() bool {
+	return IsDeleted(f.Flags)
+}
+
+func (f FileInfoTruncated) IsInvalid() bool {
+	return IsInvalid(f.Flags)
+}
+
+type FileIntf interface {
+	Size() int64
+	IsDeleted() bool
+	IsInvalid() bool
+}
+
+type BlockInfo struct {
+	Offset int64 // noencode (cache only)
+	Size   uint32
+	Hash   []byte // max:64
+}
+
+func (b BlockInfo) String() string {
+	return fmt.Sprintf("Block{%d/%d/%x}", b.Offset, b.Size, b.Hash)
+}
+
+type RequestMessage struct {
+	Repository string // max:64
+	Name       string // max:8192
+	Offset     uint64
+	Size       uint32
+}
+
+type ResponseMessage struct {
+	Data []byte
+}
+
+type ClusterConfigMessage struct {
+	ClientName    string       // max:64
+	ClientVersion string       // max:64
+	Repositories  []Repository // max:64
+	Options       []Option     // max:64
+}
+
+func (o *ClusterConfigMessage) GetOption(key string) string {
+	for _, option := range o.Options {
+		if option.Key == key {
+			return option.Value
+		}
+	}
+	return ""
+}
+
+type Repository struct {
+	ID    string // max:64
+	Nodes []Node // max:64
+}
+
+type Node struct {
+	ID              []byte // max:32
+	Flags           uint32
+	MaxLocalVersion uint64
+}
+
+type Option struct {
+	Key   string // max:64
+	Value string // max:1024
+}
+
+type CloseMessage struct {
+	Reason string // max:1024
+}
+
+type EmptyMessage struct{}

+ 964 - 0
message_xdr.go

@@ -0,0 +1,964 @@
+// ************************************************************
+// This file is automatically generated by genxdr. Do not edit.
+// ************************************************************
+
+package protocol
+
+import (
+	"bytes"
+	"io"
+
+	"github.com/calmh/xdr"
+)
+
+/*
+
+IndexMessage Structure:
+
+ 0                   1                   2                   3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                     Length of Repository                      |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+/                                                               /
+\                 Repository (variable length)                  \
+/                                                               /
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                        Number of Files                        |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+/                                                               /
+\               Zero or more FileInfo Structures                \
+/                                                               /
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+
+struct IndexMessage {
+	string Repository<64>;
+	FileInfo Files<>;
+}
+
+*/
+
+func (o IndexMessage) EncodeXDR(w io.Writer) (int, error) {
+	var xw = xdr.NewWriter(w)
+	return o.encodeXDR(xw)
+}
+
+func (o IndexMessage) MarshalXDR() []byte {
+	return o.AppendXDR(make([]byte, 0, 128))
+}
+
+func (o IndexMessage) AppendXDR(bs []byte) []byte {
+	var aw = xdr.AppendWriter(bs)
+	var xw = xdr.NewWriter(&aw)
+	o.encodeXDR(xw)
+	return []byte(aw)
+}
+
+func (o IndexMessage) encodeXDR(xw *xdr.Writer) (int, error) {
+	if len(o.Repository) > 64 {
+		return xw.Tot(), xdr.ErrElementSizeExceeded
+	}
+	xw.WriteString(o.Repository)
+	xw.WriteUint32(uint32(len(o.Files)))
+	for i := range o.Files {
+		_, err := o.Files[i].encodeXDR(xw)
+		if err != nil {
+			return xw.Tot(), err
+		}
+	}
+	return xw.Tot(), xw.Error()
+}
+
+func (o *IndexMessage) DecodeXDR(r io.Reader) error {
+	xr := xdr.NewReader(r)
+	return o.decodeXDR(xr)
+}
+
+func (o *IndexMessage) UnmarshalXDR(bs []byte) error {
+	var br = bytes.NewReader(bs)
+	var xr = xdr.NewReader(br)
+	return o.decodeXDR(xr)
+}
+
+func (o *IndexMessage) decodeXDR(xr *xdr.Reader) error {
+	o.Repository = xr.ReadStringMax(64)
+	_FilesSize := int(xr.ReadUint32())
+	o.Files = make([]FileInfo, _FilesSize)
+	for i := range o.Files {
+		(&o.Files[i]).decodeXDR(xr)
+	}
+	return xr.Error()
+}
+
+/*
+
+FileInfo Structure:
+
+ 0                   1                   2                   3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                        Length of Name                         |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+/                                                               /
+\                    Name (variable length)                     \
+/                                                               /
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                             Flags                             |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                                                               |
++                      Modified (64 bits)                       +
+|                                                               |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                                                               |
++                       Version (64 bits)                       +
+|                                                               |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                                                               |
++                    Local Version (64 bits)                    +
+|                                                               |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                       Number of Blocks                        |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+/                                                               /
+\               Zero or more BlockInfo Structures               \
+/                                                               /
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+
+struct FileInfo {
+	string Name<8192>;
+	unsigned int Flags;
+	hyper Modified;
+	unsigned hyper Version;
+	unsigned hyper LocalVersion;
+	BlockInfo Blocks<>;
+}
+
+*/
+
+func (o FileInfo) EncodeXDR(w io.Writer) (int, error) {
+	var xw = xdr.NewWriter(w)
+	return o.encodeXDR(xw)
+}
+
+func (o FileInfo) MarshalXDR() []byte {
+	return o.AppendXDR(make([]byte, 0, 128))
+}
+
+func (o FileInfo) AppendXDR(bs []byte) []byte {
+	var aw = xdr.AppendWriter(bs)
+	var xw = xdr.NewWriter(&aw)
+	o.encodeXDR(xw)
+	return []byte(aw)
+}
+
+func (o FileInfo) encodeXDR(xw *xdr.Writer) (int, error) {
+	if len(o.Name) > 8192 {
+		return xw.Tot(), xdr.ErrElementSizeExceeded
+	}
+	xw.WriteString(o.Name)
+	xw.WriteUint32(o.Flags)
+	xw.WriteUint64(uint64(o.Modified))
+	xw.WriteUint64(o.Version)
+	xw.WriteUint64(o.LocalVersion)
+	xw.WriteUint32(uint32(len(o.Blocks)))
+	for i := range o.Blocks {
+		_, err := o.Blocks[i].encodeXDR(xw)
+		if err != nil {
+			return xw.Tot(), err
+		}
+	}
+	return xw.Tot(), xw.Error()
+}
+
+func (o *FileInfo) DecodeXDR(r io.Reader) error {
+	xr := xdr.NewReader(r)
+	return o.decodeXDR(xr)
+}
+
+func (o *FileInfo) UnmarshalXDR(bs []byte) error {
+	var br = bytes.NewReader(bs)
+	var xr = xdr.NewReader(br)
+	return o.decodeXDR(xr)
+}
+
+func (o *FileInfo) decodeXDR(xr *xdr.Reader) error {
+	o.Name = xr.ReadStringMax(8192)
+	o.Flags = xr.ReadUint32()
+	o.Modified = int64(xr.ReadUint64())
+	o.Version = xr.ReadUint64()
+	o.LocalVersion = xr.ReadUint64()
+	_BlocksSize := int(xr.ReadUint32())
+	o.Blocks = make([]BlockInfo, _BlocksSize)
+	for i := range o.Blocks {
+		(&o.Blocks[i]).decodeXDR(xr)
+	}
+	return xr.Error()
+}
+
+/*
+
+FileInfoTruncated Structure:
+
+ 0                   1                   2                   3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                        Length of Name                         |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+/                                                               /
+\                    Name (variable length)                     \
+/                                                               /
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                             Flags                             |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                                                               |
++                      Modified (64 bits)                       +
+|                                                               |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                                                               |
++                       Version (64 bits)                       +
+|                                                               |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                                                               |
++                    Local Version (64 bits)                    +
+|                                                               |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                          Num Blocks                           |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+
+struct FileInfoTruncated {
+	string Name<8192>;
+	unsigned int Flags;
+	hyper Modified;
+	unsigned hyper Version;
+	unsigned hyper LocalVersion;
+	unsigned int NumBlocks;
+}
+
+*/
+
+func (o FileInfoTruncated) EncodeXDR(w io.Writer) (int, error) {
+	var xw = xdr.NewWriter(w)
+	return o.encodeXDR(xw)
+}
+
+func (o FileInfoTruncated) MarshalXDR() []byte {
+	return o.AppendXDR(make([]byte, 0, 128))
+}
+
+func (o FileInfoTruncated) AppendXDR(bs []byte) []byte {
+	var aw = xdr.AppendWriter(bs)
+	var xw = xdr.NewWriter(&aw)
+	o.encodeXDR(xw)
+	return []byte(aw)
+}
+
+func (o FileInfoTruncated) encodeXDR(xw *xdr.Writer) (int, error) {
+	if len(o.Name) > 8192 {
+		return xw.Tot(), xdr.ErrElementSizeExceeded
+	}
+	xw.WriteString(o.Name)
+	xw.WriteUint32(o.Flags)
+	xw.WriteUint64(uint64(o.Modified))
+	xw.WriteUint64(o.Version)
+	xw.WriteUint64(o.LocalVersion)
+	xw.WriteUint32(o.NumBlocks)
+	return xw.Tot(), xw.Error()
+}
+
+func (o *FileInfoTruncated) DecodeXDR(r io.Reader) error {
+	xr := xdr.NewReader(r)
+	return o.decodeXDR(xr)
+}
+
+func (o *FileInfoTruncated) UnmarshalXDR(bs []byte) error {
+	var br = bytes.NewReader(bs)
+	var xr = xdr.NewReader(br)
+	return o.decodeXDR(xr)
+}
+
+func (o *FileInfoTruncated) decodeXDR(xr *xdr.Reader) error {
+	o.Name = xr.ReadStringMax(8192)
+	o.Flags = xr.ReadUint32()
+	o.Modified = int64(xr.ReadUint64())
+	o.Version = xr.ReadUint64()
+	o.LocalVersion = xr.ReadUint64()
+	o.NumBlocks = xr.ReadUint32()
+	return xr.Error()
+}
+
+/*
+
+BlockInfo Structure:
+
+ 0                   1                   2                   3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                             Size                              |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                        Length of Hash                         |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+/                                                               /
+\                    Hash (variable length)                     \
+/                                                               /
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+
+struct BlockInfo {
+	unsigned int Size;
+	opaque Hash<64>;
+}
+
+*/
+
+func (o BlockInfo) EncodeXDR(w io.Writer) (int, error) {
+	var xw = xdr.NewWriter(w)
+	return o.encodeXDR(xw)
+}
+
+func (o BlockInfo) MarshalXDR() []byte {
+	return o.AppendXDR(make([]byte, 0, 128))
+}
+
+func (o BlockInfo) AppendXDR(bs []byte) []byte {
+	var aw = xdr.AppendWriter(bs)
+	var xw = xdr.NewWriter(&aw)
+	o.encodeXDR(xw)
+	return []byte(aw)
+}
+
+func (o BlockInfo) encodeXDR(xw *xdr.Writer) (int, error) {
+	xw.WriteUint32(o.Size)
+	if len(o.Hash) > 64 {
+		return xw.Tot(), xdr.ErrElementSizeExceeded
+	}
+	xw.WriteBytes(o.Hash)
+	return xw.Tot(), xw.Error()
+}
+
+func (o *BlockInfo) DecodeXDR(r io.Reader) error {
+	xr := xdr.NewReader(r)
+	return o.decodeXDR(xr)
+}
+
+func (o *BlockInfo) UnmarshalXDR(bs []byte) error {
+	var br = bytes.NewReader(bs)
+	var xr = xdr.NewReader(br)
+	return o.decodeXDR(xr)
+}
+
+func (o *BlockInfo) decodeXDR(xr *xdr.Reader) error {
+	o.Size = xr.ReadUint32()
+	o.Hash = xr.ReadBytesMax(64)
+	return xr.Error()
+}
+
+/*
+
+RequestMessage Structure:
+
+ 0                   1                   2                   3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                     Length of Repository                      |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+/                                                               /
+\                 Repository (variable length)                  \
+/                                                               /
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                        Length of Name                         |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+/                                                               /
+\                    Name (variable length)                     \
+/                                                               /
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                                                               |
++                       Offset (64 bits)                        +
+|                                                               |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                             Size                              |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+
+struct RequestMessage {
+	string Repository<64>;
+	string Name<8192>;
+	unsigned hyper Offset;
+	unsigned int Size;
+}
+
+*/
+
+func (o RequestMessage) EncodeXDR(w io.Writer) (int, error) {
+	var xw = xdr.NewWriter(w)
+	return o.encodeXDR(xw)
+}
+
+func (o RequestMessage) MarshalXDR() []byte {
+	return o.AppendXDR(make([]byte, 0, 128))
+}
+
+func (o RequestMessage) AppendXDR(bs []byte) []byte {
+	var aw = xdr.AppendWriter(bs)
+	var xw = xdr.NewWriter(&aw)
+	o.encodeXDR(xw)
+	return []byte(aw)
+}
+
+func (o RequestMessage) encodeXDR(xw *xdr.Writer) (int, error) {
+	if len(o.Repository) > 64 {
+		return xw.Tot(), xdr.ErrElementSizeExceeded
+	}
+	xw.WriteString(o.Repository)
+	if len(o.Name) > 8192 {
+		return xw.Tot(), xdr.ErrElementSizeExceeded
+	}
+	xw.WriteString(o.Name)
+	xw.WriteUint64(o.Offset)
+	xw.WriteUint32(o.Size)
+	return xw.Tot(), xw.Error()
+}
+
+func (o *RequestMessage) DecodeXDR(r io.Reader) error {
+	xr := xdr.NewReader(r)
+	return o.decodeXDR(xr)
+}
+
+func (o *RequestMessage) UnmarshalXDR(bs []byte) error {
+	var br = bytes.NewReader(bs)
+	var xr = xdr.NewReader(br)
+	return o.decodeXDR(xr)
+}
+
+func (o *RequestMessage) decodeXDR(xr *xdr.Reader) error {
+	o.Repository = xr.ReadStringMax(64)
+	o.Name = xr.ReadStringMax(8192)
+	o.Offset = xr.ReadUint64()
+	o.Size = xr.ReadUint32()
+	return xr.Error()
+}
+
+/*
+
+ResponseMessage Structure:
+
+ 0                   1                   2                   3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                        Length of Data                         |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+/                                                               /
+\                    Data (variable length)                     \
+/                                                               /
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+
+struct ResponseMessage {
+	opaque Data<>;
+}
+
+*/
+
+func (o ResponseMessage) EncodeXDR(w io.Writer) (int, error) {
+	var xw = xdr.NewWriter(w)
+	return o.encodeXDR(xw)
+}
+
+func (o ResponseMessage) MarshalXDR() []byte {
+	return o.AppendXDR(make([]byte, 0, 128))
+}
+
+func (o ResponseMessage) AppendXDR(bs []byte) []byte {
+	var aw = xdr.AppendWriter(bs)
+	var xw = xdr.NewWriter(&aw)
+	o.encodeXDR(xw)
+	return []byte(aw)
+}
+
+func (o ResponseMessage) encodeXDR(xw *xdr.Writer) (int, error) {
+	xw.WriteBytes(o.Data)
+	return xw.Tot(), xw.Error()
+}
+
+func (o *ResponseMessage) DecodeXDR(r io.Reader) error {
+	xr := xdr.NewReader(r)
+	return o.decodeXDR(xr)
+}
+
+func (o *ResponseMessage) UnmarshalXDR(bs []byte) error {
+	var br = bytes.NewReader(bs)
+	var xr = xdr.NewReader(br)
+	return o.decodeXDR(xr)
+}
+
+func (o *ResponseMessage) decodeXDR(xr *xdr.Reader) error {
+	o.Data = xr.ReadBytes()
+	return xr.Error()
+}
+
+/*
+
+ClusterConfigMessage Structure:
+
+ 0                   1                   2                   3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                     Length of Client Name                     |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+/                                                               /
+\                 Client Name (variable length)                 \
+/                                                               /
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                   Length of Client Version                    |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+/                                                               /
+\               Client Version (variable length)                \
+/                                                               /
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                    Number of Repositories                     |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+/                                                               /
+\              Zero or more Repository Structures               \
+/                                                               /
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                       Number of Options                       |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+/                                                               /
+\                Zero or more Option Structures                 \
+/                                                               /
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+
+struct ClusterConfigMessage {
+	string ClientName<64>;
+	string ClientVersion<64>;
+	Repository Repositories<64>;
+	Option Options<64>;
+}
+
+*/
+
+func (o ClusterConfigMessage) EncodeXDR(w io.Writer) (int, error) {
+	var xw = xdr.NewWriter(w)
+	return o.encodeXDR(xw)
+}
+
+func (o ClusterConfigMessage) MarshalXDR() []byte {
+	return o.AppendXDR(make([]byte, 0, 128))
+}
+
+func (o ClusterConfigMessage) AppendXDR(bs []byte) []byte {
+	var aw = xdr.AppendWriter(bs)
+	var xw = xdr.NewWriter(&aw)
+	o.encodeXDR(xw)
+	return []byte(aw)
+}
+
+func (o ClusterConfigMessage) encodeXDR(xw *xdr.Writer) (int, error) {
+	if len(o.ClientName) > 64 {
+		return xw.Tot(), xdr.ErrElementSizeExceeded
+	}
+	xw.WriteString(o.ClientName)
+	if len(o.ClientVersion) > 64 {
+		return xw.Tot(), xdr.ErrElementSizeExceeded
+	}
+	xw.WriteString(o.ClientVersion)
+	if len(o.Repositories) > 64 {
+		return xw.Tot(), xdr.ErrElementSizeExceeded
+	}
+	xw.WriteUint32(uint32(len(o.Repositories)))
+	for i := range o.Repositories {
+		_, err := o.Repositories[i].encodeXDR(xw)
+		if err != nil {
+			return xw.Tot(), err
+		}
+	}
+	if len(o.Options) > 64 {
+		return xw.Tot(), xdr.ErrElementSizeExceeded
+	}
+	xw.WriteUint32(uint32(len(o.Options)))
+	for i := range o.Options {
+		_, err := o.Options[i].encodeXDR(xw)
+		if err != nil {
+			return xw.Tot(), err
+		}
+	}
+	return xw.Tot(), xw.Error()
+}
+
+func (o *ClusterConfigMessage) DecodeXDR(r io.Reader) error {
+	xr := xdr.NewReader(r)
+	return o.decodeXDR(xr)
+}
+
+func (o *ClusterConfigMessage) UnmarshalXDR(bs []byte) error {
+	var br = bytes.NewReader(bs)
+	var xr = xdr.NewReader(br)
+	return o.decodeXDR(xr)
+}
+
+func (o *ClusterConfigMessage) decodeXDR(xr *xdr.Reader) error {
+	o.ClientName = xr.ReadStringMax(64)
+	o.ClientVersion = xr.ReadStringMax(64)
+	_RepositoriesSize := int(xr.ReadUint32())
+	if _RepositoriesSize > 64 {
+		return xdr.ErrElementSizeExceeded
+	}
+	o.Repositories = make([]Repository, _RepositoriesSize)
+	for i := range o.Repositories {
+		(&o.Repositories[i]).decodeXDR(xr)
+	}
+	_OptionsSize := int(xr.ReadUint32())
+	if _OptionsSize > 64 {
+		return xdr.ErrElementSizeExceeded
+	}
+	o.Options = make([]Option, _OptionsSize)
+	for i := range o.Options {
+		(&o.Options[i]).decodeXDR(xr)
+	}
+	return xr.Error()
+}
+
+/*
+
+Repository Structure:
+
+ 0                   1                   2                   3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                         Length of ID                          |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+/                                                               /
+\                     ID (variable length)                      \
+/                                                               /
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                        Number of Nodes                        |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+/                                                               /
+\                 Zero or more Node Structures                  \
+/                                                               /
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+
+struct Repository {
+	string ID<64>;
+	Node Nodes<64>;
+}
+
+*/
+
+func (o Repository) EncodeXDR(w io.Writer) (int, error) {
+	var xw = xdr.NewWriter(w)
+	return o.encodeXDR(xw)
+}
+
+func (o Repository) MarshalXDR() []byte {
+	return o.AppendXDR(make([]byte, 0, 128))
+}
+
+func (o Repository) AppendXDR(bs []byte) []byte {
+	var aw = xdr.AppendWriter(bs)
+	var xw = xdr.NewWriter(&aw)
+	o.encodeXDR(xw)
+	return []byte(aw)
+}
+
+func (o Repository) encodeXDR(xw *xdr.Writer) (int, error) {
+	if len(o.ID) > 64 {
+		return xw.Tot(), xdr.ErrElementSizeExceeded
+	}
+	xw.WriteString(o.ID)
+	if len(o.Nodes) > 64 {
+		return xw.Tot(), xdr.ErrElementSizeExceeded
+	}
+	xw.WriteUint32(uint32(len(o.Nodes)))
+	for i := range o.Nodes {
+		_, err := o.Nodes[i].encodeXDR(xw)
+		if err != nil {
+			return xw.Tot(), err
+		}
+	}
+	return xw.Tot(), xw.Error()
+}
+
+func (o *Repository) DecodeXDR(r io.Reader) error {
+	xr := xdr.NewReader(r)
+	return o.decodeXDR(xr)
+}
+
+func (o *Repository) UnmarshalXDR(bs []byte) error {
+	var br = bytes.NewReader(bs)
+	var xr = xdr.NewReader(br)
+	return o.decodeXDR(xr)
+}
+
+func (o *Repository) decodeXDR(xr *xdr.Reader) error {
+	o.ID = xr.ReadStringMax(64)
+	_NodesSize := int(xr.ReadUint32())
+	if _NodesSize > 64 {
+		return xdr.ErrElementSizeExceeded
+	}
+	o.Nodes = make([]Node, _NodesSize)
+	for i := range o.Nodes {
+		(&o.Nodes[i]).decodeXDR(xr)
+	}
+	return xr.Error()
+}
+
+/*
+
+Node Structure:
+
+ 0                   1                   2                   3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                         Length of ID                          |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+/                                                               /
+\                     ID (variable length)                      \
+/                                                               /
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                             Flags                             |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                                                               |
++                  Max Local Version (64 bits)                  +
+|                                                               |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+
+struct Node {
+	opaque ID<32>;
+	unsigned int Flags;
+	unsigned hyper MaxLocalVersion;
+}
+
+*/
+
+func (o Node) EncodeXDR(w io.Writer) (int, error) {
+	var xw = xdr.NewWriter(w)
+	return o.encodeXDR(xw)
+}
+
+func (o Node) MarshalXDR() []byte {
+	return o.AppendXDR(make([]byte, 0, 128))
+}
+
+func (o Node) AppendXDR(bs []byte) []byte {
+	var aw = xdr.AppendWriter(bs)
+	var xw = xdr.NewWriter(&aw)
+	o.encodeXDR(xw)
+	return []byte(aw)
+}
+
+func (o Node) encodeXDR(xw *xdr.Writer) (int, error) {
+	if len(o.ID) > 32 {
+		return xw.Tot(), xdr.ErrElementSizeExceeded
+	}
+	xw.WriteBytes(o.ID)
+	xw.WriteUint32(o.Flags)
+	xw.WriteUint64(o.MaxLocalVersion)
+	return xw.Tot(), xw.Error()
+}
+
+func (o *Node) DecodeXDR(r io.Reader) error {
+	xr := xdr.NewReader(r)
+	return o.decodeXDR(xr)
+}
+
+func (o *Node) UnmarshalXDR(bs []byte) error {
+	var br = bytes.NewReader(bs)
+	var xr = xdr.NewReader(br)
+	return o.decodeXDR(xr)
+}
+
+func (o *Node) decodeXDR(xr *xdr.Reader) error {
+	o.ID = xr.ReadBytesMax(32)
+	o.Flags = xr.ReadUint32()
+	o.MaxLocalVersion = xr.ReadUint64()
+	return xr.Error()
+}
+
+/*
+
+Option Structure:
+
+ 0                   1                   2                   3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                         Length of Key                         |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+/                                                               /
+\                     Key (variable length)                     \
+/                                                               /
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                        Length of Value                        |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+/                                                               /
+\                    Value (variable length)                    \
+/                                                               /
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+
+struct Option {
+	string Key<64>;
+	string Value<1024>;
+}
+
+*/
+
+func (o Option) EncodeXDR(w io.Writer) (int, error) {
+	var xw = xdr.NewWriter(w)
+	return o.encodeXDR(xw)
+}
+
+func (o Option) MarshalXDR() []byte {
+	return o.AppendXDR(make([]byte, 0, 128))
+}
+
+func (o Option) AppendXDR(bs []byte) []byte {
+	var aw = xdr.AppendWriter(bs)
+	var xw = xdr.NewWriter(&aw)
+	o.encodeXDR(xw)
+	return []byte(aw)
+}
+
+func (o Option) encodeXDR(xw *xdr.Writer) (int, error) {
+	if len(o.Key) > 64 {
+		return xw.Tot(), xdr.ErrElementSizeExceeded
+	}
+	xw.WriteString(o.Key)
+	if len(o.Value) > 1024 {
+		return xw.Tot(), xdr.ErrElementSizeExceeded
+	}
+	xw.WriteString(o.Value)
+	return xw.Tot(), xw.Error()
+}
+
+func (o *Option) DecodeXDR(r io.Reader) error {
+	xr := xdr.NewReader(r)
+	return o.decodeXDR(xr)
+}
+
+func (o *Option) UnmarshalXDR(bs []byte) error {
+	var br = bytes.NewReader(bs)
+	var xr = xdr.NewReader(br)
+	return o.decodeXDR(xr)
+}
+
+func (o *Option) decodeXDR(xr *xdr.Reader) error {
+	o.Key = xr.ReadStringMax(64)
+	o.Value = xr.ReadStringMax(1024)
+	return xr.Error()
+}
+
+/*
+
+CloseMessage Structure:
+
+ 0                   1                   2                   3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+|                       Length of Reason                        |
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+/                                                               /
+\                   Reason (variable length)                    \
+/                                                               /
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+
+struct CloseMessage {
+	string Reason<1024>;
+}
+
+*/
+
+func (o CloseMessage) EncodeXDR(w io.Writer) (int, error) {
+	var xw = xdr.NewWriter(w)
+	return o.encodeXDR(xw)
+}
+
+func (o CloseMessage) MarshalXDR() []byte {
+	return o.AppendXDR(make([]byte, 0, 128))
+}
+
+func (o CloseMessage) AppendXDR(bs []byte) []byte {
+	var aw = xdr.AppendWriter(bs)
+	var xw = xdr.NewWriter(&aw)
+	o.encodeXDR(xw)
+	return []byte(aw)
+}
+
+func (o CloseMessage) encodeXDR(xw *xdr.Writer) (int, error) {
+	if len(o.Reason) > 1024 {
+		return xw.Tot(), xdr.ErrElementSizeExceeded
+	}
+	xw.WriteString(o.Reason)
+	return xw.Tot(), xw.Error()
+}
+
+func (o *CloseMessage) DecodeXDR(r io.Reader) error {
+	xr := xdr.NewReader(r)
+	return o.decodeXDR(xr)
+}
+
+func (o *CloseMessage) UnmarshalXDR(bs []byte) error {
+	var br = bytes.NewReader(bs)
+	var xr = xdr.NewReader(br)
+	return o.decodeXDR(xr)
+}
+
+func (o *CloseMessage) decodeXDR(xr *xdr.Reader) error {
+	o.Reason = xr.ReadStringMax(1024)
+	return xr.Error()
+}
+
+/*
+
+EmptyMessage Structure:
+
+ 0                   1                   2                   3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+
+struct EmptyMessage {
+}
+
+*/
+
+func (o EmptyMessage) EncodeXDR(w io.Writer) (int, error) {
+	var xw = xdr.NewWriter(w)
+	return o.encodeXDR(xw)
+}
+
+func (o EmptyMessage) MarshalXDR() []byte {
+	return o.AppendXDR(make([]byte, 0, 128))
+}
+
+func (o EmptyMessage) AppendXDR(bs []byte) []byte {
+	var aw = xdr.AppendWriter(bs)
+	var xw = xdr.NewWriter(&aw)
+	o.encodeXDR(xw)
+	return []byte(aw)
+}
+
+func (o EmptyMessage) encodeXDR(xw *xdr.Writer) (int, error) {
+	return xw.Tot(), xw.Error()
+}
+
+func (o *EmptyMessage) DecodeXDR(r io.Reader) error {
+	xr := xdr.NewReader(r)
+	return o.decodeXDR(xr)
+}
+
+func (o *EmptyMessage) UnmarshalXDR(bs []byte) error {
+	var br = bytes.NewReader(bs)
+	var xr = xdr.NewReader(br)
+	return o.decodeXDR(xr)
+}
+
+func (o *EmptyMessage) decodeXDR(xr *xdr.Reader) error {
+	return xr.Error()
+}

+ 42 - 0
nativemodel_darwin.go

@@ -0,0 +1,42 @@
+// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
+// All rights reserved. Use of this source code is governed by an MIT-style
+// license that can be found in the LICENSE file.
+
+// +build darwin
+
+package protocol
+
+// Darwin uses NFD normalization
+
+import "code.google.com/p/go.text/unicode/norm"
+
+type nativeModel struct {
+	next Model
+}
+
+func (m nativeModel) Index(nodeID NodeID, repo string, files []FileInfo) {
+	for i := range files {
+		files[i].Name = norm.NFD.String(files[i].Name)
+	}
+	m.next.Index(nodeID, repo, files)
+}
+
+func (m nativeModel) IndexUpdate(nodeID NodeID, repo string, files []FileInfo) {
+	for i := range files {
+		files[i].Name = norm.NFD.String(files[i].Name)
+	}
+	m.next.IndexUpdate(nodeID, repo, files)
+}
+
+func (m nativeModel) Request(nodeID NodeID, repo string, name string, offset int64, size int) ([]byte, error) {
+	name = norm.NFD.String(name)
+	return m.next.Request(nodeID, repo, name, offset, size)
+}
+
+func (m nativeModel) ClusterConfig(nodeID NodeID, config ClusterConfigMessage) {
+	m.next.ClusterConfig(nodeID, config)
+}
+
+func (m nativeModel) Close(nodeID NodeID, err error) {
+	m.next.Close(nodeID, err)
+}

+ 33 - 0
nativemodel_unix.go

@@ -0,0 +1,33 @@
+// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
+// All rights reserved. Use of this source code is governed by an MIT-style
+// license that can be found in the LICENSE file.
+
+// +build !windows,!darwin
+
+package protocol
+
+// Normal Unixes uses NFC and slashes, which is the wire format.
+
+type nativeModel struct {
+	next Model
+}
+
+func (m nativeModel) Index(nodeID NodeID, repo string, files []FileInfo) {
+	m.next.Index(nodeID, repo, files)
+}
+
+func (m nativeModel) IndexUpdate(nodeID NodeID, repo string, files []FileInfo) {
+	m.next.IndexUpdate(nodeID, repo, files)
+}
+
+func (m nativeModel) Request(nodeID NodeID, repo string, name string, offset int64, size int) ([]byte, error) {
+	return m.next.Request(nodeID, repo, name, offset, size)
+}
+
+func (m nativeModel) ClusterConfig(nodeID NodeID, config ClusterConfigMessage) {
+	m.next.ClusterConfig(nodeID, config)
+}
+
+func (m nativeModel) Close(nodeID NodeID, err error) {
+	m.next.Close(nodeID, err)
+}

+ 72 - 0
nativemodel_windows.go

@@ -0,0 +1,72 @@
+// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
+// All rights reserved. Use of this source code is governed by an MIT-style
+// license that can be found in the LICENSE file.
+
+// +build windows
+
+package protocol
+
+// Windows uses backslashes as file separator and disallows a bunch of
+// characters in the filename
+
+import (
+	"path/filepath"
+	"strings"
+)
+
+var disallowedCharacters = string([]rune{
+	'<', '>', ':', '"', '|', '?', '*',
+	0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
+	11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
+	21, 22, 23, 24, 25, 26, 27, 28, 29, 30,
+	31,
+})
+
+type nativeModel struct {
+	next Model
+}
+
+func (m nativeModel) Index(nodeID NodeID, repo string, files []FileInfo) {
+	for i, f := range files {
+		if strings.ContainsAny(f.Name, disallowedCharacters) {
+			if f.IsDeleted() {
+				// Don't complain if the file is marked as deleted, since it
+				// can't possibly exist here anyway.
+				continue
+			}
+			files[i].Flags |= FlagInvalid
+			l.Warnf("File name %q contains invalid characters; marked as invalid.", f.Name)
+		}
+		files[i].Name = filepath.FromSlash(f.Name)
+	}
+	m.next.Index(nodeID, repo, files)
+}
+
+func (m nativeModel) IndexUpdate(nodeID NodeID, repo string, files []FileInfo) {
+	for i, f := range files {
+		if strings.ContainsAny(f.Name, disallowedCharacters) {
+			if f.IsDeleted() {
+				// Don't complain if the file is marked as deleted, since it
+				// can't possibly exist here anyway.
+				continue
+			}
+			files[i].Flags |= FlagInvalid
+			l.Warnf("File name %q contains invalid characters; marked as invalid.", f.Name)
+		}
+		files[i].Name = filepath.FromSlash(files[i].Name)
+	}
+	m.next.IndexUpdate(nodeID, repo, files)
+}
+
+func (m nativeModel) Request(nodeID NodeID, repo string, name string, offset int64, size int) ([]byte, error) {
+	name = filepath.FromSlash(name)
+	return m.next.Request(nodeID, repo, name, offset, size)
+}
+
+func (m nativeModel) ClusterConfig(nodeID NodeID, config ClusterConfigMessage) {
+	m.next.ClusterConfig(nodeID, config)
+}
+
+func (m nativeModel) Close(nodeID NodeID, err error) {
+	m.next.Close(nodeID, err)
+}

+ 159 - 0
nodeid.go

@@ -0,0 +1,159 @@
+// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
+// All rights reserved. Use of this source code is governed by an MIT-style
+// license that can be found in the LICENSE file.
+
+package protocol
+
+import (
+	"bytes"
+	"crypto/sha256"
+	"encoding/base32"
+	"errors"
+	"fmt"
+	"regexp"
+	"strings"
+
+	"github.com/syncthing/syncthing/internal/luhn"
+)
+
+type NodeID [32]byte
+
+var LocalNodeID = NodeID{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
+
+// NewNodeID generates a new node ID from the raw bytes of a certificate
+func NewNodeID(rawCert []byte) NodeID {
+	var n NodeID
+	hf := sha256.New()
+	hf.Write(rawCert)
+	hf.Sum(n[:0])
+	return n
+}
+
+func NodeIDFromString(s string) (NodeID, error) {
+	var n NodeID
+	err := n.UnmarshalText([]byte(s))
+	return n, err
+}
+
+func NodeIDFromBytes(bs []byte) NodeID {
+	var n NodeID
+	if len(bs) != len(n) {
+		panic("incorrect length of byte slice representing node ID")
+	}
+	copy(n[:], bs)
+	return n
+}
+
+// String returns the canonical string representation of the node ID
+func (n NodeID) String() string {
+	id := base32.StdEncoding.EncodeToString(n[:])
+	id = strings.Trim(id, "=")
+	id, err := luhnify(id)
+	if err != nil {
+		// Should never happen
+		panic(err)
+	}
+	id = chunkify(id)
+	return id
+}
+
+func (n NodeID) GoString() string {
+	return n.String()
+}
+
+func (n NodeID) Compare(other NodeID) int {
+	return bytes.Compare(n[:], other[:])
+}
+
+func (n NodeID) Equals(other NodeID) bool {
+	return bytes.Compare(n[:], other[:]) == 0
+}
+
+func (n *NodeID) MarshalText() ([]byte, error) {
+	return []byte(n.String()), nil
+}
+
+func (n *NodeID) UnmarshalText(bs []byte) error {
+	id := string(bs)
+	id = strings.Trim(id, "=")
+	id = strings.ToUpper(id)
+	id = untypeoify(id)
+	id = unchunkify(id)
+
+	var err error
+	switch len(id) {
+	case 56:
+		// New style, with check digits
+		id, err = unluhnify(id)
+		if err != nil {
+			return err
+		}
+		fallthrough
+	case 52:
+		// Old style, no check digits
+		dec, err := base32.StdEncoding.DecodeString(id + "====")
+		if err != nil {
+			return err
+		}
+		copy(n[:], dec)
+		return nil
+	default:
+		return errors.New("node ID invalid: incorrect length")
+	}
+}
+
+func luhnify(s string) (string, error) {
+	if len(s) != 52 {
+		panic("unsupported string length")
+	}
+
+	res := make([]string, 0, 4)
+	for i := 0; i < 4; i++ {
+		p := s[i*13 : (i+1)*13]
+		l, err := luhn.Base32.Generate(p)
+		if err != nil {
+			return "", err
+		}
+		res = append(res, fmt.Sprintf("%s%c", p, l))
+	}
+	return res[0] + res[1] + res[2] + res[3], nil
+}
+
+func unluhnify(s string) (string, error) {
+	if len(s) != 56 {
+		return "", fmt.Errorf("unsupported string length %d", len(s))
+	}
+
+	res := make([]string, 0, 4)
+	for i := 0; i < 4; i++ {
+		p := s[i*14 : (i+1)*14-1]
+		l, err := luhn.Base32.Generate(p)
+		if err != nil {
+			return "", err
+		}
+		if g := fmt.Sprintf("%s%c", p, l); g != s[i*14:(i+1)*14] {
+			return "", errors.New("check digit incorrect")
+		}
+		res = append(res, p)
+	}
+	return res[0] + res[1] + res[2] + res[3], nil
+}
+
+func chunkify(s string) string {
+	s = regexp.MustCompile("(.{7})").ReplaceAllString(s, "$1-")
+	s = strings.Trim(s, "-")
+	return s
+}
+
+func unchunkify(s string) string {
+	s = strings.Replace(s, "-", "", -1)
+	s = strings.Replace(s, " ", "", -1)
+	return s
+}
+
+func untypeoify(s string) string {
+	s = strings.Replace(s, "0", "O", -1)
+	s = strings.Replace(s, "1", "I", -1)
+	s = strings.Replace(s, "8", "B", -1)
+	return s
+}

+ 78 - 0
nodeid_test.go

@@ -0,0 +1,78 @@
+// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
+// All rights reserved. Use of this source code is governed by an MIT-style
+// license that can be found in the LICENSE file.
+
+package protocol
+
+import "testing"
+
+var formatted = "P56IOI7-MZJNU2Y-IQGDREY-DM2MGTI-MGL3BXN-PQ6W5BM-TBBZ4TJ-XZWICQ2"
+var formatCases = []string{
+	"P56IOI-7MZJNU-2IQGDR-EYDM2M-GTMGL3-BXNPQ6-W5BTBB-Z4TJXZ-WICQ",
+	"P56IOI-7MZJNU2Y-IQGDR-EYDM2M-GTI-MGL3-BXNPQ6-W5BM-TBB-Z4TJXZ-WICQ2",
+	"P56IOI7 MZJNU2I QGDREYD M2MGTMGL 3BXNPQ6W 5BTB BZ4T JXZWICQ",
+	"P56IOI7 MZJNU2Y IQGDREY DM2MGTI MGL3BXN PQ6W5BM TBBZ4TJ XZWICQ2",
+	"P56IOI7MZJNU2IQGDREYDM2MGTMGL3BXNPQ6W5BTBBZ4TJXZWICQ",
+	"p56ioi7mzjnu2iqgdreydm2mgtmgl3bxnpq6w5btbbz4tjxzwicq",
+	"P56IOI7MZJNU2YIQGDREYDM2MGTIMGL3BXNPQ6W5BMTBBZ4TJXZWICQ2",
+	"P561017MZJNU2YIQGDREYDM2MGTIMGL3BXNPQ6W5BMT88Z4TJXZWICQ2",
+	"p56ioi7mzjnu2yiqgdreydm2mgtimgl3bxnpq6w5bmtbbz4tjxzwicq2",
+	"p561017mzjnu2yiqgdreydm2mgtimgl3bxnpq6w5bmt88z4tjxzwicq2",
+}
+
+func TestFormatNodeID(t *testing.T) {
+	for i, tc := range formatCases {
+		var id NodeID
+		err := id.UnmarshalText([]byte(tc))
+		if err != nil {
+			t.Errorf("#%d UnmarshalText(%q); %v", i, tc, err)
+		} else if f := id.String(); f != formatted {
+			t.Errorf("#%d FormatNodeID(%q)\n\t%q !=\n\t%q", i, tc, f, formatted)
+		}
+	}
+}
+
+var validateCases = []struct {
+	s  string
+	ok bool
+}{
+	{"", false},
+	{"P56IOI7-MZJNU2Y-IQGDREY-DM2MGTI-MGL3BXN-PQ6W5BM-TBBZ4TJ-XZWICQ2", true},
+	{"P56IOI7-MZJNU2-IQGDREY-DM2MGT-MGL3BXN-PQ6W5B-TBBZ4TJ-XZWICQ", true},
+	{"P56IOI7 MZJNU2I QGDREYD M2MGTMGL 3BXNPQ6W 5BTB BZ4T JXZWICQ", true},
+	{"P56IOI7MZJNU2IQGDREYDM2MGTMGL3BXNPQ6W5BTBBZ4TJXZWICQ", true},
+	{"P56IOI7MZJNU2IQGDREYDM2MGTMGL3BXNPQ6W5BTBBZ4TJXZWICQCCCC", false},
+	{"p56ioi7mzjnu2iqgdreydm2mgtmgl3bxnpq6w5btbbz4tjxzwicq", true},
+	{"p56ioi7mzjnu2iqgdreydm2mgtmgl3bxnpq6w5btbbz4tjxzwicqCCCC", false},
+}
+
+func TestValidateNodeID(t *testing.T) {
+	for _, tc := range validateCases {
+		var id NodeID
+		err := id.UnmarshalText([]byte(tc.s))
+		if (err == nil && !tc.ok) || (err != nil && tc.ok) {
+			t.Errorf("ValidateNodeID(%q); %v != %v", tc.s, err, tc.ok)
+		}
+	}
+}
+
+func TestMarshallingNodeID(t *testing.T) {
+	n0 := NodeID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 10, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32}
+	n1 := NodeID{}
+	n2 := NodeID{}
+
+	bs, _ := n0.MarshalText()
+	n1.UnmarshalText(bs)
+	bs, _ = n1.MarshalText()
+	n2.UnmarshalText(bs)
+
+	if n2.String() != n0.String() {
+		t.Errorf("String marshalling error; %q != %q", n2.String(), n0.String())
+	}
+	if !n2.Equals(n0) {
+		t.Error("Equals error")
+	}
+	if n2.Compare(n0) != 0 {
+		t.Error("Compare error")
+	}
+}

+ 640 - 0
protocol.go

@@ -0,0 +1,640 @@
+// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
+// All rights reserved. Use of this source code is governed by an MIT-style
+// license that can be found in the LICENSE file.
+
+package protocol
+
+import (
+	"encoding/binary"
+	"encoding/hex"
+	"errors"
+	"fmt"
+	"io"
+	"sync"
+	"time"
+
+	lz4 "github.com/bkaradzic/go-lz4"
+)
+
+const (
+	BlockSize = 128 * 1024
+)
+
+const (
+	messageTypeClusterConfig = 0
+	messageTypeIndex         = 1
+	messageTypeRequest       = 2
+	messageTypeResponse      = 3
+	messageTypePing          = 4
+	messageTypePong          = 5
+	messageTypeIndexUpdate   = 6
+	messageTypeClose         = 7
+)
+
+const (
+	stateInitial = iota
+	stateCCRcvd
+	stateIdxRcvd
+)
+
+const (
+	FlagDeleted    uint32 = 1 << 12
+	FlagInvalid           = 1 << 13
+	FlagDirectory         = 1 << 14
+	FlagNoPermBits        = 1 << 15
+)
+
+const (
+	FlagShareTrusted  uint32 = 1 << 0
+	FlagShareReadOnly        = 1 << 1
+	FlagIntroducer           = 1 << 2
+	FlagShareBits            = 0x000000ff
+)
+
+var (
+	ErrClusterHash = fmt.Errorf("configuration error: mismatched cluster hash")
+	ErrClosed      = errors.New("connection closed")
+)
+
+type Model interface {
+	// An index was received from the peer node
+	Index(nodeID NodeID, repo string, files []FileInfo)
+	// An index update was received from the peer node
+	IndexUpdate(nodeID NodeID, repo string, files []FileInfo)
+	// A request was made by the peer node
+	Request(nodeID NodeID, repo string, name string, offset int64, size int) ([]byte, error)
+	// A cluster configuration message was received
+	ClusterConfig(nodeID NodeID, config ClusterConfigMessage)
+	// The peer node closed the connection
+	Close(nodeID NodeID, err error)
+}
+
+type Connection interface {
+	ID() NodeID
+	Name() string
+	Index(repo string, files []FileInfo) error
+	IndexUpdate(repo string, files []FileInfo) error
+	Request(repo string, name string, offset int64, size int) ([]byte, error)
+	ClusterConfig(config ClusterConfigMessage)
+	Statistics() Statistics
+}
+
+type rawConnection struct {
+	id       NodeID
+	name     string
+	receiver Model
+	state    int
+
+	cr *countingReader
+	cw *countingWriter
+
+	awaiting    [4096]chan asyncResult
+	awaitingMut sync.Mutex
+
+	idxMut sync.Mutex // ensures serialization of Index calls
+
+	nextID chan int
+	outbox chan hdrMsg
+	closed chan struct{}
+	once   sync.Once
+
+	compressionThreshold int // compress messages larger than this many bytes
+
+	rdbuf0 []byte // used & reused by readMessage
+	rdbuf1 []byte // used & reused by readMessage
+}
+
+type asyncResult struct {
+	val []byte
+	err error
+}
+
+type hdrMsg struct {
+	hdr header
+	msg encodable
+}
+
+type encodable interface {
+	AppendXDR([]byte) []byte
+}
+
+const (
+	pingTimeout  = 30 * time.Second
+	pingIdleTime = 60 * time.Second
+)
+
+func NewConnection(nodeID NodeID, reader io.Reader, writer io.Writer, receiver Model, name string, compress bool) Connection {
+	cr := &countingReader{Reader: reader}
+	cw := &countingWriter{Writer: writer}
+
+	compThres := 1<<31 - 1 // compression disabled
+	if compress {
+		compThres = 128 // compress messages that are 128 bytes long or larger
+	}
+	c := rawConnection{
+		id:                   nodeID,
+		name:                 name,
+		receiver:             nativeModel{receiver},
+		state:                stateInitial,
+		cr:                   cr,
+		cw:                   cw,
+		outbox:               make(chan hdrMsg),
+		nextID:               make(chan int),
+		closed:               make(chan struct{}),
+		compressionThreshold: compThres,
+	}
+
+	go c.readerLoop()
+	go c.writerLoop()
+	go c.pingerLoop()
+	go c.idGenerator()
+
+	return wireFormatConnection{&c}
+}
+
+func (c *rawConnection) ID() NodeID {
+	return c.id
+}
+
+func (c *rawConnection) Name() string {
+	return c.name
+}
+
+// Index writes the list of file information to the connected peer node
+func (c *rawConnection) Index(repo string, idx []FileInfo) error {
+	select {
+	case <-c.closed:
+		return ErrClosed
+	default:
+	}
+	c.idxMut.Lock()
+	c.send(-1, messageTypeIndex, IndexMessage{repo, idx})
+	c.idxMut.Unlock()
+	return nil
+}
+
+// IndexUpdate writes the list of file information to the connected peer node as an update
+func (c *rawConnection) IndexUpdate(repo string, idx []FileInfo) error {
+	select {
+	case <-c.closed:
+		return ErrClosed
+	default:
+	}
+	c.idxMut.Lock()
+	c.send(-1, messageTypeIndexUpdate, IndexMessage{repo, idx})
+	c.idxMut.Unlock()
+	return nil
+}
+
+// Request returns the bytes for the specified block after fetching them from the connected peer.
+func (c *rawConnection) Request(repo string, name string, offset int64, size int) ([]byte, error) {
+	var id int
+	select {
+	case id = <-c.nextID:
+	case <-c.closed:
+		return nil, ErrClosed
+	}
+
+	c.awaitingMut.Lock()
+	if ch := c.awaiting[id]; ch != nil {
+		panic("id taken")
+	}
+	rc := make(chan asyncResult, 1)
+	c.awaiting[id] = rc
+	c.awaitingMut.Unlock()
+
+	ok := c.send(id, messageTypeRequest, RequestMessage{repo, name, uint64(offset), uint32(size)})
+	if !ok {
+		return nil, ErrClosed
+	}
+
+	res, ok := <-rc
+	if !ok {
+		return nil, ErrClosed
+	}
+	return res.val, res.err
+}
+
+// ClusterConfig send the cluster configuration message to the peer and returns any error
+func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) {
+	c.send(-1, messageTypeClusterConfig, config)
+}
+
+func (c *rawConnection) ping() bool {
+	var id int
+	select {
+	case id = <-c.nextID:
+	case <-c.closed:
+		return false
+	}
+
+	rc := make(chan asyncResult, 1)
+	c.awaitingMut.Lock()
+	c.awaiting[id] = rc
+	c.awaitingMut.Unlock()
+
+	ok := c.send(id, messageTypePing, nil)
+	if !ok {
+		return false
+	}
+
+	res, ok := <-rc
+	return ok && res.err == nil
+}
+
+func (c *rawConnection) readerLoop() (err error) {
+	defer func() {
+		c.close(err)
+	}()
+
+	for {
+		select {
+		case <-c.closed:
+			return ErrClosed
+		default:
+		}
+
+		hdr, msg, err := c.readMessage()
+		if err != nil {
+			return err
+		}
+
+		switch hdr.msgType {
+		case messageTypeIndex:
+			if c.state < stateCCRcvd {
+				return fmt.Errorf("protocol error: index message in state %d", c.state)
+			}
+			c.handleIndex(msg.(IndexMessage))
+			c.state = stateIdxRcvd
+
+		case messageTypeIndexUpdate:
+			if c.state < stateIdxRcvd {
+				return fmt.Errorf("protocol error: index update message in state %d", c.state)
+			}
+			c.handleIndexUpdate(msg.(IndexMessage))
+
+		case messageTypeRequest:
+			if c.state < stateIdxRcvd {
+				return fmt.Errorf("protocol error: request message in state %d", c.state)
+			}
+			// Requests are handled asynchronously
+			go c.handleRequest(hdr.msgID, msg.(RequestMessage))
+
+		case messageTypeResponse:
+			if c.state < stateIdxRcvd {
+				return fmt.Errorf("protocol error: response message in state %d", c.state)
+			}
+			c.handleResponse(hdr.msgID, msg.(ResponseMessage))
+
+		case messageTypePing:
+			c.send(hdr.msgID, messageTypePong, EmptyMessage{})
+
+		case messageTypePong:
+			c.handlePong(hdr.msgID)
+
+		case messageTypeClusterConfig:
+			if c.state != stateInitial {
+				return fmt.Errorf("protocol error: cluster config message in state %d", c.state)
+			}
+			go c.receiver.ClusterConfig(c.id, msg.(ClusterConfigMessage))
+			c.state = stateCCRcvd
+
+		case messageTypeClose:
+			return errors.New(msg.(CloseMessage).Reason)
+
+		default:
+			return fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
+		}
+	}
+}
+
+func (c *rawConnection) readMessage() (hdr header, msg encodable, err error) {
+	if cap(c.rdbuf0) < 8 {
+		c.rdbuf0 = make([]byte, 8)
+	} else {
+		c.rdbuf0 = c.rdbuf0[:8]
+	}
+	_, err = io.ReadFull(c.cr, c.rdbuf0)
+	if err != nil {
+		return
+	}
+
+	hdr = decodeHeader(binary.BigEndian.Uint32(c.rdbuf0[0:4]))
+	msglen := int(binary.BigEndian.Uint32(c.rdbuf0[4:8]))
+
+	if debug {
+		l.Debugf("read header %v (msglen=%d)", hdr, msglen)
+	}
+
+	if cap(c.rdbuf0) < msglen {
+		c.rdbuf0 = make([]byte, msglen)
+	} else {
+		c.rdbuf0 = c.rdbuf0[:msglen]
+	}
+	_, err = io.ReadFull(c.cr, c.rdbuf0)
+	if err != nil {
+		return
+	}
+
+	if debug {
+		l.Debugf("read %d bytes", len(c.rdbuf0))
+	}
+
+	msgBuf := c.rdbuf0
+	if hdr.compression {
+		c.rdbuf1 = c.rdbuf1[:cap(c.rdbuf1)]
+		c.rdbuf1, err = lz4.Decode(c.rdbuf1, c.rdbuf0)
+		if err != nil {
+			return
+		}
+		msgBuf = c.rdbuf1
+		if debug {
+			l.Debugf("decompressed to %d bytes", len(msgBuf))
+		}
+	}
+
+	if debug {
+		if len(msgBuf) > 1024 {
+			l.Debugf("message data:\n%s", hex.Dump(msgBuf[:1024]))
+		} else {
+			l.Debugf("message data:\n%s", hex.Dump(msgBuf))
+		}
+	}
+
+	switch hdr.msgType {
+	case messageTypeIndex, messageTypeIndexUpdate:
+		var idx IndexMessage
+		err = idx.UnmarshalXDR(msgBuf)
+		msg = idx
+
+	case messageTypeRequest:
+		var req RequestMessage
+		err = req.UnmarshalXDR(msgBuf)
+		msg = req
+
+	case messageTypeResponse:
+		var resp ResponseMessage
+		err = resp.UnmarshalXDR(msgBuf)
+		msg = resp
+
+	case messageTypePing, messageTypePong:
+		msg = EmptyMessage{}
+
+	case messageTypeClusterConfig:
+		var cc ClusterConfigMessage
+		err = cc.UnmarshalXDR(msgBuf)
+		msg = cc
+
+	case messageTypeClose:
+		var cm CloseMessage
+		err = cm.UnmarshalXDR(msgBuf)
+		msg = cm
+
+	default:
+		err = fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
+	}
+
+	return
+}
+
+func (c *rawConnection) handleIndex(im IndexMessage) {
+	if debug {
+		l.Debugf("Index(%v, %v, %d files)", c.id, im.Repository, len(im.Files))
+	}
+	c.receiver.Index(c.id, im.Repository, im.Files)
+}
+
+func (c *rawConnection) handleIndexUpdate(im IndexMessage) {
+	if debug {
+		l.Debugf("queueing IndexUpdate(%v, %v, %d files)", c.id, im.Repository, len(im.Files))
+	}
+	c.receiver.IndexUpdate(c.id, im.Repository, im.Files)
+}
+
+func (c *rawConnection) handleRequest(msgID int, req RequestMessage) {
+	data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size))
+
+	c.send(msgID, messageTypeResponse, ResponseMessage{data})
+}
+
+func (c *rawConnection) handleResponse(msgID int, resp ResponseMessage) {
+	c.awaitingMut.Lock()
+	if rc := c.awaiting[msgID]; rc != nil {
+		c.awaiting[msgID] = nil
+		rc <- asyncResult{resp.Data, nil}
+		close(rc)
+	}
+	c.awaitingMut.Unlock()
+}
+
+func (c *rawConnection) handlePong(msgID int) {
+	c.awaitingMut.Lock()
+	if rc := c.awaiting[msgID]; rc != nil {
+		c.awaiting[msgID] = nil
+		rc <- asyncResult{}
+		close(rc)
+	}
+	c.awaitingMut.Unlock()
+}
+
+func (c *rawConnection) send(msgID int, msgType int, msg encodable) bool {
+	if msgID < 0 {
+		select {
+		case id := <-c.nextID:
+			msgID = id
+		case <-c.closed:
+			return false
+		}
+	}
+
+	hdr := header{
+		version: 0,
+		msgID:   msgID,
+		msgType: msgType,
+	}
+
+	select {
+	case c.outbox <- hdrMsg{hdr, msg}:
+		return true
+	case <-c.closed:
+		return false
+	}
+}
+
+func (c *rawConnection) writerLoop() {
+	var msgBuf = make([]byte, 8) // buffer for wire format message, kept and reused
+	var uncBuf []byte            // buffer for uncompressed message, kept and reused
+	for {
+		var tempBuf []byte
+		var err error
+
+		select {
+		case hm := <-c.outbox:
+			if hm.msg != nil {
+				// Uncompressed message in uncBuf
+				uncBuf = hm.msg.AppendXDR(uncBuf[:0])
+
+				if len(uncBuf) >= c.compressionThreshold {
+					// Use compression for large messages
+					hm.hdr.compression = true
+
+					// Make sure we have enough space for the compressed message plus header in msgBug
+					msgBuf = msgBuf[:cap(msgBuf)]
+					if maxLen := lz4.CompressBound(len(uncBuf)) + 8; maxLen > len(msgBuf) {
+						msgBuf = make([]byte, maxLen)
+					}
+
+					// Compressed is written to msgBuf, we keep tb for the length only
+					tempBuf, err = lz4.Encode(msgBuf[8:], uncBuf)
+					binary.BigEndian.PutUint32(msgBuf[4:8], uint32(len(tempBuf)))
+					msgBuf = msgBuf[0 : len(tempBuf)+8]
+
+					if debug {
+						l.Debugf("write compressed message; %v (len=%d)", hm.hdr, len(tempBuf))
+					}
+				} else {
+					// No point in compressing very short messages
+					hm.hdr.compression = false
+
+					msgBuf = msgBuf[:cap(msgBuf)]
+					if l := len(uncBuf) + 8; l > len(msgBuf) {
+						msgBuf = make([]byte, l)
+					}
+
+					binary.BigEndian.PutUint32(msgBuf[4:8], uint32(len(uncBuf)))
+					msgBuf = msgBuf[0 : len(uncBuf)+8]
+					copy(msgBuf[8:], uncBuf)
+
+					if debug {
+						l.Debugf("write uncompressed message; %v (len=%d)", hm.hdr, len(uncBuf))
+					}
+				}
+			} else {
+				if debug {
+					l.Debugf("write empty message; %v", hm.hdr)
+				}
+				binary.BigEndian.PutUint32(msgBuf[4:8], 0)
+				msgBuf = msgBuf[:8]
+			}
+
+			binary.BigEndian.PutUint32(msgBuf[0:4], encodeHeader(hm.hdr))
+
+			if err == nil {
+				var n int
+				n, err = c.cw.Write(msgBuf)
+				if debug {
+					l.Debugf("wrote %d bytes on the wire", n)
+				}
+			}
+			if err != nil {
+				c.close(err)
+				return
+			}
+		case <-c.closed:
+			return
+		}
+	}
+}
+
+func (c *rawConnection) close(err error) {
+	c.once.Do(func() {
+		close(c.closed)
+
+		c.awaitingMut.Lock()
+		for i, ch := range c.awaiting {
+			if ch != nil {
+				close(ch)
+				c.awaiting[i] = nil
+			}
+		}
+		c.awaitingMut.Unlock()
+
+		go c.receiver.Close(c.id, err)
+	})
+}
+
+func (c *rawConnection) idGenerator() {
+	nextID := 0
+	for {
+		nextID = (nextID + 1) & 0xfff
+		select {
+		case c.nextID <- nextID:
+		case <-c.closed:
+			return
+		}
+	}
+}
+
+func (c *rawConnection) pingerLoop() {
+	var rc = make(chan bool, 1)
+	ticker := time.Tick(pingIdleTime / 2)
+	for {
+		select {
+		case <-ticker:
+			if d := time.Since(c.cr.Last()); d < pingIdleTime {
+				if debug {
+					l.Debugln(c.id, "ping skipped after rd", d)
+				}
+				continue
+			}
+			if d := time.Since(c.cw.Last()); d < pingIdleTime {
+				if debug {
+					l.Debugln(c.id, "ping skipped after wr", d)
+				}
+				continue
+			}
+			go func() {
+				if debug {
+					l.Debugln(c.id, "ping ->")
+				}
+				rc <- c.ping()
+			}()
+			select {
+			case ok := <-rc:
+				if debug {
+					l.Debugln(c.id, "<- pong")
+				}
+				if !ok {
+					c.close(fmt.Errorf("ping failure"))
+				}
+			case <-time.After(pingTimeout):
+				c.close(fmt.Errorf("ping timeout"))
+			case <-c.closed:
+				return
+			}
+
+		case <-c.closed:
+			return
+		}
+	}
+}
+
+type Statistics struct {
+	At            time.Time
+	InBytesTotal  uint64
+	OutBytesTotal uint64
+}
+
+func (c *rawConnection) Statistics() Statistics {
+	return Statistics{
+		At:            time.Now(),
+		InBytesTotal:  c.cr.Tot(),
+		OutBytesTotal: c.cw.Tot(),
+	}
+}
+
+func IsDeleted(bits uint32) bool {
+	return bits&FlagDeleted != 0
+}
+
+func IsInvalid(bits uint32) bool {
+	return bits&FlagInvalid != 0
+}
+
+func IsDirectory(bits uint32) bool {
+	return bits&FlagDirectory != 0
+}
+
+func HasPermissionBits(bits uint32) bool {
+	return bits&FlagNoPermBits == 0
+}

+ 383 - 0
protocol_test.go

@@ -0,0 +1,383 @@
+// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
+// All rights reserved. Use of this source code is governed by an MIT-style
+// license that can be found in the LICENSE file.
+
+package protocol
+
+import (
+	"bytes"
+	"encoding/hex"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"os"
+	"reflect"
+	"testing"
+	"testing/quick"
+
+	"github.com/calmh/xdr"
+)
+
+var (
+	c0ID = NewNodeID([]byte{1})
+	c1ID = NewNodeID([]byte{2})
+)
+
+func TestHeaderFunctions(t *testing.T) {
+	f := func(ver, id, typ int) bool {
+		ver = int(uint(ver) % 16)
+		id = int(uint(id) % 4096)
+		typ = int(uint(typ) % 256)
+		h0 := header{version: ver, msgID: id, msgType: typ}
+		h1 := decodeHeader(encodeHeader(h0))
+		return h0 == h1
+	}
+	if err := quick.Check(f, nil); err != nil {
+		t.Error(err)
+	}
+}
+
+func TestHeaderLayout(t *testing.T) {
+	var e, a uint32
+
+	// Version are the first four bits
+	e = 0xf0000000
+	a = encodeHeader(header{version: 0xf})
+	if a != e {
+		t.Errorf("Header layout incorrect; %08x != %08x", a, e)
+	}
+
+	// Message ID are the following 12 bits
+	e = 0x0fff0000
+	a = encodeHeader(header{msgID: 0xfff})
+	if a != e {
+		t.Errorf("Header layout incorrect; %08x != %08x", a, e)
+	}
+
+	// Type are the last 8 bits before reserved
+	e = 0x0000ff00
+	a = encodeHeader(header{msgType: 0xff})
+	if a != e {
+		t.Errorf("Header layout incorrect; %08x != %08x", a, e)
+	}
+}
+
+func TestPing(t *testing.T) {
+	ar, aw := io.Pipe()
+	br, bw := io.Pipe()
+
+	c0 := NewConnection(c0ID, ar, bw, nil, "name", true).(wireFormatConnection).next.(*rawConnection)
+	c1 := NewConnection(c1ID, br, aw, nil, "name", true).(wireFormatConnection).next.(*rawConnection)
+
+	if ok := c0.ping(); !ok {
+		t.Error("c0 ping failed")
+	}
+	if ok := c1.ping(); !ok {
+		t.Error("c1 ping failed")
+	}
+}
+
+func TestPingErr(t *testing.T) {
+	e := errors.New("something broke")
+
+	for i := 0; i < 16; i++ {
+		for j := 0; j < 16; j++ {
+			m0 := newTestModel()
+			m1 := newTestModel()
+
+			ar, aw := io.Pipe()
+			br, bw := io.Pipe()
+			eaw := &ErrPipe{PipeWriter: *aw, max: i, err: e}
+			ebw := &ErrPipe{PipeWriter: *bw, max: j, err: e}
+
+			c0 := NewConnection(c0ID, ar, ebw, m0, "name", true).(wireFormatConnection).next.(*rawConnection)
+			NewConnection(c1ID, br, eaw, m1, "name", true)
+
+			res := c0.ping()
+			if (i < 8 || j < 8) && res {
+				t.Errorf("Unexpected ping success; i=%d, j=%d", i, j)
+			} else if (i >= 12 && j >= 12) && !res {
+				t.Errorf("Unexpected ping fail; i=%d, j=%d", i, j)
+			}
+		}
+	}
+}
+
+// func TestRequestResponseErr(t *testing.T) {
+// 	e := errors.New("something broke")
+
+// 	var pass bool
+// 	for i := 0; i < 48; i++ {
+// 		for j := 0; j < 38; j++ {
+// 			m0 := newTestModel()
+// 			m0.data = []byte("response data")
+// 			m1 := newTestModel()
+
+// 			ar, aw := io.Pipe()
+// 			br, bw := io.Pipe()
+// 			eaw := &ErrPipe{PipeWriter: *aw, max: i, err: e}
+// 			ebw := &ErrPipe{PipeWriter: *bw, max: j, err: e}
+
+// 			NewConnection(c0ID, ar, ebw, m0, nil)
+// 			c1 := NewConnection(c1ID, br, eaw, m1, nil).(wireFormatConnection).next.(*rawConnection)
+
+// 			d, err := c1.Request("default", "tn", 1234, 5678)
+// 			if err == e || err == ErrClosed {
+// 				t.Logf("Error at %d+%d bytes", i, j)
+// 				if !m1.isClosed() {
+// 					t.Fatal("c1 not closed")
+// 				}
+// 				if !m0.isClosed() {
+// 					t.Fatal("c0 not closed")
+// 				}
+// 				continue
+// 			}
+// 			if err != nil {
+// 				t.Fatal(err)
+// 			}
+// 			if string(d) != "response data" {
+// 				t.Fatalf("Incorrect response data %q", string(d))
+// 			}
+// 			if m0.repo != "default" {
+// 				t.Fatalf("Incorrect repo %q", m0.repo)
+// 			}
+// 			if m0.name != "tn" {
+// 				t.Fatalf("Incorrect name %q", m0.name)
+// 			}
+// 			if m0.offset != 1234 {
+// 				t.Fatalf("Incorrect offset %d", m0.offset)
+// 			}
+// 			if m0.size != 5678 {
+// 				t.Fatalf("Incorrect size %d", m0.size)
+// 			}
+// 			t.Logf("Pass at %d+%d bytes", i, j)
+// 			pass = true
+// 		}
+// 	}
+// 	if !pass {
+// 		t.Fatal("Never passed")
+// 	}
+// }
+
+func TestVersionErr(t *testing.T) {
+	m0 := newTestModel()
+	m1 := newTestModel()
+
+	ar, aw := io.Pipe()
+	br, bw := io.Pipe()
+
+	c0 := NewConnection(c0ID, ar, bw, m0, "name", true).(wireFormatConnection).next.(*rawConnection)
+	NewConnection(c1ID, br, aw, m1, "name", true)
+
+	w := xdr.NewWriter(c0.cw)
+	w.WriteUint32(encodeHeader(header{
+		version: 2,
+		msgID:   0,
+		msgType: 0,
+	}))
+	w.WriteUint32(0)
+
+	if !m1.isClosed() {
+		t.Error("Connection should close due to unknown version")
+	}
+}
+
+func TestTypeErr(t *testing.T) {
+	m0 := newTestModel()
+	m1 := newTestModel()
+
+	ar, aw := io.Pipe()
+	br, bw := io.Pipe()
+
+	c0 := NewConnection(c0ID, ar, bw, m0, "name", true).(wireFormatConnection).next.(*rawConnection)
+	NewConnection(c1ID, br, aw, m1, "name", true)
+
+	w := xdr.NewWriter(c0.cw)
+	w.WriteUint32(encodeHeader(header{
+		version: 0,
+		msgID:   0,
+		msgType: 42,
+	}))
+	w.WriteUint32(0)
+
+	if !m1.isClosed() {
+		t.Error("Connection should close due to unknown message type")
+	}
+}
+
+func TestClose(t *testing.T) {
+	m0 := newTestModel()
+	m1 := newTestModel()
+
+	ar, aw := io.Pipe()
+	br, bw := io.Pipe()
+
+	c0 := NewConnection(c0ID, ar, bw, m0, "name", true).(wireFormatConnection).next.(*rawConnection)
+	NewConnection(c1ID, br, aw, m1, "name", true)
+
+	c0.close(nil)
+
+	<-c0.closed
+	if !m0.isClosed() {
+		t.Fatal("Connection should be closed")
+	}
+
+	// None of these should panic, some should return an error
+
+	if c0.ping() {
+		t.Error("Ping should not return true")
+	}
+
+	c0.Index("default", nil)
+	c0.Index("default", nil)
+
+	if _, err := c0.Request("default", "foo", 0, 0); err == nil {
+		t.Error("Request should return an error")
+	}
+}
+
+func TestElementSizeExceededNested(t *testing.T) {
+	m := ClusterConfigMessage{
+		Repositories: []Repository{
+			{ID: "longstringlongstringlongstringinglongstringlongstringlonlongstringlongstringlon"},
+		},
+	}
+	_, err := m.EncodeXDR(ioutil.Discard)
+	if err == nil {
+		t.Errorf("ID length %d > max 64, but no error", len(m.Repositories[0].ID))
+	}
+}
+
+func TestMarshalIndexMessage(t *testing.T) {
+	var quickCfg = &quick.Config{MaxCountScale: 10}
+	if testing.Short() {
+		quickCfg = nil
+	}
+
+	f := func(m1 IndexMessage) bool {
+		for _, f := range m1.Files {
+			for i := range f.Blocks {
+				f.Blocks[i].Offset = 0
+				if len(f.Blocks[i].Hash) == 0 {
+					f.Blocks[i].Hash = nil
+				}
+			}
+		}
+
+		return testMarshal(t, "index", &m1, &IndexMessage{})
+	}
+
+	if err := quick.Check(f, quickCfg); err != nil {
+		t.Error(err)
+	}
+}
+
+func TestMarshalRequestMessage(t *testing.T) {
+	var quickCfg = &quick.Config{MaxCountScale: 10}
+	if testing.Short() {
+		quickCfg = nil
+	}
+
+	f := func(m1 RequestMessage) bool {
+		return testMarshal(t, "request", &m1, &RequestMessage{})
+	}
+
+	if err := quick.Check(f, quickCfg); err != nil {
+		t.Error(err)
+	}
+}
+
+func TestMarshalResponseMessage(t *testing.T) {
+	var quickCfg = &quick.Config{MaxCountScale: 10}
+	if testing.Short() {
+		quickCfg = nil
+	}
+
+	f := func(m1 ResponseMessage) bool {
+		if len(m1.Data) == 0 {
+			m1.Data = nil
+		}
+		return testMarshal(t, "response", &m1, &ResponseMessage{})
+	}
+
+	if err := quick.Check(f, quickCfg); err != nil {
+		t.Error(err)
+	}
+}
+
+func TestMarshalClusterConfigMessage(t *testing.T) {
+	var quickCfg = &quick.Config{MaxCountScale: 10}
+	if testing.Short() {
+		quickCfg = nil
+	}
+
+	f := func(m1 ClusterConfigMessage) bool {
+		return testMarshal(t, "clusterconfig", &m1, &ClusterConfigMessage{})
+	}
+
+	if err := quick.Check(f, quickCfg); err != nil {
+		t.Error(err)
+	}
+}
+
+func TestMarshalCloseMessage(t *testing.T) {
+	var quickCfg = &quick.Config{MaxCountScale: 10}
+	if testing.Short() {
+		quickCfg = nil
+	}
+
+	f := func(m1 CloseMessage) bool {
+		return testMarshal(t, "close", &m1, &CloseMessage{})
+	}
+
+	if err := quick.Check(f, quickCfg); err != nil {
+		t.Error(err)
+	}
+}
+
+type message interface {
+	EncodeXDR(io.Writer) (int, error)
+	DecodeXDR(io.Reader) error
+}
+
+func testMarshal(t *testing.T, prefix string, m1, m2 message) bool {
+	var buf bytes.Buffer
+
+	failed := func(bc []byte) {
+		bs, _ := json.MarshalIndent(m1, "", "  ")
+		ioutil.WriteFile(prefix+"-1.txt", bs, 0644)
+		bs, _ = json.MarshalIndent(m2, "", "  ")
+		ioutil.WriteFile(prefix+"-2.txt", bs, 0644)
+		if len(bc) > 0 {
+			f, _ := os.Create(prefix + "-data.txt")
+			fmt.Fprint(f, hex.Dump(bc))
+			f.Close()
+		}
+	}
+
+	_, err := m1.EncodeXDR(&buf)
+	if err == xdr.ErrElementSizeExceeded {
+		return true
+	}
+	if err != nil {
+		failed(nil)
+		t.Fatal(err)
+	}
+
+	bc := make([]byte, len(buf.Bytes()))
+	copy(bc, buf.Bytes())
+
+	err = m2.DecodeXDR(&buf)
+	if err != nil {
+		failed(bc)
+		t.Fatal(err)
+	}
+
+	ok := reflect.DeepEqual(m1, m2)
+	if !ok {
+		failed(bc)
+	}
+	return ok
+}

+ 58 - 0
wireformat.go

@@ -0,0 +1,58 @@
+// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
+// All rights reserved. Use of this source code is governed by an MIT-style
+// license that can be found in the LICENSE file.
+
+package protocol
+
+import (
+	"path/filepath"
+
+	"code.google.com/p/go.text/unicode/norm"
+)
+
+type wireFormatConnection struct {
+	next Connection
+}
+
+func (c wireFormatConnection) ID() NodeID {
+	return c.next.ID()
+}
+
+func (c wireFormatConnection) Name() string {
+	return c.next.Name()
+}
+
+func (c wireFormatConnection) Index(repo string, fs []FileInfo) error {
+	var myFs = make([]FileInfo, len(fs))
+	copy(myFs, fs)
+
+	for i := range fs {
+		myFs[i].Name = norm.NFC.String(filepath.ToSlash(myFs[i].Name))
+	}
+
+	return c.next.Index(repo, myFs)
+}
+
+func (c wireFormatConnection) IndexUpdate(repo string, fs []FileInfo) error {
+	var myFs = make([]FileInfo, len(fs))
+	copy(myFs, fs)
+
+	for i := range fs {
+		myFs[i].Name = norm.NFC.String(filepath.ToSlash(myFs[i].Name))
+	}
+
+	return c.next.IndexUpdate(repo, myFs)
+}
+
+func (c wireFormatConnection) Request(repo, name string, offset int64, size int) ([]byte, error) {
+	name = norm.NFC.String(filepath.ToSlash(name))
+	return c.next.Request(repo, name, offset, size)
+}
+
+func (c wireFormatConnection) ClusterConfig(config ClusterConfigMessage) {
+	c.next.ClusterConfig(config)
+}
+
+func (c wireFormatConnection) Statistics() Statistics {
+	return c.next.Statistics()
+}