lib/connections: Add KCP support (fixes #804)

GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/3489
This commit is contained in:
Audrius Butkevicius
2017-03-07 12:44:16 +00:00
committed by Jakob Borg
parent 151004d645
commit 0da0774ce4
181 changed files with 30946 additions and 106 deletions

22
vendor/github.com/xtaci/kcp-go/LICENSE generated vendored Normal file
View File

@@ -0,0 +1,22 @@
The MIT License (MIT)
Copyright (c) 2015 Daniel Fu
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

263
vendor/github.com/xtaci/kcp-go/crypt.go generated vendored Normal file
View File

@@ -0,0 +1,263 @@
package kcp
import (
"crypto/aes"
"crypto/cipher"
"crypto/des"
"crypto/sha1"
"golang.org/x/crypto/blowfish"
"golang.org/x/crypto/cast5"
"golang.org/x/crypto/pbkdf2"
"golang.org/x/crypto/salsa20"
"golang.org/x/crypto/tea"
"golang.org/x/crypto/twofish"
"golang.org/x/crypto/xtea"
)
var (
initialVector = []byte{167, 115, 79, 156, 18, 172, 27, 1, 164, 21, 242, 193, 252, 120, 230, 107}
saltxor = `sH3CIVoF#rWLtJo6`
)
// BlockCrypt defines encryption/decryption methods for a given byte slice.
// Notes on implementing: the data to be encrypted contains a builtin
// nonce at the first 16 bytes
type BlockCrypt interface {
// Encrypt encrypts the whole block in src into dst.
// Dst and src may point at the same memory.
Encrypt(dst, src []byte)
// Decrypt decrypts the whole block in src into dst.
// Dst and src may point at the same memory.
Decrypt(dst, src []byte)
}
type salsa20BlockCrypt struct {
key [32]byte
}
// NewSalsa20BlockCrypt https://en.wikipedia.org/wiki/Salsa20
func NewSalsa20BlockCrypt(key []byte) (BlockCrypt, error) {
c := new(salsa20BlockCrypt)
copy(c.key[:], key)
return c, nil
}
func (c *salsa20BlockCrypt) Encrypt(dst, src []byte) {
salsa20.XORKeyStream(dst[8:], src[8:], src[:8], &c.key)
copy(dst[:8], src[:8])
}
func (c *salsa20BlockCrypt) Decrypt(dst, src []byte) {
salsa20.XORKeyStream(dst[8:], src[8:], src[:8], &c.key)
copy(dst[:8], src[:8])
}
type twofishBlockCrypt struct {
encbuf []byte
decbuf []byte
block cipher.Block
}
// NewTwofishBlockCrypt https://en.wikipedia.org/wiki/Twofish
func NewTwofishBlockCrypt(key []byte) (BlockCrypt, error) {
c := new(twofishBlockCrypt)
block, err := twofish.NewCipher(key)
if err != nil {
return nil, err
}
c.block = block
c.encbuf = make([]byte, twofish.BlockSize)
c.decbuf = make([]byte, 2*twofish.BlockSize)
return c, nil
}
func (c *twofishBlockCrypt) Encrypt(dst, src []byte) { encrypt(c.block, dst, src, c.encbuf) }
func (c *twofishBlockCrypt) Decrypt(dst, src []byte) { decrypt(c.block, dst, src, c.decbuf) }
type tripleDESBlockCrypt struct {
encbuf []byte
decbuf []byte
block cipher.Block
}
// NewTripleDESBlockCrypt https://en.wikipedia.org/wiki/Triple_DES
func NewTripleDESBlockCrypt(key []byte) (BlockCrypt, error) {
c := new(tripleDESBlockCrypt)
block, err := des.NewTripleDESCipher(key)
if err != nil {
return nil, err
}
c.block = block
c.encbuf = make([]byte, des.BlockSize)
c.decbuf = make([]byte, 2*des.BlockSize)
return c, nil
}
func (c *tripleDESBlockCrypt) Encrypt(dst, src []byte) { encrypt(c.block, dst, src, c.encbuf) }
func (c *tripleDESBlockCrypt) Decrypt(dst, src []byte) { decrypt(c.block, dst, src, c.decbuf) }
type cast5BlockCrypt struct {
encbuf []byte
decbuf []byte
block cipher.Block
}
// NewCast5BlockCrypt https://en.wikipedia.org/wiki/CAST-128
func NewCast5BlockCrypt(key []byte) (BlockCrypt, error) {
c := new(cast5BlockCrypt)
block, err := cast5.NewCipher(key)
if err != nil {
return nil, err
}
c.block = block
c.encbuf = make([]byte, cast5.BlockSize)
c.decbuf = make([]byte, 2*cast5.BlockSize)
return c, nil
}
func (c *cast5BlockCrypt) Encrypt(dst, src []byte) { encrypt(c.block, dst, src, c.encbuf) }
func (c *cast5BlockCrypt) Decrypt(dst, src []byte) { decrypt(c.block, dst, src, c.decbuf) }
type blowfishBlockCrypt struct {
encbuf []byte
decbuf []byte
block cipher.Block
}
// NewBlowfishBlockCrypt https://en.wikipedia.org/wiki/Blowfish_(cipher)
func NewBlowfishBlockCrypt(key []byte) (BlockCrypt, error) {
c := new(blowfishBlockCrypt)
block, err := blowfish.NewCipher(key)
if err != nil {
return nil, err
}
c.block = block
c.encbuf = make([]byte, blowfish.BlockSize)
c.decbuf = make([]byte, 2*blowfish.BlockSize)
return c, nil
}
func (c *blowfishBlockCrypt) Encrypt(dst, src []byte) { encrypt(c.block, dst, src, c.encbuf) }
func (c *blowfishBlockCrypt) Decrypt(dst, src []byte) { decrypt(c.block, dst, src, c.decbuf) }
type aesBlockCrypt struct {
encbuf []byte
decbuf []byte
block cipher.Block
}
// NewAESBlockCrypt https://en.wikipedia.org/wiki/Advanced_Encryption_Standard
func NewAESBlockCrypt(key []byte) (BlockCrypt, error) {
c := new(aesBlockCrypt)
block, err := aes.NewCipher(key)
if err != nil {
return nil, err
}
c.block = block
c.encbuf = make([]byte, aes.BlockSize)
c.decbuf = make([]byte, 2*aes.BlockSize)
return c, nil
}
func (c *aesBlockCrypt) Encrypt(dst, src []byte) { encrypt(c.block, dst, src, c.encbuf) }
func (c *aesBlockCrypt) Decrypt(dst, src []byte) { decrypt(c.block, dst, src, c.decbuf) }
type teaBlockCrypt struct {
encbuf []byte
decbuf []byte
block cipher.Block
}
// NewTEABlockCrypt https://en.wikipedia.org/wiki/Tiny_Encryption_Algorithm
func NewTEABlockCrypt(key []byte) (BlockCrypt, error) {
c := new(teaBlockCrypt)
block, err := tea.NewCipherWithRounds(key, 16)
if err != nil {
return nil, err
}
c.block = block
c.encbuf = make([]byte, tea.BlockSize)
c.decbuf = make([]byte, 2*tea.BlockSize)
return c, nil
}
func (c *teaBlockCrypt) Encrypt(dst, src []byte) { encrypt(c.block, dst, src, c.encbuf) }
func (c *teaBlockCrypt) Decrypt(dst, src []byte) { decrypt(c.block, dst, src, c.decbuf) }
type xteaBlockCrypt struct {
encbuf []byte
decbuf []byte
block cipher.Block
}
// NewXTEABlockCrypt https://en.wikipedia.org/wiki/XTEA
func NewXTEABlockCrypt(key []byte) (BlockCrypt, error) {
c := new(xteaBlockCrypt)
block, err := xtea.NewCipher(key)
if err != nil {
return nil, err
}
c.block = block
c.encbuf = make([]byte, xtea.BlockSize)
c.decbuf = make([]byte, 2*xtea.BlockSize)
return c, nil
}
func (c *xteaBlockCrypt) Encrypt(dst, src []byte) { encrypt(c.block, dst, src, c.encbuf) }
func (c *xteaBlockCrypt) Decrypt(dst, src []byte) { decrypt(c.block, dst, src, c.decbuf) }
type simpleXORBlockCrypt struct {
xortbl []byte
}
// NewSimpleXORBlockCrypt simple xor with key expanding
func NewSimpleXORBlockCrypt(key []byte) (BlockCrypt, error) {
c := new(simpleXORBlockCrypt)
c.xortbl = pbkdf2.Key(key, []byte(saltxor), 32, mtuLimit, sha1.New)
return c, nil
}
func (c *simpleXORBlockCrypt) Encrypt(dst, src []byte) { xorBytes(dst, src, c.xortbl) }
func (c *simpleXORBlockCrypt) Decrypt(dst, src []byte) { xorBytes(dst, src, c.xortbl) }
type noneBlockCrypt struct{}
// NewNoneBlockCrypt does nothing but copying
func NewNoneBlockCrypt(key []byte) (BlockCrypt, error) {
return new(noneBlockCrypt), nil
}
func (c *noneBlockCrypt) Encrypt(dst, src []byte) { copy(dst, src) }
func (c *noneBlockCrypt) Decrypt(dst, src []byte) { copy(dst, src) }
// packet encryption with local CFB mode
func encrypt(block cipher.Block, dst, src, buf []byte) {
blocksize := block.BlockSize()
tbl := buf[:blocksize]
block.Encrypt(tbl, initialVector)
n := len(src) / blocksize
base := 0
for i := 0; i < n; i++ {
xorWords(dst[base:], src[base:], tbl)
block.Encrypt(tbl, dst[base:])
base += blocksize
}
xorBytes(dst[base:], src[base:], tbl)
}
func decrypt(block cipher.Block, dst, src, buf []byte) {
blocksize := block.BlockSize()
tbl := buf[:blocksize]
next := buf[blocksize:]
block.Encrypt(tbl, initialVector)
n := len(src) / blocksize
base := 0
for i := 0; i < n; i++ {
block.Encrypt(next, src[base:])
xorWords(dst[base:], src[base:], tbl)
tbl, next = next, tbl
base += blocksize
}
xorBytes(dst[base:], src[base:], tbl)
}

242
vendor/github.com/xtaci/kcp-go/fec.go generated vendored Normal file
View File

@@ -0,0 +1,242 @@
package kcp
import (
"encoding/binary"
"sync/atomic"
"github.com/xtaci/reedsolomon"
)
const (
fecHeaderSize = 6
fecHeaderSizePlus2 = fecHeaderSize + 2 // plus 2B data size
typeData = 0xf1
typeFEC = 0xf2
fecExpire = 30000 // 30s
)
type (
// FEC defines forward error correction for packets
FEC struct {
rx []fecPacket // ordered receive queue
rxlimit int // queue size limit
dataShards int
parityShards int
shardSize int
next uint32 // next seqid
enc reedsolomon.Encoder
shards [][]byte
shards2 [][]byte // for calcECC
shardsflag []bool
paws uint32 // Protect Against Wrapped Sequence numbers
lastCheck uint32
}
fecPacket struct {
seqid uint32
flag uint16
data []byte
ts uint32
}
)
func newFEC(rxlimit, dataShards, parityShards int) *FEC {
if dataShards <= 0 || parityShards <= 0 {
return nil
}
if rxlimit < dataShards+parityShards {
return nil
}
fec := new(FEC)
fec.rxlimit = rxlimit
fec.dataShards = dataShards
fec.parityShards = parityShards
fec.shardSize = dataShards + parityShards
fec.paws = (0xffffffff/uint32(fec.shardSize) - 1) * uint32(fec.shardSize)
enc, err := reedsolomon.New(dataShards, parityShards)
if err != nil {
return nil
}
fec.enc = enc
fec.shards = make([][]byte, fec.shardSize)
fec.shards2 = make([][]byte, fec.shardSize)
fec.shardsflag = make([]bool, fec.shardSize)
return fec
}
// decode a fec packet
func (fec *FEC) decode(data []byte) fecPacket {
var pkt fecPacket
pkt.seqid = binary.LittleEndian.Uint32(data)
pkt.flag = binary.LittleEndian.Uint16(data[4:])
pkt.ts = currentMs()
// allocate memory & copy
buf := xmitBuf.Get().([]byte)[:len(data)-6]
copy(buf, data[6:])
pkt.data = buf
return pkt
}
func (fec *FEC) markData(data []byte) {
binary.LittleEndian.PutUint32(data, fec.next)
binary.LittleEndian.PutUint16(data[4:], typeData)
fec.next++
}
func (fec *FEC) markFEC(data []byte) {
binary.LittleEndian.PutUint32(data, fec.next)
binary.LittleEndian.PutUint16(data[4:], typeFEC)
fec.next++
if fec.next >= fec.paws { // paws would only occurs in markFEC
fec.next = 0
}
}
// input a fec packet
func (fec *FEC) input(pkt fecPacket) (recovered [][]byte) {
// expiration
now := currentMs()
if now-fec.lastCheck >= fecExpire {
var rx []fecPacket
for k := range fec.rx {
if now-fec.rx[k].ts < fecExpire {
rx = append(rx, fec.rx[k])
} else {
xmitBuf.Put(fec.rx[k].data)
}
}
fec.rx = rx
fec.lastCheck = now
}
// insertion
n := len(fec.rx) - 1
insertIdx := 0
for i := n; i >= 0; i-- {
if pkt.seqid == fec.rx[i].seqid { // de-duplicate
xmitBuf.Put(pkt.data)
return nil
} else if pkt.seqid > fec.rx[i].seqid { // insertion
insertIdx = i + 1
break
}
}
// insert into ordered rx queue
if insertIdx == n+1 {
fec.rx = append(fec.rx, pkt)
} else {
fec.rx = append(fec.rx, fecPacket{})
copy(fec.rx[insertIdx+1:], fec.rx[insertIdx:])
fec.rx[insertIdx] = pkt
}
// shard range for current packet
shardBegin := pkt.seqid - pkt.seqid%uint32(fec.shardSize)
shardEnd := shardBegin + uint32(fec.shardSize) - 1
// max search range in ordered queue for current shard
searchBegin := insertIdx - int(pkt.seqid%uint32(fec.shardSize))
if searchBegin < 0 {
searchBegin = 0
}
searchEnd := searchBegin + fec.shardSize - 1
if searchEnd >= len(fec.rx) {
searchEnd = len(fec.rx) - 1
}
if searchEnd > searchBegin && searchEnd-searchBegin+1 >= fec.dataShards {
numshard := 0
numDataShard := 0
first := -1
maxlen := 0
shards := fec.shards
shardsflag := fec.shardsflag
for k := range fec.shards {
shards[k] = nil
shardsflag[k] = false
}
for i := searchBegin; i <= searchEnd; i++ {
seqid := fec.rx[i].seqid
if seqid > shardEnd {
break
} else if seqid >= shardBegin {
shards[seqid%uint32(fec.shardSize)] = fec.rx[i].data
shardsflag[seqid%uint32(fec.shardSize)] = true
numshard++
if fec.rx[i].flag == typeData {
numDataShard++
}
if numshard == 1 {
first = i
}
if len(fec.rx[i].data) > maxlen {
maxlen = len(fec.rx[i].data)
}
}
}
if numDataShard == fec.dataShards { // no lost
for i := first; i < first+numshard; i++ { // free
xmitBuf.Put(fec.rx[i].data)
}
copy(fec.rx[first:], fec.rx[first+numshard:])
for i := 0; i < numshard; i++ { // dereference
fec.rx[len(fec.rx)-1-i] = fecPacket{}
}
fec.rx = fec.rx[:len(fec.rx)-numshard]
} else if numshard >= fec.dataShards { // recoverable
for k := range shards {
if shards[k] != nil {
dlen := len(shards[k])
shards[k] = shards[k][:maxlen]
xorBytes(shards[k][dlen:], shards[k][dlen:], shards[k][dlen:])
}
}
if err := fec.enc.Reconstruct(shards); err == nil {
for k := range shards[:fec.dataShards] {
if !shardsflag[k] {
recovered = append(recovered, shards[k])
}
}
}
for i := first; i < first+numshard; i++ { // free
xmitBuf.Put(fec.rx[i].data)
}
copy(fec.rx[first:], fec.rx[first+numshard:])
for i := 0; i < numshard; i++ { // dereference
fec.rx[len(fec.rx)-1-i] = fecPacket{}
}
fec.rx = fec.rx[:len(fec.rx)-numshard]
}
}
// keep rxlimit
if len(fec.rx) > fec.rxlimit {
if fec.rx[0].flag == typeData { // record unrecoverable data
atomic.AddUint64(&DefaultSnmp.FECShortShards, 1)
}
xmitBuf.Put(fec.rx[0].data) // free
fec.rx[0].data = nil
fec.rx = fec.rx[1:]
}
return
}
func (fec *FEC) calcECC(data [][]byte, offset, maxlen int) (ecc [][]byte) {
if len(data) != fec.shardSize {
return nil
}
shards := fec.shards2
for k := range shards {
shards[k] = data[k][offset:maxlen]
}
if err := fec.enc.Encode(shards); err != nil {
return nil
}
return data[fec.dataShards:]
}

964
vendor/github.com/xtaci/kcp-go/kcp.go generated vendored Normal file
View File

@@ -0,0 +1,964 @@
// Package kcp - A Fast and Reliable ARQ Protocol
package kcp
import (
"encoding/binary"
"sync/atomic"
)
const (
IKCP_RTO_NDL = 30 // no delay min rto
IKCP_RTO_MIN = 100 // normal min rto
IKCP_RTO_DEF = 200
IKCP_RTO_MAX = 60000
IKCP_CMD_PUSH = 81 // cmd: push data
IKCP_CMD_ACK = 82 // cmd: ack
IKCP_CMD_WASK = 83 // cmd: window probe (ask)
IKCP_CMD_WINS = 84 // cmd: window size (tell)
IKCP_ASK_SEND = 1 // need to send IKCP_CMD_WASK
IKCP_ASK_TELL = 2 // need to send IKCP_CMD_WINS
IKCP_WND_SND = 32
IKCP_WND_RCV = 32
IKCP_MTU_DEF = 1400
IKCP_ACK_FAST = 3
IKCP_INTERVAL = 100
IKCP_OVERHEAD = 24
IKCP_DEADLINK = 20
IKCP_THRESH_INIT = 2
IKCP_THRESH_MIN = 2
IKCP_PROBE_INIT = 7000 // 7 secs to probe window size
IKCP_PROBE_LIMIT = 120000 // up to 120 secs to probe window
)
// Output is a closure which captures conn and calls conn.Write
type Output func(buf []byte, size int)
/* encode 8 bits unsigned int */
func ikcp_encode8u(p []byte, c byte) []byte {
p[0] = c
return p[1:]
}
/* decode 8 bits unsigned int */
func ikcp_decode8u(p []byte, c *byte) []byte {
*c = p[0]
return p[1:]
}
/* encode 16 bits unsigned int (lsb) */
func ikcp_encode16u(p []byte, w uint16) []byte {
binary.LittleEndian.PutUint16(p, w)
return p[2:]
}
/* decode 16 bits unsigned int (lsb) */
func ikcp_decode16u(p []byte, w *uint16) []byte {
*w = binary.LittleEndian.Uint16(p)
return p[2:]
}
/* encode 32 bits unsigned int (lsb) */
func ikcp_encode32u(p []byte, l uint32) []byte {
binary.LittleEndian.PutUint32(p, l)
return p[4:]
}
/* decode 32 bits unsigned int (lsb) */
func ikcp_decode32u(p []byte, l *uint32) []byte {
*l = binary.LittleEndian.Uint32(p)
return p[4:]
}
func _imin_(a, b uint32) uint32 {
if a <= b {
return a
} else {
return b
}
}
func _imax_(a, b uint32) uint32 {
if a >= b {
return a
} else {
return b
}
}
func _ibound_(lower, middle, upper uint32) uint32 {
return _imin_(_imax_(lower, middle), upper)
}
func _itimediff(later, earlier uint32) int32 {
return (int32)(later - earlier)
}
// Segment defines a KCP segment
type Segment struct {
conv uint32
cmd uint32
frg uint32
wnd uint32
ts uint32
sn uint32
una uint32
resendts uint32
rto uint32
fastack uint32
xmit uint32
data []byte
}
// encode a segment into buffer
func (seg *Segment) encode(ptr []byte) []byte {
ptr = ikcp_encode32u(ptr, seg.conv)
ptr = ikcp_encode8u(ptr, uint8(seg.cmd))
ptr = ikcp_encode8u(ptr, uint8(seg.frg))
ptr = ikcp_encode16u(ptr, uint16(seg.wnd))
ptr = ikcp_encode32u(ptr, seg.ts)
ptr = ikcp_encode32u(ptr, seg.sn)
ptr = ikcp_encode32u(ptr, seg.una)
ptr = ikcp_encode32u(ptr, uint32(len(seg.data)))
return ptr
}
// KCP defines a single KCP connection
type KCP struct {
conv, mtu, mss, state uint32
snd_una, snd_nxt, rcv_nxt uint32
ssthresh uint32
rx_rttval, rx_srtt, rx_rto, rx_minrto uint32
snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe uint32
interval, ts_flush, xmit uint32
nodelay, updated uint32
ts_probe, probe_wait uint32
dead_link, incr uint32
fastresend int32
nocwnd, stream int32
snd_queue []Segment
rcv_queue []Segment
snd_buf []Segment
rcv_buf []Segment
acklist []ackItem
buffer []byte
output Output
}
type ackItem struct {
sn uint32
ts uint32
}
// NewKCP create a new kcp control object, 'conv' must equal in two endpoint
// from the same connection.
func NewKCP(conv uint32, output Output) *KCP {
kcp := new(KCP)
kcp.conv = conv
kcp.snd_wnd = IKCP_WND_SND
kcp.rcv_wnd = IKCP_WND_RCV
kcp.rmt_wnd = IKCP_WND_RCV
kcp.mtu = IKCP_MTU_DEF
kcp.mss = kcp.mtu - IKCP_OVERHEAD
kcp.buffer = make([]byte, (kcp.mtu+IKCP_OVERHEAD)*3)
kcp.rx_rto = IKCP_RTO_DEF
kcp.rx_minrto = IKCP_RTO_MIN
kcp.interval = IKCP_INTERVAL
kcp.ts_flush = IKCP_INTERVAL
kcp.ssthresh = IKCP_THRESH_INIT
kcp.dead_link = IKCP_DEADLINK
kcp.output = output
return kcp
}
// newSegment creates a KCP segment
func (kcp *KCP) newSegment(size int) *Segment {
seg := new(Segment)
seg.data = xmitBuf.Get().([]byte)[:size]
return seg
}
// delSegment recycles a KCP segment
func (kcp *KCP) delSegment(seg *Segment) {
xmitBuf.Put(seg.data)
}
// PeekSize checks the size of next message in the recv queue
func (kcp *KCP) PeekSize() (length int) {
if len(kcp.rcv_queue) == 0 {
return -1
}
seg := &kcp.rcv_queue[0]
if seg.frg == 0 {
return len(seg.data)
}
if len(kcp.rcv_queue) < int(seg.frg+1) {
return -1
}
for k := range kcp.rcv_queue {
seg := &kcp.rcv_queue[k]
length += len(seg.data)
if seg.frg == 0 {
break
}
}
return
}
// Recv is user/upper level recv: returns size, returns below zero for EAGAIN
func (kcp *KCP) Recv(buffer []byte) (n int) {
if len(kcp.rcv_queue) == 0 {
return -1
}
peeksize := kcp.PeekSize()
if peeksize < 0 {
return -2
}
if peeksize > len(buffer) {
return -3
}
var fast_recover bool
if len(kcp.rcv_queue) >= int(kcp.rcv_wnd) {
fast_recover = true
}
// merge fragment
count := 0
for k := range kcp.rcv_queue {
seg := &kcp.rcv_queue[k]
copy(buffer, seg.data)
buffer = buffer[len(seg.data):]
n += len(seg.data)
count++
kcp.delSegment(seg)
if seg.frg == 0 {
break
}
}
kcp.rcv_queue = kcp.rcv_queue[count:]
// move available data from rcv_buf -> rcv_queue
count = 0
for k := range kcp.rcv_buf {
seg := &kcp.rcv_buf[k]
if seg.sn == kcp.rcv_nxt && len(kcp.rcv_queue) < int(kcp.rcv_wnd) {
kcp.rcv_nxt++
count++
} else {
break
}
}
kcp.rcv_queue = append(kcp.rcv_queue, kcp.rcv_buf[:count]...)
kcp.rcv_buf = kcp.rcv_buf[count:]
// fast recover
if len(kcp.rcv_queue) < int(kcp.rcv_wnd) && fast_recover {
// ready to send back IKCP_CMD_WINS in ikcp_flush
// tell remote my window size
kcp.probe |= IKCP_ASK_TELL
}
return
}
// Send is user/upper level send, returns below zero for error
func (kcp *KCP) Send(buffer []byte) int {
var count int
if len(buffer) == 0 {
return -1
}
// append to previous segment in streaming mode (if possible)
if kcp.stream != 0 {
n := len(kcp.snd_queue)
if n > 0 {
old := &kcp.snd_queue[n-1]
if len(old.data) < int(kcp.mss) {
capacity := int(kcp.mss) - len(old.data)
extend := capacity
if len(buffer) < capacity {
extend = len(buffer)
}
seg := kcp.newSegment(len(old.data) + extend)
seg.frg = 0
copy(seg.data, old.data)
copy(seg.data[len(old.data):], buffer)
buffer = buffer[extend:]
kcp.delSegment(old)
kcp.snd_queue[n-1] = *seg
}
}
if len(buffer) == 0 {
return 0
}
}
if len(buffer) <= int(kcp.mss) {
count = 1
} else {
count = (len(buffer) + int(kcp.mss) - 1) / int(kcp.mss)
}
if count > 255 {
return -2
}
if count == 0 {
count = 1
}
for i := 0; i < count; i++ {
var size int
if len(buffer) > int(kcp.mss) {
size = int(kcp.mss)
} else {
size = len(buffer)
}
seg := kcp.newSegment(size)
copy(seg.data, buffer[:size])
if kcp.stream == 0 { // message mode
seg.frg = uint32(count - i - 1)
} else { // stream mode
seg.frg = 0
}
kcp.snd_queue = append(kcp.snd_queue, *seg)
buffer = buffer[size:]
}
return 0
}
func (kcp *KCP) update_ack(rtt int32) {
// https://tools.ietf.org/html/rfc6298
var rto uint32
if kcp.rx_srtt == 0 {
kcp.rx_srtt = uint32(rtt)
kcp.rx_rttval = uint32(rtt) / 2
} else {
delta := rtt - int32(kcp.rx_srtt)
if delta < 0 {
delta = -delta
}
kcp.rx_rttval = (3*kcp.rx_rttval + uint32(delta)) / 4
kcp.rx_srtt = (7*kcp.rx_srtt + uint32(rtt)) / 8
if kcp.rx_srtt < 1 {
kcp.rx_srtt = 1
}
}
rto = kcp.rx_srtt + _imax_(kcp.interval, 4*kcp.rx_rttval)
kcp.rx_rto = _ibound_(kcp.rx_minrto, rto, IKCP_RTO_MAX)
}
func (kcp *KCP) shrink_buf() {
if len(kcp.snd_buf) > 0 {
seg := &kcp.snd_buf[0]
kcp.snd_una = seg.sn
} else {
kcp.snd_una = kcp.snd_nxt
}
}
func (kcp *KCP) parse_ack(sn uint32) {
if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
return
}
for k := range kcp.snd_buf {
seg := &kcp.snd_buf[k]
if sn == seg.sn {
kcp.delSegment(seg)
copy(kcp.snd_buf[k:], kcp.snd_buf[k+1:])
kcp.snd_buf[len(kcp.snd_buf)-1] = Segment{}
kcp.snd_buf = kcp.snd_buf[:len(kcp.snd_buf)-1]
break
}
if _itimediff(sn, seg.sn) < 0 {
break
}
}
}
func (kcp *KCP) parse_fastack(sn uint32) {
if _itimediff(sn, kcp.snd_una) < 0 || _itimediff(sn, kcp.snd_nxt) >= 0 {
return
}
for k := range kcp.snd_buf {
seg := &kcp.snd_buf[k]
if _itimediff(sn, seg.sn) < 0 {
break
} else if sn != seg.sn { // && kcp.current >= seg.ts+kcp.rx_srtt {
seg.fastack++
}
}
}
func (kcp *KCP) parse_una(una uint32) {
count := 0
for k := range kcp.snd_buf {
seg := &kcp.snd_buf[k]
if _itimediff(una, seg.sn) > 0 {
kcp.delSegment(seg)
count++
} else {
break
}
}
kcp.snd_buf = kcp.snd_buf[count:]
}
// ack append
func (kcp *KCP) ack_push(sn, ts uint32) {
kcp.acklist = append(kcp.acklist, ackItem{sn, ts})
}
func (kcp *KCP) parse_data(newseg *Segment) {
sn := newseg.sn
if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) >= 0 ||
_itimediff(sn, kcp.rcv_nxt) < 0 {
kcp.delSegment(newseg)
return
}
n := len(kcp.rcv_buf) - 1
insert_idx := 0
repeat := false
for i := n; i >= 0; i-- {
seg := &kcp.rcv_buf[i]
if seg.sn == sn {
repeat = true
atomic.AddUint64(&DefaultSnmp.RepeatSegs, 1)
break
}
if _itimediff(sn, seg.sn) > 0 {
insert_idx = i + 1
break
}
}
if !repeat {
if insert_idx == n+1 {
kcp.rcv_buf = append(kcp.rcv_buf, *newseg)
} else {
kcp.rcv_buf = append(kcp.rcv_buf, Segment{})
copy(kcp.rcv_buf[insert_idx+1:], kcp.rcv_buf[insert_idx:])
kcp.rcv_buf[insert_idx] = *newseg
}
} else {
kcp.delSegment(newseg)
}
// move available data from rcv_buf -> rcv_queue
count := 0
for k := range kcp.rcv_buf {
seg := &kcp.rcv_buf[k]
if seg.sn == kcp.rcv_nxt && len(kcp.rcv_queue) < int(kcp.rcv_wnd) {
kcp.rcv_nxt++
count++
} else {
break
}
}
kcp.rcv_queue = append(kcp.rcv_queue, kcp.rcv_buf[:count]...)
kcp.rcv_buf = kcp.rcv_buf[count:]
}
// Input when you received a low level packet (eg. UDP packet), call it
func (kcp *KCP) Input(data []byte, update_ack bool) int {
una := kcp.snd_una
if len(data) < IKCP_OVERHEAD {
return -1
}
var maxack uint32
var recentack uint32
var flag int
for {
var ts, sn, length, una, conv uint32
var wnd uint16
var cmd, frg uint8
if len(data) < int(IKCP_OVERHEAD) {
break
}
data = ikcp_decode32u(data, &conv)
if conv != kcp.conv {
return -1
}
data = ikcp_decode8u(data, &cmd)
data = ikcp_decode8u(data, &frg)
data = ikcp_decode16u(data, &wnd)
data = ikcp_decode32u(data, &ts)
data = ikcp_decode32u(data, &sn)
data = ikcp_decode32u(data, &una)
data = ikcp_decode32u(data, &length)
if len(data) < int(length) {
return -2
}
if cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK &&
cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS {
return -3
}
kcp.rmt_wnd = uint32(wnd)
kcp.parse_una(una)
kcp.shrink_buf()
if cmd == IKCP_CMD_ACK {
kcp.parse_ack(sn)
kcp.shrink_buf()
if flag == 0 {
flag = 1
maxack = sn
} else if _itimediff(sn, maxack) > 0 {
maxack = sn
}
recentack = ts
} else if cmd == IKCP_CMD_PUSH {
if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) < 0 {
kcp.ack_push(sn, ts)
if _itimediff(sn, kcp.rcv_nxt) >= 0 {
seg := kcp.newSegment(int(length))
seg.conv = conv
seg.cmd = uint32(cmd)
seg.frg = uint32(frg)
seg.wnd = uint32(wnd)
seg.ts = ts
seg.sn = sn
seg.una = una
copy(seg.data, data[:length])
kcp.parse_data(seg)
} else {
atomic.AddUint64(&DefaultSnmp.RepeatSegs, 1)
}
} else {
atomic.AddUint64(&DefaultSnmp.RepeatSegs, 1)
}
} else if cmd == IKCP_CMD_WASK {
// ready to send back IKCP_CMD_WINS in Ikcp_flush
// tell remote my window size
kcp.probe |= IKCP_ASK_TELL
} else if cmd == IKCP_CMD_WINS {
// do nothing
} else {
return -3
}
data = data[length:]
}
current := currentMs()
if flag != 0 && update_ack {
kcp.parse_fastack(maxack)
if _itimediff(current, recentack) >= 0 {
kcp.update_ack(_itimediff(current, recentack))
}
}
if _itimediff(kcp.snd_una, una) > 0 {
if kcp.cwnd < kcp.rmt_wnd {
mss := kcp.mss
if kcp.cwnd < kcp.ssthresh {
kcp.cwnd++
kcp.incr += mss
} else {
if kcp.incr < mss {
kcp.incr = mss
}
kcp.incr += (mss*mss)/kcp.incr + (mss / 16)
if (kcp.cwnd+1)*mss <= kcp.incr {
kcp.cwnd++
}
}
if kcp.cwnd > kcp.rmt_wnd {
kcp.cwnd = kcp.rmt_wnd
kcp.incr = kcp.rmt_wnd * mss
}
}
}
return 0
}
func (kcp *KCP) wnd_unused() int32 {
if len(kcp.rcv_queue) < int(kcp.rcv_wnd) {
return int32(int(kcp.rcv_wnd) - len(kcp.rcv_queue))
}
return 0
}
// flush pending data
func (kcp *KCP) flush() {
buffer := kcp.buffer
change := 0
lost := false
var seg Segment
seg.conv = kcp.conv
seg.cmd = IKCP_CMD_ACK
seg.wnd = uint32(kcp.wnd_unused())
seg.una = kcp.rcv_nxt
// flush acknowledges
ptr := buffer
for i, ack := range kcp.acklist {
size := len(buffer) - len(ptr)
if size+IKCP_OVERHEAD > int(kcp.mtu) {
kcp.output(buffer, size)
ptr = buffer
}
// filter jitters caused by bufferbloat
if ack.sn >= kcp.rcv_nxt || len(kcp.acklist)-1 == i {
seg.sn, seg.ts = ack.sn, ack.ts
ptr = seg.encode(ptr)
}
}
kcp.acklist = nil
current := currentMs()
// probe window size (if remote window size equals zero)
if kcp.rmt_wnd == 0 {
if kcp.probe_wait == 0 {
kcp.probe_wait = IKCP_PROBE_INIT
kcp.ts_probe = current + kcp.probe_wait
} else {
if _itimediff(current, kcp.ts_probe) >= 0 {
if kcp.probe_wait < IKCP_PROBE_INIT {
kcp.probe_wait = IKCP_PROBE_INIT
}
kcp.probe_wait += kcp.probe_wait / 2
if kcp.probe_wait > IKCP_PROBE_LIMIT {
kcp.probe_wait = IKCP_PROBE_LIMIT
}
kcp.ts_probe = current + kcp.probe_wait
kcp.probe |= IKCP_ASK_SEND
}
}
} else {
kcp.ts_probe = 0
kcp.probe_wait = 0
}
// flush window probing commands
if (kcp.probe & IKCP_ASK_SEND) != 0 {
seg.cmd = IKCP_CMD_WASK
size := len(buffer) - len(ptr)
if size+IKCP_OVERHEAD > int(kcp.mtu) {
kcp.output(buffer, size)
ptr = buffer
}
ptr = seg.encode(ptr)
}
// flush window probing commands
if (kcp.probe & IKCP_ASK_TELL) != 0 {
seg.cmd = IKCP_CMD_WINS
size := len(buffer) - len(ptr)
if size+IKCP_OVERHEAD > int(kcp.mtu) {
kcp.output(buffer, size)
ptr = buffer
}
ptr = seg.encode(ptr)
}
kcp.probe = 0
// calculate window size
cwnd := _imin_(kcp.snd_wnd, kcp.rmt_wnd)
if kcp.nocwnd == 0 {
cwnd = _imin_(kcp.cwnd, cwnd)
}
// sliding window, controlled by snd_nxt && sna_una+cwnd
count := 0
for k := range kcp.snd_queue {
if _itimediff(kcp.snd_nxt, kcp.snd_una+cwnd) >= 0 {
break
}
newseg := kcp.snd_queue[k]
newseg.conv = kcp.conv
newseg.cmd = IKCP_CMD_PUSH
newseg.wnd = seg.wnd
newseg.ts = current
newseg.sn = kcp.snd_nxt
newseg.una = kcp.rcv_nxt
newseg.resendts = newseg.ts
newseg.rto = kcp.rx_rto
kcp.snd_buf = append(kcp.snd_buf, newseg)
kcp.snd_nxt++
count++
kcp.snd_queue[k].data = nil
}
kcp.snd_queue = kcp.snd_queue[count:]
// flag pending data
hasPending := false
if count > 0 {
hasPending = true
}
// calculate resent
resent := uint32(kcp.fastresend)
if kcp.fastresend <= 0 {
resent = 0xffffffff
}
// flush data segments
var lostSegs, fastRetransSegs, earlyRetransSegs uint64
for k := range kcp.snd_buf {
current := currentMs()
segment := &kcp.snd_buf[k]
needsend := false
if segment.xmit == 0 {
needsend = true
segment.xmit++
segment.rto = kcp.rx_rto
segment.resendts = current + segment.rto
} else if _itimediff(current, segment.resendts) >= 0 {
needsend = true
segment.xmit++
kcp.xmit++
if kcp.nodelay == 0 {
segment.rto += kcp.rx_rto
} else {
segment.rto += kcp.rx_rto / 2
}
segment.resendts = current + segment.rto
lost = true
lostSegs++
} else if segment.fastack >= resent { // fast retransmit
lastsend := segment.resendts - segment.rto
if _itimediff(current, lastsend) >= int32(kcp.rx_rto/4) {
needsend = true
segment.xmit++
segment.fastack = 0
segment.resendts = current + segment.rto
change++
fastRetransSegs++
}
} else if segment.fastack > 0 && !hasPending { // early retransmit
lastsend := segment.resendts - segment.rto
if _itimediff(current, lastsend) >= int32(kcp.rx_rto/4) {
needsend = true
segment.xmit++
segment.fastack = 0
segment.resendts = current + segment.rto
change++
earlyRetransSegs++
}
}
if needsend {
segment.ts = current
segment.wnd = seg.wnd
segment.una = kcp.rcv_nxt
size := len(buffer) - len(ptr)
need := IKCP_OVERHEAD + len(segment.data)
if size+need > int(kcp.mtu) {
kcp.output(buffer, size)
ptr = buffer
}
ptr = segment.encode(ptr)
copy(ptr, segment.data)
ptr = ptr[len(segment.data):]
if segment.xmit >= kcp.dead_link {
kcp.state = 0xFFFFFFFF
}
}
}
atomic.AddUint64(&DefaultSnmp.RetransSegs, lostSegs+fastRetransSegs+earlyRetransSegs)
atomic.AddUint64(&DefaultSnmp.LostSegs, lostSegs)
atomic.AddUint64(&DefaultSnmp.EarlyRetransSegs, earlyRetransSegs)
atomic.AddUint64(&DefaultSnmp.FastRetransSegs, fastRetransSegs)
// flash remain segments
size := len(buffer) - len(ptr)
if size > 0 {
kcp.output(buffer, size)
}
// update ssthresh
// rate halving, https://tools.ietf.org/html/rfc6937
if change != 0 {
inflight := kcp.snd_nxt - kcp.snd_una
kcp.ssthresh = inflight / 2
if kcp.ssthresh < IKCP_THRESH_MIN {
kcp.ssthresh = IKCP_THRESH_MIN
}
kcp.cwnd = kcp.ssthresh + resent
kcp.incr = kcp.cwnd * kcp.mss
}
// congestion control, https://tools.ietf.org/html/rfc5681
if lost {
kcp.ssthresh = cwnd / 2
if kcp.ssthresh < IKCP_THRESH_MIN {
kcp.ssthresh = IKCP_THRESH_MIN
}
kcp.cwnd = 1
kcp.incr = kcp.mss
}
if kcp.cwnd < 1 {
kcp.cwnd = 1
kcp.incr = kcp.mss
}
}
// Update updates state (call it repeatedly, every 10ms-100ms), or you can ask
// ikcp_check when to call it again (without ikcp_input/_send calling).
// 'current' - current timestamp in millisec.
func (kcp *KCP) Update() {
var slap int32
current := currentMs()
if kcp.updated == 0 {
kcp.updated = 1
kcp.ts_flush = current
}
slap = _itimediff(current, kcp.ts_flush)
if slap >= 10000 || slap < -10000 {
kcp.ts_flush = current
slap = 0
}
if slap >= 0 {
kcp.ts_flush += kcp.interval
if _itimediff(current, kcp.ts_flush) >= 0 {
kcp.ts_flush = current + kcp.interval
}
kcp.flush()
}
}
// Check determines when should you invoke ikcp_update:
// returns when you should invoke ikcp_update in millisec, if there
// is no ikcp_input/_send calling. you can call ikcp_update in that
// time, instead of call update repeatly.
// Important to reduce unnacessary ikcp_update invoking. use it to
// schedule ikcp_update (eg. implementing an epoll-like mechanism,
// or optimize ikcp_update when handling massive kcp connections)
func (kcp *KCP) Check() uint32 {
current := currentMs()
ts_flush := kcp.ts_flush
tm_flush := int32(0x7fffffff)
tm_packet := int32(0x7fffffff)
minimal := uint32(0)
if kcp.updated == 0 {
return current
}
if _itimediff(current, ts_flush) >= 10000 ||
_itimediff(current, ts_flush) < -10000 {
ts_flush = current
}
if _itimediff(current, ts_flush) >= 0 {
return current
}
tm_flush = _itimediff(ts_flush, current)
for k := range kcp.snd_buf {
seg := &kcp.snd_buf[k]
diff := _itimediff(seg.resendts, current)
if diff <= 0 {
return current
}
if diff < tm_packet {
tm_packet = diff
}
}
minimal = uint32(tm_packet)
if tm_packet >= tm_flush {
minimal = uint32(tm_flush)
}
if minimal >= kcp.interval {
minimal = kcp.interval
}
return current + minimal
}
// SetMtu changes MTU size, default is 1400
func (kcp *KCP) SetMtu(mtu int) int {
if mtu < 50 || mtu < IKCP_OVERHEAD {
return -1
}
buffer := make([]byte, (mtu+IKCP_OVERHEAD)*3)
if buffer == nil {
return -2
}
kcp.mtu = uint32(mtu)
kcp.mss = kcp.mtu - IKCP_OVERHEAD
kcp.buffer = buffer
return 0
}
// NoDelay options
// fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
// nodelay: 0:disable(default), 1:enable
// interval: internal update timer interval in millisec, default is 100ms
// resend: 0:disable fast resend(default), 1:enable fast resend
// nc: 0:normal congestion control(default), 1:disable congestion control
func (kcp *KCP) NoDelay(nodelay, interval, resend, nc int) int {
if nodelay >= 0 {
kcp.nodelay = uint32(nodelay)
if nodelay != 0 {
kcp.rx_minrto = IKCP_RTO_NDL
} else {
kcp.rx_minrto = IKCP_RTO_MIN
}
}
if interval >= 0 {
if interval > 5000 {
interval = 5000
} else if interval < 10 {
interval = 10
}
kcp.interval = uint32(interval)
}
if resend >= 0 {
kcp.fastresend = int32(resend)
}
if nc >= 0 {
kcp.nocwnd = int32(nc)
}
return 0
}
// WndSize sets maximum window size: sndwnd=32, rcvwnd=32 by default
func (kcp *KCP) WndSize(sndwnd, rcvwnd int) int {
if sndwnd > 0 {
kcp.snd_wnd = uint32(sndwnd)
}
if rcvwnd > 0 {
kcp.rcv_wnd = uint32(rcvwnd)
}
return 0
}
// WaitSnd gets how many packet is waiting to be sent
func (kcp *KCP) WaitSnd() int {
return len(kcp.snd_buf) + len(kcp.snd_queue)
}

937
vendor/github.com/xtaci/kcp-go/sess.go generated vendored Normal file
View File

@@ -0,0 +1,937 @@
package kcp
import (
"crypto/rand"
"encoding/binary"
"hash/crc32"
"io"
"net"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"golang.org/x/net/ipv4"
)
type errTimeout struct {
error
}
func (errTimeout) Timeout() bool { return true }
func (errTimeout) Temporary() bool { return true }
func (errTimeout) Error() string { return "i/o timeout" }
const (
defaultWndSize = 128 // default window size, in packet
nonceSize = 16 // magic number
crcSize = 4 // 4bytes packet checksum
cryptHeaderSize = nonceSize + crcSize
mtuLimit = 2048
rxQueueLimit = 8192
rxFECMulti = 3 // FEC keeps rxFECMulti* (dataShard+parityShard) ordered packets in memory
defaultKeepAliveInterval = 10
)
const (
errBrokenPipe = "broken pipe"
errInvalidOperation = "invalid operation"
)
var (
xmitBuf sync.Pool
)
func init() {
xmitBuf.New = func() interface{} {
return make([]byte, mtuLimit)
}
}
type (
// UDPSession defines a KCP session implemented by UDP
UDPSession struct {
kcp *KCP // the core ARQ
l *Listener // point to server listener if it's a server socket
fec *FEC // forward error correction
conn net.PacketConn // the underlying packet socket
block BlockCrypt
remote net.Addr
rd time.Time // read deadline
wd time.Time // write deadline
sockbuff []byte // kcp receiving is based on packet, I turn it into stream
die chan struct{}
chReadEvent chan struct{}
chWriteEvent chan struct{}
chUDPOutput chan []byte
headerSize int
ackNoDelay bool
isClosed bool
keepAliveInterval int32
mu sync.Mutex
updateInterval int32
}
setReadBuffer interface {
SetReadBuffer(bytes int) error
}
setWriteBuffer interface {
SetWriteBuffer(bytes int) error
}
)
// newUDPSession create a new udp session for client or server
func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn net.PacketConn, remote net.Addr, block BlockCrypt) *UDPSession {
sess := new(UDPSession)
sess.chUDPOutput = make(chan []byte)
sess.die = make(chan struct{})
sess.chReadEvent = make(chan struct{}, 1)
sess.chWriteEvent = make(chan struct{}, 1)
sess.remote = remote
sess.conn = conn
sess.keepAliveInterval = defaultKeepAliveInterval
sess.l = l
sess.block = block
sess.fec = newFEC(rxFECMulti*(dataShards+parityShards), dataShards, parityShards)
// calculate header size
if sess.block != nil {
sess.headerSize += cryptHeaderSize
}
if sess.fec != nil {
sess.headerSize += fecHeaderSizePlus2
}
sess.kcp = NewKCP(conv, func(buf []byte, size int) {
if size >= IKCP_OVERHEAD {
ext := xmitBuf.Get().([]byte)[:sess.headerSize+size]
copy(ext[sess.headerSize:], buf)
select {
case sess.chUDPOutput <- ext:
case <-sess.die:
}
}
})
sess.kcp.WndSize(defaultWndSize, defaultWndSize)
sess.kcp.SetMtu(IKCP_MTU_DEF - sess.headerSize)
go sess.updateTask()
go sess.outputTask()
if sess.l == nil { // it's a client connection
go sess.readLoop()
atomic.AddUint64(&DefaultSnmp.ActiveOpens, 1)
} else {
atomic.AddUint64(&DefaultSnmp.PassiveOpens, 1)
}
currestab := atomic.AddUint64(&DefaultSnmp.CurrEstab, 1)
maxconn := atomic.LoadUint64(&DefaultSnmp.MaxConn)
if currestab > maxconn {
atomic.CompareAndSwapUint64(&DefaultSnmp.MaxConn, maxconn, currestab)
}
return sess
}
// Read implements the Conn Read method.
func (s *UDPSession) Read(b []byte) (n int, err error) {
for {
s.mu.Lock()
if len(s.sockbuff) > 0 { // copy from buffer
n = copy(b, s.sockbuff)
s.sockbuff = s.sockbuff[n:]
s.mu.Unlock()
return n, nil
}
if s.isClosed {
s.mu.Unlock()
return 0, errors.New(errBrokenPipe)
}
if !s.rd.IsZero() {
if time.Now().After(s.rd) { // timeout
s.mu.Unlock()
return 0, errTimeout{}
}
}
if n := s.kcp.PeekSize(); n > 0 { // data arrived
if len(b) >= n {
s.kcp.Recv(b)
} else {
buf := make([]byte, n)
s.kcp.Recv(buf)
n = copy(b, buf)
s.sockbuff = buf[n:] // store remaining bytes into sockbuff for next read
}
s.mu.Unlock()
atomic.AddUint64(&DefaultSnmp.BytesReceived, uint64(n))
return n, nil
}
var timeout *time.Timer
var c <-chan time.Time
if !s.rd.IsZero() {
delay := s.rd.Sub(time.Now())
timeout = time.NewTimer(delay)
c = timeout.C
}
s.mu.Unlock()
// wait for read event or timeout
select {
case <-s.chReadEvent:
case <-c:
case <-s.die:
}
if timeout != nil {
timeout.Stop()
}
}
}
// Write implements the Conn Write method.
func (s *UDPSession) Write(b []byte) (n int, err error) {
for {
s.mu.Lock()
if s.isClosed {
s.mu.Unlock()
return 0, errors.New(errBrokenPipe)
}
if !s.wd.IsZero() {
if time.Now().After(s.wd) { // timeout
s.mu.Unlock()
return 0, errTimeout{}
}
}
if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
n = len(b)
max := s.kcp.mss << 8
for {
if len(b) <= int(max) { // in most cases
s.kcp.Send(b)
break
} else {
s.kcp.Send(b[:max])
b = b[max:]
}
}
s.kcp.flush()
s.mu.Unlock()
atomic.AddUint64(&DefaultSnmp.BytesSent, uint64(n))
return n, nil
}
var timeout *time.Timer
var c <-chan time.Time
if !s.wd.IsZero() {
delay := s.wd.Sub(time.Now())
timeout = time.NewTimer(delay)
c = timeout.C
}
s.mu.Unlock()
// wait for write event or timeout
select {
case <-s.chWriteEvent:
case <-c:
case <-s.die:
}
if timeout != nil {
timeout.Stop()
}
}
}
// Close closes the connection.
func (s *UDPSession) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.isClosed {
return errors.New(errBrokenPipe)
}
close(s.die)
s.isClosed = true
atomic.AddUint64(&DefaultSnmp.CurrEstab, ^uint64(0))
if s.l == nil { // client socket close
return s.conn.Close()
}
return nil
}
// LocalAddr returns the local network address. The Addr returned is shared by all invocations of LocalAddr, so do not modify it.
func (s *UDPSession) LocalAddr() net.Addr { return s.conn.LocalAddr() }
// RemoteAddr returns the remote network address. The Addr returned is shared by all invocations of RemoteAddr, so do not modify it.
func (s *UDPSession) RemoteAddr() net.Addr { return s.remote }
// SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline.
func (s *UDPSession) SetDeadline(t time.Time) error {
s.mu.Lock()
defer s.mu.Unlock()
s.rd = t
s.wd = t
return nil
}
// SetReadDeadline implements the Conn SetReadDeadline method.
func (s *UDPSession) SetReadDeadline(t time.Time) error {
s.mu.Lock()
defer s.mu.Unlock()
s.rd = t
return nil
}
// SetWriteDeadline implements the Conn SetWriteDeadline method.
func (s *UDPSession) SetWriteDeadline(t time.Time) error {
s.mu.Lock()
defer s.mu.Unlock()
s.wd = t
return nil
}
// SetWindowSize set maximum window size
func (s *UDPSession) SetWindowSize(sndwnd, rcvwnd int) {
s.mu.Lock()
defer s.mu.Unlock()
s.kcp.WndSize(sndwnd, rcvwnd)
}
// SetMtu sets the maximum transmission unit
func (s *UDPSession) SetMtu(mtu int) {
s.mu.Lock()
defer s.mu.Unlock()
s.kcp.SetMtu(mtu - s.headerSize)
}
// SetStreamMode toggles the stream mode on/off
func (s *UDPSession) SetStreamMode(enable bool) {
s.mu.Lock()
defer s.mu.Unlock()
if enable {
s.kcp.stream = 1
} else {
s.kcp.stream = 0
}
}
// SetACKNoDelay changes ack flush option, set true to flush ack immediately,
func (s *UDPSession) SetACKNoDelay(nodelay bool) {
s.mu.Lock()
defer s.mu.Unlock()
s.ackNoDelay = nodelay
}
// SetNoDelay calls nodelay() of kcp
func (s *UDPSession) SetNoDelay(nodelay, interval, resend, nc int) {
s.mu.Lock()
defer s.mu.Unlock()
s.kcp.NoDelay(nodelay, interval, resend, nc)
atomic.StoreInt32(&s.updateInterval, int32(interval))
}
// SetDSCP sets the 6bit DSCP field of IP header, no effect if it's accepted from Listener
func (s *UDPSession) SetDSCP(dscp int) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.l == nil {
if nc, ok := s.conn.(*ConnectedUDPConn); ok {
return ipv4.NewConn(nc.Conn).SetTOS(dscp << 2)
} else if nc, ok := s.conn.(net.Conn); ok {
return ipv4.NewConn(nc).SetTOS(dscp << 2)
}
}
return errors.New(errInvalidOperation)
}
// SetReadBuffer sets the socket read buffer, no effect if it's accepted from Listener
func (s *UDPSession) SetReadBuffer(bytes int) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.l == nil {
if nc, ok := s.conn.(setReadBuffer); ok {
return nc.SetReadBuffer(bytes)
}
}
return errors.New(errInvalidOperation)
}
// SetWriteBuffer sets the socket write buffer, no effect if it's accepted from Listener
func (s *UDPSession) SetWriteBuffer(bytes int) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.l == nil {
if nc, ok := s.conn.(setWriteBuffer); ok {
return nc.SetWriteBuffer(bytes)
}
}
return errors.New(errInvalidOperation)
}
// SetKeepAlive changes per-connection NAT keepalive interval; 0 to disable, default to 10s
func (s *UDPSession) SetKeepAlive(interval int) {
atomic.StoreInt32(&s.keepAliveInterval, int32(interval))
}
func (s *UDPSession) outputTask() {
// offset pre-compute
fecOffset := 0
if s.block != nil {
fecOffset = cryptHeaderSize
}
szOffset := fecOffset + fecHeaderSize
// fec data group
var cacheLine []byte
var fecGroup [][]byte
var fecCnt int
var fecMaxSize int
if s.fec != nil {
cacheLine = make([]byte, s.fec.shardSize*mtuLimit)
fecGroup = make([][]byte, s.fec.shardSize)
for k := range fecGroup {
fecGroup[k] = cacheLine[k*mtuLimit : (k+1)*mtuLimit]
}
}
// keepalive
var lastPing time.Time
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
// receive from a synchronous channel
// buffered channel must be avoided, because of "bufferbloat"
case ext := <-s.chUDPOutput:
var ecc [][]byte
if s.fec != nil {
s.fec.markData(ext[fecOffset:])
// explicit size, including 2bytes size itself.
binary.LittleEndian.PutUint16(ext[szOffset:], uint16(len(ext[szOffset:])))
// copy data to fec group
sz := len(ext)
fecGroup[fecCnt] = fecGroup[fecCnt][:sz]
copy(fecGroup[fecCnt], ext)
fecCnt++
if sz > fecMaxSize {
fecMaxSize = sz
}
// calculate Reed-Solomon Erasure Code
if fecCnt == s.fec.dataShards {
for i := 0; i < s.fec.dataShards; i++ {
shard := fecGroup[i]
slen := len(shard)
xorBytes(shard[slen:fecMaxSize], shard[slen:fecMaxSize], shard[slen:fecMaxSize])
}
ecc = s.fec.calcECC(fecGroup, szOffset, fecMaxSize)
for k := range ecc {
s.fec.markFEC(ecc[k][fecOffset:])
ecc[k] = ecc[k][:fecMaxSize]
}
fecCnt = 0
fecMaxSize = 0
}
}
if s.block != nil {
io.ReadFull(rand.Reader, ext[:nonceSize])
checksum := crc32.ChecksumIEEE(ext[cryptHeaderSize:])
binary.LittleEndian.PutUint32(ext[nonceSize:], checksum)
s.block.Encrypt(ext, ext)
if ecc != nil {
for k := range ecc {
io.ReadFull(rand.Reader, ecc[k][:nonceSize])
checksum := crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])
binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)
s.block.Encrypt(ecc[k], ecc[k])
}
}
}
nbytes := 0
nsegs := 0
// if mrand.Intn(100) < 50 {
if n, err := s.conn.WriteTo(ext, s.remote); err == nil {
nbytes += n
nsegs++
}
// }
if ecc != nil {
for k := range ecc {
if n, err := s.conn.WriteTo(ecc[k], s.remote); err == nil {
nbytes += n
nsegs++
}
}
}
atomic.AddUint64(&DefaultSnmp.OutSegs, uint64(nsegs))
atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes))
xmitBuf.Put(ext)
case <-ticker.C: // NAT keep-alive
interval := time.Duration(atomic.LoadInt32(&s.keepAliveInterval)) * time.Second
if interval > 0 && time.Now().After(lastPing.Add(interval)) {
var rnd uint16
binary.Read(rand.Reader, binary.LittleEndian, &rnd)
sz := int(rnd)%(IKCP_MTU_DEF-s.headerSize-IKCP_OVERHEAD) + s.headerSize + IKCP_OVERHEAD
ping := make([]byte, sz) // randomized ping packet
io.ReadFull(rand.Reader, ping)
s.conn.WriteTo(ping, s.remote)
lastPing = time.Now()
}
case <-s.die:
return
}
}
}
// kcp update, input loop
func (s *UDPSession) updateTask() {
tc := time.After(time.Duration(atomic.LoadInt32(&s.updateInterval)) * time.Millisecond)
for {
select {
case <-tc:
s.mu.Lock()
s.kcp.flush()
if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
s.notifyWriteEvent()
}
s.mu.Unlock()
tc = time.After(time.Duration(atomic.LoadInt32(&s.updateInterval)) * time.Millisecond)
case <-s.die:
if s.l != nil { // has listener
select {
case s.l.chDeadlinks <- s.remote:
case <-s.l.die:
}
}
return
}
}
}
// GetConv gets conversation id of a session
func (s *UDPSession) GetConv() uint32 {
return s.kcp.conv
}
func (s *UDPSession) notifyReadEvent() {
select {
case s.chReadEvent <- struct{}{}:
default:
}
}
func (s *UDPSession) notifyWriteEvent() {
select {
case s.chWriteEvent <- struct{}{}:
default:
}
}
func (s *UDPSession) kcpInput(data []byte) {
var kcpInErrors, fecErrs, fecRecovered, fecSegs uint64
if s.fec != nil {
f := s.fec.decode(data)
s.mu.Lock()
if f.flag == typeData {
if ret := s.kcp.Input(data[fecHeaderSizePlus2:], true); ret != 0 {
kcpInErrors++
}
}
if f.flag == typeData || f.flag == typeFEC {
if f.flag == typeFEC {
fecSegs++
}
if recovers := s.fec.input(f); recovers != nil {
for _, r := range recovers {
if len(r) >= 2 { // must be larger than 2bytes
sz := binary.LittleEndian.Uint16(r)
if int(sz) <= len(r) && sz >= 2 {
if ret := s.kcp.Input(r[2:sz], false); ret == 0 {
fecRecovered++
} else {
kcpInErrors++
}
} else {
fecErrs++
}
} else {
fecErrs++
}
}
}
}
// notify reader
if n := s.kcp.PeekSize(); n > 0 {
s.notifyReadEvent()
}
if s.ackNoDelay {
s.kcp.flush()
}
s.mu.Unlock()
} else {
s.mu.Lock()
if ret := s.kcp.Input(data, true); ret != 0 {
kcpInErrors++
}
// notify reader
if n := s.kcp.PeekSize(); n > 0 {
s.notifyReadEvent()
}
if s.ackNoDelay {
s.kcp.flush()
}
s.mu.Unlock()
}
atomic.AddUint64(&DefaultSnmp.InSegs, 1)
atomic.AddUint64(&DefaultSnmp.InBytes, uint64(len(data)))
if fecSegs > 0 {
atomic.AddUint64(&DefaultSnmp.FECSegs, fecSegs)
}
if kcpInErrors > 0 {
atomic.AddUint64(&DefaultSnmp.KCPInErrors, kcpInErrors)
}
if fecErrs > 0 {
atomic.AddUint64(&DefaultSnmp.FECErrs, fecErrs)
}
if fecRecovered > 0 {
atomic.AddUint64(&DefaultSnmp.FECRecovered, fecRecovered)
}
}
func (s *UDPSession) receiver(ch chan []byte) {
for {
data := xmitBuf.Get().([]byte)[:mtuLimit]
if n, _, err := s.conn.ReadFrom(data); err == nil && n >= s.headerSize+IKCP_OVERHEAD {
select {
case ch <- data[:n]:
case <-s.die:
}
} else if err != nil {
return
} else {
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
}
}
}
// read loop for client session
func (s *UDPSession) readLoop() {
chPacket := make(chan []byte, rxQueueLimit)
go s.receiver(chPacket)
for {
select {
case data := <-chPacket:
raw := data
dataValid := false
if s.block != nil {
s.block.Decrypt(data, data)
data = data[nonceSize:]
checksum := crc32.ChecksumIEEE(data[crcSize:])
if checksum == binary.LittleEndian.Uint32(data) {
data = data[crcSize:]
dataValid = true
} else {
atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
}
} else if s.block == nil {
dataValid = true
}
if dataValid {
s.kcpInput(data)
}
xmitBuf.Put(raw)
case <-s.die:
return
}
}
}
type (
// Listener defines a server listening for connections
Listener struct {
block BlockCrypt
dataShards, parityShards int
fec *FEC // for fec init test
conn net.PacketConn
sessions map[string]*UDPSession
chAccepts chan *UDPSession
chDeadlinks chan net.Addr
headerSize int
die chan struct{}
rxbuf sync.Pool
rd atomic.Value
wd atomic.Value
}
packet struct {
from net.Addr
data []byte
}
)
// monitor incoming data for all connections of server
func (l *Listener) monitor() {
chPacket := make(chan packet, rxQueueLimit)
go l.receiver(chPacket)
for {
select {
case p := <-chPacket:
raw := p.data
data := p.data
from := p.from
dataValid := false
if l.block != nil {
l.block.Decrypt(data, data)
data = data[nonceSize:]
checksum := crc32.ChecksumIEEE(data[crcSize:])
if checksum == binary.LittleEndian.Uint32(data) {
data = data[crcSize:]
dataValid = true
} else {
atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
}
} else if l.block == nil {
dataValid = true
}
if dataValid {
addr := from.String()
s, ok := l.sessions[addr]
if !ok { // new session
var conv uint32
convValid := false
if l.fec != nil {
isfec := binary.LittleEndian.Uint16(data[4:])
if isfec == typeData {
conv = binary.LittleEndian.Uint32(data[fecHeaderSizePlus2:])
convValid = true
}
} else {
conv = binary.LittleEndian.Uint32(data)
convValid = true
}
if convValid {
s := newUDPSession(conv, l.dataShards, l.parityShards, l, l.conn, from, l.block)
s.kcpInput(data)
l.sessions[addr] = s
l.chAccepts <- s
}
} else {
s.kcpInput(data)
}
}
l.rxbuf.Put(raw)
case deadlink := <-l.chDeadlinks:
delete(l.sessions, deadlink.String())
case <-l.die:
return
}
}
}
func (l *Listener) receiver(ch chan packet) {
for {
data := l.rxbuf.Get().([]byte)[:mtuLimit]
if n, from, err := l.conn.ReadFrom(data); err == nil && n >= l.headerSize+IKCP_OVERHEAD {
ch <- packet{from, data[:n]}
} else if err != nil {
return
} else {
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
}
}
}
// SetReadBuffer sets the socket read buffer for the Listener
func (l *Listener) SetReadBuffer(bytes int) error {
if nc, ok := l.conn.(setReadBuffer); ok {
return nc.SetReadBuffer(bytes)
}
return errors.New(errInvalidOperation)
}
// SetWriteBuffer sets the socket write buffer for the Listener
func (l *Listener) SetWriteBuffer(bytes int) error {
if nc, ok := l.conn.(setWriteBuffer); ok {
return nc.SetWriteBuffer(bytes)
}
return errors.New(errInvalidOperation)
}
// SetDSCP sets the 6bit DSCP field of IP header
func (l *Listener) SetDSCP(dscp int) error {
if nc, ok := l.conn.(net.Conn); ok {
return ipv4.NewConn(nc).SetTOS(dscp << 2)
}
return errors.New(errInvalidOperation)
}
// Accept implements the Accept method in the Listener interface; it waits for the next call and returns a generic Conn.
func (l *Listener) Accept() (net.Conn, error) {
return l.AcceptKCP()
}
// AcceptKCP accepts a KCP connection
func (l *Listener) AcceptKCP() (*UDPSession, error) {
var timeout <-chan time.Time
if tdeadline, ok := l.rd.Load().(time.Time); ok && !tdeadline.IsZero() {
timeout = time.After(tdeadline.Sub(time.Now()))
}
select {
case <-timeout:
return nil, &errTimeout{}
case c := <-l.chAccepts:
return c, nil
case <-l.die:
return nil, errors.New(errBrokenPipe)
}
}
// SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline.
func (l *Listener) SetDeadline(t time.Time) error {
l.SetReadDeadline(t)
l.SetWriteDeadline(t)
return nil
}
// SetReadDeadline implements the Conn SetReadDeadline method.
func (l *Listener) SetReadDeadline(t time.Time) error {
l.rd.Store(t)
return nil
}
// SetWriteDeadline implements the Conn SetWriteDeadline method.
func (l *Listener) SetWriteDeadline(t time.Time) error {
l.wd.Store(t)
return nil
}
// Close stops listening on the UDP address. Already Accepted connections are not closed.
func (l *Listener) Close() error {
close(l.die)
return l.conn.Close()
}
// Addr returns the listener's network address, The Addr returned is shared by all invocations of Addr, so do not modify it.
func (l *Listener) Addr() net.Addr {
return l.conn.LocalAddr()
}
// Listen listens for incoming KCP packets addressed to the local address laddr on the network "udp",
func Listen(laddr string) (net.Listener, error) {
return ListenWithOptions(laddr, nil, 0, 0)
}
// ListenWithOptions listens for incoming KCP packets addressed to the local address laddr on the network "udp" with packet encryption,
// dataShards, parityShards defines Reed-Solomon Erasure Coding parameters
func ListenWithOptions(laddr string, block BlockCrypt, dataShards, parityShards int) (*Listener, error) {
udpaddr, err := net.ResolveUDPAddr("udp", laddr)
if err != nil {
return nil, errors.Wrap(err, "net.ResolveUDPAddr")
}
conn, err := net.ListenUDP("udp", udpaddr)
if err != nil {
return nil, errors.Wrap(err, "net.ListenUDP")
}
return ServeConn(block, dataShards, parityShards, conn)
}
// ServeConn serves KCP protocol for a single packet connection.
func ServeConn(block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*Listener, error) {
l := new(Listener)
l.conn = conn
l.sessions = make(map[string]*UDPSession)
l.chAccepts = make(chan *UDPSession, 1024)
l.chDeadlinks = make(chan net.Addr, 1024)
l.die = make(chan struct{})
l.dataShards = dataShards
l.parityShards = parityShards
l.block = block
l.fec = newFEC(rxFECMulti*(dataShards+parityShards), dataShards, parityShards)
l.rxbuf.New = func() interface{} {
return make([]byte, mtuLimit)
}
// calculate header size
if l.block != nil {
l.headerSize += cryptHeaderSize
}
if l.fec != nil {
l.headerSize += fecHeaderSizePlus2
}
go l.monitor()
return l, nil
}
// Dial connects to the remote address "raddr" on the network "udp"
func Dial(raddr string) (net.Conn, error) {
return DialWithOptions(raddr, nil, 0, 0)
}
// DialWithOptions connects to the remote address "raddr" on the network "udp" with packet encryption
func DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards int) (*UDPSession, error) {
udpaddr, err := net.ResolveUDPAddr("udp", raddr)
if err != nil {
return nil, errors.Wrap(err, "net.ResolveUDPAddr")
}
udpconn, err := net.DialUDP("udp", nil, udpaddr)
if err != nil {
return nil, errors.Wrap(err, "net.DialUDP")
}
return NewConn(raddr, block, dataShards, parityShards, &ConnectedUDPConn{udpconn, udpconn})
}
// NewConn establishes a session and talks KCP protocol over a packet connection.
func NewConn(raddr string, block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*UDPSession, error) {
udpaddr, err := net.ResolveUDPAddr("udp", raddr)
if err != nil {
return nil, errors.Wrap(err, "net.ResolveUDPAddr")
}
var convid uint32
binary.Read(rand.Reader, binary.LittleEndian, &convid)
return newUDPSession(convid, dataShards, parityShards, nil, conn, udpaddr, block), nil
}
func currentMs() uint32 {
return uint32(time.Now().UnixNano() / int64(time.Millisecond))
}
// ConnectedUDPConn is a wrapper for net.UDPConn which converts WriteTo syscalls
// to Write syscalls that are 4 times faster on some OS'es. This should only be
// used for connections that were produced by a net.Dial* call.
type ConnectedUDPConn struct {
*net.UDPConn
Conn net.Conn // underlying connection if any
}
// WriteTo redirects all writes to the Write syscall, which is 4 times faster.
func (c *ConnectedUDPConn) WriteTo(b []byte, addr net.Addr) (int, error) {
return c.Write(b)
}

152
vendor/github.com/xtaci/kcp-go/snmp.go generated vendored Normal file
View File

@@ -0,0 +1,152 @@
package kcp
import (
"fmt"
"sync/atomic"
)
// Snmp defines network statistics indicator
type Snmp struct {
BytesSent uint64 // raw bytes sent
BytesReceived uint64
MaxConn uint64
ActiveOpens uint64
PassiveOpens uint64
CurrEstab uint64 // count of connections for now
InErrs uint64 // udp read errors
InCsumErrors uint64 // checksum errors from CRC32
KCPInErrors uint64 // packet iput errors from kcp
InSegs uint64
OutSegs uint64
InBytes uint64 // udp bytes received
OutBytes uint64 // udp bytes sent
RetransSegs uint64
FastRetransSegs uint64
EarlyRetransSegs uint64
LostSegs uint64 // number of segs infered as lost
RepeatSegs uint64 // number of segs duplicated
FECRecovered uint64 // correct packets recovered from FEC
FECErrs uint64 // incorrect packets recovered from FEC
FECSegs uint64 // FEC segments received
FECShortShards uint64 // number of data shards that's not enough for recovery
}
func newSnmp() *Snmp {
return new(Snmp)
}
func (s *Snmp) Header() []string {
return []string{
"BytesSent",
"BytesReceived",
"MaxConn",
"ActiveOpens",
"PassiveOpens",
"CurrEstab",
"InErrs",
"InCsumErrors",
"KCPInErrors",
"InSegs",
"OutSegs",
"InBytes",
"OutBytes",
"RetransSegs",
"FastRetransSegs",
"EarlyRetransSegs",
"LostSegs",
"RepeatSegs",
"FECSegs",
"FECErrs",
"FECRecovered",
"FECShortShards",
}
}
func (s *Snmp) ToSlice() []string {
snmp := s.Copy()
return []string{
fmt.Sprint(snmp.BytesSent),
fmt.Sprint(snmp.BytesReceived),
fmt.Sprint(snmp.MaxConn),
fmt.Sprint(snmp.ActiveOpens),
fmt.Sprint(snmp.PassiveOpens),
fmt.Sprint(snmp.CurrEstab),
fmt.Sprint(snmp.InErrs),
fmt.Sprint(snmp.InCsumErrors),
fmt.Sprint(snmp.KCPInErrors),
fmt.Sprint(snmp.InSegs),
fmt.Sprint(snmp.OutSegs),
fmt.Sprint(snmp.InBytes),
fmt.Sprint(snmp.OutBytes),
fmt.Sprint(snmp.RetransSegs),
fmt.Sprint(snmp.FastRetransSegs),
fmt.Sprint(snmp.EarlyRetransSegs),
fmt.Sprint(snmp.LostSegs),
fmt.Sprint(snmp.RepeatSegs),
fmt.Sprint(snmp.FECSegs),
fmt.Sprint(snmp.FECErrs),
fmt.Sprint(snmp.FECRecovered),
fmt.Sprint(snmp.FECShortShards),
}
}
// Copy make a copy of current snmp snapshot
func (s *Snmp) Copy() *Snmp {
d := newSnmp()
d.BytesSent = atomic.LoadUint64(&s.BytesSent)
d.BytesReceived = atomic.LoadUint64(&s.BytesReceived)
d.MaxConn = atomic.LoadUint64(&s.MaxConn)
d.ActiveOpens = atomic.LoadUint64(&s.ActiveOpens)
d.PassiveOpens = atomic.LoadUint64(&s.PassiveOpens)
d.CurrEstab = atomic.LoadUint64(&s.CurrEstab)
d.InErrs = atomic.LoadUint64(&s.InErrs)
d.InCsumErrors = atomic.LoadUint64(&s.InCsumErrors)
d.KCPInErrors = atomic.LoadUint64(&s.KCPInErrors)
d.InSegs = atomic.LoadUint64(&s.InSegs)
d.OutSegs = atomic.LoadUint64(&s.OutSegs)
d.InBytes = atomic.LoadUint64(&s.InBytes)
d.OutBytes = atomic.LoadUint64(&s.OutBytes)
d.RetransSegs = atomic.LoadUint64(&s.RetransSegs)
d.FastRetransSegs = atomic.LoadUint64(&s.FastRetransSegs)
d.EarlyRetransSegs = atomic.LoadUint64(&s.EarlyRetransSegs)
d.LostSegs = atomic.LoadUint64(&s.LostSegs)
d.RepeatSegs = atomic.LoadUint64(&s.RepeatSegs)
d.FECSegs = atomic.LoadUint64(&s.FECSegs)
d.FECErrs = atomic.LoadUint64(&s.FECErrs)
d.FECRecovered = atomic.LoadUint64(&s.FECRecovered)
d.FECShortShards = atomic.LoadUint64(&s.FECShortShards)
return d
}
// Reset values to zero
func (s *Snmp) Reset() {
atomic.StoreUint64(&s.BytesSent, 0)
atomic.StoreUint64(&s.BytesReceived, 0)
atomic.StoreUint64(&s.MaxConn, 0)
atomic.StoreUint64(&s.ActiveOpens, 0)
atomic.StoreUint64(&s.PassiveOpens, 0)
atomic.StoreUint64(&s.CurrEstab, 0)
atomic.StoreUint64(&s.InErrs, 0)
atomic.StoreUint64(&s.InCsumErrors, 0)
atomic.StoreUint64(&s.KCPInErrors, 0)
atomic.StoreUint64(&s.InSegs, 0)
atomic.StoreUint64(&s.OutSegs, 0)
atomic.StoreUint64(&s.InBytes, 0)
atomic.StoreUint64(&s.OutBytes, 0)
atomic.StoreUint64(&s.RetransSegs, 0)
atomic.StoreUint64(&s.FastRetransSegs, 0)
atomic.StoreUint64(&s.EarlyRetransSegs, 0)
atomic.StoreUint64(&s.LostSegs, 0)
atomic.StoreUint64(&s.RepeatSegs, 0)
atomic.StoreUint64(&s.FECSegs, 0)
atomic.StoreUint64(&s.FECErrs, 0)
atomic.StoreUint64(&s.FECRecovered, 0)
atomic.StoreUint64(&s.FECShortShards, 0)
}
// DefaultSnmp is the global KCP connection statistics collector
var DefaultSnmp *Snmp
func init() {
DefaultSnmp = newSnmp()
}

111
vendor/github.com/xtaci/kcp-go/xor.go generated vendored Normal file
View File

@@ -0,0 +1,111 @@
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package kcp
import (
"runtime"
"unsafe"
)
const wordSize = int(unsafe.Sizeof(uintptr(0)))
const supportsUnaligned = runtime.GOARCH == "386" || runtime.GOARCH == "amd64" || runtime.GOARCH == "ppc64" || runtime.GOARCH == "ppc64le" || runtime.GOARCH == "s390x"
// fastXORBytes xors in bulk. It only works on architectures that
// support unaligned read/writes.
func fastXORBytes(dst, a, b []byte) int {
n := len(a)
if len(b) < n {
n = len(b)
}
w := n / wordSize
if w > 0 {
wordBytes := w * wordSize
fastXORWords(dst[:wordBytes], a[:wordBytes], b[:wordBytes])
}
for i := (n - n%wordSize); i < n; i++ {
dst[i] = a[i] ^ b[i]
}
return n
}
func safeXORBytes(dst, a, b []byte) int {
n := len(a)
if len(b) < n {
n = len(b)
}
ex := n % 8
for i := 0; i < ex; i++ {
dst[i] = a[i] ^ b[i]
}
for i := ex; i < n; i += 8 {
_dst := dst[i : i+8]
_a := a[i : i+8]
_b := b[i : i+8]
_dst[0] = _a[0] ^ _b[0]
_dst[1] = _a[1] ^ _b[1]
_dst[2] = _a[2] ^ _b[2]
_dst[3] = _a[3] ^ _b[3]
_dst[4] = _a[4] ^ _b[4]
_dst[5] = _a[5] ^ _b[5]
_dst[6] = _a[6] ^ _b[6]
_dst[7] = _a[7] ^ _b[7]
}
return n
}
// xorBytes xors the bytes in a and b. The destination is assumed to have enough
// space. Returns the number of bytes xor'd.
func xorBytes(dst, a, b []byte) int {
if supportsUnaligned {
return fastXORBytes(dst, a, b)
} else {
// TODO(hanwen): if (dst, a, b) have common alignment
// we could still try fastXORBytes. It is not clear
// how often this happens, and it's only worth it if
// the block encryption itself is hardware
// accelerated.
return safeXORBytes(dst, a, b)
}
}
// fastXORWords XORs multiples of 4 or 8 bytes (depending on architecture.)
// The arguments are assumed to be of equal length.
func fastXORWords(dst, a, b []byte) {
dw := *(*[]uintptr)(unsafe.Pointer(&dst))
aw := *(*[]uintptr)(unsafe.Pointer(&a))
bw := *(*[]uintptr)(unsafe.Pointer(&b))
n := len(b) / wordSize
ex := n % 8
for i := 0; i < ex; i++ {
dw[i] = aw[i] ^ bw[i]
}
for i := ex; i < n; i += 8 {
_dw := dw[i : i+8]
_aw := aw[i : i+8]
_bw := bw[i : i+8]
_dw[0] = _aw[0] ^ _bw[0]
_dw[1] = _aw[1] ^ _bw[1]
_dw[2] = _aw[2] ^ _bw[2]
_dw[3] = _aw[3] ^ _bw[3]
_dw[4] = _aw[4] ^ _bw[4]
_dw[5] = _aw[5] ^ _bw[5]
_dw[6] = _aw[6] ^ _bw[6]
_dw[7] = _aw[7] ^ _bw[7]
}
}
func xorWords(dst, a, b []byte) {
if supportsUnaligned {
fastXORWords(dst, a, b)
} else {
safeXORBytes(dst, a, b)
}
}