Browse Source

bandwidth metrics & prometheus metrics publisher

multi-tenant
forest 9 months ago
parent
commit
b9b61e56d4
6 changed files with 405 additions and 43 deletions
  1. +2
    -2
      README.md
  2. +293
    -24
      main.go
  3. +100
    -7
      tunnel-lib/server.go
  4. +8
    -8
      tunnel-lib/virtualaddr.go
  5. +1
    -1
      usage-example/client-config.json
  6. +1
    -1
      usage-example/tunnels.json

+ 2
- 2
README.md View File

@ -47,7 +47,7 @@ Starting the tunnel client. Client Identifier: TestClient1
2020/08/06 14:00:04 theshold client is starting up using config:
{
"DebugLog": false,
"ClientIdentifier": "TestClient1",
"ClientId": "TestClient1",
"ServerHost": "localhost",
"ServerTunnelControlPort": 9056,
"ServerManagementPort": 9057,
@ -67,7 +67,7 @@ Sending the tunnel configuration to the server.
HTTP PUT localhost:9057/tunnels:
now listening on 127.0.0.1:9000
[{"HaProxyProxyProtocol":true,"ListenAddress":"127.0.0.1","ListenHostnameGlob":"*","ListenPort":9000,"BackEndService":"fooService","ClientIdentifier":"TestClient1"}]
[{"HaProxyProxyProtocol":true,"ListenAddress":"127.0.0.1","ListenHostnameGlob":"*","ListenPort":9000,"BackEndService":"fooService","ClientId":"TestClient1"}]
Starting the "sender" test app.
It connects to the front end port of the tunnel (port 9000). This would be your end user who wants to use the web application.


+ 293
- 24
main.go View File

@ -5,7 +5,6 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
@ -18,6 +17,7 @@ import (
"path"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"time"
@ -45,22 +45,31 @@ type ServerConfig struct {
// tenantId is N/A. ClientId must be formatted `<nodeId>`
// clients can register listeners with any hostname including null, on any open port.
//
MultiTenantMode bool
MultiTenantMode bool
MultiTenantInternalAPIListenPort int
MultiTenantInternalAPICaCertificateFile string
CaCertificateFilesGlob string
ServerTlsKeyFile string
ServerTlsCertificateFile string
Metrics MetricsConfig
}
type ClientConfig struct {
DebugLog bool
ClientIdentifier string
ClientId string
ServerAddr string
ServiceToLocalAddrMap *map[string]string
CaCertificateFilesGlob string
ClientTlsKeyFile string
ClientTlsCertificateFile string
AdminUnixSocket string
Metrics MetricsConfig
}
type MetricsConfig struct {
PrometheusMetricsAPIPort int
}
type ListenerConfig struct {
@ -69,7 +78,7 @@ type ListenerConfig struct {
ListenHostnameGlob string
ListenPort int
BackEndService string
ClientIdentifier string
ClientId string
}
type ClientState struct {
@ -78,8 +87,29 @@ type ClientState struct {
}
type ManagementHttpHandler struct {
Domain string
ControlHandler http.Handler
Domain string
MultiTenantMode bool
ControlHandler http.Handler
}
type BandwidthCounter struct {
Inbound int64
Outbound int64
}
type MultiTenantInternalAPI struct{}
type PrometheusMetricsAPI struct {
MultiTenantServerMode bool
InboundByTenant map[string]int64
OutboundByTenant map[string]int64
InboundByService map[string]int64
OutboundByService map[string]int64
}
type Tenant struct {
ReservedPorts []int
AuthorizedDomains []string
}
type LiveConfigUpdate struct {
@ -92,7 +122,9 @@ type adminAPI struct{}
// Server State
var listenersByTenant map[string][]ListenerConfig
var clientStatesMutex = &sync.Mutex{}
var tenantStatesMutex = &sync.Mutex{}
var clientStatesByTenant map[string]map[string]ClientState
var tenants map[string]Tenant
var server *tunnel.Server
// Client State
@ -245,10 +277,10 @@ func runClient(configFileName *string) {
))
}
if clientIdDomain[0] != config.ClientIdentifier {
if clientIdDomain[0] != config.ClientId {
log.Fatal(fmt.Errorf(
"expected TLS client certificate common name clientId '%s' to match ClientIdentifier '%s'",
clientIdDomain[0], config.ClientIdentifier,
"expected TLS client certificate common name clientId '%s' to match ClientId '%s'",
clientIdDomain[0], config.ClientId,
))
}
@ -279,13 +311,13 @@ func runClient(configFileName *string) {
clientStateChanges := make(chan *tunnel.ClientStateChange)
tunnelClientConfig := &tunnel.ClientConfig{
DebugLog: config.DebugLog,
Identifier: config.ClientIdentifier,
Identifier: config.ClientId,
ServerAddr: config.ServerAddr,
FetchLocalAddr: func(service string) (string, error) {
//log.Printf("(*serviceToLocalAddrMap): %+v\n\n", (*serviceToLocalAddrMap))
localAddr, hasLocalAddr := (*serviceToLocalAddrMap)[service]
if !hasLocalAddr {
return "", errors.New("service not configured. See ServiceToLocalAddrMap in client config file.")
return "", fmt.Errorf("service '%s' not configured. Set ServiceToLocalAddrMap in client config file or HTTP PUT /liveconfig over the AdminUnixSocket.", service)
}
return localAddr, nil
},
@ -340,7 +372,7 @@ func runClientAdminApi(config ClientConfig) {
}
}
func validateCertificate(domain string, request *http.Request) (identifier string, tenantId string, err error) {
func validateCertificate(domain string, multiTenantMode bool, request *http.Request) (identifier string, tenantId string, err error) {
if len(request.TLS.PeerCertificates) != 1 {
return "", "", fmt.Errorf("expected exactly 1 TLS client certificate, got %d", len(request.TLS.PeerCertificates))
}
@ -361,7 +393,7 @@ func validateCertificate(domain string, request *http.Request) (identifier strin
identifier = clientIdDomain[0]
nodeId := identifier
if strings.Contains(identifier, ".") {
if multiTenantMode {
tenantIdNodeId := strings.Split(identifier, ".")
if len(tenantIdNodeId) != 2 {
return "", "", fmt.Errorf(
@ -400,9 +432,18 @@ func runServer(configFileName *string) {
clientStateChangeChannel := make(chan *tunnel.ClientStateChange)
var metricChannel chan tunnel.BandwidthMetric = nil
// the Server should only collect metrics when in multi-tenant mode -- this is needed for billing
if config.MultiTenantMode {
metricChannel = make(chan tunnel.BandwidthMetric)
go exportMetrics(config.Metrics /*multiTenantServerMode: */, true, metricChannel)
}
tunnelServerConfig := &tunnel.ServerConfig{
StateChanges: clientStateChangeChannel,
ValidateCertificate: validateCertificate,
Bandwidth: metricChannel,
Domain: config.Domain,
DebugLog: config.DebugLog,
}
@ -416,11 +457,10 @@ func runServer(configFileName *string) {
go (func() {
for {
clientStateChange := <-clientStateChangeChannel
clientStatesMutex.Lock()
previousState := ""
currentState := clientStateChange.Current.String()
tenantId := ""
if strings.Contains(clientStateChange.Identifier, ".") {
if config.MultiTenantMode {
tenantIdNodeId := strings.Split(clientStateChange.Identifier, ".")
if len(tenantIdNodeId) != 2 {
fmt.Printf("runServer(): go func(): can't handle clientStateChange with malformed Identifier '%s' \n", clientStateChange.Identifier)
@ -429,6 +469,7 @@ func runServer(configFileName *string) {
tenantId = tenantIdNodeId[0]
}
clientStatesMutex.Lock()
if _, hasTenant := clientStatesByTenant[tenantId]; !hasTenant {
clientStatesByTenant[tenantId] = map[string]ClientState{}
}
@ -466,6 +507,7 @@ func runServer(configFileName *string) {
}
tlsConfig := &tls.Config{
ClientCAs: caCertPool,
ClientAuth: tls.RequireAndVerifyClientCert,
}
@ -475,11 +517,37 @@ func runServer(configFileName *string) {
Addr: fmt.Sprintf(":%d", config.ListenPort),
TLSConfig: tlsConfig,
Handler: &(ManagementHttpHandler{
Domain: config.Domain,
ControlHandler: server,
Domain: config.Domain,
MultiTenantMode: config.MultiTenantMode,
ControlHandler: server,
}),
}
if config.MultiTenantMode {
go (func() {
caCertPool := x509.NewCertPool()
caCert, err := ioutil.ReadFile(config.MultiTenantInternalAPICaCertificateFile)
if err != nil {
log.Fatal(err)
}
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig := &tls.Config{
ClientCAs: caCertPool,
ClientAuth: tls.RequireAndVerifyClientCert,
}
tlsConfig.BuildNameToCertificate()
multiTenantInternalServer := &http.Server{
Addr: fmt.Sprintf(":%d", config.MultiTenantInternalAPIListenPort),
TLSConfig: tlsConfig,
Handler: &MultiTenantInternalAPI{},
}
err = multiTenantInternalServer.ListenAndServeTLS(config.ServerTlsCertificateFile, config.ServerTlsKeyFile)
panic(err)
})()
}
log.Print("runServer(): the server should be running now\n")
err = httpsManagementServer.ListenAndServeTLS(config.ServerTlsCertificateFile, config.ServerTlsKeyFile)
panic(err)
@ -491,12 +559,12 @@ func setListeners(tenantId string, listenerConfigs []ListenerConfig) (int, strin
newListenersThatHaveToBeAdded := make([]ListenerConfig, 0)
for _, newListenerConfig := range listenerConfigs {
clientState, everHeardOfClientBefore := clientStatesByTenant[tenantId][newListenerConfig.ClientIdentifier]
clientState, everHeardOfClientBefore := clientStatesByTenant[tenantId][newListenerConfig.ClientId]
if !everHeardOfClientBefore {
return http.StatusNotFound, fmt.Sprintf("Client %s Not Found", newListenerConfig.ClientIdentifier)
return http.StatusNotFound, fmt.Sprintf("Client %s Not Found", newListenerConfig.ClientId)
}
if clientState.CurrentState != tunnel.ClientConnected.String() {
return http.StatusNotFound, fmt.Sprintf("Client %s is not connected it is %s", newListenerConfig.ClientIdentifier, clientState.CurrentState)
return http.StatusNotFound, fmt.Sprintf("Client %s is not connected it is %s", newListenerConfig.ClientId, clientState.CurrentState)
}
}
@ -539,14 +607,14 @@ func setListeners(tenantId string, listenerConfigs []ListenerConfig) (int, strin
listenAddress,
newListenerConfig.ListenPort,
newListenerConfig.ListenHostnameGlob,
newListenerConfig.ClientIdentifier,
newListenerConfig.ClientId,
newListenerConfig.HaProxyProxyProtocol,
newListenerConfig.BackEndService,
)
if err != nil {
if strings.Contains(err.Error(), "already in use") {
return http.StatusConflict, fmt.Sprintf("Port Conflict Port %s already in use", listenAddress)
return http.StatusConflict, fmt.Sprintf("Port Conflict: Port %s is reserved or already in use", listenAddress)
}
log.Printf("setListeners(): can't net.Listen(\"tcp\", \"%s\") because %s \n", listenAddress, err)
@ -563,18 +631,138 @@ func setListeners(tenantId string, listenerConfigs []ListenerConfig) (int, strin
}
func exportMetrics(config MetricsConfig, multiTenantServerMode bool, bandwidth <-chan tunnel.BandwidthMetric) {
metricsAPI := &PrometheusMetricsAPI{
MultiTenantServerMode: multiTenantServerMode,
InboundByTenant: map[string]int64{},
OutboundByTenant: map[string]int64{},
InboundByService: map[string]int64{},
OutboundByService: map[string]int64{},
}
go (func() {
for {
metric := <-bandwidth
if multiTenantServerMode {
tenantIdNodeId := strings.Split(metric.ClientId, ".")
if len(tenantIdNodeId) != 2 {
panic(fmt.Errorf("malformed metric.ClientId '%s', expected <tenantId>.<nodeId>", metric.ClientId))
}
if metric.Inbound {
metricsAPI.InboundByTenant[tenantIdNodeId[0]] += int64(metric.Bytes)
} else {
metricsAPI.OutboundByTenant[tenantIdNodeId[0]] += int64(metric.Bytes)
}
} else {
if metric.Inbound {
metricsAPI.InboundByService[metric.Service] += int64(metric.Bytes)
} else {
metricsAPI.OutboundByService[metric.Service] += int64(metric.Bytes)
}
}
}
})()
go (func() {
metricsServer := &http.Server{
Addr: fmt.Sprintf(":%d", config.PrometheusMetricsAPIPort),
Handler: metricsAPI,
}
err := metricsServer.ListenAndServe()
panic(err)
})()
}
func (s *PrometheusMetricsAPI) ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) {
getMillisecondsSinceUnixEpoch := func() int64 {
return time.Now().UnixNano() / int64(time.Millisecond)
}
responseWriter.Header().Set("Content-Type", "text/plain; version=0.0.4")
writeMetric := func(inbound map[string]int64, outbound map[string]int64, name, tag, desc string) {
timestamp := getMillisecondsSinceUnixEpoch()
responseWriter.Write([]byte(fmt.Sprintf("# HELP %s %s\n", name, desc)))
responseWriter.Write([]byte(fmt.Sprintf("# TYPE %s counter\n", name)))
for id, bytes := range inbound {
responseWriter.Write([]byte(fmt.Sprintf("%s{%s=\"%s\",direction=\"inbound\"} %d %d\n", name, tag, id, bytes, timestamp)))
}
for id, bytes := range outbound {
responseWriter.Write([]byte(fmt.Sprintf("%s{%s=\"%s\",direction=\"outbound\"} %d %d\n", name, tag, id, bytes, timestamp)))
}
}
if s.MultiTenantServerMode {
writeMetric(s.InboundByTenant, s.OutboundByTenant, "bandwidth_by_tenant", "tenant", "bandwidth usage by tenant in bytes, excluding usage from control protocol.")
} else {
writeMetric(s.InboundByService, s.OutboundByService, "bandwidth_by_service", "service", "bandwidth usage by service in bytes.")
}
}
func compareListenerConfigs(a, b ListenerConfig) bool {
return (a.ListenPort == b.ListenPort &&
a.ListenAddress == b.ListenAddress &&
a.ListenHostnameGlob == b.ListenHostnameGlob &&
a.BackEndService == b.BackEndService &&
a.ClientIdentifier == b.ClientIdentifier &&
a.ClientId == b.ClientId &&
a.HaProxyProxyProtocol == b.HaProxyProxyProtocol)
}
func (s *MultiTenantInternalAPI) ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) {
switch path.Clean(request.URL.Path) {
case "/tenants":
if request.Method == "GET" || request.Method == "PUT" {
if request.Method == "PUT" {
if request.Header.Get("Content-Type") != "application/json" {
http.Error(responseWriter, "415 Unsupported Media Type: Content-Type must be application/json", http.StatusUnsupportedMediaType)
} else {
bodyBytes, err := ioutil.ReadAll(request.Body)
if err != nil {
http.Error(responseWriter, "500 Read Error", http.StatusInternalServerError)
return
}
var newTenants map[string]Tenant
err = json.Unmarshal(bodyBytes, &newTenants)
if err != nil {
http.Error(responseWriter, "422 Unprocessable Entity: Can't Parse JSON", http.StatusUnprocessableEntity)
return
}
tenantStatesMutex.Lock()
tenants = newTenants
tenantStatesMutex.Unlock()
}
}
tenantStatesMutex.Lock()
bytes, err := json.Marshal(tenants)
tenantStatesMutex.Unlock()
if err != nil {
http.Error(responseWriter, "500 JSON Marshal Error", http.StatusInternalServerError)
return
}
responseWriter.Header().Set("Content-Type", "application/json")
responseWriter.Write(bytes)
} else {
responseWriter.Header().Set("Allow", "GET, PUT")
http.Error(responseWriter, "405 Method Not Allowed, try GET or PUT", http.StatusMethodNotAllowed)
}
case "/ping":
if request.Method == "GET" {
fmt.Fprint(responseWriter, "pong")
} else {
responseWriter.Header().Set("Allow", "GET")
http.Error(responseWriter, "405 method not allowed, try GET", http.StatusMethodNotAllowed)
}
default:
http.Error(responseWriter, "404 Not Found, try /tenants or /ping", http.StatusNotFound)
}
}
func (s *ManagementHttpHandler) ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) {
_, tenantId, err := validateCertificate(s.Domain, request)
_, tenantId, err := validateCertificate(s.Domain, s.MultiTenantMode, request)
if err != nil {
http.Error(responseWriter, fmt.Sprintf("400 bad request: %s", err.Error()), http.StatusBadRequest)
return
@ -621,6 +809,71 @@ func (s *ManagementHttpHandler) ServeHTTP(responseWriter http.ResponseWriter, re
return
}
if s.MultiTenantMode {
for _, listenerConfig := range listenerConfigs {
tenantIdNodeId := strings.Split(listenerConfig.ClientId, ".")
if len(tenantIdNodeId) != 2 {
http.Error(
responseWriter,
fmt.Sprintf(
"400 Bad Request: invalid ClientId '%s'. It needs to be in the form '<tenantId>.<nodeId>'",
listenerConfig.ClientId,
),
http.StatusBadRequest,
)
return
}
tenant, hasTenant := tenants[tenantIdNodeId[0]]
if !hasTenant {
http.Error(
responseWriter,
fmt.Sprintf("400 Bad Request: unknown tenantId '%s'", tenantIdNodeId[0]),
http.StatusBadRequest,
)
}
isAuthorizedDomain := false
for _, tenantAuthorizedDomain := range tenant.AuthorizedDomains {
isSubdomain := strings.HasSuffix(listenerConfig.ListenHostnameGlob, fmt.Sprintf(".%s", tenantAuthorizedDomain))
if (tenantAuthorizedDomain == listenerConfig.ListenHostnameGlob) || isSubdomain {
isAuthorizedDomain = true
break
}
}
if listenerConfig.ListenHostnameGlob != "" && !isAuthorizedDomain {
http.Error(
responseWriter,
fmt.Sprintf(
"400 Bad Request: ListenHostnameGlob '%s' is not covered by any of your authorized domains [%s]",
listenerConfig.ListenHostnameGlob,
strings.Join(stringSliceMap(tenant.AuthorizedDomains, func(x string) string { return fmt.Sprintf("'%s'", x) }), ", "),
),
http.StatusBadRequest,
)
}
if listenerConfig.ListenHostnameGlob == "" {
isReservedPort := false
for _, tenantReservedPort := range tenant.ReservedPorts {
if listenerConfig.ListenPort == tenantReservedPort {
isReservedPort = true
break
}
}
if !isReservedPort {
http.Error(
responseWriter,
fmt.Sprintf(
"400 Bad Request: ListenHostnameGlob is empty and ListenPort '%d' is not one of your reserved ports [%s]",
listenerConfig.ListenPort,
strings.Join(intSlice2StringSlice(tenant.ReservedPorts), ", "),
),
http.StatusBadRequest,
)
}
}
}
}
statusCode, errorMessage := setListeners(tenantId, listenerConfigs)
if statusCode != 200 {
@ -667,3 +920,19 @@ func getConfigBytes(configFileName *string) []byte {
return nil
}
}
func intSlice2StringSlice(slice []int) []string {
toReturn := make([]string, len(slice))
for i, integer := range slice {
toReturn[i] = strconv.Itoa(integer)
}
return toReturn
}
func stringSliceMap(slice []string, mapper func(string) string) []string {
toReturn := make([]string, len(slice))
for i, str := range slice {
toReturn[i] = mapper(str)
}
return toReturn
}

+ 100
- 7
tunnel-lib/server.go View File

@ -24,6 +24,7 @@ import (
var (
errNoClientSession = errors.New("no client session established")
defaultTimeout = 10 * time.Second
metricChunkSize = 1000000 // one megabyte
)
// Server is responsible for proxying public connections to the client over a
@ -70,6 +71,8 @@ type Server struct {
// the domain of the server, used for validating clientIds
domain string
bandwidth chan<- BandwidthMetric
// see ServerConfig.ValidateCertificate comment
validateCertificate func(domain string, request *http.Request) (identifier string, tenantId string, err error)
@ -79,6 +82,14 @@ type Server struct {
debugLog bool
}
type BandwidthMetric struct {
Bytes int
RemoteAddress net.Addr
Inbound bool
Service string
ClientId string
}
// ServerConfig defines the configuration for the Server
type ServerConfig struct {
// StateChanges receives state transition details each time client
@ -94,6 +105,8 @@ type ServerConfig struct {
// the domain of the server, used for validating clientIds
Domain string
Bandwidth chan<- BandwidthMetric
// function that analyzes the TLS client certificate of the request.
// this is based on the CommonName attribute of the TLS certificate.
// If we are in multi-tenant mode, it must be formatted like `<tenantId>.<nodeId>@<domain>`
@ -101,7 +114,7 @@ type ServerConfig struct {
// <domain> must match the configured Domain of this Threshold server
// the identifier it returns will be `<tenantId>.<nodeId>` or `<nodeId>`.
// the tenantId it returns will be `<tenantId>` or ""
ValidateCertificate func(domain string, request *http.Request) (identifier string, tenantId string, err error)
ValidateCertificate func(domain string, multiTenantMode bool, request *http.Request) (identifier string, tenantId string, err error)
// YamuxConfig defines the config which passed to every new yamux.Session. If nil
// yamux.DefaultConfig() is used.
@ -133,6 +146,7 @@ func NewServer(cfg *ServerConfig) (*Server, error) {
virtualAddrs: newVirtualAddrs(opts),
controls: newControls(),
states: make(map[string]ClientState),
bandwidth: cfg.Bandwidth,
stateCh: cfg.StateChanges,
domain: cfg.Domain,
yamuxConfig: yamuxConfig,
@ -191,11 +205,11 @@ func (s *Server) handleTCPConn(conn net.Conn) error {
return err
}
service := strconv.Itoa(port)
service := fmt.Sprintf("port%d", port)
if listenerInfo.BackendService != "" {
service = listenerInfo.BackendService
}
stream, err := s.dial(listenerInfo.AssociatedClientIdentity, service)
stream, err := s.dial(listenerInfo.AssociatedClientId, service)
if err != nil {
return err
}
@ -223,8 +237,21 @@ func (s *Server) handleTCPConn(conn net.Conn) error {
disconnectedChan := make(chan bool)
go s.proxy(disconnectedChan, conn, stream, "from proxy-client to client")
go s.proxy(disconnectedChan, stream, conn, "from client to proxy-client")
inboundMetric := BandwidthMetric{
Service: listenerInfo.BackendService,
ClientId: listenerInfo.AssociatedClientId,
RemoteAddress: conn.RemoteAddr(),
Inbound: true,
}
outboundMetric := BandwidthMetric{
Service: listenerInfo.BackendService,
ClientId: listenerInfo.AssociatedClientId,
RemoteAddress: conn.RemoteAddr(),
Inbound: false,
}
go s.proxy(disconnectedChan, conn, stream, outboundMetric, s.bandwidth, "outbound from tunnel to remote client")
go s.proxy(disconnectedChan, stream, conn, inboundMetric, s.bandwidth, "inbound from remote client to tunnel")
// Once one member of this conversation has disconnected, we should end the conversation for all parties.
<-disconnectedChan
@ -232,18 +259,84 @@ func (s *Server) handleTCPConn(conn net.Conn) error {
return nonil(stream.Close(), conn.Close())
}
func (s *Server) proxy(disconnectedChan chan bool, dst, src net.Conn, side string) {
func (s *Server) proxy(disconnectedChan chan bool, dst, src net.Conn, metric BandwidthMetric, bandwidth chan<- BandwidthMetric, side string) {
defer (func() { disconnectedChan <- true })()
if s.debugLog {
log.Printf("Server.proxy(): tunneling %s -> %s (%s)\n", src.RemoteAddr(), dst.RemoteAddr(), side)
}
n, err := io.Copy(dst, src)
var n int64
var err error
if bandwidth != nil {
n, err = ioCopyWithMetrics(dst, src, metric, bandwidth)
} else {
n, err = io.Copy(dst, src)
}
if s.debugLog {
log.Printf("Server.proxy(): tunneled %d bytes %s -> %s (%s): %v\n", n, src.RemoteAddr(), dst.RemoteAddr(), side, err)
}
}
// copied from the go standard library source code (io.Copy) with metric collection added.
func ioCopyWithMetrics(dst io.Writer, src io.Reader, metric BandwidthMetric, bandwidth chan<- BandwidthMetric) (written int64, err error) {
size := 32 * 1024
if l, ok := src.(*io.LimitedReader); ok && int64(size) > l.N {
if l.N < 1 {
size = 1
} else {
size = int(l.N)
}
}
chunkForMetrics := 0
buf := make([]byte, size)
for {
nr, er := src.Read(buf)
if nr > 0 {
nw, ew := dst.Write(buf[0:nr])
if nw > 0 {
chunkForMetrics += nw
if chunkForMetrics >= metricChunkSize {
bandwidth <- BandwidthMetric{
Inbound: metric.Inbound,
Service: metric.Service,
ClientId: metric.ClientId,
RemoteAddress: metric.RemoteAddress,
Bytes: chunkForMetrics,
}
chunkForMetrics = 0
}
written += int64(nw)
}
if ew != nil {
err = ew
break
}
if nr != nw {
err = io.ErrShortWrite
break
}
}
if er != nil {
if er != io.EOF {
err = er
}
break
}
}
if chunkForMetrics > 0 {
bandwidth <- BandwidthMetric{
Inbound: metric.Inbound,
Service: metric.Service,
ClientId: metric.ClientId,
RemoteAddress: metric.RemoteAddress,
Bytes: chunkForMetrics,
}
}
return written, err
}
func (s *Server) dial(identifier string, service string) (net.Conn, error) {
control, ok := s.getControl(identifier)
if !ok {


+ 8
- 8
tunnel-lib/virtualaddr.go View File

@ -16,9 +16,9 @@ type ListenerInfo struct {
//Send the HAProxy PROXY protocol v1 header to the proxy client before streaming TCP from the remote client.
SendProxyProtocolv1 bool
BackendService string
AssociatedClientIdentity string
HostnameGlob string
BackendService string
AssociatedClientId string
HostnameGlob string
}
type listener struct {
@ -142,10 +142,10 @@ func (vaddr *vaddrStorage) Add(ip net.IP, port int, hostnameGlob string, ident s
func (l *listener) addHost(hostnameGlob string, ident string, sendProxyProtocolv1 bool, service string) {
l.backends = append(l.backends, ListenerInfo{
HostnameGlob: hostnameGlob,
AssociatedClientIdentity: ident,
SendProxyProtocolv1: sendProxyProtocolv1,
BackendService: service,
HostnameGlob: hostnameGlob,
AssociatedClientId: ident,
SendProxyProtocolv1: sendProxyProtocolv1,
BackendService: service,
})
}
@ -217,7 +217,7 @@ func (vaddr *vaddrStorage) newListener(ip net.IP, port int) (*listener, error) {
func (vaddr *vaddrStorage) HasIdentifier(identifier string) bool {
for _, listener := range vaddr.listeners {
for _, backend := range listener.backends {
if backend.AssociatedClientIdentity == identifier {
if backend.AssociatedClientId == identifier {
return true
}
}


+ 1
- 1
usage-example/client-config.json View File

@ -1,6 +1,6 @@
{
"DebugLog": false,
"ClientIdentifier": "TestClient1",
"ClientId": "TestClient1",
"ServerAddr": "localhost:9056",
"ServiceToLocalAddrMap": {
"fooService": "127.0.0.1:9001"


+ 1
- 1
usage-example/tunnels.json View File

@ -1,6 +1,6 @@
[
{
"ClientIdentifier": "TestClient1",
"ClientId": "TestClient1",
"ListenPort": 9000,
"ListenAddress": "127.0.0.1",
"ListenHostnameGlob": "*",


Loading…
Cancel
Save