vendor: Update github.com/xtaci/kcp

This commit is contained in:
Jakob Borg
2017-03-07 14:28:09 +01:00
parent 81af29e3e2
commit b3e2665a79
34 changed files with 686 additions and 3335 deletions

50
vendor/github.com/xtaci/kcp-go/emitter.go generated vendored Normal file
View File

@@ -0,0 +1,50 @@
package kcp
import (
"net"
"sync/atomic"
)
var defaultEmitter Emitter
const emitQueue = 8192
func init() {
defaultEmitter.init()
}
type (
emitPacket struct {
conn net.PacketConn
to net.Addr
data []byte
recycle bool
}
// Emitter is the global packet sender
Emitter struct {
ch chan emitPacket
}
)
func (e *Emitter) init() {
e.ch = make(chan emitPacket, emitQueue)
go e.emitTask()
}
// keepon writing packets to kernel
func (e *Emitter) emitTask() {
for p := range e.ch {
if n, err := p.conn.WriteTo(p.data, p.to); err == nil {
atomic.AddUint64(&DefaultSnmp.OutSegs, 1)
atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(n))
}
if p.recycle {
xmitBuf.Put(p.data)
}
}
}
func (e *Emitter) emit(p emitPacket) {
e.ch <- p
}

View File

@@ -4,7 +4,7 @@ import (
"encoding/binary"
"sync/atomic"
"github.com/xtaci/reedsolomon"
"github.com/klauspost/reedsolomon"
)
const (
@@ -12,26 +12,29 @@ const (
fecHeaderSizePlus2 = fecHeaderSize + 2 // plus 2B data size
typeData = 0xf1
typeFEC = 0xf2
fecExpire = 30000 // 30s
)
type (
// FEC defines forward error correction for packets
FEC struct {
rx []fecPacket // ordered receive queue
rxlimit int // queue size limit
rxlimit int // queue size limit
dataShards int
parityShards int
shardSize int
next uint32 // next seqid
enc reedsolomon.Encoder
shards [][]byte
shards2 [][]byte // for calcECC
shardsflag []bool
paws uint32 // Protect Against Wrapped Sequence numbers
lastCheck uint32
next uint32 // next seqid
paws uint32 // Protect Against Wrapped Sequence numbers
rx []fecPacket // ordered receive queue
// caches
decodeCache [][]byte
encodeCache [][]byte
shardsflag []bool
// RS encoder
enc reedsolomon.Encoder
}
// fecPacket is a decoded FEC packet
fecPacket struct {
seqid uint32
flag uint16
@@ -54,19 +57,19 @@ func newFEC(rxlimit, dataShards, parityShards int) *FEC {
fec.parityShards = parityShards
fec.shardSize = dataShards + parityShards
fec.paws = (0xffffffff/uint32(fec.shardSize) - 1) * uint32(fec.shardSize)
enc, err := reedsolomon.New(dataShards, parityShards)
enc, err := reedsolomon.New(dataShards, parityShards, reedsolomon.WithMaxGoroutines(1))
if err != nil {
return nil
}
fec.enc = enc
fec.shards = make([][]byte, fec.shardSize)
fec.shards2 = make([][]byte, fec.shardSize)
fec.decodeCache = make([][]byte, fec.shardSize)
fec.encodeCache = make([][]byte, fec.shardSize)
fec.shardsflag = make([]bool, fec.shardSize)
return fec
}
// decode a fec packet
func (fec *FEC) decode(data []byte) fecPacket {
// decodeBytes a fec packet
func (fec *FEC) decodeBytes(data []byte) fecPacket {
var pkt fecPacket
pkt.seqid = binary.LittleEndian.Uint32(data)
pkt.flag = binary.LittleEndian.Uint16(data[4:])
@@ -88,28 +91,11 @@ func (fec *FEC) markFEC(data []byte) {
binary.LittleEndian.PutUint32(data, fec.next)
binary.LittleEndian.PutUint16(data[4:], typeFEC)
fec.next++
if fec.next >= fec.paws { // paws would only occurs in markFEC
fec.next = 0
}
fec.next %= fec.paws
}
// input a fec packet
func (fec *FEC) input(pkt fecPacket) (recovered [][]byte) {
// expiration
now := currentMs()
if now-fec.lastCheck >= fecExpire {
var rx []fecPacket
for k := range fec.rx {
if now-fec.rx[k].ts < fecExpire {
rx = append(rx, fec.rx[k])
} else {
xmitBuf.Put(fec.rx[k].data)
}
}
fec.rx = rx
fec.lastCheck = now
}
// Decode a fec packet
func (fec *FEC) Decode(pkt fecPacket) (recovered [][]byte) {
// insertion
n := len(fec.rx) - 1
insertIdx := 0
@@ -117,7 +103,7 @@ func (fec *FEC) input(pkt fecPacket) (recovered [][]byte) {
if pkt.seqid == fec.rx[i].seqid { // de-duplicate
xmitBuf.Put(pkt.data)
return nil
} else if pkt.seqid > fec.rx[i].seqid { // insertion
} else if _itimediff(pkt.seqid, fec.rx[i].seqid) > 0 { // insertion
insertIdx = i + 1
break
}
@@ -146,23 +132,24 @@ func (fec *FEC) input(pkt fecPacket) (recovered [][]byte) {
searchEnd = len(fec.rx) - 1
}
// re-construct datashards
if searchEnd > searchBegin && searchEnd-searchBegin+1 >= fec.dataShards {
numshard := 0
numDataShard := 0
first := -1
maxlen := 0
shards := fec.shards
shards := fec.decodeCache
shardsflag := fec.shardsflag
for k := range fec.shards {
for k := range fec.decodeCache {
shards[k] = nil
shardsflag[k] = false
}
for i := searchBegin; i <= searchEnd; i++ {
seqid := fec.rx[i].seqid
if seqid > shardEnd {
if _itimediff(seqid, shardEnd) > 0 {
break
} else if seqid >= shardBegin {
} else if _itimediff(seqid, shardBegin) >= 0 {
shards[seqid%uint32(fec.shardSize)] = fec.rx[i].data
shardsflag[seqid%uint32(fec.shardSize)] = true
numshard++
@@ -226,11 +213,12 @@ func (fec *FEC) input(pkt fecPacket) (recovered [][]byte) {
return
}
func (fec *FEC) calcECC(data [][]byte, offset, maxlen int) (ecc [][]byte) {
// Encode a group of datashards
func (fec *FEC) Encode(data [][]byte, offset, maxlen int) (ecc [][]byte) {
if len(data) != fec.shardSize {
return nil
}
shards := fec.shards2
shards := fec.encodeCache
for k := range shards {
shards[k] = data[k][offset:maxlen]
}

232
vendor/github.com/xtaci/kcp-go/kcp.go generated vendored
View File

@@ -72,17 +72,15 @@ func ikcp_decode32u(p []byte, l *uint32) []byte {
func _imin_(a, b uint32) uint32 {
if a <= b {
return a
} else {
return b
}
return b
}
func _imax_(a, b uint32) uint32 {
if a >= b {
return a
} else {
return b
}
return b
}
func _ibound_(lower, middle, upper uint32) uint32 {
@@ -102,11 +100,11 @@ type Segment struct {
ts uint32
sn uint32
una uint32
data []byte
resendts uint32
rto uint32
fastack uint32
xmit uint32
data []byte
}
// encode a segment into buffer
@@ -127,7 +125,8 @@ type KCP struct {
conv, mtu, mss, state uint32
snd_una, snd_nxt, rcv_nxt uint32
ssthresh uint32
rx_rttval, rx_srtt, rx_rto, rx_minrto uint32
rx_rttvar, rx_srtt int32
rx_rto, rx_minrto uint32
snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe uint32
interval, ts_flush, xmit uint32
nodelay, updated uint32
@@ -144,8 +143,9 @@ type KCP struct {
acklist []ackItem
buffer []byte
output Output
buffer []byte
output Output
datashard, parityshard int
}
type ackItem struct {
@@ -340,20 +340,24 @@ func (kcp *KCP) update_ack(rtt int32) {
// https://tools.ietf.org/html/rfc6298
var rto uint32
if kcp.rx_srtt == 0 {
kcp.rx_srtt = uint32(rtt)
kcp.rx_rttval = uint32(rtt) / 2
kcp.rx_srtt = rtt
kcp.rx_rttvar = rtt >> 1
} else {
delta := rtt - int32(kcp.rx_srtt)
delta := rtt - kcp.rx_srtt
kcp.rx_srtt += delta >> 3
if delta < 0 {
delta = -delta
}
kcp.rx_rttval = (3*kcp.rx_rttval + uint32(delta)) / 4
kcp.rx_srtt = (7*kcp.rx_srtt + uint32(rtt)) / 8
if kcp.rx_srtt < 1 {
kcp.rx_srtt = 1
if rtt < kcp.rx_srtt-kcp.rx_rttvar {
// if the new RTT sample is below the bottom of the range of
// what an RTT measurement is expected to be.
// give an 8x reduced weight versus its normal weighting
kcp.rx_rttvar += (delta - kcp.rx_rttvar) >> 5
} else {
kcp.rx_rttvar += (delta - kcp.rx_rttvar) >> 2
}
}
rto = kcp.rx_srtt + _imax_(kcp.interval, 4*kcp.rx_rttval)
rto = uint32(kcp.rx_srtt) + _imax_(kcp.interval, uint32(kcp.rx_rttvar)<<2)
kcp.rx_rto = _ibound_(kcp.rx_minrto, rto, IKCP_RTO_MAX)
}
@@ -395,7 +399,7 @@ func (kcp *KCP) parse_fastack(sn uint32) {
seg := &kcp.snd_buf[k]
if _itimediff(sn, seg.sn) < 0 {
break
} else if sn != seg.sn { // && kcp.current >= seg.ts+kcp.rx_srtt {
} else if sn != seg.sn {
seg.fastack++
}
}
@@ -472,16 +476,17 @@ func (kcp *KCP) parse_data(newseg *Segment) {
}
// Input when you received a low level packet (eg. UDP packet), call it
func (kcp *KCP) Input(data []byte, update_ack bool) int {
// regular indicates a regular packet has received(not from FEC)
func (kcp *KCP) Input(data []byte, regular, ackNoDelay bool) int {
una := kcp.snd_una
if len(data) < IKCP_OVERHEAD {
return -1
}
var maxack uint32
var recentack uint32
var flag int
current := currentMs()
for {
var ts, sn, length, una, conv uint32
var wnd uint16
@@ -512,11 +517,18 @@ func (kcp *KCP) Input(data []byte, update_ack bool) int {
return -3
}
kcp.rmt_wnd = uint32(wnd)
// only trust window updates from regular packets. i.e: latest update
if regular {
kcp.rmt_wnd = uint32(wnd)
}
kcp.parse_una(una)
kcp.shrink_buf()
if cmd == IKCP_CMD_ACK {
if _itimediff(current, ts) >= 0 {
kcp.update_ack(_itimediff(current, ts))
}
kcp.parse_ack(sn)
kcp.shrink_buf()
if flag == 0 {
@@ -525,7 +537,6 @@ func (kcp *KCP) Input(data []byte, update_ack bool) int {
} else if _itimediff(sn, maxack) > 0 {
maxack = sn
}
recentack = ts
} else if cmd == IKCP_CMD_PUSH {
if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) < 0 {
kcp.ack_push(sn, ts)
@@ -559,12 +570,8 @@ func (kcp *KCP) Input(data []byte, update_ack bool) int {
data = data[length:]
}
current := currentMs()
if flag != 0 && update_ack {
if flag != 0 && regular {
kcp.parse_fastack(maxack)
if _itimediff(current, recentack) >= 0 {
kcp.update_ack(_itimediff(current, recentack))
}
}
if _itimediff(kcp.snd_una, una) > 0 {
@@ -589,6 +596,11 @@ func (kcp *KCP) Input(data []byte, update_ack bool) int {
}
}
if ackNoDelay && len(kcp.acklist) > 0 { // ack immediately
kcp.flush(true)
} else if kcp.rmt_wnd == 0 && len(kcp.acklist) > 0 { // window zero
kcp.flush(true)
}
return 0
}
@@ -600,7 +612,7 @@ func (kcp *KCP) wnd_unused() int32 {
}
// flush pending data
func (kcp *KCP) flush() {
func (kcp *KCP) flush(ackOnly bool) {
buffer := kcp.buffer
change := 0
lost := false
@@ -612,21 +624,42 @@ func (kcp *KCP) flush() {
seg.una = kcp.rcv_nxt
// flush acknowledges
ptr := buffer
var required []ackItem
for i, ack := range kcp.acklist {
size := len(buffer) - len(ptr)
if size+IKCP_OVERHEAD > int(kcp.mtu) {
kcp.output(buffer, size)
ptr = buffer
}
// filter jitters caused by bufferbloat
// filter necessary acks only
if ack.sn >= kcp.rcv_nxt || len(kcp.acklist)-1 == i {
seg.sn, seg.ts = ack.sn, ack.ts
ptr = seg.encode(ptr)
required = append(required, kcp.acklist[i])
}
}
kcp.acklist = nil
ptr := buffer
maxBatchSize := kcp.mtu / IKCP_OVERHEAD
for len(required) > 0 {
var batchSize int
if kcp.datashard > 0 && kcp.parityshard > 0 { // try triggering FEC
batchSize = int(_ibound_(1, uint32(len(required)/kcp.datashard), maxBatchSize))
} else {
batchSize = int(_ibound_(1, uint32(len(required)), maxBatchSize))
}
for len(required) >= batchSize {
for i := 0; i < batchSize; i++ {
ack := required[i]
seg.sn, seg.ts = ack.sn, ack.ts
ptr = seg.encode(ptr)
}
size := len(buffer) - len(ptr)
kcp.output(buffer, size)
ptr = buffer
required = required[batchSize:]
}
}
if ackOnly { // flush acks only
return
}
current := currentMs()
// probe window size (if remote window size equals zero)
if kcp.rmt_wnd == 0 {
@@ -682,7 +715,7 @@ func (kcp *KCP) flush() {
}
// sliding window, controlled by snd_nxt && sna_una+cwnd
count := 0
newSegsCount := 0
for k := range kcp.snd_queue {
if _itimediff(kcp.snd_nxt, kcp.snd_una+cwnd) >= 0 {
break
@@ -690,24 +723,13 @@ func (kcp *KCP) flush() {
newseg := kcp.snd_queue[k]
newseg.conv = kcp.conv
newseg.cmd = IKCP_CMD_PUSH
newseg.wnd = seg.wnd
newseg.ts = current
newseg.sn = kcp.snd_nxt
newseg.una = kcp.rcv_nxt
newseg.resendts = newseg.ts
newseg.rto = kcp.rx_rto
kcp.snd_buf = append(kcp.snd_buf, newseg)
kcp.snd_nxt++
count++
newSegsCount++
kcp.snd_queue[k].data = nil
}
kcp.snd_queue = kcp.snd_queue[count:]
// flag pending data
hasPending := false
if count > 0 {
hasPending = true
}
kcp.snd_queue = kcp.snd_queue[newSegsCount:]
// calculate resent
resent := uint32(kcp.fastresend)
@@ -715,18 +737,37 @@ func (kcp *KCP) flush() {
resent = 0xffffffff
}
// flush data segments
// counters
var lostSegs, fastRetransSegs, earlyRetransSegs uint64
for k := range kcp.snd_buf {
current := currentMs()
// send new segments
for k := len(kcp.snd_buf) - newSegsCount; k < len(kcp.snd_buf); k++ {
segment := &kcp.snd_buf[k]
segment.xmit++
segment.rto = kcp.rx_rto
segment.resendts = current + segment.rto
segment.ts = current
segment.wnd = seg.wnd
segment.una = kcp.rcv_nxt
size := len(buffer) - len(ptr)
need := IKCP_OVERHEAD + len(segment.data)
if size+need > int(kcp.mtu) {
kcp.output(buffer, size)
ptr = buffer
}
ptr = segment.encode(ptr)
copy(ptr, segment.data)
ptr = ptr[len(segment.data):]
}
// check for retransmissions
for k := 0; k < len(kcp.snd_buf)-newSegsCount; k++ {
segment := &kcp.snd_buf[k]
needsend := false
if segment.xmit == 0 {
needsend = true
segment.xmit++
segment.rto = kcp.rx_rto
segment.resendts = current + segment.rto
} else if _itimediff(current, segment.resendts) >= 0 {
if _itimediff(current, segment.resendts) >= 0 { // RTO
needsend = true
segment.xmit++
kcp.xmit++
@@ -739,25 +780,21 @@ func (kcp *KCP) flush() {
lost = true
lostSegs++
} else if segment.fastack >= resent { // fast retransmit
lastsend := segment.resendts - segment.rto
if _itimediff(current, lastsend) >= int32(kcp.rx_rto/4) {
needsend = true
segment.xmit++
segment.fastack = 0
segment.resendts = current + segment.rto
change++
fastRetransSegs++
}
} else if segment.fastack > 0 && !hasPending { // early retransmit
lastsend := segment.resendts - segment.rto
if _itimediff(current, lastsend) >= int32(kcp.rx_rto/4) {
needsend = true
segment.xmit++
segment.fastack = 0
segment.resendts = current + segment.rto
change++
earlyRetransSegs++
}
needsend = true
segment.xmit++
segment.fastack = 0
segment.rto = kcp.rx_rto
segment.resendts = current + segment.rto
change++
fastRetransSegs++
} else if segment.fastack > 0 && newSegsCount == 0 { // early retransmit
needsend = true
segment.xmit++
segment.fastack = 0
segment.rto = kcp.rx_rto
segment.resendts = current + segment.rto
change++
earlyRetransSegs++
}
if needsend {
@@ -783,17 +820,29 @@ func (kcp *KCP) flush() {
}
}
atomic.AddUint64(&DefaultSnmp.RetransSegs, lostSegs+fastRetransSegs+earlyRetransSegs)
atomic.AddUint64(&DefaultSnmp.LostSegs, lostSegs)
atomic.AddUint64(&DefaultSnmp.EarlyRetransSegs, earlyRetransSegs)
atomic.AddUint64(&DefaultSnmp.FastRetransSegs, fastRetransSegs)
// flash remain segments
size := len(buffer) - len(ptr)
if size > 0 {
kcp.output(buffer, size)
}
// counter updates
sum := lostSegs
if lostSegs > 0 {
atomic.AddUint64(&DefaultSnmp.LostSegs, lostSegs)
}
if fastRetransSegs > 0 {
atomic.AddUint64(&DefaultSnmp.FastRetransSegs, fastRetransSegs)
sum += fastRetransSegs
}
if earlyRetransSegs > 0 {
atomic.AddUint64(&DefaultSnmp.EarlyRetransSegs, earlyRetransSegs)
sum += earlyRetransSegs
}
if sum > 0 {
atomic.AddUint64(&DefaultSnmp.RetransSegs, sum)
}
// update ssthresh
// rate halving, https://tools.ietf.org/html/rfc6937
if change != 0 {
@@ -846,7 +895,7 @@ func (kcp *KCP) Update() {
if _itimediff(current, kcp.ts_flush) >= 0 {
kcp.ts_flush = current + kcp.interval
}
kcp.flush()
kcp.flush(false)
}
}
@@ -900,6 +949,12 @@ func (kcp *KCP) Check() uint32 {
return current + minimal
}
// set datashard,parityshard info for some optimizations
func (kcp *KCP) setFEC(datashard, parityshard int) {
kcp.datashard = datashard
kcp.parityshard = parityshard
}
// SetMtu changes MTU size, default is 1400
func (kcp *KCP) SetMtu(mtu int) int {
if mtu < 50 || mtu < IKCP_OVERHEAD {
@@ -962,3 +1017,12 @@ func (kcp *KCP) WndSize(sndwnd, rcvwnd int) int {
func (kcp *KCP) WaitSnd() int {
return len(kcp.snd_buf) + len(kcp.snd_queue)
}
// Cwnd returns current congestion window size
func (kcp *KCP) Cwnd() uint32 {
cwnd := _imin_(kcp.snd_wnd, kcp.rmt_wnd)
if kcp.nocwnd == 0 {
cwnd = _imin_(kcp.cwnd, cwnd)
}
return cwnd
}

View File

@@ -23,14 +23,13 @@ func (errTimeout) Temporary() bool { return true }
func (errTimeout) Error() string { return "i/o timeout" }
const (
defaultWndSize = 128 // default window size, in packet
nonceSize = 16 // magic number
crcSize = 4 // 4bytes packet checksum
cryptHeaderSize = nonceSize + crcSize
mtuLimit = 2048
rxQueueLimit = 8192
rxFECMulti = 3 // FEC keeps rxFECMulti* (dataShard+parityShard) ordered packets in memory
defaultKeepAliveInterval = 10
defaultWndSize = 128 // default window size, in packet
nonceSize = 16 // magic number
crcSize = 4 // 4bytes packet checksum
cryptHeaderSize = nonceSize + crcSize
mtuLimit = 2048
rxQueueLimit = 8192
rxFECMulti = 3 // FEC keeps rxFECMulti* (dataShard+parityShard) ordered packets in memory
)
const (
@@ -40,6 +39,7 @@ const (
var (
xmitBuf sync.Pool
sid uint32
)
func init() {
@@ -51,25 +51,36 @@ func init() {
type (
// UDPSession defines a KCP session implemented by UDP
UDPSession struct {
kcp *KCP // the core ARQ
l *Listener // point to server listener if it's a server socket
fec *FEC // forward error correction
conn net.PacketConn // the underlying packet socket
block BlockCrypt
remote net.Addr
rd time.Time // read deadline
wd time.Time // write deadline
sockbuff []byte // kcp receiving is based on packet, I turn it into stream
die chan struct{}
chReadEvent chan struct{}
chWriteEvent chan struct{}
chUDPOutput chan []byte
headerSize int
ackNoDelay bool
isClosed bool
keepAliveInterval int32
mu sync.Mutex
updateInterval int32
// core
sid uint32
conn net.PacketConn // the underlying packet socket
kcp *KCP // the core ARQ
l *Listener // point to server listener if it's a server socket
block BlockCrypt // encryption
sockbuff []byte // kcp receiving is based on packet, I turn it into stream
// forward error correction
fec *FEC
fecDataShards [][]byte
fecHeaderOffset int
fecPayloadOffset int
fecCnt int // count datashard
fecMaxSize int // record maximum data length in datashard
// settings
remote net.Addr
rd time.Time // read deadline
wd time.Time // write deadline
headerSize int
updateInterval int32
ackNoDelay bool
// notifications
die chan struct{}
chReadEvent chan struct{}
chWriteEvent chan struct{}
isClosed bool
mu sync.Mutex
}
setReadBuffer interface {
@@ -84,16 +95,30 @@ type (
// newUDPSession create a new udp session for client or server
func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn net.PacketConn, remote net.Addr, block BlockCrypt) *UDPSession {
sess := new(UDPSession)
sess.chUDPOutput = make(chan []byte)
sess.sid = atomic.AddUint32(&sid, 1)
sess.die = make(chan struct{})
sess.chReadEvent = make(chan struct{}, 1)
sess.chWriteEvent = make(chan struct{}, 1)
sess.remote = remote
sess.conn = conn
sess.keepAliveInterval = defaultKeepAliveInterval
sess.l = l
sess.block = block
// FEC initialization
sess.fec = newFEC(rxFECMulti*(dataShards+parityShards), dataShards, parityShards)
if sess.fec != nil {
if sess.block != nil {
sess.fecHeaderOffset = cryptHeaderSize
}
sess.fecPayloadOffset = sess.fecHeaderOffset + fecHeaderSize
// fec data shards
sess.fecDataShards = make([][]byte, sess.fec.shardSize)
for k := range sess.fecDataShards {
sess.fecDataShards[k] = make([]byte, mtuLimit)
}
}
// calculate header size
if sess.block != nil {
sess.headerSize += cryptHeaderSize
@@ -104,19 +129,14 @@ func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn
sess.kcp = NewKCP(conv, func(buf []byte, size int) {
if size >= IKCP_OVERHEAD {
ext := xmitBuf.Get().([]byte)[:sess.headerSize+size]
copy(ext[sess.headerSize:], buf)
select {
case sess.chUDPOutput <- ext:
case <-sess.die:
}
sess.output(buf[:size])
}
})
sess.kcp.WndSize(defaultWndSize, defaultWndSize)
sess.kcp.SetMtu(IKCP_MTU_DEF - sess.headerSize)
sess.kcp.setFEC(dataShards, parityShards)
go sess.updateTask()
go sess.outputTask()
updater.addSession(sess)
if sess.l == nil { // it's a client connection
go sess.readLoop()
atomic.AddUint64(&DefaultSnmp.ActiveOpens, 1)
@@ -207,19 +227,19 @@ func (s *UDPSession) Write(b []byte) (n int, err error) {
}
}
if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
if s.kcp.WaitSnd() < int(s.kcp.Cwnd()) {
n = len(b)
max := s.kcp.mss << 8
for {
if len(b) <= int(max) { // in most cases
if len(b) <= int(s.kcp.mss) {
s.kcp.Send(b)
break
} else {
s.kcp.Send(b[:max])
b = b[max:]
s.kcp.Send(b[:s.kcp.mss])
b = b[s.kcp.mss:]
}
}
s.kcp.flush()
s.kcp.flush(false)
s.mu.Unlock()
atomic.AddUint64(&DefaultSnmp.BytesSent, uint64(n))
return n, nil
@@ -249,6 +269,8 @@ func (s *UDPSession) Write(b []byte) (n int, err error) {
// Close closes the connection.
func (s *UDPSession) Close() error {
updater.removeSession(s)
s.mu.Lock()
defer s.mu.Unlock()
if s.isClosed {
@@ -373,151 +395,93 @@ func (s *UDPSession) SetWriteBuffer(bytes int) error {
return errors.New(errInvalidOperation)
}
// SetKeepAlive changes per-connection NAT keepalive interval; 0 to disable, default to 10s
func (s *UDPSession) SetKeepAlive(interval int) {
atomic.StoreInt32(&s.keepAliveInterval, int32(interval))
}
// output pipeline entry
// steps for output data processing:
// 1. FEC
// 2. CRC32
// 3. Encryption
// 4. emit to emitTask
// 5. emitTask WriteTo kernel
func (s *UDPSession) output(buf []byte) {
var ecc [][]byte
func (s *UDPSession) outputTask() {
// offset pre-compute
fecOffset := 0
if s.block != nil {
fecOffset = cryptHeaderSize
}
szOffset := fecOffset + fecHeaderSize
// extend buf's header space
ext := xmitBuf.Get().([]byte)[:s.headerSize+len(buf)]
copy(ext[s.headerSize:], buf)
// fec data group
var cacheLine []byte
var fecGroup [][]byte
var fecCnt int
var fecMaxSize int
// FEC stage
if s.fec != nil {
cacheLine = make([]byte, s.fec.shardSize*mtuLimit)
fecGroup = make([][]byte, s.fec.shardSize)
for k := range fecGroup {
fecGroup[k] = cacheLine[k*mtuLimit : (k+1)*mtuLimit]
s.fec.markData(ext[s.fecHeaderOffset:])
binary.LittleEndian.PutUint16(ext[s.fecPayloadOffset:], uint16(len(ext[s.fecPayloadOffset:])))
// copy data to fec datashards
sz := len(ext)
s.fecDataShards[s.fecCnt] = s.fecDataShards[s.fecCnt][:sz]
copy(s.fecDataShards[s.fecCnt], ext)
s.fecCnt++
// record max datashard length
if sz > s.fecMaxSize {
s.fecMaxSize = sz
}
// calculate Reed-Solomon Erasure Code
if s.fecCnt == s.fec.dataShards {
// bzero each datashard's tail
for i := 0; i < s.fec.dataShards; i++ {
shard := s.fecDataShards[i]
slen := len(shard)
xorBytes(shard[slen:s.fecMaxSize], shard[slen:s.fecMaxSize], shard[slen:s.fecMaxSize])
}
// calculation of RS
ecc = s.fec.Encode(s.fecDataShards, s.fecPayloadOffset, s.fecMaxSize)
for k := range ecc {
s.fec.markFEC(ecc[k][s.fecHeaderOffset:])
ecc[k] = ecc[k][:s.fecMaxSize]
}
// reset counters to zero
s.fecCnt = 0
s.fecMaxSize = 0
}
}
// keepalive
var lastPing time.Time
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
// encryption stage
if s.block != nil {
io.ReadFull(rand.Reader, ext[:nonceSize])
checksum := crc32.ChecksumIEEE(ext[cryptHeaderSize:])
binary.LittleEndian.PutUint32(ext[nonceSize:], checksum)
s.block.Encrypt(ext, ext)
for {
select {
// receive from a synchronous channel
// buffered channel must be avoided, because of "bufferbloat"
case ext := <-s.chUDPOutput:
var ecc [][]byte
if s.fec != nil {
s.fec.markData(ext[fecOffset:])
// explicit size, including 2bytes size itself.
binary.LittleEndian.PutUint16(ext[szOffset:], uint16(len(ext[szOffset:])))
// copy data to fec group
sz := len(ext)
fecGroup[fecCnt] = fecGroup[fecCnt][:sz]
copy(fecGroup[fecCnt], ext)
fecCnt++
if sz > fecMaxSize {
fecMaxSize = sz
}
// calculate Reed-Solomon Erasure Code
if fecCnt == s.fec.dataShards {
for i := 0; i < s.fec.dataShards; i++ {
shard := fecGroup[i]
slen := len(shard)
xorBytes(shard[slen:fecMaxSize], shard[slen:fecMaxSize], shard[slen:fecMaxSize])
}
ecc = s.fec.calcECC(fecGroup, szOffset, fecMaxSize)
for k := range ecc {
s.fec.markFEC(ecc[k][fecOffset:])
ecc[k] = ecc[k][:fecMaxSize]
}
fecCnt = 0
fecMaxSize = 0
}
if ecc != nil {
for k := range ecc {
io.ReadFull(rand.Reader, ecc[k][:nonceSize])
checksum := crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])
binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)
s.block.Encrypt(ecc[k], ecc[k])
}
}
}
if s.block != nil {
io.ReadFull(rand.Reader, ext[:nonceSize])
checksum := crc32.ChecksumIEEE(ext[cryptHeaderSize:])
binary.LittleEndian.PutUint32(ext[nonceSize:], checksum)
s.block.Encrypt(ext, ext)
if ecc != nil {
for k := range ecc {
io.ReadFull(rand.Reader, ecc[k][:nonceSize])
checksum := crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])
binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)
s.block.Encrypt(ecc[k], ecc[k])
}
}
}
nbytes := 0
nsegs := 0
// if mrand.Intn(100) < 50 {
if n, err := s.conn.WriteTo(ext, s.remote); err == nil {
nbytes += n
nsegs++
}
// }
if ecc != nil {
for k := range ecc {
if n, err := s.conn.WriteTo(ecc[k], s.remote); err == nil {
nbytes += n
nsegs++
}
}
}
atomic.AddUint64(&DefaultSnmp.OutSegs, uint64(nsegs))
atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes))
xmitBuf.Put(ext)
case <-ticker.C: // NAT keep-alive
interval := time.Duration(atomic.LoadInt32(&s.keepAliveInterval)) * time.Second
if interval > 0 && time.Now().After(lastPing.Add(interval)) {
var rnd uint16
binary.Read(rand.Reader, binary.LittleEndian, &rnd)
sz := int(rnd)%(IKCP_MTU_DEF-s.headerSize-IKCP_OVERHEAD) + s.headerSize + IKCP_OVERHEAD
ping := make([]byte, sz) // randomized ping packet
io.ReadFull(rand.Reader, ping)
s.conn.WriteTo(ping, s.remote)
lastPing = time.Now()
}
case <-s.die:
return
// emit stage
defaultEmitter.emit(emitPacket{s.conn, s.remote, ext, true})
if ecc != nil {
for k := range ecc {
defaultEmitter.emit(emitPacket{s.conn, s.remote, ecc[k], false})
}
}
}
// kcp update, input loop
func (s *UDPSession) updateTask() {
tc := time.After(time.Duration(atomic.LoadInt32(&s.updateInterval)) * time.Millisecond)
for {
select {
case <-tc:
s.mu.Lock()
s.kcp.flush()
if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
s.notifyWriteEvent()
}
s.mu.Unlock()
tc = time.After(time.Duration(atomic.LoadInt32(&s.updateInterval)) * time.Millisecond)
case <-s.die:
if s.l != nil { // has listener
select {
case s.l.chDeadlinks <- s.remote:
case <-s.l.die:
}
}
return
}
// kcp update, returns interval for next calling
func (s *UDPSession) update() time.Duration {
s.mu.Lock()
s.kcp.flush(false)
if s.kcp.WaitSnd() < int(s.kcp.Cwnd()) {
s.notifyWriteEvent()
}
s.mu.Unlock()
return time.Duration(atomic.LoadInt32(&s.updateInterval)) * time.Millisecond
}
// GetConv gets conversation id of a session
@@ -540,28 +504,28 @@ func (s *UDPSession) notifyWriteEvent() {
}
func (s *UDPSession) kcpInput(data []byte) {
var kcpInErrors, fecErrs, fecRecovered, fecSegs uint64
var kcpInErrors, fecErrs, fecRecovered, fecParityShards uint64
if s.fec != nil {
f := s.fec.decode(data)
f := s.fec.decodeBytes(data)
s.mu.Lock()
if f.flag == typeData {
if ret := s.kcp.Input(data[fecHeaderSizePlus2:], true); ret != 0 {
if ret := s.kcp.Input(data[fecHeaderSizePlus2:], true, s.ackNoDelay); ret != 0 {
kcpInErrors++
}
}
if f.flag == typeData || f.flag == typeFEC {
if f.flag == typeFEC {
fecSegs++
fecParityShards++
}
if recovers := s.fec.input(f); recovers != nil {
if recovers := s.fec.Decode(f); recovers != nil {
for _, r := range recovers {
if len(r) >= 2 { // must be larger than 2bytes
sz := binary.LittleEndian.Uint16(r)
if int(sz) <= len(r) && sz >= 2 {
if ret := s.kcp.Input(r[2:sz], false); ret == 0 {
if ret := s.kcp.Input(r[2:sz], false, s.ackNoDelay); ret == 0 {
fecRecovered++
} else {
kcpInErrors++
@@ -580,29 +544,23 @@ func (s *UDPSession) kcpInput(data []byte) {
if n := s.kcp.PeekSize(); n > 0 {
s.notifyReadEvent()
}
if s.ackNoDelay {
s.kcp.flush()
}
s.mu.Unlock()
} else {
s.mu.Lock()
if ret := s.kcp.Input(data, true); ret != 0 {
if ret := s.kcp.Input(data, true, s.ackNoDelay); ret != 0 {
kcpInErrors++
}
// notify reader
if n := s.kcp.PeekSize(); n > 0 {
s.notifyReadEvent()
}
if s.ackNoDelay {
s.kcp.flush()
}
s.mu.Unlock()
}
atomic.AddUint64(&DefaultSnmp.InSegs, 1)
atomic.AddUint64(&DefaultSnmp.InBytes, uint64(len(data)))
if fecSegs > 0 {
atomic.AddUint64(&DefaultSnmp.FECSegs, fecSegs)
if fecParityShards > 0 {
atomic.AddUint64(&DefaultSnmp.FECParityShards, fecParityShards)
}
if kcpInErrors > 0 {
atomic.AddUint64(&DefaultSnmp.KCPInErrors, kcpInErrors)

View File

@@ -27,7 +27,7 @@ type Snmp struct {
RepeatSegs uint64 // number of segs duplicated
FECRecovered uint64 // correct packets recovered from FEC
FECErrs uint64 // incorrect packets recovered from FEC
FECSegs uint64 // FEC segments received
FECParityShards uint64 // FEC segments received
FECShortShards uint64 // number of data shards that's not enough for recovery
}
@@ -35,6 +35,7 @@ func newSnmp() *Snmp {
return new(Snmp)
}
// Header returns all field names
func (s *Snmp) Header() []string {
return []string{
"BytesSent",
@@ -55,13 +56,14 @@ func (s *Snmp) Header() []string {
"EarlyRetransSegs",
"LostSegs",
"RepeatSegs",
"FECSegs",
"FECParityShards",
"FECErrs",
"FECRecovered",
"FECShortShards",
}
}
// ToSlice returns current snmp info as slice
func (s *Snmp) ToSlice() []string {
snmp := s.Copy()
return []string{
@@ -83,7 +85,7 @@ func (s *Snmp) ToSlice() []string {
fmt.Sprint(snmp.EarlyRetransSegs),
fmt.Sprint(snmp.LostSegs),
fmt.Sprint(snmp.RepeatSegs),
fmt.Sprint(snmp.FECSegs),
fmt.Sprint(snmp.FECParityShards),
fmt.Sprint(snmp.FECErrs),
fmt.Sprint(snmp.FECRecovered),
fmt.Sprint(snmp.FECShortShards),
@@ -111,7 +113,7 @@ func (s *Snmp) Copy() *Snmp {
d.EarlyRetransSegs = atomic.LoadUint64(&s.EarlyRetransSegs)
d.LostSegs = atomic.LoadUint64(&s.LostSegs)
d.RepeatSegs = atomic.LoadUint64(&s.RepeatSegs)
d.FECSegs = atomic.LoadUint64(&s.FECSegs)
d.FECParityShards = atomic.LoadUint64(&s.FECParityShards)
d.FECErrs = atomic.LoadUint64(&s.FECErrs)
d.FECRecovered = atomic.LoadUint64(&s.FECRecovered)
d.FECShortShards = atomic.LoadUint64(&s.FECShortShards)
@@ -138,7 +140,7 @@ func (s *Snmp) Reset() {
atomic.StoreUint64(&s.EarlyRetransSegs, 0)
atomic.StoreUint64(&s.LostSegs, 0)
atomic.StoreUint64(&s.RepeatSegs, 0)
atomic.StoreUint64(&s.FECSegs, 0)
atomic.StoreUint64(&s.FECParityShards, 0)
atomic.StoreUint64(&s.FECErrs, 0)
atomic.StoreUint64(&s.FECRecovered, 0)
atomic.StoreUint64(&s.FECShortShards, 0)

104
vendor/github.com/xtaci/kcp-go/updater.go generated vendored Normal file
View File

@@ -0,0 +1,104 @@
package kcp
import (
"container/heap"
"sync"
"time"
)
var updater updateHeap
func init() {
updater.init()
go updater.updateTask()
}
type entry struct {
sid uint32
ts time.Time
s *UDPSession
}
type updateHeap struct {
entries []entry
indices map[uint32]int
mu sync.Mutex
chWakeUp chan struct{}
}
func (h *updateHeap) Len() int { return len(h.entries) }
func (h *updateHeap) Less(i, j int) bool { return h.entries[i].ts.Before(h.entries[j].ts) }
func (h *updateHeap) Swap(i, j int) {
h.entries[i], h.entries[j] = h.entries[j], h.entries[i]
h.indices[h.entries[i].sid] = i
h.indices[h.entries[j].sid] = j
}
func (h *updateHeap) Push(x interface{}) {
h.entries = append(h.entries, x.(entry))
n := len(h.entries)
h.indices[h.entries[n-1].sid] = n - 1
}
func (h *updateHeap) Pop() interface{} {
n := len(h.entries)
x := h.entries[n-1]
h.entries = h.entries[0 : n-1]
delete(h.indices, x.sid)
return x
}
func (h *updateHeap) init() {
h.indices = make(map[uint32]int)
h.chWakeUp = make(chan struct{}, 1)
}
func (h *updateHeap) addSession(s *UDPSession) {
h.mu.Lock()
heap.Push(h, entry{s.sid, time.Now(), s})
h.mu.Unlock()
h.wakeup()
}
func (h *updateHeap) removeSession(s *UDPSession) {
h.mu.Lock()
if idx, ok := h.indices[s.sid]; ok {
heap.Remove(h, idx)
}
h.mu.Unlock()
}
func (h *updateHeap) wakeup() {
select {
case h.chWakeUp <- struct{}{}:
default:
}
}
func (h *updateHeap) updateTask() {
var timer <-chan time.Time
for {
select {
case <-timer:
case <-h.chWakeUp:
}
h.mu.Lock()
hlen := h.Len()
now := time.Now()
for i := 0; i < hlen; i++ {
entry := heap.Pop(h).(entry)
if now.After(entry.ts) {
entry.ts = now.Add(entry.s.update())
heap.Push(h, entry)
} else {
heap.Push(h, entry)
break
}
}
if h.Len() > 0 {
timer = time.After(h.entries[0].ts.Sub(now))
}
h.mu.Unlock()
}
}

View File

@@ -65,14 +65,13 @@ func safeXORBytes(dst, a, b []byte) int {
func xorBytes(dst, a, b []byte) int {
if supportsUnaligned {
return fastXORBytes(dst, a, b)
} else {
// TODO(hanwen): if (dst, a, b) have common alignment
// we could still try fastXORBytes. It is not clear
// how often this happens, and it's only worth it if
// the block encryption itself is hardware
// accelerated.
return safeXORBytes(dst, a, b)
}
// TODO(hanwen): if (dst, a, b) have common alignment
// we could still try fastXORBytes. It is not clear
// how often this happens, and it's only worth it if
// the block encryption itself is hardware
// accelerated.
return safeXORBytes(dst, a, b)
}
// fastXORWords XORs multiples of 4 or 8 bytes (depending on architecture.)