vendor: Mega update all dependencies

GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/4080
This commit is contained in:
Jakob Borg
2017-04-05 14:34:41 +00:00
parent 49c1527724
commit a1bcc15458
1354 changed files with 55066 additions and 797850 deletions

View File

@@ -1,50 +0,0 @@
package kcp
import (
"net"
"sync/atomic"
)
var defaultEmitter Emitter
func init() {
defaultEmitter.init()
}
type (
// packet emit request
emitPacket struct {
conn net.PacketConn
to net.Addr
data []byte
// mark this packet should recycle to global xmitBuf
recycle bool
}
// Emitter is the global packet sender
Emitter struct {
ch chan emitPacket
}
)
func (e *Emitter) init() {
e.ch = make(chan emitPacket)
go e.emitTask()
}
// keepon writing packets to kernel
func (e *Emitter) emitTask() {
for p := range e.ch {
if n, err := p.conn.WriteTo(p.data, p.to); err == nil {
atomic.AddUint64(&DefaultSnmp.OutPkts, 1)
atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(n))
}
if p.recycle {
xmitBuf.Put(p.data)
}
}
}
func (e *Emitter) emit(p emitPacket) {
e.ch <- p
}

281
vendor/github.com/xtaci/kcp-go/fec.go generated vendored
View File

@@ -15,35 +15,31 @@ const (
)
type (
// FEC defines forward error correction for packets
FEC struct {
rxlimit int // queue size limit
dataShards int
parityShards int
shardSize int
next uint32 // next seqid
paws uint32 // Protect Against Wrapped Sequence numbers
rx []fecPacket // ordered receive queue
// caches
decodeCache [][]byte
encodeCache [][]byte
shardsflag []bool
// RS encoder
enc reedsolomon.Encoder
}
// fecPacket is a decoded FEC packet
fecPacket struct {
seqid uint32
flag uint16
data []byte
ts uint32
}
// FECDecoder for decoding incoming packets
FECDecoder struct {
rxlimit int // queue size limit
dataShards int
parityShards int
shardSize int
rx []fecPacket // ordered receive queue
// caches
decodeCache [][]byte
flagCache []bool
// RS decoder
codec reedsolomon.Encoder
}
)
func newFEC(rxlimit, dataShards, parityShards int) *FEC {
func newFECDecoder(rxlimit, dataShards, parityShards int) *FECDecoder {
if dataShards <= 0 || parityShards <= 0 {
return nil
}
@@ -51,29 +47,26 @@ func newFEC(rxlimit, dataShards, parityShards int) *FEC {
return nil
}
fec := new(FEC)
fec := new(FECDecoder)
fec.rxlimit = rxlimit
fec.dataShards = dataShards
fec.parityShards = parityShards
fec.shardSize = dataShards + parityShards
fec.paws = (0xffffffff/uint32(fec.shardSize) - 1) * uint32(fec.shardSize)
enc, err := reedsolomon.New(dataShards, parityShards, reedsolomon.WithMaxGoroutines(1))
if err != nil {
return nil
}
fec.enc = enc
fec.codec = enc
fec.decodeCache = make([][]byte, fec.shardSize)
fec.encodeCache = make([][]byte, fec.shardSize)
fec.shardsflag = make([]bool, fec.shardSize)
fec.flagCache = make([]bool, fec.shardSize)
return fec
}
// decodeBytes a fec packet
func (fec *FEC) decodeBytes(data []byte) fecPacket {
func (dec *FECDecoder) decodeBytes(data []byte) fecPacket {
var pkt fecPacket
pkt.seqid = binary.LittleEndian.Uint32(data)
pkt.flag = binary.LittleEndian.Uint16(data[4:])
pkt.ts = currentMs()
// allocate memory & copy
buf := xmitBuf.Get().([]byte)[:len(data)-6]
copy(buf, data[6:])
@@ -81,29 +74,16 @@ func (fec *FEC) decodeBytes(data []byte) fecPacket {
return pkt
}
func (fec *FEC) markData(data []byte) {
binary.LittleEndian.PutUint32(data, fec.next)
binary.LittleEndian.PutUint16(data[4:], typeData)
fec.next++
}
func (fec *FEC) markFEC(data []byte) {
binary.LittleEndian.PutUint32(data, fec.next)
binary.LittleEndian.PutUint16(data[4:], typeFEC)
fec.next++
fec.next %= fec.paws
}
// Decode a fec packet
func (fec *FEC) Decode(pkt fecPacket) (recovered [][]byte) {
func (dec *FECDecoder) Decode(pkt fecPacket) (recovered [][]byte) {
// insertion
n := len(fec.rx) - 1
n := len(dec.rx) - 1
insertIdx := 0
for i := n; i >= 0; i-- {
if pkt.seqid == fec.rx[i].seqid { // de-duplicate
if pkt.seqid == dec.rx[i].seqid { // de-duplicate
xmitBuf.Put(pkt.data)
return nil
} else if _itimediff(pkt.seqid, fec.rx[i].seqid) > 0 { // insertion
} else if _itimediff(pkt.seqid, dec.rx[i].seqid) > 0 { // insertion
insertIdx = i + 1
break
}
@@ -111,70 +91,65 @@ func (fec *FEC) Decode(pkt fecPacket) (recovered [][]byte) {
// insert into ordered rx queue
if insertIdx == n+1 {
fec.rx = append(fec.rx, pkt)
dec.rx = append(dec.rx, pkt)
} else {
fec.rx = append(fec.rx, fecPacket{})
copy(fec.rx[insertIdx+1:], fec.rx[insertIdx:])
fec.rx[insertIdx] = pkt
dec.rx = append(dec.rx, fecPacket{})
copy(dec.rx[insertIdx+1:], dec.rx[insertIdx:]) // shift right
dec.rx[insertIdx] = pkt
}
// shard range for current packet
shardBegin := pkt.seqid - pkt.seqid%uint32(fec.shardSize)
shardEnd := shardBegin + uint32(fec.shardSize) - 1
shardBegin := pkt.seqid - pkt.seqid%uint32(dec.shardSize)
shardEnd := shardBegin + uint32(dec.shardSize) - 1
// max search range in ordered queue for current shard
searchBegin := insertIdx - int(pkt.seqid%uint32(fec.shardSize))
searchBegin := insertIdx - int(pkt.seqid%uint32(dec.shardSize))
if searchBegin < 0 {
searchBegin = 0
}
searchEnd := searchBegin + fec.shardSize - 1
if searchEnd >= len(fec.rx) {
searchEnd = len(fec.rx) - 1
searchEnd := searchBegin + dec.shardSize - 1
if searchEnd >= len(dec.rx) {
searchEnd = len(dec.rx) - 1
}
// re-construct datashards
if searchEnd > searchBegin && searchEnd-searchBegin+1 >= fec.dataShards {
numshard := 0
numDataShard := 0
first := -1
maxlen := 0
shards := fec.decodeCache
shardsflag := fec.shardsflag
for k := range fec.decodeCache {
if searchEnd-searchBegin+1 >= dec.dataShards {
var numshard, numDataShard, first, maxlen int
// zero cache
shards := dec.decodeCache
shardsflag := dec.flagCache
for k := range dec.decodeCache {
shards[k] = nil
shardsflag[k] = false
}
// shard assembly
for i := searchBegin; i <= searchEnd; i++ {
seqid := fec.rx[i].seqid
seqid := dec.rx[i].seqid
if _itimediff(seqid, shardEnd) > 0 {
break
} else if _itimediff(seqid, shardBegin) >= 0 {
shards[seqid%uint32(fec.shardSize)] = fec.rx[i].data
shardsflag[seqid%uint32(fec.shardSize)] = true
shards[seqid%uint32(dec.shardSize)] = dec.rx[i].data
shardsflag[seqid%uint32(dec.shardSize)] = true
numshard++
if fec.rx[i].flag == typeData {
if dec.rx[i].flag == typeData {
numDataShard++
}
if numshard == 1 {
first = i
}
if len(fec.rx[i].data) > maxlen {
maxlen = len(fec.rx[i].data)
if len(dec.rx[i].data) > maxlen {
maxlen = len(dec.rx[i].data)
}
}
}
if numDataShard == fec.dataShards { // no lost
for i := first; i < first+numshard; i++ { // free
xmitBuf.Put(fec.rx[i].data)
}
copy(fec.rx[first:], fec.rx[first+numshard:])
for i := 0; i < numshard; i++ { // dereference
fec.rx[len(fec.rx)-1-i] = fecPacket{}
}
fec.rx = fec.rx[:len(fec.rx)-numshard]
} else if numshard >= fec.dataShards { // recoverable
if numDataShard == dec.dataShards {
// case 1: no lost data shards
dec.rx = dec.freeRange(first, numshard, dec.rx)
} else if numshard >= dec.dataShards {
// case 2: data shard lost, but recoverable from parity shard
for k := range shards {
if shards[k] != nil {
dlen := len(shards[k])
@@ -182,49 +157,147 @@ func (fec *FEC) Decode(pkt fecPacket) (recovered [][]byte) {
xorBytes(shards[k][dlen:], shards[k][dlen:], shards[k][dlen:])
}
}
if err := fec.enc.Reconstruct(shards); err == nil {
for k := range shards[:fec.dataShards] {
if err := dec.codec.Reconstruct(shards); err == nil {
for k := range shards[:dec.dataShards] {
if !shardsflag[k] {
recovered = append(recovered, shards[k])
}
}
}
for i := first; i < first+numshard; i++ { // free
xmitBuf.Put(fec.rx[i].data)
}
copy(fec.rx[first:], fec.rx[first+numshard:])
for i := 0; i < numshard; i++ { // dereference
fec.rx[len(fec.rx)-1-i] = fecPacket{}
}
fec.rx = fec.rx[:len(fec.rx)-numshard]
dec.rx = dec.freeRange(first, numshard, dec.rx)
}
}
// keep rxlimit
if len(fec.rx) > fec.rxlimit {
if fec.rx[0].flag == typeData { // record unrecoverable data
if len(dec.rx) > dec.rxlimit {
if dec.rx[0].flag == typeData { // record unrecoverable data
atomic.AddUint64(&DefaultSnmp.FECShortShards, 1)
}
xmitBuf.Put(fec.rx[0].data) // free
fec.rx[0].data = nil
fec.rx = fec.rx[1:]
dec.rx = dec.freeRange(0, 1, dec.rx)
}
return
}
// Encode a group of datashards
func (fec *FEC) Encode(data [][]byte, offset, maxlen int) (ecc [][]byte) {
if len(data) != fec.shardSize {
// free a range of fecPacket, and zero for GC recycling
func (dec *FECDecoder) freeRange(first, n int, q []fecPacket) []fecPacket {
for i := first; i < first+n; i++ { // free
xmitBuf.Put(q[i].data)
}
copy(q[first:], q[first+n:])
for i := 0; i < n; i++ { // dereference data
q[len(q)-1-i].data = nil
}
return q[:len(q)-n]
}
type (
// FECEncoder for encoding outgoing packets
FECEncoder struct {
dataShards int
parityShards int
shardSize int
paws uint32 // Protect Against Wrapped Sequence numbers
next uint32 // next seqid
shardCount int // count the number of datashards collected
maxSize int // record maximum data length in datashard
headerOffset int // FEC header offset
payloadOffset int // FEC payload offset
// caches
shardCache [][]byte
encodeCache [][]byte
// RS encoder
codec reedsolomon.Encoder
}
)
func newFECEncoder(dataShards, parityShards, offset int) *FECEncoder {
if dataShards <= 0 || parityShards <= 0 {
return nil
}
shards := fec.encodeCache
for k := range shards {
shards[k] = data[k][offset:maxlen]
fec := new(FECEncoder)
fec.dataShards = dataShards
fec.parityShards = parityShards
fec.shardSize = dataShards + parityShards
fec.paws = (0xffffffff/uint32(fec.shardSize) - 1) * uint32(fec.shardSize)
fec.headerOffset = offset
fec.payloadOffset = fec.headerOffset + fecHeaderSize
enc, err := reedsolomon.New(dataShards, parityShards, reedsolomon.WithMaxGoroutines(1))
if err != nil {
return nil
}
fec.codec = enc
// caches
fec.encodeCache = make([][]byte, fec.shardSize)
fec.shardCache = make([][]byte, fec.shardSize)
for k := range fec.shardCache {
fec.shardCache[k] = make([]byte, mtuLimit)
}
return fec
}
// Encode the packet, output parity shards if we have enough datashards
// the content of returned parityshards will change in next Encode
func (enc *FECEncoder) Encode(b []byte) (ps [][]byte) {
enc.markData(b[enc.headerOffset:])
binary.LittleEndian.PutUint16(b[enc.payloadOffset:], uint16(len(b[enc.payloadOffset:])))
// copy data to fec datashards
sz := len(b)
enc.shardCache[enc.shardCount] = enc.shardCache[enc.shardCount][:sz]
copy(enc.shardCache[enc.shardCount], b)
enc.shardCount++
// record max datashard length
if sz > enc.maxSize {
enc.maxSize = sz
}
if err := fec.enc.Encode(shards); err != nil {
return nil
// calculate Reed-Solomon Erasure Code
if enc.shardCount == enc.dataShards {
// bzero each datashard's tail
for i := 0; i < enc.dataShards; i++ {
shard := enc.shardCache[i]
slen := len(shard)
xorBytes(shard[slen:enc.maxSize], shard[slen:enc.maxSize], shard[slen:enc.maxSize])
}
// construct equal-sized slice with stripped header
cache := enc.encodeCache
for k := range cache {
cache[k] = enc.shardCache[k][enc.payloadOffset:enc.maxSize]
}
// rs encode
if err := enc.codec.Encode(cache); err == nil {
ps = enc.shardCache[enc.dataShards:]
for k := range ps {
enc.markFEC(ps[k][enc.headerOffset:])
ps[k] = ps[k][:enc.maxSize]
}
}
// reset counters to zero
enc.shardCount = 0
enc.maxSize = 0
}
return data[fec.dataShards:]
return
}
func (enc *FECEncoder) markData(data []byte) {
binary.LittleEndian.PutUint32(data, enc.next)
binary.LittleEndian.PutUint16(data[4:], typeData)
enc.next++
}
func (enc *FECEncoder) markFEC(data []byte) {
binary.LittleEndian.PutUint32(data, enc.next)
binary.LittleEndian.PutUint16(data[4:], typeFEC)
enc.next = (enc.next + 1) % enc.paws
}

213
vendor/github.com/xtaci/kcp-go/kcp.go generated vendored
View File

@@ -94,17 +94,17 @@ func _itimediff(later, earlier uint32) int32 {
// Segment defines a KCP segment
type Segment struct {
conv uint32
cmd uint32
frg uint32
wnd uint32
cmd uint8
frg uint8
wnd uint16
ts uint32
sn uint32
una uint32
data []byte
resendts uint32
rto uint32
fastack uint32
xmit uint32
resendts uint32
fastack uint32
data []byte
}
// encode a segment into buffer
@@ -129,7 +129,7 @@ type KCP struct {
rx_rttvar, rx_srtt int32
rx_rto, rx_minrto uint32
snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe uint32
interval, ts_flush, xmit uint32
interval, ts_flush uint32
nodelay, updated uint32
ts_probe, probe_wait uint32
dead_link, incr uint32
@@ -144,9 +144,8 @@ type KCP struct {
acklist []ackItem
buffer []byte
output Output
datashard, parityshard int
buffer []byte
output Output
}
type ackItem struct {
@@ -176,14 +175,13 @@ func NewKCP(conv uint32, output Output) *KCP {
}
// newSegment creates a KCP segment
func (kcp *KCP) newSegment(size int) *Segment {
seg := new(Segment)
func (kcp *KCP) newSegment(size int) (seg Segment) {
seg.data = xmitBuf.Get().([]byte)[:size]
return seg
return
}
// delSegment recycles a KCP segment
func (kcp *KCP) delSegment(seg *Segment) {
func (kcp *KCP) delSegment(seg Segment) {
xmitBuf.Put(seg.data)
}
@@ -240,12 +238,14 @@ func (kcp *KCP) Recv(buffer []byte) (n int) {
buffer = buffer[len(seg.data):]
n += len(seg.data)
count++
kcp.delSegment(seg)
kcp.delSegment(*seg)
if seg.frg == 0 {
break
}
}
kcp.rcv_queue = kcp.rcv_queue[count:]
if count > 0 {
kcp.rcv_queue = kcp.remove_front(kcp.rcv_queue, count)
}
// move available data from rcv_buf -> rcv_queue
count = 0
@@ -258,8 +258,11 @@ func (kcp *KCP) Recv(buffer []byte) (n int) {
break
}
}
kcp.rcv_queue = append(kcp.rcv_queue, kcp.rcv_buf[:count]...)
kcp.rcv_buf = kcp.rcv_buf[count:]
if count > 0 {
kcp.rcv_queue = append(kcp.rcv_queue, kcp.rcv_buf[:count]...)
kcp.rcv_buf = kcp.remove_front(kcp.rcv_buf, count)
}
// fast recover
if len(kcp.rcv_queue) < int(kcp.rcv_wnd) && fast_recover {
@@ -281,20 +284,20 @@ func (kcp *KCP) Send(buffer []byte) int {
if kcp.stream != 0 {
n := len(kcp.snd_queue)
if n > 0 {
old := &kcp.snd_queue[n-1]
if len(old.data) < int(kcp.mss) {
capacity := int(kcp.mss) - len(old.data)
seg := &kcp.snd_queue[n-1]
if len(seg.data) < int(kcp.mss) {
capacity := int(kcp.mss) - len(seg.data)
extend := capacity
if len(buffer) < capacity {
extend = len(buffer)
}
seg := kcp.newSegment(len(old.data) + extend)
seg.frg = 0
copy(seg.data, old.data)
copy(seg.data[len(old.data):], buffer)
// grow slice, the underlying cap is guaranteed to
// be larger than kcp.mss
oldlen := len(seg.data)
seg.data = seg.data[:oldlen+extend]
copy(seg.data[oldlen:], buffer)
buffer = buffer[extend:]
kcp.delSegment(old)
kcp.snd_queue[n-1] = *seg
}
}
@@ -327,11 +330,11 @@ func (kcp *KCP) Send(buffer []byte) int {
seg := kcp.newSegment(size)
copy(seg.data, buffer[:size])
if kcp.stream == 0 { // message mode
seg.frg = uint32(count - i - 1)
seg.frg = uint8(count - i - 1)
} else { // stream mode
seg.frg = 0
}
kcp.snd_queue = append(kcp.snd_queue, *seg)
kcp.snd_queue = append(kcp.snd_queue, seg)
buffer = buffer[size:]
}
return 0
@@ -379,7 +382,7 @@ func (kcp *KCP) parse_ack(sn uint32) {
for k := range kcp.snd_buf {
seg := &kcp.snd_buf[k]
if sn == seg.sn {
kcp.delSegment(seg)
kcp.delSegment(*seg)
copy(kcp.snd_buf[k:], kcp.snd_buf[k+1:])
kcp.snd_buf[len(kcp.snd_buf)-1] = Segment{}
kcp.snd_buf = kcp.snd_buf[:len(kcp.snd_buf)-1]
@@ -411,13 +414,15 @@ func (kcp *KCP) parse_una(una uint32) {
for k := range kcp.snd_buf {
seg := &kcp.snd_buf[k]
if _itimediff(una, seg.sn) > 0 {
kcp.delSegment(seg)
kcp.delSegment(*seg)
count++
} else {
break
}
}
kcp.snd_buf = kcp.snd_buf[count:]
if count > 0 {
kcp.snd_buf = kcp.remove_front(kcp.snd_buf, count)
}
}
// ack append
@@ -425,7 +430,7 @@ func (kcp *KCP) ack_push(sn, ts uint32) {
kcp.acklist = append(kcp.acklist, ackItem{sn, ts})
}
func (kcp *KCP) parse_data(newseg *Segment) {
func (kcp *KCP) parse_data(newseg Segment) {
sn := newseg.sn
if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) >= 0 ||
_itimediff(sn, kcp.rcv_nxt) < 0 {
@@ -451,11 +456,11 @@ func (kcp *KCP) parse_data(newseg *Segment) {
if !repeat {
if insert_idx == n+1 {
kcp.rcv_buf = append(kcp.rcv_buf, *newseg)
kcp.rcv_buf = append(kcp.rcv_buf, newseg)
} else {
kcp.rcv_buf = append(kcp.rcv_buf, Segment{})
copy(kcp.rcv_buf[insert_idx+1:], kcp.rcv_buf[insert_idx:])
kcp.rcv_buf[insert_idx] = *newseg
kcp.rcv_buf[insert_idx] = newseg
}
} else {
kcp.delSegment(newseg)
@@ -472,8 +477,10 @@ func (kcp *KCP) parse_data(newseg *Segment) {
break
}
}
kcp.rcv_queue = append(kcp.rcv_queue, kcp.rcv_buf[:count]...)
kcp.rcv_buf = kcp.rcv_buf[count:]
if count > 0 {
kcp.rcv_queue = append(kcp.rcv_queue, kcp.rcv_buf[:count]...)
kcp.rcv_buf = kcp.remove_front(kcp.rcv_buf, count)
}
}
// Input when you received a low level packet (eg. UDP packet), call it
@@ -542,9 +549,9 @@ func (kcp *KCP) Input(data []byte, regular, ackNoDelay bool) int {
if _itimediff(sn, kcp.rcv_nxt) >= 0 {
seg := kcp.newSegment(int(length))
seg.conv = conv
seg.cmd = uint32(cmd)
seg.frg = uint32(frg)
seg.wnd = uint32(wnd)
seg.cmd = cmd
seg.frg = frg
seg.wnd = wnd
seg.ts = ts
seg.sn = sn
seg.una = una
@@ -609,59 +616,43 @@ func (kcp *KCP) Input(data []byte, regular, ackNoDelay bool) int {
return 0
}
func (kcp *KCP) wnd_unused() int32 {
func (kcp *KCP) wnd_unused() uint16 {
if len(kcp.rcv_queue) < int(kcp.rcv_wnd) {
return int32(int(kcp.rcv_wnd) - len(kcp.rcv_queue))
return uint16(int(kcp.rcv_wnd) - len(kcp.rcv_queue))
}
return 0
}
// flush pending data
func (kcp *KCP) flush(ackOnly bool) {
buffer := kcp.buffer
change := 0
lost := false
var seg Segment
seg.conv = kcp.conv
seg.cmd = IKCP_CMD_ACK
seg.wnd = uint32(kcp.wnd_unused())
seg.wnd = kcp.wnd_unused()
seg.una = kcp.rcv_nxt
buffer := kcp.buffer
// flush acknowledges
var required []ackItem
for i, ack := range kcp.acklist {
// filter necessary acks only
if ack.sn >= kcp.rcv_nxt || len(kcp.acklist)-1 == i {
required = append(required, kcp.acklist[i])
}
}
kcp.acklist = nil
ptr := buffer
maxBatchSize := kcp.mtu / IKCP_OVERHEAD
for len(required) > 0 {
var batchSize int
if kcp.datashard > 0 && kcp.parityshard > 0 { // try triggering FEC
batchSize = int(_ibound_(1, uint32(len(required)/kcp.datashard), maxBatchSize))
} else {
batchSize = int(_ibound_(1, uint32(len(required)), maxBatchSize))
}
for len(required) >= batchSize {
for i := 0; i < batchSize; i++ {
ack := required[i]
seg.sn, seg.ts = ack.sn, ack.ts
ptr = seg.encode(ptr)
}
size := len(buffer) - len(ptr)
for i, ack := range kcp.acklist {
size := len(buffer) - len(ptr)
if size+IKCP_OVERHEAD > int(kcp.mtu) {
kcp.output(buffer, size)
ptr = buffer
required = required[batchSize:]
}
// filter jitters caused by bufferbloat
if ack.sn >= kcp.rcv_nxt || len(kcp.acklist)-1 == i {
seg.sn, seg.ts = ack.sn, ack.ts
ptr = seg.encode(ptr)
}
}
kcp.acklist = kcp.acklist[0:0]
if ackOnly { // flush acks only
if ackOnly { // flash remain ack segments
size := len(buffer) - len(ptr)
if size > 0 {
kcp.output(buffer, size)
}
return
}
@@ -734,7 +725,9 @@ func (kcp *KCP) flush(ackOnly bool) {
newSegsCount++
kcp.snd_queue[k].data = nil
}
kcp.snd_queue = kcp.snd_queue[newSegsCount:]
if newSegsCount > 0 {
kcp.snd_queue = kcp.remove_front(kcp.snd_queue, newSegsCount)
}
// calculate resent
resent := uint32(kcp.fastresend)
@@ -742,53 +735,28 @@ func (kcp *KCP) flush(ackOnly bool) {
resent = 0xffffffff
}
// counters
var lostSegs, fastRetransSegs, earlyRetransSegs uint64
// send new segments
for k := len(kcp.snd_buf) - newSegsCount; k < len(kcp.snd_buf); k++ {
current := currentMs()
segment := &kcp.snd_buf[k]
segment.xmit++
segment.rto = kcp.rx_rto
segment.resendts = current + segment.rto
segment.ts = current
segment.wnd = seg.wnd
segment.una = kcp.rcv_nxt
size := len(buffer) - len(ptr)
need := IKCP_OVERHEAD + len(segment.data)
if size+need > int(kcp.mtu) {
kcp.output(buffer, size)
ptr = buffer
}
ptr = segment.encode(ptr)
copy(ptr, segment.data)
ptr = ptr[len(segment.data):]
}
// check for retransmissions
for k := 0; k < len(kcp.snd_buf)-newSegsCount; k++ {
current := currentMs()
current := currentMs()
var change, lost, lostSegs, fastRetransSegs, earlyRetransSegs uint64
for k := range kcp.snd_buf {
segment := &kcp.snd_buf[k]
needsend := false
if _itimediff(current, segment.resendts) >= 0 { // RTO
if segment.xmit == 0 { // initial transmit
needsend = true
segment.rto = kcp.rx_rto
segment.resendts = current + segment.rto
} else if _itimediff(current, segment.resendts) >= 0 { // RTO
needsend = true
segment.xmit++
kcp.xmit++
if kcp.nodelay == 0 {
segment.rto += kcp.rx_rto
} else {
segment.rto += kcp.rx_rto / 2
}
segment.resendts = current + segment.rto
lost = true
lost++
lostSegs++
} else if segment.fastack >= resent { // fast retransmit
needsend = true
segment.xmit++
segment.fastack = 0
segment.rto = kcp.rx_rto
segment.resendts = current + segment.rto
@@ -796,7 +764,6 @@ func (kcp *KCP) flush(ackOnly bool) {
fastRetransSegs++
} else if segment.fastack > 0 && newSegsCount == 0 { // early retransmit
needsend = true
segment.xmit++
segment.fastack = 0
segment.rto = kcp.rx_rto
segment.resendts = current + segment.rto
@@ -805,15 +772,17 @@ func (kcp *KCP) flush(ackOnly bool) {
}
if needsend {
segment.xmit++
segment.ts = current
segment.wnd = seg.wnd
segment.una = kcp.rcv_nxt
segment.una = seg.una
size := len(buffer) - len(ptr)
need := IKCP_OVERHEAD + len(segment.data)
if size+need > int(kcp.mtu) {
kcp.output(buffer, size)
current = currentMs() // time update for a blocking call
ptr = buffer
}
@@ -852,7 +821,7 @@ func (kcp *KCP) flush(ackOnly bool) {
// update ssthresh
// rate halving, https://tools.ietf.org/html/rfc6937
if change != 0 {
if change > 0 {
inflight := kcp.snd_nxt - kcp.snd_una
kcp.ssthresh = inflight / 2
if kcp.ssthresh < IKCP_THRESH_MIN {
@@ -863,7 +832,7 @@ func (kcp *KCP) flush(ackOnly bool) {
}
// congestion control, https://tools.ietf.org/html/rfc5681
if lost {
if lost > 0 {
kcp.ssthresh = cwnd / 2
if kcp.ssthresh < IKCP_THRESH_MIN {
kcp.ssthresh = IKCP_THRESH_MIN
@@ -956,12 +925,6 @@ func (kcp *KCP) Check() uint32 {
return current + minimal
}
// set datashard,parityshard info for some optimizations
func (kcp *KCP) setFEC(datashard, parityshard int) {
kcp.datashard = datashard
kcp.parityshard = parityshard
}
// SetMtu changes MTU size, default is 1400
func (kcp *KCP) SetMtu(mtu int) int {
if mtu < 50 || mtu < IKCP_OVERHEAD {
@@ -1025,11 +988,11 @@ func (kcp *KCP) WaitSnd() int {
return len(kcp.snd_buf) + len(kcp.snd_queue)
}
// Cwnd returns current congestion window size
func (kcp *KCP) Cwnd() uint32 {
cwnd := _imin_(kcp.snd_wnd, kcp.rmt_wnd)
if kcp.nocwnd == 0 {
cwnd = _imin_(kcp.cwnd, cwnd)
// remove front n elements from queue
func (kcp *KCP) remove_front(q []Segment, n int) []Segment {
newn := copy(q, q[n:])
for i := newn; i < len(q); i++ {
q[i] = Segment{} // manual set nil for GC
}
return cwnd
return q[:newn]
}

View File

@@ -33,13 +33,16 @@ const (
cryptHeaderSize = nonceSize + crcSize
// maximum packet size
mtuLimit = 2048
// packet receiving channel limit
rxQueueLimit = 2048
mtuLimit = 1500
// FEC keeps rxFECMulti* (dataShard+parityShard) ordered packets in memory
rxFECMulti = 3
// accept backlog
acceptBacklog = 128
// prerouting(to session) queue
qlen = 128
)
const (
@@ -72,24 +75,24 @@ type (
block BlockCrypt // block encryption
// kcp receiving is based on packets
// sockbuff turns packets into stream
sockbuff []byte
// recvbuf turns packets into stream
recvbuf []byte
bufptr []byte
// extended output buffer(with header)
ext []byte
fec *FEC // forward error correction
fecDataShards [][]byte // data shards cache
fecShardCount int // count the number of datashards collected
fecMaxSize int // record maximum data length in datashard
fecHeaderOffset int // FEC header offset in packet
fecPayloadOffset int // FEC payload offset in packet
// FEC
fecDecoder *FECDecoder
fecEncoder *FECEncoder
// settings
remote net.Addr // remote peer address
rd time.Time // read deadline
wd time.Time // write deadline
headerSize int // the overall header size added before KCP frame
updateInterval int32 // interval in seconds to call kcp.flush()
ackNoDelay bool // send ack immediately for each incoming packet
remote net.Addr // remote peer address
rd time.Time // read deadline
wd time.Time // write deadline
headerSize int // the overall header size added before KCP frame
updateInterval time.Duration // interval in seconds to call kcp.flush()
ackNoDelay bool // send ack immediately for each incoming packet
writeDelay bool // delay kcp.flush() for Write() for bulk transfer
// notifications
die chan struct{} // notify session has Closed
@@ -120,39 +123,41 @@ func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn
sess.conn = conn
sess.l = l
sess.block = block
sess.recvbuf = make([]byte, mtuLimit)
// FEC initialization
sess.fec = newFEC(rxFECMulti*(dataShards+parityShards), dataShards, parityShards)
if sess.fec != nil {
if sess.block != nil {
sess.fecHeaderOffset = cryptHeaderSize
}
sess.fecPayloadOffset = sess.fecHeaderOffset + fecHeaderSize
// fec data shards
sess.fecDataShards = make([][]byte, sess.fec.shardSize)
for k := range sess.fecDataShards {
sess.fecDataShards[k] = make([]byte, mtuLimit)
}
sess.fecDecoder = newFECDecoder(rxFECMulti*(dataShards+parityShards), dataShards, parityShards)
if sess.block != nil {
sess.fecEncoder = newFECEncoder(dataShards, parityShards, cryptHeaderSize)
} else {
sess.fecEncoder = newFECEncoder(dataShards, parityShards, 0)
}
// calculate header size
if sess.block != nil {
sess.headerSize += cryptHeaderSize
}
if sess.fec != nil {
if sess.fecEncoder != nil {
sess.headerSize += fecHeaderSizePlus2
}
// only allocate extended packet buffer
// when the extra header is required
if sess.headerSize > 0 {
sess.ext = make([]byte, mtuLimit)
}
sess.kcp = NewKCP(conv, func(buf []byte, size int) {
if size >= IKCP_OVERHEAD {
sess.output(buf[:size])
}
})
sess.kcp.SetMtu(IKCP_MTU_DEF - sess.headerSize)
sess.kcp.setFEC(dataShards, parityShards)
// add current session to the global updater,
// which periodically calls sess.update()
updater.addSession(sess)
if sess.l == nil { // it's a client connection
go sess.readLoop()
atomic.AddUint64(&DefaultSnmp.ActiveOpens, 1)
@@ -168,13 +173,13 @@ func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn
return sess
}
// Read implements the Conn Read method.
// Read implements net.Conn
func (s *UDPSession) Read(b []byte) (n int, err error) {
for {
s.mu.Lock()
if len(s.sockbuff) > 0 { // copy from buffer
n = copy(b, s.sockbuff)
s.sockbuff = s.sockbuff[n:]
if len(s.bufptr) > 0 { // copy from buffer into b
n = copy(b, s.bufptr)
s.bufptr = s.bufptr[n:]
s.mu.Unlock()
return n, nil
}
@@ -184,30 +189,38 @@ func (s *UDPSession) Read(b []byte) (n int, err error) {
return 0, errors.New(errBrokenPipe)
}
if !s.rd.IsZero() {
if time.Now().After(s.rd) { // timeout
s.mu.Unlock()
return 0, errTimeout{}
}
}
if n := s.kcp.PeekSize(); n > 0 { // data arrived
if len(b) >= n {
if size := s.kcp.PeekSize(); size > 0 { // peek data size from kcp
atomic.AddUint64(&DefaultSnmp.BytesReceived, uint64(size))
if len(b) >= size { // direct write to b
s.kcp.Recv(b)
} else {
buf := make([]byte, n)
s.kcp.Recv(buf)
n = copy(b, buf)
s.sockbuff = buf[n:] // store remaining bytes into sockbuff for next read
s.mu.Unlock()
return size, nil
}
// resize kcp receive buffer
// to make sure recvbuf has enough capacity
if cap(s.recvbuf) < size {
s.recvbuf = make([]byte, size)
}
// resize recvbuf slice length
s.recvbuf = s.recvbuf[:size]
s.kcp.Recv(s.recvbuf)
n = copy(b, s.recvbuf) // copy to b
s.bufptr = s.recvbuf[n:] // update pointer
s.mu.Unlock()
atomic.AddUint64(&DefaultSnmp.BytesReceived, uint64(n))
return n, nil
}
// read deadline
var timeout *time.Timer
var c <-chan time.Time
if !s.rd.IsZero() {
if time.Now().After(s.rd) {
s.mu.Unlock()
return 0, errTimeout{}
}
delay := s.rd.Sub(time.Now())
timeout = time.NewTimer(delay)
c = timeout.C
@@ -227,7 +240,7 @@ func (s *UDPSession) Read(b []byte) (n int, err error) {
}
}
// Write implements the Conn Write method.
// Write implements net.Conn
func (s *UDPSession) Write(b []byte) (n int, err error) {
for {
s.mu.Lock()
@@ -236,14 +249,8 @@ func (s *UDPSession) Write(b []byte) (n int, err error) {
return 0, errors.New(errBrokenPipe)
}
if !s.wd.IsZero() {
if time.Now().After(s.wd) { // timeout
s.mu.Unlock()
return 0, errTimeout{}
}
}
if s.kcp.WaitSnd() < int(s.kcp.Cwnd()) {
// api flow control
if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
n = len(b)
for {
if len(b) <= int(s.kcp.mss) {
@@ -255,15 +262,22 @@ func (s *UDPSession) Write(b []byte) (n int, err error) {
}
}
s.kcp.flush(false)
if !s.writeDelay {
s.kcp.flush(false)
}
s.mu.Unlock()
atomic.AddUint64(&DefaultSnmp.BytesSent, uint64(n))
return n, nil
}
// write deadline
var timeout *time.Timer
var c <-chan time.Time
if !s.wd.IsZero() {
if time.Now().After(s.wd) {
s.mu.Unlock()
return 0, errTimeout{}
}
delay := s.wd.Sub(time.Now())
timeout = time.NewTimer(delay)
c = timeout.C
@@ -286,6 +300,9 @@ func (s *UDPSession) Write(b []byte) (n int, err error) {
// Close closes the connection.
func (s *UDPSession) Close() error {
updater.removeSession(s)
if s.l != nil { // notify listener
s.l.closeSession(s.remote)
}
s.mu.Lock()
defer s.mu.Unlock()
@@ -298,7 +315,6 @@ func (s *UDPSession) Close() error {
if s.l == nil { // client socket close
return s.conn.Close()
}
return nil
}
@@ -333,6 +349,13 @@ func (s *UDPSession) SetWriteDeadline(t time.Time) error {
return nil
}
// SetWriteDelay delays write for bulk transfer until the next update interval
func (s *UDPSession) SetWriteDelay(delay bool) {
s.mu.Lock()
defer s.mu.Unlock()
s.writeDelay = delay
}
// SetWindowSize set maximum window size
func (s *UDPSession) SetWindowSize(sndwnd, rcvwnd int) {
s.mu.Lock()
@@ -375,7 +398,7 @@ func (s *UDPSession) SetNoDelay(nodelay, interval, resend, nc int) {
s.mu.Lock()
defer s.mu.Unlock()
s.kcp.NoDelay(nodelay, interval, resend, nc)
atomic.StoreInt32(&s.updateInterval, int32(interval))
s.updateInterval = time.Duration(interval) * time.Millisecond
}
// SetDSCP sets the 6bit DSCP field of IP header, no effect if it's accepted from Listener
@@ -418,54 +441,24 @@ func (s *UDPSession) SetWriteBuffer(bytes int) error {
// output pipeline entry
// steps for output data processing:
// 0. Header extends
// 1. FEC
// 2. CRC32
// 3. Encryption
// 4. emit to emitTask
// 5. emitTask WriteTo kernel
// 4. WriteTo kernel
func (s *UDPSession) output(buf []byte) {
var ecc [][]byte
// extend buf's header space
ext := xmitBuf.Get().([]byte)[:s.headerSize+len(buf)]
copy(ext[s.headerSize:], buf)
ext := buf
if s.headerSize > 0 {
ext = s.ext[:s.headerSize+len(buf)]
copy(ext[s.headerSize:], buf)
}
// FEC stage
if s.fec != nil {
s.fec.markData(ext[s.fecHeaderOffset:])
binary.LittleEndian.PutUint16(ext[s.fecPayloadOffset:], uint16(len(ext[s.fecPayloadOffset:])))
// copy data to fec datashards
sz := len(ext)
s.fecDataShards[s.fecShardCount] = s.fecDataShards[s.fecShardCount][:sz]
copy(s.fecDataShards[s.fecShardCount], ext)
s.fecShardCount++
// record max datashard length
if sz > s.fecMaxSize {
s.fecMaxSize = sz
}
// calculate Reed-Solomon Erasure Code
if s.fecShardCount == s.fec.dataShards {
// bzero each datashard's tail
for i := 0; i < s.fec.dataShards; i++ {
shard := s.fecDataShards[i]
slen := len(shard)
xorBytes(shard[slen:s.fecMaxSize], shard[slen:s.fecMaxSize], shard[slen:s.fecMaxSize])
}
// calculation of RS
ecc = s.fec.Encode(s.fecDataShards, s.fecPayloadOffset, s.fecMaxSize)
for k := range ecc {
s.fec.markFEC(ecc[k][s.fecHeaderOffset:])
ecc[k] = ecc[k][:s.fecMaxSize]
}
// reset counters to zero
s.fecShardCount = 0
s.fecMaxSize = 0
}
if s.fecEncoder != nil {
ecc = s.fecEncoder.Encode(ext)
}
// encryption stage
@@ -485,24 +478,38 @@ func (s *UDPSession) output(buf []byte) {
}
}
// emit stage
defaultEmitter.emit(emitPacket{s.conn, s.remote, ext, true})
// WriteTo kernel
nbytes := 0
npkts := 0
// if mrand.Intn(100) < 50 {
if n, err := s.conn.WriteTo(ext, s.remote); err == nil {
nbytes += n
npkts++
}
// }
if ecc != nil {
for k := range ecc {
defaultEmitter.emit(emitPacket{s.conn, s.remote, ecc[k], false})
if n, err := s.conn.WriteTo(ecc[k], s.remote); err == nil {
nbytes += n
npkts++
}
}
}
atomic.AddUint64(&DefaultSnmp.OutPkts, uint64(npkts))
atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes))
}
// kcp update, returns interval for next calling
func (s *UDPSession) update() time.Duration {
func (s *UDPSession) update() (interval time.Duration) {
s.mu.Lock()
s.kcp.flush(false)
if s.kcp.WaitSnd() < int(s.kcp.Cwnd()) {
if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
s.notifyWriteEvent()
}
interval = s.updateInterval
s.mu.Unlock()
return time.Duration(atomic.LoadInt32(&s.updateInterval)) * time.Millisecond
return
}
// GetConv gets conversation id of a session
@@ -527,8 +534,8 @@ func (s *UDPSession) notifyWriteEvent() {
func (s *UDPSession) kcpInput(data []byte) {
var kcpInErrors, fecErrs, fecRecovered, fecParityShards uint64
if s.fec != nil {
f := s.fec.decodeBytes(data)
if s.fecDecoder != nil {
f := s.fecDecoder.decodeBytes(data)
s.mu.Lock()
if f.flag == typeData {
if ret := s.kcp.Input(data[fecHeaderSizePlus2:], true, s.ackNoDelay); ret != 0 {
@@ -541,7 +548,7 @@ func (s *UDPSession) kcpInput(data []byte) {
fecParityShards++
}
if recovers := s.fec.Decode(f); recovers != nil {
if recovers := s.fecDecoder.Decode(f); recovers != nil {
for _, r := range recovers {
if len(r) >= 2 { // must be larger than 2bytes
sz := binary.LittleEndian.Uint16(r)
@@ -601,6 +608,7 @@ func (s *UDPSession) receiver(ch chan []byte) {
select {
case ch <- data[:n]:
case <-s.die:
return
}
} else if err != nil {
return
@@ -612,7 +620,7 @@ func (s *UDPSession) receiver(ch chan []byte) {
// read loop for client session
func (s *UDPSession) readLoop() {
chPacket := make(chan []byte, rxQueueLimit)
chPacket := make(chan []byte, qlen)
go s.receiver(chPacket)
for {
@@ -650,16 +658,16 @@ type (
block BlockCrypt // block encryption
dataShards int // FEC data shard
parityShards int // FEC parity shard
fec *FEC // FEC mock initialization
fecDecoder *FECDecoder // FEC mock initialization
conn net.PacketConn // the underlying packet connection
sessions map[string]*UDPSession // all sessions accepted by this Listener
chAccepts chan *UDPSession // Listen() backlog
chDeadlinks chan net.Addr // session close queue
headerSize int // the overall header size added before KCP frame
die chan struct{} // notify the listener has closed
rd atomic.Value // read deadline for Accept()
wd atomic.Value
sessions map[string]*UDPSession // all sessions accepted by this Listener
chAccepts chan *UDPSession // Listen() backlog
chSessionClosed chan net.Addr // session close queue
headerSize int // the overall header size added before KCP frame
die chan struct{} // notify the listener has closed
rd atomic.Value // read deadline for Accept()
wd atomic.Value
}
// incoming packet
@@ -671,7 +679,7 @@ type (
// monitor incoming data for all connections of server
func (l *Listener) monitor() {
chPacket := make(chan inPacket, rxQueueLimit)
chPacket := make(chan inPacket, qlen)
go l.receiver(chPacket)
for {
select {
@@ -698,24 +706,26 @@ func (l *Listener) monitor() {
addr := from.String()
s, ok := l.sessions[addr]
if !ok { // new session
var conv uint32
convValid := false
if l.fec != nil {
isfec := binary.LittleEndian.Uint16(data[4:])
if isfec == typeData {
conv = binary.LittleEndian.Uint32(data[fecHeaderSizePlus2:])
if len(l.chAccepts) < cap(l.chAccepts) { // do not let new session overwhelm accept queue
var conv uint32
convValid := false
if l.fecDecoder != nil {
isfec := binary.LittleEndian.Uint16(data[4:])
if isfec == typeData {
conv = binary.LittleEndian.Uint32(data[fecHeaderSizePlus2:])
convValid = true
}
} else {
conv = binary.LittleEndian.Uint32(data)
convValid = true
}
} else {
conv = binary.LittleEndian.Uint32(data)
convValid = true
}
if convValid {
s := newUDPSession(conv, l.dataShards, l.parityShards, l, l.conn, from, l.block)
s.kcpInput(data)
l.sessions[addr] = s
l.chAccepts <- s
if convValid {
s := newUDPSession(conv, l.dataShards, l.parityShards, l, l.conn, from, l.block)
s.kcpInput(data)
l.sessions[addr] = s
l.chAccepts <- s
}
}
} else {
s.kcpInput(data)
@@ -723,7 +733,7 @@ func (l *Listener) monitor() {
}
xmitBuf.Put(raw)
case deadlink := <-l.chDeadlinks:
case deadlink := <-l.chSessionClosed:
delete(l.sessions, deadlink.String())
case <-l.die:
return
@@ -735,7 +745,11 @@ func (l *Listener) receiver(ch chan inPacket) {
for {
data := xmitBuf.Get().([]byte)[:mtuLimit]
if n, from, err := l.conn.ReadFrom(data); err == nil && n >= l.headerSize+IKCP_OVERHEAD {
ch <- inPacket{from, data[:n]}
select {
case ch <- inPacket{from, data[:n]}:
case <-l.die:
return
}
} else if err != nil {
return
} else {
@@ -815,6 +829,16 @@ func (l *Listener) Close() error {
return l.conn.Close()
}
// closeSession notify the listener that a session has closed
func (l *Listener) closeSession(remote net.Addr) bool {
select {
case l.chSessionClosed <- remote:
return true
case <-l.die:
return false
}
}
// Addr returns the listener's network address, The Addr returned is shared by all invocations of Addr, so do not modify it.
func (l *Listener) Addr() net.Addr {
return l.conn.LocalAddr()
@@ -845,19 +869,19 @@ func ServeConn(block BlockCrypt, dataShards, parityShards int, conn net.PacketCo
l := new(Listener)
l.conn = conn
l.sessions = make(map[string]*UDPSession)
l.chAccepts = make(chan *UDPSession, 1024)
l.chDeadlinks = make(chan net.Addr, 1024)
l.chAccepts = make(chan *UDPSession, acceptBacklog)
l.chSessionClosed = make(chan net.Addr)
l.die = make(chan struct{})
l.dataShards = dataShards
l.parityShards = parityShards
l.block = block
l.fec = newFEC(rxFECMulti*(dataShards+parityShards), dataShards, parityShards)
l.fecDecoder = newFECDecoder(rxFECMulti*(dataShards+parityShards), dataShards, parityShards)
// calculate header size
if l.block != nil {
l.headerSize += cryptHeaderSize
}
if l.fec != nil {
if l.fecDecoder != nil {
l.headerSize += fecHeaderSizePlus2
}

View File

@@ -45,6 +45,7 @@ func (h *updateHeap) Push(x interface{}) {
func (h *updateHeap) Pop() interface{} {
n := len(h.entries)
x := h.entries[n-1]
h.entries[n-1] = entry{} // manual set nil for GC
h.entries = h.entries[0 : n-1]
delete(h.indices, x.sid)
return x

View File

@@ -36,8 +36,8 @@ type Session struct {
config *Config
nextStreamID uint32 // next stream identifier
bucket int32 // token bucket
bucketCond *sync.Cond // used for waiting for tokens
bucket int32 // token bucket
bucketNotify chan struct{} // used for waiting for tokens
streams map[uint32]*Stream // all streams in this session
streamLock sync.Mutex // locks streams
@@ -61,7 +61,7 @@ func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session {
s.streams = make(map[uint32]*Stream)
s.chAccepts = make(chan *Stream, defaultAcceptBacklog)
s.bucket = int32(config.MaxReceiveBuffer)
s.bucketCond = sync.NewCond(&sync.Mutex{})
s.bucketNotify = make(chan struct{}, 1)
s.writes = make(chan writeRequest)
if client {
@@ -129,11 +129,19 @@ func (s *Session) Close() (err error) {
s.streams[k].sessionClose()
}
s.streamLock.Unlock()
s.bucketCond.Signal()
s.notifyBucket()
return s.conn.Close()
}
}
// notifyBucket notifies recvLoop that bucket is available
func (s *Session) notifyBucket() {
select {
case s.bucketNotify <- struct{}{}:
default:
}
}
// IsClosed does a safe check to see if we have shutdown
func (s *Session) IsClosed() bool {
select {
@@ -166,7 +174,7 @@ func (s *Session) streamClosed(sid uint32) {
s.streamLock.Lock()
if n := s.streams[sid].recycleTokens(); n > 0 { // return remaining tokens to the bucket
if atomic.AddInt32(&s.bucket, int32(n)) > 0 {
s.bucketCond.Signal()
s.notifyBucket()
}
}
delete(s.streams, sid)
@@ -175,12 +183,9 @@ func (s *Session) streamClosed(sid uint32) {
// returnTokens is called by stream to return token after read
func (s *Session) returnTokens(n int) {
oldvalue := atomic.LoadInt32(&s.bucket)
newvalue := atomic.AddInt32(&s.bucket, int32(n))
if oldvalue <= 0 && newvalue > 0 {
s.bucketCond.Signal()
if atomic.AddInt32(&s.bucket, int32(n)) > 0 {
s.notifyBucket()
}
}
// session read a frame from underlying connection
@@ -211,14 +216,8 @@ func (s *Session) readFrame(buffer []byte) (f Frame, err error) {
func (s *Session) recvLoop() {
buffer := make([]byte, (1<<16)+headerSize)
for {
s.bucketCond.L.Lock()
for atomic.LoadInt32(&s.bucket) <= 0 && !s.IsClosed() {
s.bucketCond.Wait()
}
s.bucketCond.L.Unlock()
if s.IsClosed() {
return
<-s.bucketNotify
}
if f, err := s.readFrame(buffer); err == nil {
@@ -272,7 +271,7 @@ func (s *Session) keepalive() {
select {
case <-tickerPing.C:
s.writeFrame(newFrame(cmdNOP, 0))
s.bucketCond.Signal() // force a signal to the recvLoop
s.notifyBucket() // force a signal to the recvLoop
case <-tickerTimeout.C:
if !atomic.CompareAndSwapInt32(&s.dataReady, 1, 0) {
s.Close()

View File

@@ -224,7 +224,7 @@ func (s *Stream) recycleTokens() (n int) {
// split large byte buffer into smaller frames, reference only
func (s *Stream) split(bts []byte, cmd byte, sid uint32) []Frame {
var frames []Frame
frames := make([]Frame, 0, len(bts)/s.frameSize+1)
for len(bts) > s.frameSize {
frame := newFrame(cmd, sid)
frame.data = bts[:s.frameSize]