io.Copy use pool buffer

This commit is contained in:
fatedier 2017-06-06 18:48:40 +08:00
parent 1eaf17fd05
commit 511503d34c
10 changed files with 93 additions and 126 deletions

View File

@ -24,9 +24,9 @@ import (
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/models/plugin"
"github.com/fatedier/frp/models/proto/tcp"
"github.com/fatedier/frp/models/proto/udp"
"github.com/fatedier/frp/utils/errors"
frpIo "github.com/fatedier/frp/utils/io"
"github.com/fatedier/frp/utils/log"
frpNet "github.com/fatedier/frp/utils/net"
)
@ -277,14 +277,14 @@ func HandleTcpWorkConnection(localInfo *config.LocalSvrConf, proxyPlugin plugin.
)
remote = workConn
if baseInfo.UseEncryption {
remote, err = tcp.WithEncryption(remote, []byte(config.ClientCommonCfg.PrivilegeToken))
remote, err = frpIo.WithEncryption(remote, []byte(config.ClientCommonCfg.PrivilegeToken))
if err != nil {
workConn.Error("create encryption stream error: %v", err)
return
}
}
if baseInfo.UseCompression {
remote = tcp.WithCompression(remote)
remote = frpIo.WithCompression(remote)
}
if proxyPlugin != nil {
@ -302,7 +302,7 @@ func HandleTcpWorkConnection(localInfo *config.LocalSvrConf, proxyPlugin plugin.
workConn.Debug("join connections, localConn(l[%s] r[%s]) workConn(l[%s] r[%s])", localConn.LocalAddr().String(),
localConn.RemoteAddr().String(), workConn.LocalAddr().String(), workConn.RemoteAddr().String())
tcp.Join(localConn, remote)
frpIo.Join(localConn, remote)
workConn.Debug("join connections closed")
}
}

View File

@ -23,8 +23,8 @@ import (
"strings"
"sync"
"github.com/fatedier/frp/models/proto/tcp"
"github.com/fatedier/frp/utils/errors"
frpIo "github.com/fatedier/frp/utils/io"
frpNet "github.com/fatedier/frp/utils/net"
)
@ -177,7 +177,7 @@ func (hp *HttpProxy) ConnectHandler(rw http.ResponseWriter, req *http.Request) {
}
client.Write([]byte("HTTP/1.0 200 OK\r\n\r\n"))
go tcp.Join(remote, client)
go frpIo.Join(remote, client)
}
func (hp *HttpProxy) Auth(rw http.ResponseWriter, req *http.Request) bool {

View File

@ -19,7 +19,7 @@ import (
"io"
"net"
"github.com/fatedier/frp/models/proto/tcp"
frpIo "github.com/fatedier/frp/utils/io"
)
const PluginUnixDomainSocket = "unix_domain_socket"
@ -57,7 +57,7 @@ func (uds *UnixDomainSocketPlugin) Handle(conn io.ReadWriteCloser) {
return
}
tcp.Join(localConn, conn)
frpIo.Join(localConn, conn)
}
func (uds *UnixDomainSocketPlugin) Name() string {

View File

@ -1,38 +0,0 @@
// Copyright 2016 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tcp
import (
"io"
"sync"
)
// Join two io.ReadWriteCloser and do some operations.
func Join(c1 io.ReadWriteCloser, c2 io.ReadWriteCloser) (inCount int64, outCount int64) {
var wait sync.WaitGroup
pipe := func(to io.ReadWriteCloser, from io.ReadWriteCloser, count *int64) {
defer to.Close()
defer from.Close()
defer wait.Done()
*count, _ = io.Copy(to, from)
}
wait.Add(2)
go pipe(c1, c2, &inCount)
go pipe(c2, c1, &outCount)
wait.Wait()
return
}

View File

@ -1,67 +0,0 @@
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tcp
import (
"io"
"testing"
"github.com/stretchr/testify/assert"
)
func TestJoin(t *testing.T) {
assert := assert.New(t)
var (
n int
err error
)
text1 := "A document that gives tips for writing clear, idiomatic Go code. A must read for any new Go programmer. It augments the tour and the language specification, both of which should be read first."
text2 := "A document that specifies the conditions under which reads of a variable in one goroutine can be guaranteed to observe values produced by writes to the same variable in a different goroutine."
// Forward bytes directly.
pr, pw := io.Pipe()
pr2, pw2 := io.Pipe()
pr3, pw3 := io.Pipe()
pr4, pw4 := io.Pipe()
conn1 := WrapReadWriteCloser(pr, pw2, nil)
conn2 := WrapReadWriteCloser(pr2, pw, nil)
conn3 := WrapReadWriteCloser(pr3, pw4, nil)
conn4 := WrapReadWriteCloser(pr4, pw3, nil)
go func() {
Join(conn2, conn3)
}()
buf1 := make([]byte, 1024)
buf2 := make([]byte, 1024)
conn1.Write([]byte(text1))
conn4.Write([]byte(text2))
n, err = conn4.Read(buf1)
assert.NoError(err)
assert.Equal(text1, string(buf1[:n]))
n, err = conn1.Read(buf2)
assert.NoError(err)
assert.Equal(text2, string(buf2[:n]))
conn1.Close()
conn2.Close()
conn3.Close()
conn4.Close()
}

View File

@ -24,9 +24,9 @@ import (
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/models/proto/tcp"
"github.com/fatedier/frp/models/proto/udp"
"github.com/fatedier/frp/utils/errors"
frpIo "github.com/fatedier/frp/utils/io"
"github.com/fatedier/frp/utils/log"
frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/vhost"
@ -461,20 +461,20 @@ func HandleUserTcpConnection(pxy Proxy, userConn frpNet.Conn) {
var local io.ReadWriteCloser = workConn
cfg := pxy.GetConf().GetBaseInfo()
if cfg.UseEncryption {
local, err = tcp.WithEncryption(local, []byte(config.ServerCommonCfg.PrivilegeToken))
local, err = frpIo.WithEncryption(local, []byte(config.ServerCommonCfg.PrivilegeToken))
if err != nil {
pxy.Error("create encryption stream error: %v", err)
return
}
}
if cfg.UseCompression {
local = tcp.WithCompression(local)
local = frpIo.WithCompression(local)
}
pxy.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(),
workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String())
StatsOpenConnection(pxy.GetName())
inCount, outCount := tcp.Join(local, userConn)
inCount, outCount := frpIo.Join(local, userConn)
StatsCloseConnection(pxy.GetName())
StatsAddTrafficIn(pxy.GetName(), inCount)
StatsAddTrafficOut(pxy.GetName(), outCount)

View File

@ -12,16 +12,38 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package tcp
package io
import (
"io"
"sync"
"github.com/golang/snappy"
"github.com/fatedier/frp/utils/crypto"
"github.com/fatedier/frp/utils/pool"
)
// Join two io.ReadWriteCloser and do some operations.
func Join(c1 io.ReadWriteCloser, c2 io.ReadWriteCloser) (inCount int64, outCount int64) {
var wait sync.WaitGroup
pipe := func(to io.ReadWriteCloser, from io.ReadWriteCloser, count *int64) {
defer to.Close()
defer from.Close()
defer wait.Done()
buf := pool.GetBuf(16 * 1024)
defer pool.PutBuf(buf)
*count, _ = io.CopyBuffer(to, from, buf)
}
wait.Add(2)
go pipe(c1, c2, &inCount)
go pipe(c2, c1, &outCount)
wait.Wait()
return
}
func WithEncryption(rwc io.ReadWriteCloser, key []byte) (io.ReadWriteCloser, error) {
w, err := crypto.NewWriter(rwc, key)
if err != nil {

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package tcp
package io
import (
"io"
@ -21,6 +21,51 @@ import (
"github.com/stretchr/testify/assert"
)
func TestJoin(t *testing.T) {
assert := assert.New(t)
var (
n int
err error
)
text1 := "A document that gives tips for writing clear, idiomatic Go code. A must read for any new Go programmer. It augments the tour and the language specification, both of which should be read first."
text2 := "A document that specifies the conditions under which reads of a variable in one goroutine can be guaranteed to observe values produced by writes to the same variable in a different goroutine."
// Forward bytes directly.
pr, pw := io.Pipe()
pr2, pw2 := io.Pipe()
pr3, pw3 := io.Pipe()
pr4, pw4 := io.Pipe()
conn1 := WrapReadWriteCloser(pr, pw2, nil)
conn2 := WrapReadWriteCloser(pr2, pw, nil)
conn3 := WrapReadWriteCloser(pr3, pw4, nil)
conn4 := WrapReadWriteCloser(pr4, pw3, nil)
go func() {
Join(conn2, conn3)
}()
buf1 := make([]byte, 1024)
buf2 := make([]byte, 1024)
conn1.Write([]byte(text1))
conn4.Write([]byte(text2))
n, err = conn4.Read(buf1)
assert.NoError(err)
assert.Equal(text1, string(buf1[:n]))
n, err = conn1.Read(buf2)
assert.NoError(err)
assert.Equal(text2, string(buf2[:n]))
conn1.Close()
conn2.Close()
conn3.Close()
conn4.Close()
}
func TestWithCompression(t *testing.T) {
assert := assert.New(t)

View File

@ -17,15 +17,18 @@ package pool
import "sync"
var (
bufPool5k sync.Pool
bufPool2k sync.Pool
bufPool1k sync.Pool
bufPool sync.Pool
bufPool16k sync.Pool
bufPool5k sync.Pool
bufPool2k sync.Pool
bufPool1k sync.Pool
bufPool sync.Pool
)
func GetBuf(size int) []byte {
var x interface{}
if size >= 5*1024 {
if size >= 16*1024 {
x = bufPool16k.Get()
} else if size >= 5*1024 {
x = bufPool5k.Get()
} else if size >= 2*1024 {
x = bufPool2k.Get()
@ -46,7 +49,9 @@ func GetBuf(size int) []byte {
func PutBuf(buf []byte) {
size := cap(buf)
if size >= 5*1024 {
if size >= 16*1024 {
bufPool16k.Put(buf)
} else if size >= 5*1024 {
bufPool5k.Put(buf)
} else if size >= 2*1024 {
bufPool2k.Put(buf)

View File

@ -19,7 +19,7 @@ import (
"strings"
)
var version string = "0.11.0"
var version string = "0.12.0"
func Full() string {
return version