package main
import (
"bytes"
"context"
"crypto/rand"
"crypto/sha256"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"math"
mathRand "math/rand"
"net"
"net/http"
"regexp"
"strconv"
"strings"
"time"
"git.sequentialread.com/forest/greenhouse/pki"
errors "git.sequentialread.com/forest/pkg-errors"
base58 "github.com/shengdoushi/base58"
)
type BaseHTTPService struct {
client * http . Client
clientExpiry * time . Time
ClientFactory func ( ) ( * http . Client , * time . Time , error )
}
type BackendApp struct {
BaseHTTPService
WorkingDirectory string
EmailService * EmailService
Model * DBModel
DigitalOcean * DigitalOceanService
Gandi * GandiService
BackblazeB2 * BackblazeB2Service
SSH * SSHService
ThresholdProvisioning * ThresholdProvisioningService
ThresholdManagementPort int
ThresholdPort int
AdminTenantId int
AdminThresholdNodeId string
GreenhouseListenPort int
GreenhouseThresholdServiceId string
}
type ThresholdMetrics struct {
InboundByTenant map [ string ] int64
OutboundByTenant map [ string ] int64
}
type ThresholdTenantInfo struct {
ThresholdServers [ ] string
ClientStates map [ string ] ThresholdClientState
Listeners [ ] ThresholdTunnel
AuthorizedDomains [ ] string
PortStart int
PortEnd int
EmailAddress string
}
type ThresholdClientState struct {
CurrentState string
LastState string
}
type VPSInstanceWithTenants struct {
TenantSettings map [ string ] TunnelSettings
VPSInstance * VPSInstance
}
type stringBytesTuple struct {
id string
bytes int64
}
type vpsInstanceBytesTuple struct {
instance * VPSInstance
bytes int64
}
type tenantMoveTuple struct {
tenantId int
instanceId string
bytes int64
}
type knapsackGuess struct {
moves [ ] * tenantMoveTuple
distance int64
score int64
}
type taskResult struct {
Name string
Err error
Result interface { }
}
const tenantShufflingThreshold = float64 ( 0.2 )
const knapsackShufflingMinimizationFactor = float64 ( 0.3 )
const knapsackNumberOfGuessesFactor = float64 ( 5 )
const shufflingCircuitBreakerLimit = 1000
const GIGABYTE = int64 ( 1000000000 )
const TERABYTE = int64 ( 1000000000000 )
const tenantPinDuration = 6 * time . Hour
const managementClientCertSubject = "management@greenhouse.server.garden"
const freeSubdomainDomain = "greenhouseusers.com"
var projectedOverageAllowedBeforeSpawningNewInstance int64 = GIGABYTE * 250
var projectedUnderageAllowedBeforeTerminatingInstance int64 = TERABYTE
func initBackend (
workingDirectory string ,
config * Config ,
pkiService * pki . PKIService ,
model * DBModel ,
emailService * EmailService ,
) * BackendApp {
greenhouseThresholdServiceId := "greenhouse_https"
toReturn := BackendApp {
WorkingDirectory : workingDirectory ,
EmailService : emailService ,
Model : model ,
DigitalOcean : NewDigitalOceanService ( config ) ,
Gandi : NewGandiService ( config ) ,
BackblazeB2 : NewBackblazeB2Service ( config ) ,
SSH : NewSSHService ( config ) ,
ThresholdProvisioning : NewThresholdProvisioningService ( pkiService ) ,
ThresholdPort : config . ThresholdPort ,
ThresholdManagementPort : config . ThresholdManagementPort ,
AdminTenantId : config . AdminTenantId ,
AdminThresholdNodeId : adminThresholdNodeId ,
GreenhouseListenPort : config . FrontendPort ,
GreenhouseThresholdServiceId : greenhouseThresholdServiceId ,
}
// TODO move this BaseHTTPService into the ThresholdProvisioning service?
toReturn . ClientFactory = func ( ) ( * http . Client , * time . Time , error ) {
caCert , err := pkiService . GetCACertificate ( mainCAName )
if err != nil {
return nil , nil , err
}
expiryTime := time . Now ( ) . Add ( time . Hour * 24 )
cert , err := pkiService . GetClientTLSCertificate ( managementAPIAuthCAName , managementClientCertSubject , expiryTime )
if err != nil {
return nil , nil , err
}
caCertPool := x509 . NewCertPool ( )
caCertPool . AddCert ( caCert )
tlsClientConfig := & tls . Config {
Certificates : [ ] tls . Certificate { cert } ,
RootCAs : caCertPool ,
}
tlsClientConfig . BuildNameToCertificate ( )
return & http . Client {
Transport : & http . Transport {
TLSClientConfig : tlsClientConfig ,
} ,
Timeout : 10 * time . Second ,
} , & expiryTime , nil
}
return & toReturn
}
func ( app * BackendApp ) CreateAPIToken ( tenantId int , name string ) ( string , string , error ) {
apiTokenBuffer := make ( [ ] byte , 16 )
rand . Read ( apiTokenBuffer )
apiToken := base58 . Encode ( apiTokenBuffer , base58 . BitcoinAlphabet )
rawHash := sha256 . Sum256 ( [ ] byte ( apiToken ) )
hashedAPIToken := fmt . Sprintf ( "%x" , rawHash )
tenant , err := app . Model . GetTenant ( tenantId )
if err != nil {
return "" , "" , err
}
i := 0
for i < 100 {
conflict := false
for _ , token := range tenant . APITokens {
if token . Name == name {
conflict = true
}
}
if conflict == false {
break
}
i ++
nameWithoutNumberSuffix := regexp . MustCompile ( "_[0-9]+$" ) . ReplaceAllString ( name , "" )
name = fmt . Sprintf ( "%s_%d" , nameWithoutNumberSuffix , i )
}
if i >= 100 {
return "" , name , errors . Errorf ( "too many %s tokens" , name )
}
err = app . Model . CreateAPIToken ( tenantId , name , hashedAPIToken )
return apiToken , name , err
}
func ( app * BackendApp ) InitializeTenant ( tenantId int , email string ) error {
emailSplit := strings . Split ( email , "@" )
if len ( emailSplit ) != 2 {
return errors . Errorf ( "InitializeTenant(): invalid email '%s'" , email )
}
emailDomainSplit := strings . Split ( emailSplit [ 1 ] , "." )
if len ( emailDomainSplit ) < 2 {
return errors . Errorf ( "InitializeTenant(): invalid email '%s'" , email )
}
replaceInvalidCharactersRegex := regexp . MustCompile ( "[^a-z0-9]" )
replaceRepeatedHyphensRegex := regexp . MustCompile ( "-+" )
makeSubdomain := func ( input string ) string {
toReturn := replaceInvalidCharactersRegex . ReplaceAllString ( input , "-" )
toReturn = replaceRepeatedHyphensRegex . ReplaceAllString ( toReturn , "-" )
return strings . Trim ( toReturn , "-" )
}
subdomain := makeSubdomain ( emailSplit [ 0 ] )
var err error
alreadyTaken := true
i := 0
for alreadyTaken && i < 1000 {
i ++
alreadyTaken , err = app . Model . SetFreeSubdomain ( tenantId , subdomain )
if err != nil {
return err
}
if i == 1 {
subdomain = makeSubdomain ( fmt . Sprintf ( "%s-%s" , emailSplit [ 0 ] , emailDomainSplit [ 0 ] ) )
} else if i == 2 {
subdomain = makeSubdomain ( email )
} else {
subdomain = makeSubdomain ( fmt . Sprintf ( "%s-%d" , emailSplit [ 0 ] , i - 2 ) )
}
}
if i >= 1000 {
return errors . Errorf ( "InitializeTenant(): something went wrong generating a subdomain for '%s'" , email )
}
start , end , bucket , err := app . Model . GetNextReservedPorts ( )
if err != nil {
return errors . Wrapf ( err , "InitializeTenant() for '%s':" , email )
}
err = app . Model . SetReservedPorts ( tenantId , start , end , bucket )
if err != nil {
return errors . Wrapf ( err , "InitializeTenant() for '%s':" , email )
}
err = app . Reallocate ( false , false )
if err != nil {
return errors . Wrapf ( err , "InitializeTenant() for '%s':" , email )
}
return nil
}
func ( app * BackendApp ) ConsumeMetrics ( ) error {
vpsInstances , err := app . Model . GetVPSInstances ( )
if err != nil {
return err
}
billingYear , billingMonth , _ , _ , _ := getBillingTimeInfo ( )
actions := make ( [ ] func ( ) taskResult , len ( vpsInstances ) )
i := 0
for instanceId , instance := range vpsInstances {
actions [ i ] = func ( ) taskResult {
// TODO move the metrics to the management API / to the client side in threshold
responseBytes , err := app . MyHTTP200 (
"GET" ,
fmt . Sprintf ( "https://%s:%d/consumeMetrics" , instance . IPV4 , app . ThresholdManagementPort ) ,
nil ,
nil ,
)
result := ThresholdMetrics { }
if err == nil {
err = json . Unmarshal ( responseBytes , & result )
}
if err == nil {
err = app . Model . RecordVPSUsage ( instance , result , billingYear , billingMonth )
}
return taskResult {
Name : instanceId ,
Err : err ,
Result : result ,
}
}
i ++
}
results := doInParallel ( false , actions ... )
errorStrings := [ ] string { }
bytesByTenant := map [ int ] int64 { }
for _ , result := range results {
if result . Err != nil {
errorStrings = append ( errorStrings , fmt . Sprintf ( "error collecting metrics from %s: %+v" , result . Name , result . Err ) )
} else {
instanceBytesByTenant := result . Result . ( ThresholdMetrics )
for tenantIdString , bytez := range instanceBytesByTenant . InboundByTenant {
tenantId , err := strconv . Atoi ( tenantIdString )
if err != nil {
errorStrings = append ( errorStrings , fmt . Sprintf ( "can't convert tenantId '%s' to integer: %s" , tenantIdString , err ) )
} else {
bytesByTenant [ tenantId ] += bytez
}
}
for tenantIdString , bytez := range instanceBytesByTenant . OutboundByTenant {
tenantId , err := strconv . Atoi ( tenantIdString )
if err != nil {
errorStrings = append ( errorStrings , fmt . Sprintf ( "can't convert tenantId '%s' to integer: %s" , tenantIdString , err ) )
} else {
bytesByTenant [ tenantId ] += bytez
}
}
}
}
if len ( errorStrings ) > 0 {
return errors . Errorf ( "ConsumeMetrics() doInParallel(actions...): %s\n\n" , strings . Join ( errorStrings , "\n" ) )
}
err = app . Model . RecordTenantsUsage ( bytesByTenant )
if err != nil {
return err
}
return nil
}
func ( app * BackendApp ) GetTenantInfo ( tenantId int ) ( * ThresholdTenantInfo , error ) {
tenant , err := app . Model . GetTenant ( tenantId )
if err != nil {
return nil , err
}
vpsInstances , err := app . Model . GetVPSInstances ( )
if err != nil {
return nil , err
}
billingYear , billingMonth , _ , _ , _ := getBillingTimeInfo ( )
tenantVPSInstanceRows , err := app . Model . GetTenantVPSInstanceRows ( billingYear , billingMonth )
if err != nil {
return nil , err
}
thresholdServerHostPorts := [ ] string { }
actions := make ( [ ] func ( ) taskResult , 0 )
for _ , row := range tenantVPSInstanceRows {
vpsInstance , hasVpsInstance := vpsInstances [ row . GetVPSInstanceId ( ) ]
activeOrPinned := ( row . Active || row . DeactivatedAt != nil && row . DeactivatedAt . Add ( tenantPinDuration ) . After ( time . Now ( ) ) )
if hasVpsInstance && row . TenantId == tenantId && activeOrPinned {
thresholdServerHostPorts = append ( thresholdServerHostPorts , fmt . Sprintf ( "%s:%d" , vpsInstance . IPV4 , app . ThresholdManagementPort ) )
actions = append ( actions , func ( ) taskResult {
responseBytes , err := app . MyHTTP200 (
"GET" ,
fmt . Sprintf ( "https://%s:%d/tenantInfo?tenantId=%d" , vpsInstance . IPV4 , app . ThresholdManagementPort , tenantId ) ,
nil ,
nil ,
)
tenantInfo := ThresholdTenantInfo { }
if err == nil {
err = json . Unmarshal ( responseBytes , & tenantInfo )
}
return taskResult {
Name : row . GetVPSInstanceId ( ) ,
Err : err ,
Result : tenantInfo ,
}
} )
}
}
results := doInParallel ( false , actions ... )
//log.Printf("%+v, %d", tenant.TunnelSettings.AuthorizedDomains, len(tenant.TunnelSettings.AuthorizedDomains))
errorStrings := [ ] string { }
toReturn := ThresholdTenantInfo {
ThresholdServers : thresholdServerHostPorts ,
ClientStates : map [ string ] ThresholdClientState { } ,
Listeners : [ ] ThresholdTunnel { } ,
AuthorizedDomains : tenant . TunnelSettings . AuthorizedDomains ,
PortStart : tenant . TunnelSettings . PortStart ,
PortEnd : tenant . TunnelSettings . PortEnd ,
EmailAddress : tenant . Email ,
}
listenersToReturn := map [ string ] ThresholdTunnel { }
for _ , result := range results {
if result . Err != nil {
errorStrings = append ( errorStrings , fmt . Sprintf ( "error getting active nodeIds for tenant %d from %s: %+v" , tenantId , result . Name , result . Err ) )
} else {
tenantInfo := result . Result . ( ThresholdTenantInfo )
// bytez, err1 := json.MarshalIndent(tenantInfo, "", " ")
// log.Printf("bytez: %s\nerr: %s\n\n", string(bytez), err1)
// deduplicate ClientStates from multiple threshold servers, preferring the connected state
for clientNodeId , state := range tenantInfo . ClientStates {
alreadyExistingState , hasAlreadyExistingState := toReturn . ClientStates [ clientNodeId ]
if ! hasAlreadyExistingState || alreadyExistingState . CurrentState != "ClientConnected" {
toReturn . ClientStates [ clientNodeId ] = state
}
}
// deduplicate listeners lists from multiple threshold servers via map
for _ , listener := range tenantInfo . Listeners {
listenersToReturn [ listener . String ( ) ] = listener
}
}
}
for _ , listener := range listenersToReturn {
toReturn . Listeners = append ( toReturn . Listeners , listener )
}
if len ( errorStrings ) > 0 {
return nil , errors . Errorf ( "GetTenantInfo() doInParallel(actions...): %s\n\n" , strings . Join ( errorStrings , "\n" ) )
}
return & toReturn , nil
}
func ( app * BackendApp ) GetInstances ( ) ( map [ string ] * VPSInstance , map [ string ] * VPSInstance , map [ string ] * VPSInstance , error ) {
dbVPSInstances , err := app . Model . GetVPSInstances ( )
if err != nil {
return nil , nil , nil , err
}
cloudVPSInstances , err := app . DigitalOcean . List ( )
if err != nil {
return nil , nil , nil , err
}
// jsonBytes, err = json.MarshalIndent(dbVPSInstances, "", " ")
// log.Printf("dbVPSInstances: %s\n\n", string(jsonBytes))
// jsonBytes, err = json.MarshalIndent(digitalOceanVpsInstances, "", " ")
// log.Printf("digitalOceanVpsInstances: %s\n\n", string(jsonBytes))
validVpsInstances := map [ string ] * VPSInstance { }
onlyCloudInstances := map [ string ] * VPSInstance { }
onlyDBInstances := map [ string ] * VPSInstance { }
for k , v := range cloudVPSInstances {
if _ , has := dbVPSInstances [ k ] ; ! has {
onlyCloudInstances [ k ] = v
}
}
for k , v := range dbVPSInstances {
if _ , has := cloudVPSInstances [ k ] ; ! has {
onlyDBInstances [ k ] = v
} else if ! v . Deleted && ! v . Deprecated {
validVpsInstances [ k ] = v
}
}
return validVpsInstances , onlyDBInstances , onlyCloudInstances , nil
}
func ( app * BackendApp ) HealthcheckInstances ( vpsInstances map [ string ] * VPSInstance ) map [ string ] bool {
if len ( vpsInstances ) == 0 {
return map [ string ] bool { }
}
actions := make ( [ ] func ( ) taskResult , len ( vpsInstances ) )
i := 0
for instanceId , instance := range vpsInstances {
actions [ i ] = func ( ) taskResult {
_ , err := app . MyHTTP200 (
"GET" ,
fmt . Sprintf ( "https://%s:%d/ping" , instance . IPV4 , app . ThresholdManagementPort ) ,
nil ,
nil ,
)
return taskResult {
Name : instanceId ,
Err : err ,
Result : nil ,
}
}
i ++
}
results := doInParallel ( false , actions ... )
healthStatus := map [ string ] bool { }
for _ , result := range results {
if result . Err != nil {
log . Printf ( "error pinging %s: %s" , result . Name , result . Err )
}
healthStatus [ result . Name ] = ( result . Err == nil )
}
return healthStatus
}
func getBillingTimeInfo ( ) ( int , int , time . Time , time . Time , float64 ) {
billingYear := time . Now ( ) . UTC ( ) . Year ( )
billingMonth := int ( time . Now ( ) . UTC ( ) . Month ( ) )
nextBillingYear := billingYear
nextBillingMonth := billingMonth + 1
if nextBillingMonth > 12 {
nextBillingMonth = 1
nextBillingYear ++
}
utcLocation , err := time . LoadLocation ( "UTC" )
if err != nil {
err = errors . Wrap ( err , "can't calculate billing because can't load UTC location. This should have been caught at startup" )
log . Printf ( "%+v\n" , err )
panic ( err )
}
startOfBillingMonth := time . Date ( billingYear , time . Month ( billingMonth ) , 1 , 0 , 0 , 0 , 0 , utcLocation )
endOfBillingMonth := time . Date ( nextBillingYear , time . Month ( nextBillingMonth ) , 0 , 23 , 59 , 59 , int ( int64 ( time . Second ) ) - 1 , utcLocation )
//TODO
monthDuration := float64 ( int64 ( endOfBillingMonth . Sub ( startOfBillingMonth ) ) )
monthElapsed := float64 ( int64 ( endOfBillingMonth . Sub ( time . Now ( ) . UTC ( ) ) ) )
amountOfMonthElapsed := monthElapsed / monthDuration
return billingMonth , billingYear , startOfBillingMonth , endOfBillingMonth , amountOfMonthElapsed
}
func ( app * BackendApp ) ValidateExternalDomains ( ) error {
log . Printf ( "starting ValidateExternalDomains()..." )
toValidate , err := app . Model . GetExternalDomains ( )
if err != nil {
return err
}
for _ , tuple := range toValidate {
externalDomain := tuple [ 0 ]
personalDomain := tuple [ 1 ]
_ , err := app . ValidateExternalDomain ( externalDomain , personalDomain , true )
if err != nil {
log . Printf ( "failed looking up %s: %s" , externalDomain , err )
}
}
log . Printf ( "finished ValidateExternalDomains()!" )
return nil
}
func ( app * BackendApp ) ValidateExternalDomain ( externalDomain , personalDomain string , updateDatabase bool ) ( bool , error ) {
googleDNSResolver := & net . Resolver {
PreferGo : true ,
Dial : func ( ctx context . Context , network , address string ) ( net . Conn , error ) {
d := net . Dialer {
Timeout : time . Millisecond * time . Duration ( 10000 ) ,
}
return d . DialContext ( ctx , network , "8.8.8.8:53" )
} ,
}
quad9DNSResolver := & net . Resolver {
PreferGo : true ,
Dial : func ( ctx context . Context , network , address string ) ( net . Conn , error ) {
d := net . Dialer {
Timeout : time . Millisecond * time . Duration ( 10000 ) ,
}
return d . DialContext ( ctx , network , "9.9.9.9:53" )
} ,
}
for attempts := 0 ; attempts < 5 ; attempts ++ {
cnameFromGoogle , googleErr := googleDNSResolver . LookupCNAME ( context . Background ( ) , externalDomain )
cnameFromQuad9 , quad9Err := quad9DNSResolver . LookupCNAME ( context . Background ( ) , externalDomain )
cnameFromDefaultResolver , defaultErr := net . LookupCNAME ( externalDomain )
if googleErr != nil && quad9Err != nil && defaultErr != nil {
if attempts >= 4 {
return false , defaultErr
}
} else {
personalDomainWithPeriodAtTheEnd := fmt . Sprintf ( "%s." , personalDomain )
googleIsValid := strings . HasSuffix ( cnameFromGoogle , personalDomainWithPeriodAtTheEnd )
quad9IsValid := strings . HasSuffix ( cnameFromQuad9 , personalDomainWithPeriodAtTheEnd )
defaultIsValid := strings . HasSuffix ( cnameFromDefaultResolver , personalDomainWithPeriodAtTheEnd )
// log.Printf(
// "ValidateExternalDomain(): %s --> %s,%s,%s\n(personalDomain=%s) (valid: %t,%t,%t)\n",
// externalDomain, cnameFromGoogle, cnameFromQuad9, cnameFromDefaultResolver, personalDomain, googleIsValid, quad9IsValid, defaultIsValid,
// )
if googleIsValid || quad9IsValid || defaultIsValid {
if updateDatabase {
err := app . Model . MarkExternalDomainAsVerified ( externalDomain )
if err != nil {
return false , err
}
}
return true , nil
} else {
return false , nil
}
}
}
return false , errors . New ( "ran out of attempts and niether succeeded nor failed, this should never happen :\\" )
}
func ( app * BackendApp ) Reallocate ( rebalanceVPSInstances , reshuffleTenants bool ) error {
log . Println ( "Starting Reallocate Process... " )
completed , err := app . tryReallocate ( rebalanceVPSInstances , reshuffleTenants )
if ! completed && err == nil {
log . Println ( "Reallocate not complete yet. Running backendApp.tryReallocate() again" )
_ , err := app . tryReallocate ( rebalanceVPSInstances , reshuffleTenants )
return err
}
return err
}
func ( app * BackendApp ) tryReallocate ( rebalanceVPSInstances bool , reshuffleTenants bool ) ( bool , error ) {
billingYear , billingMonth , _ , _ , amountOfMonthElapsed := getBillingTimeInfo ( )
//TODO
desiredInstancesPerTenant := 2
tenants , err := app . Model . GetTenants ( )
if err != nil {
return false , err
}
tenantVpsInstanceRows , err := app . Model . GetTenantVPSInstanceRows ( billingYear , billingMonth )
if err != nil {
return false , err
}
var validVpsInstances map [ string ] * VPSInstance
if rebalanceVPSInstances {
var onlyDBInstances map [ string ] * VPSInstance
var onlyCloudInstances map [ string ] * VPSInstance
validVpsInstances , onlyDBInstances , onlyCloudInstances , err = app . GetInstances ( )
if err != nil {
return false , err
}
errorStrings := [ ] string { }
for _ , v := range onlyDBInstances {
errorStrings = append ( errorStrings , fmt . Sprintf ( "instance %s (%s) is in the database, but not in the provider api" , v . GetId ( ) , v . IPV4 ) )
}
for _ , v := range onlyCloudInstances {
errorStrings = append ( errorStrings , fmt . Sprintf ( "instance %s (%s) is in the provider api, but not in the database" , v . GetId ( ) , v . IPV4 ) )
}
// issue warnings for inconsistencies between DB and provider
if len ( errorStrings ) > 0 {
return false , errors . Errorf ( "VPS instances are inconsistent: \n%s\n" , strings . Join ( errorStrings , "\n" ) )
}
} else {
dbVPSInstances , err := app . Model . GetVPSInstances ( )
if err != nil {
return false , err
}
validVpsInstances = map [ string ] * VPSInstance { }
for k , v := range dbVPSInstances {
if ! v . Deleted && ! v . Deprecated {
validVpsInstances [ k ] = v
}
}
}
// filter out dedicated vps instances & aggregate dedicated vps instances count onto tenants
keysToRemove := [ ] string { }
for k , vpsInstance := range validVpsInstances {
if vpsInstance . TenantId != 0 {
keysToRemove = append ( keysToRemove , k )
tenants [ vpsInstance . TenantId ] . DedicatedVPSCount += 1
}
}
// TODO do we need to do anything else with these besides cutting them from the "validVpsInstances" ?
for _ , k := range keysToRemove {
delete ( validVpsInstances , k )
}
// jsonBytes, err := json.MarshalIndent(tenants, "", " ")
// log.Printf("tenants: %s\n\n", string(jsonBytes))
// jsonBytes, err = json.MarshalIndent(tenantVpsInstanceRows, "", " ")
// log.Printf("tenantVpsInstanceRows: %s\n\n", string(jsonBytes))
// jsonBytes, err = json.MarshalIndent(validVpsInstances, "", " ")
// log.Printf("validVpsInstances: %s\n\n", string(jsonBytes))
// aggregate the bandwidth usage from the tenantVPS relation onto both
// note that the total bandwidth on the tenants may be larger
// than the total bandwidth on the vps instances because some vps instances may have been deleted
// if you update the following loop, consider updating the similar one in frontend.go admin handler 😬
tenantAllocationCount := map [ int ] int { }
shadowConfigs := map [ string ] map [ int ] * TunnelSettings { }
workingAllocations := map [ string ] map [ int ] bool { }
originalAllocations := map [ string ] map [ int ] bool { }
pinned := map [ string ] map [ int ] bool { }
for _ , row := range tenantVpsInstanceRows {
vpsInstanceId := row . GetVPSInstanceId ( )
vpsInstance , hasVPSInstance := validVpsInstances [ vpsInstanceId ]
tenant , hasTenant := tenants [ row . TenantId ]
if hasVPSInstance {
vpsInstance . Bytes += row . Bytes
}
if hasTenant {
tenant . Bytes += row . Bytes
}
if hasTenant && hasVPSInstance {
_ , has := shadowConfigs [ vpsInstanceId ]
if ! has {
shadowConfigs [ vpsInstanceId ] = map [ int ] * TunnelSettings { }
workingAllocations [ vpsInstanceId ] = map [ int ] bool { }
originalAllocations [ vpsInstanceId ] = map [ int ] bool { }
}
shadowConfigs [ vpsInstanceId ] [ tenant . Id ] = row . ShadowConfig
// if the tenannts allocation to this instance was removed recently (less than tenantPinDuration ago)
// then add the allocation to pinned, keeping the allocation live for a while while DNS updates propagate.
if row . DeactivatedAt != nil && row . DeactivatedAt . Add ( tenantPinDuration ) . After ( time . Now ( ) ) {
pinned [ vpsInstanceId ] [ tenant . Id ] = true
}
if row . Active {
workingAllocations [ vpsInstanceId ] [ tenant . Id ] = true
originalAllocations [ vpsInstanceId ] [ tenant . Id ] = true
tenantAllocationCount [ row . TenantId ] ++
}
}
}
// Deactiveate tenants who have surpassed thier service limit for this billing month
bytesPerCent := 1000000000
for _ , tenant := range tenants {
bandwidthCostCents := int ( tenant . Bytes / int64 ( bytesPerCent ) )
if bandwidthCostCents >= tenant . ServiceLimitCents {
tenant . Deactivated = true
}
}
// -------------------------------------------------------------------------------
// STEP 1: scale the # of instances up or down depending on total projected usage.
if rebalanceVPSInstances {
totalUsageSoFar := int64 ( 0 )
totalUsageByActiveTenantsSoFar := int64 ( 0 )
for _ , tenant := range tenants {
totalUsageSoFar += tenant . Bytes
if ! tenant . Deactivated {
totalUsageByActiveTenantsSoFar += tenant . Bytes
}
}
soFarFloat := float64 ( totalUsageSoFar )
activeTenantsSoFarFloat := float64 ( totalUsageByActiveTenantsSoFar )
totalProjectedUsageFloat := soFarFloat + ( ( activeTenantsSoFarFloat / amountOfMonthElapsed ) - activeTenantsSoFarFloat )
//totalProjectedUsage := int64(totalProjectedUsageFloat)
totalMonthlyAllotment := int64 ( 0 )
for _ , vpsInstance := range validVpsInstances {
if ! vpsInstance . Deprecated && ! vpsInstance . Deleted {
// TODO handle BytesMonthly when node is created in the middle of the month / near end of month
amountOfMonthThatThisInstanceWillHaveBeenTurnedOnFor := float64 ( 1 )
totalMonthlyAllotment += int64 ( float64 ( vpsInstance . BytesMonthly ) * amountOfMonthThatThisInstanceWillHaveBeenTurnedOnFor )
}
}
totalMonthlyAllotmentFloat := float64 ( totalMonthlyAllotment )
overageFloat := totalProjectedUsageFloat - totalMonthlyAllotmentFloat
overflowAmountFloat := overageFloat - float64 ( projectedOverageAllowedBeforeSpawningNewInstance )
if overflowAmountFloat > 0 || len ( validVpsInstances ) == 0 {
instancesToCreate := int ( math . Ceil ( overflowAmountFloat / float64 ( DEFAULT_INSTANCE_MONTHLY_BYTES ) ) )
if instancesToCreate < 1 && len ( validVpsInstances ) == 0 {
instancesToCreate = 1
}
log . Printf ( "spawning %d instances..." , instancesToCreate )
tasks := [ ] func ( ) taskResult { }
for i := 0 ; i < instancesToCreate ; i ++ {
tasks = append ( tasks , func ( ) taskResult {
instance , err := app . SpawnNewMultitenantInstance ( )
return taskResult {
Err : err ,
Result : instance ,
Name : strconv . Itoa ( i ) ,
}
} )
}
results := doInParallel ( false , tasks ... )
errors := [ ] string { }
for _ , result := range results {
if result . Err != nil {
errors = append ( errors , fmt . Sprintf ( "%+v" , result . Err ) )
}
}
if len ( errors ) > 0 {
return false , fmt . Errorf ( "SpawnNewMultitenantInstance failed: \n%s\n" , strings . Join ( errors , "\n" ) )
}
return false , nil
}
underusageAmount := - overageFloat - float64 ( projectedUnderageAllowedBeforeTerminatingInstance )
if underusageAmount > 0 && len ( validVpsInstances ) > 1 {
instancesToDeprecate := int ( math . Ceil ( underusageAmount / float64 ( DEFAULT_INSTANCE_MONTHLY_BYTES ) ) )
if instancesToDeprecate > len ( validVpsInstances ) - 1 {
instancesToDeprecate = len ( validVpsInstances ) - 1
}
log . Printf ( "deprecating %d instances..." , instancesToDeprecate )
return false , nil
}
}
// STEP 2: shuffle tenants around to balance load.
// if we got to this point that means we didn't spawn or deprecate any instances this time around.
// sanity check the desiredInstancesPerTenant
if desiredInstancesPerTenant > len ( validVpsInstances ) {
desiredInstancesPerTenant = len ( validVpsInstances )
}
// the allocate function will ADD (not remove) the tenant to the <instanceCount> vps instances that don't already
// have that tenant on them & have the lowest projected usage for this month.
allocate := func ( tenant * TenantInfo , instanceCount int , workingAllocations * map [ string ] map [ int ] bool ) {
lowestUsedInstances := [ ] stringBytesTuple { }
for _ , instance := range validVpsInstances {
// get total projected usage for each instance
// un-allocate any existing allocations for this tenant.. ?
// find lowest desiredInstancesPerTenant instances & allocate them
instanceAllocations , has := ( * workingAllocations ) [ instance . GetId ( ) ]
// if this instance already has this tenant allocated, skip it.
if has && instanceAllocations [ tenant . Id ] || instance . Deprecated || instance . Deleted {
continue
}
instanceMetadata := stringBytesTuple {
id : instance . GetId ( ) ,
bytes : getInstanceProjectedUsage ( instance , workingAllocations , & tenants , desiredInstancesPerTenant , amountOfMonthElapsed ) ,
}
if len ( lowestUsedInstances ) < instanceCount {
lowestUsedInstances = append ( lowestUsedInstances , instanceMetadata )
} else {
indexOfMostUsed := - 1
for i , instance := range lowestUsedInstances {
if indexOfMostUsed == - 1 || instance . bytes > lowestUsedInstances [ indexOfMostUsed ] . bytes {
indexOfMostUsed = i
}
}
if instanceMetadata . bytes < lowestUsedInstances [ indexOfMostUsed ] . bytes {
lowestUsedInstances [ indexOfMostUsed ] = instanceMetadata
}
}
}
for _ , instanceMetadata := range lowestUsedInstances {
if _ , has := ( * workingAllocations ) [ instanceMetadata . id ] ; ! has {
( * workingAllocations ) [ instanceMetadata . id ] = map [ int ] bool { }
}
( * workingAllocations ) [ instanceMetadata . id ] [ tenant . Id ] = true
}
}
// 2.1 allocate tenants who don't have the desired # of active vps allocations yet.
for _ , tenant := range tenants {
if ! tenant . Deactivated && tenantAllocationCount [ tenant . Id ] < desiredInstancesPerTenant {
allocate ( tenant , desiredInstancesPerTenant - tenantAllocationCount [ tenant . Id ] , & workingAllocations )
}
}
if reshuffleTenants {
// 2.2 shuffle tenant allocations around so tenants
// are moved from instances with predicted overages to instances with predicted underages
iterations := 0
doneShuffling := false
for ! doneShuffling && iterations < shufflingCircuitBreakerLimit && len ( validVpsInstances ) > 1 {
highestSurplus := vpsInstanceBytesTuple { instance : nil , bytes : 0 }
lowestSurplus := vpsInstanceBytesTuple { instance : nil , bytes : 0 }
for _ , instance := range validVpsInstances {
if ! instance . Deprecated && ! instance . Deleted {
//TODO
amountOfMonthThatThisInstanceWillHaveBeenTurnedOnFor := float64 ( 1 )
// TODO handle BytesMonthly when node is created in the middle of the month / near end of month
totalMonthlyAllotment := int64 ( float64 ( instance . BytesMonthly ) * amountOfMonthThatThisInstanceWillHaveBeenTurnedOnFor )
surplus := totalMonthlyAllotment - getInstanceProjectedUsage (
instance ,
& workingAllocations ,
& tenants ,
desiredInstancesPerTenant ,
amountOfMonthElapsed ,
)
if highestSurplus . instance == nil || highestSurplus . bytes < surplus {
highestSurplus = vpsInstanceBytesTuple { instance : instance , bytes : surplus }
}
if lowestSurplus . instance == nil || lowestSurplus . bytes > surplus {
lowestSurplus = vpsInstanceBytesTuple { instance : instance , bytes : surplus }
}
}
}
// if there are no instances which have a predicted overage, exit.
if lowestSurplus . bytes >= 0 {
doneShuffling = true
break
}
// lets say that the most overused instance has a predicted overusage of 100gb
// and the most underused instance has a predicted underusage of 700gb
// the difference between those is 800gb.
// so we want to move 400gb (half the difference) of estimated future traffic
// from the most overused to the most underused.
// if we did that, then the result would be balanced: both predicted to be underused by 300gb
desiredDifference := int64Abs ( lowestSurplus . bytes - highestSurplus . bytes ) / 2
// only continue shuffling if the difference between the two instances
// is over 20% of thier monthly allowance
// TODO handle BytesMonthly when node is created in the middle of the month / near end of month
averageMonthlyBytes := float64 ( highestSurplus . instance . BytesMonthly + lowestSurplus . instance . BytesMonthly ) * float64 ( 0.5 )
if float64 ( desiredDifference ) < averageMonthlyBytes * tenantShufflingThreshold {
doneShuffling = true
break
}
tenantsOfEmptiestInstance := workingAllocations [ highestSurplus . instance . GetId ( ) ]
tenantsOfFullestInstance := workingAllocations [ lowestSurplus . instance . GetId ( ) ]
// we are going to create a list of tenantId, instanceId pairs, one for each
// of the tenant allocations on each of the most underused and overused instances
// each list item also includes the net effect on the projected usage difference of the two instances
// assuming that the tenant was moved from one instance to the other
effectsOfMovingTenantToOtherInstance := [ ] tenantMoveTuple { }
for tenantId := range tenantsOfEmptiestInstance {
tenantUsageShare := float64 ( tenants [ tenantId ] . Bytes / int64 ( desiredInstancesPerTenant ) )
futureUsage := ( tenantUsageShare / amountOfMonthElapsed ) - tenantUsageShare
// only consider moving the tenant if it does not already exist on the destination
if ! tenantsOfFullestInstance [ tenantId ] {
effectsOfMovingTenantToOtherInstance = append (
effectsOfMovingTenantToOtherInstance ,
tenantMoveTuple {
tenantId : tenantId ,
instanceId : highestSurplus . instance . GetId ( ) ,
bytes : int64 ( futureUsage * 2 ) ,
} ,
)
}
}
for tenantId := range tenantsOfFullestInstance {
tenantUsageShare := float64 ( tenants [ tenantId ] . Bytes / int64 ( desiredInstancesPerTenant ) )
futureUsage := ( tenantUsageShare / amountOfMonthElapsed ) - tenantUsageShare
// only consider moving the tenant if it does not already exist on the destination
if ! tenantsOfEmptiestInstance [ tenantId ] {
effectsOfMovingTenantToOtherInstance = append (
effectsOfMovingTenantToOtherInstance ,
tenantMoveTuple {
tenantId : tenantId ,
instanceId : lowestSurplus . instance . GetId ( ) ,
bytes : int64 ( - futureUsage * 2 ) ,
} ,
)
}
}
// we constructed a list of all possible moves we could make to shuffle tenants between the two,
// now we use a heuristic method to find the best combination of moves which
// gets us closest to our desiredDifference.
// (weighted to also minimize the # of moves, based on knapsackShufflingMinimizationFactor)
// This is basically the knapsack problem: https://en.wikipedia.org/wiki/Knapsack_problem
positiveValue := int64 ( 0 )
negativeValue := int64 ( 0 )
for _ , move := range effectsOfMovingTenantToOtherInstance {
if move . bytes > 0 {
positiveValue += move . bytes
} else {
negativeValue += move . bytes
}
}
bestGuessSoFar := knapsackGuess {
moves : [ ] * tenantMoveTuple { } ,
distance : desiredDifference ,
score : getKnapsackSolutionScore ( desiredDifference , desiredDifference , 0 ) ,
}
numberOfAttempts := int ( float64 ( len ( effectsOfMovingTenantToOtherInstance ) ) * knapsackNumberOfGuessesFactor )
for attempt := 0 ; attempt < numberOfAttempts ; attempt ++ {
difference := int64 ( 0 )
moves := [ ] * tenantMoveTuple { }
positiveTotal := positiveValue
negativeTotal := negativeValue
permutation := getRandomPermutation ( len ( effectsOfMovingTenantToOtherInstance ) )
for i := range effectsOfMovingTenantToOtherInstance {
index := permutation [ i ]
move := effectsOfMovingTenantToOtherInstance [ index ]
proposedDifference := difference + move . bytes
proposedDistance := int64Abs ( proposedDifference - desiredDifference )
proposedScore := getKnapsackSolutionScore ( desiredDifference , proposedDistance , len ( moves ) + 1 )
// if moving this tenant would push our current guess "too far" in the positive direction
if proposedDifference > desiredDifference {
// ok, we overshot... would it be possible to "walk it back"?
// or did we overshoot only a little bit & create a good solution?
impossibleToGoBack := proposedDifference + negativeTotal > desiredDifference
if impossibleToGoBack && proposedScore > bestGuessSoFar . score {
continue
}
}
// if moving this tenant would push our current guess "too far" in the negative direction
if proposedDifference < 0 {
impossibleToGoBack := proposedDifference + positiveTotal < 0
if impossibleToGoBack {
continue
}
}
difference = proposedDifference
moves = append ( moves , & move )
if move . bytes > 0 {
positiveTotal -= move . bytes
} else {
negativeTotal -= move . bytes
}
if proposedScore < bestGuessSoFar . score {
bestGuessSoFar = knapsackGuess {
moves : moves ,
distance : proposedDistance ,
score : proposedScore ,
}
}
}
}
if len ( bestGuessSoFar . moves ) == 0 {
doneShuffling = true
} else {
for _ , move := range bestGuessSoFar . moves {
if move . instanceId == highestSurplus . instance . GetId ( ) {
delete ( workingAllocations [ move . instanceId ] , move . tenantId )
workingAllocations [ lowestSurplus . instance . GetId ( ) ] [ move . tenantId ] = true
} else {
delete ( workingAllocations [ move . instanceId ] , move . tenantId )
workingAllocations [ highestSurplus . instance . GetId ( ) ] [ move . tenantId ] = true
}
}
}
}
if iterations == shufflingCircuitBreakerLimit {
return false , fmt . Errorf (
` something went wrong shuffling tenants shufflingCircuitBreakerLimit was reached (%d iterations) ` ,
shufflingCircuitBreakerLimit ,
)
}
}
// Step 3: Now that we have the workingAllocations shuffled to balance the load, we need to apply
// those changes to the instances.
// we store a "shadow config" in the database for the tenants membership on vps.
// Then when we are ready to apply a new configuration, we do a diff against the current "shadow config"
// and only make requests to each instance which has diffs.
existingConfig := map [ string ] map [ int ] * TunnelSettings { }
newConfig := map [ string ] map [ int ] * TunnelSettings { }
for instanceId , instanceAllocations := range originalAllocations {
for tenantId := range instanceAllocations {
settings , has := shadowConfigs [ instanceId ] [ tenantId ]
if ! has {
settings = & TunnelSettings {
AuthorizedDomains : [ ] string { } ,
}
}
if _ , has = existingConfig [ instanceId ] ; ! has {
existingConfig [ instanceId ] = map [ int ] * TunnelSettings { }
}
existingConfig [ instanceId ] [ tenantId ] = settings
}
}
// Add both workingAllocations and pinned to newConfig
for instanceId , instanceAllocations := range workingAllocations {
for tenantId := range instanceAllocations {
if _ , has := newConfig [ instanceId ] ; ! has {
newConfig [ instanceId ] = map [ int ] * TunnelSettings { }
}
if ! tenants [ tenantId ] . Deactivated {
// TODO handle other authorized domains
newConfig [ instanceId ] [ tenantId ] = tenants [ tenantId ] . TunnelSettings
}
}
}
for instanceId , instanceAllocations := range pinned {
for tenantId := range instanceAllocations {
if _ , has := newConfig [ instanceId ] ; ! has {
newConfig [ instanceId ] = map [ int ] * TunnelSettings { }
}
if ! tenants [ tenantId ] . Deactivated {
// TODO handle other authorized domains
newConfig [ instanceId ] [ tenantId ] = tenants [ tenantId ] . TunnelSettings
}
}
}
// go over the new and existing configs and mark any instanceIds which have differences
changedInstanceIds := map [ string ] bool { }
for instanceId , instanceTenants := range existingConfig {
if _ , has := newConfig [ instanceId ] ; ! has {
changedInstanceIds [ instanceId ] = true
continue
}
newInstanceTenants := newConfig [ instanceId ]
for tenantId , tenantConfig := range instanceTenants {
newTenantConfig , has := newInstanceTenants [ tenantId ]
if ! has || ! newTenantConfig . DeepEquals ( tenantConfig ) {
changedInstanceIds [ instanceId ] = true
}
}
}
for instanceId , instanceTenants := range newConfig {
if _ , has := existingConfig [ instanceId ] ; ! has {
changedInstanceIds [ instanceId ] = true
continue
}
existingInstanceTenants := newConfig [ instanceId ]
for tenantId , tenantConfig := range instanceTenants {
existingTenantConfig , has := existingInstanceTenants [ tenantId ]
// TODO remove hardcoded true
if ! has || ! existingTenantConfig . DeepEquals ( tenantConfig ) || true {
changedInstanceIds [ instanceId ] = true
}
}
}
// we send PUT requests to all the instances that have config changes
actions := make ( [ ] func ( ) taskResult , len ( changedInstanceIds ) )
i := 0
for instanceId := range changedInstanceIds {
instance := validVpsInstances [ instanceId ]
actions [ i ] = func ( ) taskResult {
err := app . saveVpsInstanceTenantSettings ( billingYear , billingMonth , instance , newConfig [ instanceId ] )
return taskResult {
Name : instanceId ,
Err : err ,
Result : nil ,
}
}
i ++
}
log . Println ( "saving tenants assignments to threshold servers..." )
results := doInParallel ( false , actions ... )
for _ , result := range results {
if result . Err != nil {
return true , result . Err
}
}
freeSubdomains := map [ string ] [ ] string { }
for instanceId , tenantIds := range workingAllocations {
instance , hasInstance := validVpsInstances [ instanceId ]
if hasInstance {
for tenantId := range tenantIds {
tenant , hasTenant := tenants [ tenantId ]
if hasTenant && tenant . Subdomain != "" {
_ , hasIps := freeSubdomains [ tenant . Subdomain ]
if ! hasIps {
freeSubdomains [ tenant . Subdomain ] = [ ] string { instance . IPV4 }
} else {
freeSubdomains [ tenant . Subdomain ] = append ( freeSubdomains [ tenant . Subdomain ] , instance . IPV4 )
}
}
}
}
}
log . Println ( "updating tenants free subdomains..." )
err = app . Gandi . UpdateFreeSubdomains ( freeSubdomains )
if err != nil {
return true , err
}
log . Println ( "reallocate complete!" )
return true , nil
}
func getInstanceProjectedUsage (
instance * VPSInstance ,
workingAllocations * map [ string ] map [ int ] bool ,
tenants * map [ int ] * TenantInfo ,
desiredInstancesPerTenant int ,
amountOfMonthElapsed float64 ,
) int64 {
usageByActiveTenantsSoFar := int64 ( 0 )
instanceAllocations , has := ( * workingAllocations ) [ instance . GetId ( ) ]
if has {
for tenantId := range instanceAllocations {
if ! ( * tenants ) [ tenantId ] . Deactivated {
usageByActiveTenantsSoFar += ( * tenants ) [ tenantId ] . Bytes
}
}
}
soFarFloat := float64 ( instance . Bytes )
activeTenantsSoFarFloat := float64 ( usageByActiveTenantsSoFar ) / float64 ( desiredInstancesPerTenant )
totalProjectedUsageFloat := soFarFloat + ( ( activeTenantsSoFarFloat / amountOfMonthElapsed ) - activeTenantsSoFarFloat )
return int64 ( totalProjectedUsageFloat )
}
func ( app * BackendApp ) SpawnNewMultitenantInstance ( ) ( * VPSInstance , error ) {
userData := fmt . Sprintf (
` # ! / bin / sh
touch / root / . hushlogin
% s
% s ` ,
app . DigitalOcean . GetSSHHostKeysFileScript ( ) , app . BackblazeB2 . GetFileUploadShellScript ( ) ,
)
log . Println ( "Creating new multitenant worker..." )
instance , err := app . DigitalOcean . Create ( "multitenant-worker" , userData )
if err != nil {
return nil , errors . Wrap ( err , "failed to DigitalOcean.Create" )
}
log . Printf ( "Waiting for %s to get an IPv4...\n" , instance . GetId ( ) )
waitedSeconds := 0
for instance . IPV4 == "" && waitedSeconds < 300 {
time . Sleep ( time . Second * time . Duration ( 5 ) )
waitedSeconds += 5
instance , err = app . DigitalOcean . Get ( instance . ProviderInstanceId )
if err != nil {
return nil , errors . Wrap ( err , "failed to DigitalOcean.Get" )
}
}
log . Printf ( "Waiting for %s (%s) to upload its ssh host public keys to backblaze...\n" , instance . GetId ( ) , instance . IPV4 )
knownHostsFileContent , err := app . pollForSSHHostKeys ( instance . GetId ( ) )
if err != nil {
return nil , errors . Wrap ( err , "failed to PollBackblazeForSSHHostKeys" )
}
log . Printf ( "adding %s (%s) to the user's known-hosts file...\n" , instance . GetId ( ) , instance . IPV4 )
err = app . SSH . AppendToKnownHostsFile ( knownHostsFileContent )
if err != nil {
return nil , errors . Wrap ( err , "failed to AppendToSSHKnownHostsFile" )
}
log . Printf ( "Generating threshold install script for %s (%s)...\n" , instance . GetId ( ) , instance . IPV4 )
ipv4 := net . ParseIP ( instance . IPV4 )
ipv6 := net . ParseIP ( instance . IPV6 )
ips := [ ] net . IP { }
if ipv4 != nil {
ips = append ( ips , ipv4 )
}
if ipv6 != nil {
ips = append ( ips , ipv6 )
}
if len ( ips ) == 0 {
return nil , errors . Errorf ( "failed to convert instance IPs '%s' or '%s' to net.IP" , instance . IPV4 , instance . IPV6 )
}
provisionThresholdScript , err := app . ThresholdProvisioning . GetServerInstallScript ( instance . GetId ( ) , ips )
if err != nil {
return nil , errors . Wrap ( err , "failed to ThresholdProvisioning.GetServerInstallScript" )
}
// TODO use potentially different username depending on cloud provider
log . Printf ( "Installing threshold on %s (%s)...\n" , instance . GetId ( ) , instance . IPV4 )
stdout , stderr , err := app . SSH . RunScriptOnRemoteHost ( provisionThresholdScript , "root" , instance . IPV4 )
if err != nil {
return nil , errors . Wrapf (
err , "failed to RunScriptOnRemoteHost(provisionThresholdScript, \"root\", %s)" ,
instance . IPV4 ,
)
}
log . Println ( "stderr:" )
log . Println ( stderr )
log . Println ( "" )
log . Println ( "" )
log . Println ( "stdout:" )
log . Println ( stdout )
log . Println ( "" )
log . Println ( "" )
log . Printf ( "Saving %s (%s) info to the database...\n" , instance . GetId ( ) , instance . IPV4 )
err = app . Model . CreateVPSInstance ( instance )
if err != nil {
return nil , errors . Wrap ( err , "failed to Model.CreateVPSInstance" )
}
log . Printf ( "DONE! %s (%s) is provisioned!\n\n\n" , instance . GetId ( ) , instance . IPV4 )
return instance , nil
}
func ( app * BackendApp ) pollForSSHHostKeys ( instanceId string ) ( string , error ) {
// it usually takes at least 15 seconds for an instance to boot and run
time . Sleep ( time . Second * time . Duration ( 15 ) )
iterations := 0
for iterations < 60 {
iterations ++
fileContent , err := app . BackblazeB2 . Get ( fmt . Sprintf ( "greenhouse/known-hosts/%s" , instanceId ) )
if err != nil {
return "" , err
}
if fileContent != nil {
return string ( fileContent ) , nil
}
time . Sleep ( time . Second * time . Duration ( 5 ) )
}
return "" , fmt . Errorf ( "timed out waiting for %s to upload its host ssh public keys" , instanceId )
}
func ( app * BackendApp ) saveVpsInstanceTenantSettings (
billingYear int ,
billingMonth int ,
instance * VPSInstance ,
config map [ int ] * TunnelSettings ,
) error {
url := fmt . Sprintf ( "https://%s:%d/tenants" , instance . IPV4 , app . ThresholdManagementPort )
jsonBytes , err := json . Marshal ( config )
log . Println ( string ( jsonBytes ) )
if err != nil {
return errors . Wrapf (
err ,
"json serialization error calling %s (threshold multitenant management API)" ,
url ,
)
}
returnedBytes , err := app . MyHTTP200 (
"PUT" ,
url ,
bytes . NewBuffer ( jsonBytes ) ,
func ( request * http . Request ) { request . Header . Set ( "Content-Type" , "application/json" ) } ,
)
if err != nil {
return err
}
log . Printf ( "returnedBytes: \n%s\n" , string ( returnedBytes ) )
returnedBytes2 , err := app . MyHTTP200 ( "GET" , url , nil , nil )
if err != nil {
return err
}
log . Printf ( "returnedBytes2: \n%s\n" , string ( returnedBytes2 ) )
err = app . Model . SaveInstanceConfiguration ( billingYear , billingMonth , instance , config )
if err != nil {
return errors . Wrapf ( err , "Can't save instance configuration to database for %s" , instance . GetId ( ) )
}
return nil
}
// TODO this fn needs to use app.MyHTTP() too
// func (app *BackendApp) getTenantSettingsFromVpsInstance(url string) (map[string]TunnelSettings, error) {
// httpClient := http.Client{
// Transport: &http.Transport{
// TLSClientConfig: app.ThresholdManagementTlsClientConfig,
// },
// Timeout: 10 * time.Second,
// }
// request, err := http.NewRequest("GET", url, nil)
// response, err := httpClient.Do(request)
// if err != nil {
// return nil, err
// }
// if response.StatusCode != 200 {
// return nil, fmt.Errorf("HTTP %d when calling %s (threshold multitenant management API)", response.StatusCode, url)
// }
// bytes, err := ioutil.ReadAll(response.Body)
// if err != nil {
// return nil, errors.Wrapf(err, "HTTP read error when calling %s (threshold multitenant management API)", url)
// }
// var responseObject map[string]TunnelSettings
// err = json.Unmarshal(bytes, &responseObject)
// if err != nil {
// return nil, errors.Wrapf(err, "JSON parse error when calling %s (threshold multitenant management API)", url)
// }
// return responseObject, nil
// }
func CreateUnixTransport ( socketFile string ) * http . Transport {
return & http . Transport {
DialContext : func ( ctx context . Context , _ , _ string ) ( net . Conn , error ) {
return net . Dial ( "unix" , socketFile )
} ,
}
}
func ( service * BaseHTTPService ) MyHTTP (
method string ,
url string ,
body io . Reader ,
withRequest func ( * http . Request ) ,
) ( int , [ ] byte , error ) {
if service . client == nil || service . clientExpiry != nil && time . Now ( ) . After ( * ( service . clientExpiry ) ) {
httpClient , httpClientExpiry , err := service . ClientFactory ( )
if err != nil {
return 0 , nil , errors . Wrapf ( err , "failed to create HTTP client when calling %s %s" , method , url )
}
service . client = httpClient
service . clientExpiry = httpClientExpiry
}
request , err := http . NewRequest ( method , url , body )
if err != nil {
return 0 , nil , errors . Wrapf ( err , "failed to create HTTP request calling %s %s" , method , url )
}
if withRequest != nil {
withRequest ( request )
}
response , err := service . client . Do ( request )
if err != nil {
return 0 , nil , errors . Wrapf ( err , "HTTP request error when calling %s %s" , method , url )
}
bytes , err := ioutil . ReadAll ( response . Body )
if err != nil {
return response . StatusCode , nil , errors . Wrapf ( err , "HTTP read error when calling %s %s " , method , url )
}
return response . StatusCode , bytes , err
}
func ( service * BaseHTTPService ) MyHTTP200 (
method string ,
url string ,
body io . Reader ,
withRequest func ( * http . Request ) ,
) ( [ ] byte , error ) {
statusCode , bytes , err := service . MyHTTP ( method , url , body , withRequest )
if err == nil && statusCode >= 300 {
err = errors . Errorf ( "HTTP %d when calling %s %s: \n%s\n\n" , statusCode , method , url , string ( bytes ) )
}
return bytes , err
}
func doInParallel ( stopOnFirstError bool , actions ... func ( ) taskResult ) map [ string ] taskResult {
resultsChannel := make ( chan taskResult )
results := map [ string ] taskResult { }
//log.Printf("do %d actions in parallel", len(actions))
for _ , action := range actions {
// this is how you do closures over the VALUE of a variable (not the variable itself) in golang
// https://stackoverflow.com/questions/26692844/captured-closure-for-loop-variable-in-go
action := action
go ( func ( ) {
result := action ( )
resultsChannel <- result
} ) ( )
}
for range actions {
result := <- resultsChannel
results [ result . Name ] = result
if result . Err != nil && stopOnFirstError {
break
}
}
return results
}
func int64Abs ( x int64 ) int64 {
if x > 0 {
return x
} else {
return - x
}
}
func getRandomPermutation ( length int ) [ ] int {
toReturn := make ( [ ] int , length )
toAppend := make ( [ ] int , length )
for i := 0 ; i < length ; i ++ {
toAppend [ i ] = i
}
for i := 0 ; i < length ; i ++ {
picked := mathRand . Intn ( length - i )
toReturn [ i ] = toAppend [ picked ]
toAppend [ picked ] = toAppend [ length - i ]
toAppend = toAppend [ 0 : length - i ]
}
return toReturn
}
func getKnapsackSolutionScore ( scale int64 , proposedDistance int64 , proposedLength int ) int64 {
return proposedDistance + int64 ( float64 ( scale ) * knapsackShufflingMinimizationFactor ) * int64 ( proposedLength )
}