a cloud service to enable your own web server (owned by you and running on your computer) to be accessible on the internet in seconds, no credit card required
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

1412 lines
45 KiB

package main
import (
"bytes"
"crypto/rand"
"crypto/sha256"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"math"
mathRand "math/rand"
"net"
"net/http"
"regexp"
"strconv"
"strings"
"time"
"git.sequentialread.com/forest/greenhouse/pki"
errors "git.sequentialread.com/forest/pkg-errors"
base58 "github.com/shengdoushi/base58"
)
type BaseHTTPService struct {
client *http.Client
clientExpiry *time.Time
ClientFactory func() (*http.Client, *time.Time, error)
}
type BackendApp struct {
BaseHTTPService
WorkingDirectory string
EmailService *EmailService
Model *DBModel
DigitalOcean *DigitalOceanService
Gandi *GandiService
FreeSubdomainDomain string
BackblazeB2 *BackblazeB2Service
SSH *SSHService
ThresholdProvisioning *ThresholdProvisioningService
ThresholdManagementPort int
ThresholdPort int
AdminTenantId int
AdminThresholdNodeId string
GreenhouseListenPort int
GreenhouseThresholdServiceId string
}
type ThresholdMetrics struct {
InboundByTenant map[string]int64
OutboundByTenant map[string]int64
}
type VPSInstanceWithTenants struct {
TenantSettings map[string]TunnelSettings
VPSInstance *VPSInstance
}
type stringBytesTuple struct {
id string
bytes int64
}
type vpsInstanceBytesTuple struct {
instance *VPSInstance
bytes int64
}
type tenantMoveTuple struct {
tenantId int
instanceId string
bytes int64
}
type knapsackGuess struct {
moves []*tenantMoveTuple
distance int64
score int64
}
type taskResult struct {
Name string
Err error
Result interface{}
}
const tenantShufflingThreshold = float64(0.2)
const knapsackShufflingMinimizationFactor = float64(0.3)
const knapsackNumberOfGuessesFactor = float64(5)
const shufflingCircuitBreakerLimit = 1000
const GIGABYTE = int64(1000000000)
const TERABYTE = int64(1000000000000)
const managementClientCertSubject = "management@greenhouse.server.garden"
const freeSubdomainDomain = "greenhouseusers.com"
var projectedOverageAllowedBeforeSpawningNewInstance int64 = GIGABYTE * 250
var projectedUnderageAllowedBeforeTerminatingInstance int64 = TERABYTE
func initBackend(
workingDirectory string,
config *Config,
pkiService *pki.PKIService,
model *DBModel,
emailService *EmailService,
) *BackendApp {
adminThresholdNodeId := "greenhouse_node_id"
greenhouseThresholdServiceId := "greenhouse_https"
toReturn := BackendApp{
WorkingDirectory: workingDirectory,
EmailService: emailService,
Model: model,
DigitalOcean: NewDigitalOceanService(config),
Gandi: NewGandiService(config, freeSubdomainDomain),
FreeSubdomainDomain: freeSubdomainDomain,
BackblazeB2: NewBackblazeB2Service(config),
SSH: NewSSHService(config),
ThresholdProvisioning: NewThresholdProvisioningService(config, pkiService, config.AdminTenantId, adminThresholdNodeId, greenhouseThresholdServiceId),
ThresholdPort: config.ThresholdPort,
ThresholdManagementPort: config.ThresholdManagementPort,
AdminTenantId: config.AdminTenantId,
AdminThresholdNodeId: adminThresholdNodeId,
GreenhouseListenPort: config.FrontendPort,
GreenhouseThresholdServiceId: greenhouseThresholdServiceId,
}
// TODO move this BaseHTTPService into the ThresholdProvisioning service?
toReturn.ClientFactory = func() (*http.Client, *time.Time, error) {
caCert, err := pkiService.GetCACertificate(mainCAName)
if err != nil {
return nil, nil, err
}
expiryTime := time.Now().Add(time.Hour * 24)
cert, err := pkiService.GetClientTLSCertificate(managementAPIAuthCAName, managementClientCertSubject, expiryTime)
if err != nil {
return nil, nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AddCert(caCert)
tlsClientConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}
tlsClientConfig.BuildNameToCertificate()
return &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsClientConfig,
},
Timeout: 10 * time.Second,
}, &expiryTime, nil
}
return &toReturn
}
func (app *BackendApp) InitializeTenant(tenantId int, email string) error {
emailSplit := strings.Split(email, "@")
if len(emailSplit) != 2 {
return errors.Errorf("InitializeTenant(): invalid email '%s'", email)
}
emailDomainSplit := strings.Split(emailSplit[1], ".")
if len(emailDomainSplit) < 2 {
return errors.Errorf("InitializeTenant(): invalid email '%s'", email)
}
replaceInvalidCharactersRegex := regexp.MustCompile("[^a-z0-9]")
replaceRepeatedHyphensRegex := regexp.MustCompile("-+")
makeSubdomain := func(input string) string {
toReturn := replaceInvalidCharactersRegex.ReplaceAllString(input, "-")
toReturn = replaceRepeatedHyphensRegex.ReplaceAllString(toReturn, "-")
return strings.Trim(toReturn, "-")
}
subdomain := makeSubdomain(emailSplit[0])
var err error
alreadyTaken := true
i := 0
for alreadyTaken && i < 1000 {
i++
alreadyTaken, err = app.Model.SetFreeSubdomain(tenantId, subdomain)
if err != nil {
return err
}
if i == 1 {
subdomain = makeSubdomain(fmt.Sprintf("%s-%s", emailSplit[0], emailDomainSplit[0]))
} else if i == 2 {
subdomain = makeSubdomain(email)
} else {
subdomain = makeSubdomain(fmt.Sprintf("%s-%d", emailSplit[0], i-2))
}
}
if i >= 1000 {
return errors.Errorf("InitializeTenant(): something went wrong generating a subdomain for '%s'", email)
}
start, end, bucket, err := app.Model.GetNextReservedPorts()
if err != nil {
return errors.Wrapf(err, "InitializeTenant() for '%s':", email)
}
err = app.Model.SetReservedPorts(tenantId, start, end, bucket)
if err != nil {
return errors.Wrapf(err, "InitializeTenant() for '%s':", email)
}
apiTokenBuffer := make([]byte, 16)
rand.Read(apiTokenBuffer)
apiToken := base58.Encode(apiTokenBuffer, base58.BitcoinAlphabet)
rawHash := sha256.Sum256([]byte(apiToken))
hashedAPIToken := fmt.Sprintf("%x", rawHash)
err = app.Model.SetHashedAPIToken(tenantId, hashedAPIToken)
if err != nil {
return errors.Wrapf(err, "InitializeTenant() for '%s':", email)
}
return nil
}
func (app *BackendApp) ConsumeMetrics() error {
vpsInstances, err := app.Model.GetVPSInstances()
if err != nil {
return err
}
billingYear, billingMonth, _, _, _ := getBillingTimeInfo()
actions := make([]func() taskResult, len(vpsInstances))
i := 0
for instanceId, instance := range vpsInstances {
actions[i] = func() taskResult {
// TODO move the metrics to the management API / to the client side in threshold
responseBytes, err := app.MyHTTP200(
"GET",
fmt.Sprintf("https://%s:%d/consumeMetrics", instance.IPV4, app.ThresholdManagementPort),
nil,
nil,
)
//log.Println(string(responseBytes))
result := ThresholdMetrics{}
if err == nil {
err = json.Unmarshal(responseBytes, &result)
}
if err == nil {
err = app.Model.RecordVPSUsage(instance, result, billingYear, billingMonth)
}
return taskResult{
Name: instanceId,
Err: err,
Result: result,
}
}
i++
}
results := doInParallel(false, actions...)
errorStrings := []string{}
bytesByTenant := map[int]int64{}
for _, result := range results {
if result.Err != nil {
errorStrings = append(errorStrings, fmt.Sprintf("error collecting metrics from %s: %+v", result.Name, result.Err))
} else {
instanceBytesByTenant := result.Result.(ThresholdMetrics)
for tenantIdString, bytez := range instanceBytesByTenant.InboundByTenant {
tenantId, err := strconv.Atoi(tenantIdString)
if err != nil {
errorStrings = append(errorStrings, fmt.Sprintf("can't convert tenantId '%s' to integer: %s", tenantIdString, err))
} else {
bytesByTenant[tenantId] += bytez
}
}
for tenantIdString, bytez := range instanceBytesByTenant.OutboundByTenant {
tenantId, err := strconv.Atoi(tenantIdString)
if err != nil {
errorStrings = append(errorStrings, fmt.Sprintf("can't convert tenantId '%s' to integer: %s", tenantIdString, err))
} else {
bytesByTenant[tenantId] += bytez
}
}
}
}
if len(errorStrings) > 0 {
return errors.Errorf("ConsumeMetrics() doInParallel(actions...): %s\n\n", strings.Join(errorStrings, "\n"))
}
err = app.Model.RecordTenantsUsage(bytesByTenant)
if err != nil {
return err
}
return nil
}
func (app *BackendApp) GetInstances() (map[string]*VPSInstance, map[string]*VPSInstance, map[string]*VPSInstance, error) {
dbVPSInstances, err := app.Model.GetVPSInstances()
if err != nil {
return nil, nil, nil, err
}
cloudVPSInstances, err := app.DigitalOcean.List()
if err != nil {
return nil, nil, nil, err
}
// jsonBytes, err = json.MarshalIndent(dbVPSInstances, "", " ")
// log.Printf("dbVPSInstances: %s\n\n", string(jsonBytes))
// jsonBytes, err = json.MarshalIndent(digitalOceanVpsInstances, "", " ")
// log.Printf("digitalOceanVpsInstances: %s\n\n", string(jsonBytes))
validVpsInstances := map[string]*VPSInstance{}
onlyCloudInstances := map[string]*VPSInstance{}
onlyDBInstances := map[string]*VPSInstance{}
for k, v := range cloudVPSInstances {
if _, has := dbVPSInstances[k]; !has {
onlyCloudInstances[k] = v
}
}
for k, v := range dbVPSInstances {
if _, has := cloudVPSInstances[k]; !has {
onlyDBInstances[k] = v
} else if !v.Deleted && !v.Deprecated {
validVpsInstances[k] = v
}
}
return validVpsInstances, onlyDBInstances, onlyCloudInstances, nil
}
func (app *BackendApp) HealthcheckInstances(vpsInstances map[string]*VPSInstance) map[string]bool {
if len(vpsInstances) == 0 {
return map[string]bool{}
}
actions := make([]func() taskResult, len(vpsInstances))
i := 0
for instanceId, instance := range vpsInstances {
actions[i] = func() taskResult {
_, err := app.MyHTTP200(
"GET",
fmt.Sprintf("https://%s:%d/ping", instance.IPV4, app.ThresholdManagementPort),
nil,
nil,
)
return taskResult{
Name: instanceId,
Err: err,
Result: nil,
}
}
i++
}
results := doInParallel(false, actions...)
healthStatus := map[string]bool{}
for _, result := range results {
if result.Err != nil {
log.Printf("error pinging %s: %s", result.Name, result.Err)
}
healthStatus[result.Name] = (result.Err == nil)
}
return healthStatus
}
func getBillingTimeInfo() (int, int, time.Time, time.Time, float64) {
billingYear := time.Now().UTC().Year()
billingMonth := int(time.Now().UTC().Month())
nextBillingYear := billingYear
nextBillingMonth := billingMonth + 1
if nextBillingMonth > 12 {
nextBillingMonth = 1
nextBillingYear++
}
utcLocation, err := time.LoadLocation("UTC")
if err != nil {
err = errors.Wrap(err, "can't calculate billing because can't load UTC location. This should have been caught at startup")
log.Printf("%+v\n", err)
panic(err)
}
startOfBillingMonth := time.Date(billingYear, time.Month(billingMonth), 1, 0, 0, 0, 0, utcLocation)
endOfBillingMonth := time.Date(nextBillingYear, time.Month(nextBillingMonth), 0, 23, 59, 59, int(int64(time.Second))-1, utcLocation)
//TODO
monthDuration := float64(int64(endOfBillingMonth.Sub(startOfBillingMonth)))
monthElapsed := float64(int64(endOfBillingMonth.Sub(time.Now().UTC())))
amountOfMonthElapsed := monthElapsed / monthDuration
return billingMonth, billingYear, startOfBillingMonth, endOfBillingMonth, amountOfMonthElapsed
}
func (app *BackendApp) Rebalance() (bool, error) {
billingYear, billingMonth, _, _, amountOfMonthElapsed := getBillingTimeInfo()
//TODO
desiredInstancesPerTenant := 2
tenantPinDuration := 6 * time.Hour
tenants, err := app.Model.GetTenants()
if err != nil {
return false, err
}
tenantVpsInstanceRows, err := app.Model.GetTenantVPSInstanceRows(billingYear, billingMonth)
if err != nil {
return false, err
}
validVpsInstances, onlyDBInstances, onlyCloudInstances, err := app.GetInstances()
if err != nil {
return false, err
}
errorStrings := []string{}
for _, v := range onlyDBInstances {
errorStrings = append(errorStrings, fmt.Sprintf("instance %s (%s) is in the database, but not in the provider api", v.GetId(), v.IPV4))
}
for _, v := range onlyCloudInstances {
errorStrings = append(errorStrings, fmt.Sprintf("instance %s (%s) is in the provider api, but not in the database", v.GetId(), v.IPV4))
}
// issue warnings for inconsistencies between DB and provider
if len(errorStrings) > 0 {
return false, errors.Errorf("VPS instances are inconsistent: \n%s\n", strings.Join(errorStrings, "\n"))
}
// aggregate dedicated vps instances count onto tenants
keysToRemove := []string{}
for k, vpsInstance := range validVpsInstances {
if vpsInstance.TenantId != 0 {
keysToRemove = append(keysToRemove, k)
tenants[vpsInstance.TenantId].DedicatedVPSCount += 1
}
}
// TODO do we need to do anything else with these besides cutting them from the "validVpsInstances" ?
for _, k := range keysToRemove {
delete(validVpsInstances, k)
}
// jsonBytes, err := json.MarshalIndent(tenants, "", " ")
// log.Printf("tenants: %s\n\n", string(jsonBytes))
// jsonBytes, err = json.MarshalIndent(tenantVpsInstanceRows, "", " ")
// log.Printf("tenantVpsInstanceRows: %s\n\n", string(jsonBytes))
// jsonBytes, err = json.MarshalIndent(validVpsInstances, "", " ")
// log.Printf("validVpsInstances: %s\n\n", string(jsonBytes))
// aggregate the bandwidth usage from the tenantVPS relation onto both
// note that the total bandwidth on the tenants may be larger
// than the total bandwidth on the vps instances because some vps instances may have been deleted
// if you update the following loop, consider updating the similar one in frontend.go admin handler 😬
tenantAllocationCount := map[int]int{}
shadowConfigs := map[string]map[int]*TunnelSettings{}
workingAllocations := map[string]map[int]bool{}
originalAllocations := map[string]map[int]bool{}
pinned := map[string]map[int]bool{}
for _, row := range tenantVpsInstanceRows {
vpsInstanceId := row.GetVPSInstanceId()
vpsInstance, hasVPSInstance := validVpsInstances[vpsInstanceId]
tenant, hasTenant := tenants[row.TenantId]
if hasVPSInstance {
vpsInstance.Bytes += row.Bytes
}
if hasTenant {
tenant.Bytes += row.Bytes
}
if hasTenant && hasVPSInstance {
_, has := shadowConfigs[vpsInstanceId]
if !has {
shadowConfigs[vpsInstanceId] = map[int]*TunnelSettings{}
workingAllocations[vpsInstanceId] = map[int]bool{}
originalAllocations[vpsInstanceId] = map[int]bool{}
}
shadowConfigs[vpsInstanceId][tenant.Id] = row.ShadowConfig
// if the tenannts allocation to this instance was removed recently (less than tenantPinDuration ago)
// then add the allocation to pinned, keeping the allocation live for a while while DNS updates propagate.
if row.DeactivatedAt != nil && row.DeactivatedAt.Add(tenantPinDuration).After(time.Now()) {
pinned[vpsInstanceId][tenant.Id] = true
}
if row.Active {
workingAllocations[vpsInstanceId][tenant.Id] = true
originalAllocations[vpsInstanceId][tenant.Id] = true
tenantAllocationCount[row.TenantId]++
}
}
}
// Deactiveate tenants who have surpassed thier service limit for this billing month
bytesPerCent := 1000000000
for _, tenant := range tenants {
bandwidthCostCents := int(tenant.Bytes / int64(bytesPerCent))
if bandwidthCostCents >= tenant.ServiceLimitCents {
tenant.Deactivated = true
}
}
// -------------------------------------------------------------------------------
// STEP 1: scale the # of instances up or down depending on total projected usage.
totalUsageSoFar := int64(0)
totalUsageByActiveTenantsSoFar := int64(0)
for _, tenant := range tenants {
totalUsageSoFar += tenant.Bytes
if !tenant.Deactivated {
totalUsageByActiveTenantsSoFar += tenant.Bytes
}
}
soFarFloat := float64(totalUsageSoFar)
activeTenantsSoFarFloat := float64(totalUsageByActiveTenantsSoFar)
totalProjectedUsageFloat := soFarFloat + ((activeTenantsSoFarFloat / amountOfMonthElapsed) - activeTenantsSoFarFloat)
//totalProjectedUsage := int64(totalProjectedUsageFloat)
totalMonthlyAllotment := int64(0)
for _, vpsInstance := range validVpsInstances {
if !vpsInstance.Deprecated && !vpsInstance.Deleted {
// TODO handle BytesMonthly when node is created in the middle of the month / near end of month
amountOfMonthThatThisInstanceWillHaveBeenTurnedOnFor := float64(1)
totalMonthlyAllotment += int64(float64(vpsInstance.BytesMonthly) * amountOfMonthThatThisInstanceWillHaveBeenTurnedOnFor)
}
}
totalMonthlyAllotmentFloat := float64(totalMonthlyAllotment)
overageFloat := totalProjectedUsageFloat - totalMonthlyAllotmentFloat
overflowAmountFloat := overageFloat - float64(projectedOverageAllowedBeforeSpawningNewInstance)
if overflowAmountFloat > 0 || len(validVpsInstances) == 0 {
instancesToCreate := int(math.Ceil(overflowAmountFloat / float64(DEFAULT_INSTANCE_MONTHLY_BYTES)))
if instancesToCreate < 1 && len(validVpsInstances) == 0 {
instancesToCreate = 1
}
log.Printf("spawning %d instances...", instancesToCreate)
tasks := []func() taskResult{}
for i := 0; i < instancesToCreate; i++ {
tasks = append(tasks, func() taskResult {
instance, err := app.SpawnNewMultitenantInstance()
return taskResult{
Err: err,
Result: instance,
Name: strconv.Itoa(i),
}
})
}
results := doInParallel(false, tasks...)
errors := []string{}
for _, result := range results {
if result.Err != nil {
errors = append(errors, fmt.Sprintf("%+v", result.Err))
}
}
if len(errors) > 0 {
return false, fmt.Errorf("SpawnNewMultitenantInstance failed: \n%s\n", strings.Join(errors, "\n"))
}
return false, nil
}
underusageAmount := -overageFloat - float64(projectedUnderageAllowedBeforeTerminatingInstance)
if underusageAmount > 0 && len(validVpsInstances) > 1 {
instancesToDeprecate := int(math.Ceil(underusageAmount / float64(DEFAULT_INSTANCE_MONTHLY_BYTES)))
if instancesToDeprecate > len(validVpsInstances)-1 {
instancesToDeprecate = len(validVpsInstances) - 1
}
log.Printf("deprecating %d instances...", instancesToDeprecate)
return false, nil
}
// STEP 2: shuffle tenants around to balance load.
// if we got to this point that means we didn't spawn or deprecate any instances this time around.
// sanity check the desiredInstancesPerTenant
if desiredInstancesPerTenant > len(validVpsInstances) {
desiredInstancesPerTenant = len(validVpsInstances)
}
// the allocate function will ADD (not remove) the tenant to the <instanceCount> vps instances that don't already
// have that tenant on them & have the lowest projected usage for this month.
allocate := func(tenant *TenantInfo, instanceCount int, workingAllocations *map[string]map[int]bool) {
lowestUsedInstances := []stringBytesTuple{}
for _, instance := range validVpsInstances {
// get total projected usage for each instance
// un-allocate any existing allocations for this tenant.. ?
// find lowest desiredInstancesPerTenant instances & allocate them
instanceAllocations, has := (*workingAllocations)[instance.GetId()]
// if this instance already has this tenant allocated, skip it.
if has && instanceAllocations[tenant.Id] || instance.Deprecated || instance.Deleted {
continue
}
instanceMetadata := stringBytesTuple{
id: instance.GetId(),
bytes: getInstanceProjectedUsage(instance, workingAllocations, &tenants, desiredInstancesPerTenant, amountOfMonthElapsed),
}
if len(lowestUsedInstances) < instanceCount {
lowestUsedInstances = append(lowestUsedInstances, instanceMetadata)
} else {
indexOfMostUsed := -1
for i, instance := range lowestUsedInstances {
if indexOfMostUsed == -1 || instance.bytes > lowestUsedInstances[indexOfMostUsed].bytes {
indexOfMostUsed = i
}
}
if instanceMetadata.bytes < lowestUsedInstances[indexOfMostUsed].bytes {
lowestUsedInstances[indexOfMostUsed] = instanceMetadata
}
}
}
for _, instanceMetadata := range lowestUsedInstances {
if _, has := (*workingAllocations)[instanceMetadata.id]; !has {
(*workingAllocations)[instanceMetadata.id] = map[int]bool{}
}
(*workingAllocations)[instanceMetadata.id][tenant.Id] = true
}
}
// 2.1 allocate tenants who don't have the desired # of active vps allocations yet.
for _, tenant := range tenants {
if !tenant.Deactivated && tenantAllocationCount[tenant.Id] < desiredInstancesPerTenant {
allocate(tenant, desiredInstancesPerTenant-tenantAllocationCount[tenant.Id], &workingAllocations)
}
}
// 2.2 shuffle tenant allocations around so tenants
// are moved from instances with predicted overages to instances with predicted underages
iterations := 0
doneShuffling := false
for !doneShuffling && iterations < shufflingCircuitBreakerLimit && len(validVpsInstances) > 1 {
highestSurplus := vpsInstanceBytesTuple{instance: nil, bytes: 0}
lowestSurplus := vpsInstanceBytesTuple{instance: nil, bytes: 0}
for _, instance := range validVpsInstances {
if !instance.Deprecated && !instance.Deleted {
//TODO
amountOfMonthThatThisInstanceWillHaveBeenTurnedOnFor := float64(1)
// TODO handle BytesMonthly when node is created in the middle of the month / near end of month
totalMonthlyAllotment := int64(float64(instance.BytesMonthly) * amountOfMonthThatThisInstanceWillHaveBeenTurnedOnFor)
surplus := totalMonthlyAllotment - getInstanceProjectedUsage(
instance,
&workingAllocations,
&tenants,
desiredInstancesPerTenant,
amountOfMonthElapsed,
)
if highestSurplus.instance == nil || highestSurplus.bytes < surplus {
highestSurplus = vpsInstanceBytesTuple{instance: instance, bytes: surplus}
}
if lowestSurplus.instance == nil || lowestSurplus.bytes > surplus {
lowestSurplus = vpsInstanceBytesTuple{instance: instance, bytes: surplus}
}
}
}
// if there are no instances which have a predicted overage, exit.
if lowestSurplus.bytes >= 0 {
doneShuffling = true
break
}
// lets say that the most overused instance has a predicted overusage of 100gb
// and the most underused instance has a predicted underusage of 700gb
// the difference between those is 800gb.
// so we want to move 400gb (half the difference) of estimated future traffic
// from the most overused to the most underused.
// if we did that, then the result would be balanced: both predicted to be underused by 300gb
desiredDifference := int64Abs(lowestSurplus.bytes-highestSurplus.bytes) / 2
// only continue shuffling if the difference between the two instances
// is over 20% of thier monthly allowance
// TODO handle BytesMonthly when node is created in the middle of the month / near end of month
averageMonthlyBytes := float64(highestSurplus.instance.BytesMonthly+lowestSurplus.instance.BytesMonthly) * float64(0.5)
if float64(desiredDifference) < averageMonthlyBytes*tenantShufflingThreshold {
doneShuffling = true
break
}
tenantsOfEmptiestInstance := workingAllocations[highestSurplus.instance.GetId()]
tenantsOfFullestInstance := workingAllocations[lowestSurplus.instance.GetId()]
// we are going to create a list of tenantId, instanceId pairs, one for each
// of the tenant allocations on each of the most underused and overused instances
// each list item also includes the net effect on the projected usage difference of the two instances
// assuming that the tenant was moved from one instance to the other
effectsOfMovingTenantToOtherInstance := []tenantMoveTuple{}
for tenantId := range tenantsOfEmptiestInstance {
tenantUsageShare := float64(tenants[tenantId].Bytes / int64(desiredInstancesPerTenant))
futureUsage := (tenantUsageShare / amountOfMonthElapsed) - tenantUsageShare
// only consider moving the tenant if it does not already exist on the destination
if !tenantsOfFullestInstance[tenantId] {
effectsOfMovingTenantToOtherInstance = append(
effectsOfMovingTenantToOtherInstance,
tenantMoveTuple{
tenantId: tenantId,
instanceId: highestSurplus.instance.GetId(),
bytes: int64(futureUsage * 2),
},
)
}
}
for tenantId := range tenantsOfFullestInstance {
tenantUsageShare := float64(tenants[tenantId].Bytes / int64(desiredInstancesPerTenant))
futureUsage := (tenantUsageShare / amountOfMonthElapsed) - tenantUsageShare
// only consider moving the tenant if it does not already exist on the destination
if !tenantsOfEmptiestInstance[tenantId] {
effectsOfMovingTenantToOtherInstance = append(
effectsOfMovingTenantToOtherInstance,
tenantMoveTuple{
tenantId: tenantId,
instanceId: lowestSurplus.instance.GetId(),
bytes: int64(-futureUsage * 2),
},
)
}
}
// we constructed a list of all possible moves we could make to shuffle tenants between the two,
// now we use a heuristic method to find the best combination of moves which
// gets us closest to our desiredDifference.
// (weighted to also minimize the # of moves, based on knapsackShufflingMinimizationFactor)
// This is basically the knapsack problem: https://en.wikipedia.org/wiki/Knapsack_problem
positiveValue := int64(0)
negativeValue := int64(0)
for _, move := range effectsOfMovingTenantToOtherInstance {
if move.bytes > 0 {
positiveValue += move.bytes
} else {
negativeValue += move.bytes
}
}
bestGuessSoFar := knapsackGuess{
moves: []*tenantMoveTuple{},
distance: desiredDifference,
score: getKnapsackSolutionScore(desiredDifference, desiredDifference, 0),
}
numberOfAttempts := int(float64(len(effectsOfMovingTenantToOtherInstance)) * knapsackNumberOfGuessesFactor)
for attempt := 0; attempt < numberOfAttempts; attempt++ {
difference := int64(0)
moves := []*tenantMoveTuple{}
positiveTotal := positiveValue
negativeTotal := negativeValue
permutation := getRandomPermutation(len(effectsOfMovingTenantToOtherInstance))
for i := range effectsOfMovingTenantToOtherInstance {
index := permutation[i]
move := effectsOfMovingTenantToOtherInstance[index]
proposedDifference := difference + move.bytes
proposedDistance := int64Abs(proposedDifference - desiredDifference)
proposedScore := getKnapsackSolutionScore(desiredDifference, proposedDistance, len(moves)+1)
// if moving this tenant would push our current guess "too far" in the positive direction
if proposedDifference > desiredDifference {
// ok, we overshot... would it be possible to "walk it back"?
// or did we overshoot only a little bit & create a good solution?
impossibleToGoBack := proposedDifference+negativeTotal > desiredDifference
if impossibleToGoBack && proposedScore > bestGuessSoFar.score {
continue
}
}
// if moving this tenant would push our current guess "too far" in the negative direction
if proposedDifference < 0 {
impossibleToGoBack := proposedDifference+positiveTotal < 0
if impossibleToGoBack {
continue
}
}
difference = proposedDifference
moves = append(moves, &move)
if move.bytes > 0 {
positiveTotal -= move.bytes
} else {
negativeTotal -= move.bytes
}
if proposedScore < bestGuessSoFar.score {
bestGuessSoFar = knapsackGuess{
moves: moves,
distance: proposedDistance,
score: proposedScore,
}
}
}
}
if len(bestGuessSoFar.moves) == 0 {
doneShuffling = true
} else {
for _, move := range bestGuessSoFar.moves {
if move.instanceId == highestSurplus.instance.GetId() {
delete(workingAllocations[move.instanceId], move.tenantId)
workingAllocations[lowestSurplus.instance.GetId()][move.tenantId] = true
} else {
delete(workingAllocations[move.instanceId], move.tenantId)
workingAllocations[highestSurplus.instance.GetId()][move.tenantId] = true
}
}
}
}
if iterations == shufflingCircuitBreakerLimit {
return false, fmt.Errorf(
`something went wrong shuffling tenants shufflingCircuitBreakerLimit was reached (%d iterations)`,
shufflingCircuitBreakerLimit,
)
}
// Step 3: Now that we have the workingAllocations shuffled to balance the load, we need to apply
// those changes to the instances.
// we store a "shadow config" in the database for the tenants membership on vps.
// Then when we are ready to apply a new configuration, we do a diff against the current "shadow config"
// and only make requests to each instance which has diffs.
existingConfig := map[string]map[int]*TunnelSettings{}
newConfig := map[string]map[int]*TunnelSettings{}
for instanceId, instanceAllocations := range originalAllocations {
for tenantId := range instanceAllocations {
settings, has := shadowConfigs[instanceId][tenantId]
if !has {
settings = &TunnelSettings{
AuthorizedDomains: []string{},
}
}
if _, has = existingConfig[instanceId]; !has {
existingConfig[instanceId] = map[int]*TunnelSettings{}
}
existingConfig[instanceId][tenantId] = settings
}
}
authorizeFreeSubdomain := func(tenant *TenantInfo) *TunnelSettings {
if tenant.Subdomain != "" {
tenant.TunnelSettings.AuthorizedDomains = append(
tenant.TunnelSettings.AuthorizedDomains,
fmt.Sprintf("%s.%s", tenant.Subdomain, app.FreeSubdomainDomain),
)
}
return tenant.TunnelSettings
}
// Add both workingAllocations and pinned to newConfig
for instanceId, instanceAllocations := range workingAllocations {
for tenantId := range instanceAllocations {
if _, has := newConfig[instanceId]; !has {
newConfig[instanceId] = map[int]*TunnelSettings{}
}
if !tenants[tenantId].Deactivated {
newConfig[instanceId][tenantId] = authorizeFreeSubdomain(tenants[tenantId])
}
}
}
for instanceId, instanceAllocations := range pinned {
for tenantId := range instanceAllocations {
if _, has := newConfig[instanceId]; !has {
newConfig[instanceId] = map[int]*TunnelSettings{}
}
if !tenants[tenantId].Deactivated {
newConfig[instanceId][tenantId] = authorizeFreeSubdomain(tenants[tenantId])
}
}
}
// go over the new and existing configs and mark any instanceIds which have differences
changedInstanceIds := map[string]bool{}
for instanceId, instanceTenants := range existingConfig {
if _, has := newConfig[instanceId]; !has {
changedInstanceIds[instanceId] = true
continue
}
newInstanceTenants := newConfig[instanceId]
for tenantId, tenantConfig := range instanceTenants {
newTenantConfig, has := newInstanceTenants[tenantId]
if !has || !newTenantConfig.DeepEquals(tenantConfig) {
changedInstanceIds[instanceId] = true
}
}
}
for instanceId, instanceTenants := range newConfig {
if _, has := existingConfig[instanceId]; !has {
changedInstanceIds[instanceId] = true
continue
}
existingInstanceTenants := newConfig[instanceId]
for tenantId, tenantConfig := range instanceTenants {
existingTenantConfig, has := existingInstanceTenants[tenantId]
// TODO remove hardcoded true
if !has || !existingTenantConfig.DeepEquals(tenantConfig) || true {
changedInstanceIds[instanceId] = true
}
}
}
// we send PUT requests to all the instances that have config changes
actions := make([]func() taskResult, len(changedInstanceIds))
i := 0
for instanceId := range changedInstanceIds {
instance := validVpsInstances[instanceId]
actions[i] = func() taskResult {
err := app.saveVpsInstanceTenantSettings(billingYear, billingMonth, instance, newConfig[instanceId])
return taskResult{
Name: instanceId,
Err: err,
Result: nil,
}
}
i++
}
log.Println("saving tenants assignments to threshold servers...")
results := doInParallel(false, actions...)
for _, result := range results {
if result.Err != nil {
return true, result.Err
}
}
freeSubdomains := map[string][]string{}
for instanceId, tenantIds := range workingAllocations {
instance, hasInstance := validVpsInstances[instanceId]
if hasInstance {
for tenantId := range tenantIds {
tenant, hasTenant := tenants[tenantId]
if hasTenant && tenant.Subdomain != "" {
_, hasIps := freeSubdomains[tenant.Subdomain]
if !hasIps {
freeSubdomains[tenant.Subdomain] = []string{instance.IPV4}
} else {
freeSubdomains[tenant.Subdomain] = append(freeSubdomains[tenant.Subdomain], instance.IPV4)
}
}
}
}
}
log.Println("updating tenants free subdomains...")
err = app.Gandi.UpdateFreeSubdomains(freeSubdomains)
if err != nil {
return true, err
}
log.Println("rebalance complete!")
return true, nil
}
func (app *BackendApp) WriteAdminTenantThresholdConfig() error {
tenants, err := app.Model.GetTenants()
if err != nil {
return err
}
for tenantId, tenant := range tenants {
if tenantId == app.AdminTenantId {
clientConfig, err := app.ThresholdProvisioning.GetClientConfig(
app.AdminTenantId, fmt.Sprintf("%s.%s", tenant.Subdomain, app.FreeSubdomainDomain), "greenhouse",
)
if err != nil {
return err
}
clientConfigBytes, err := json.MarshalIndent(clientConfig, "", " ")
if err != nil {
return err
}
err = ioutil.WriteFile("./threshold/config.json", clientConfigBytes, 755)
if err == nil {
log.Println("wrote threshold config!")
}
return err
}
}
return errors.New("admin tenant not found")
}
func (app *BackendApp) ConfigureAdminTenantOnThresholdServer() error {
log.Println("configuring threshold server...")
billingYear, billingMonth, _, _, _ := getBillingTimeInfo()
tenants, err := app.Model.GetTenants()
if err != nil {
return err
}
rows, err := app.Model.GetTenantVPSInstanceRows(billingYear, billingMonth)
if err != nil {
return err
}
vpsInstances, err := app.Model.GetVPSInstances()
if err != nil {
return err
}
actions := []func() taskResult{}
for _, row := range rows {
if row.TenantId == app.AdminTenantId {
vpsInstance, hasVpsInstance := vpsInstances[row.GetVPSInstanceId()]
tenant, hasTenant := tenants[row.TenantId]
if hasVpsInstance && hasTenant {
actions = append(actions, func() taskResult {
url := fmt.Sprintf("https://%s:%d/tunnels", vpsInstance.IPV4, app.ThresholdPort)
jsonBytes, err := json.Marshal([]ThresholdTunnel{
ThresholdTunnel{
ClientId: fmt.Sprintf("%d.%s", app.AdminTenantId, app.AdminThresholdNodeId),
ListenPort: 80,
ListenAddress: "0.0.0.0",
ListenHostnameGlob: fmt.Sprintf("%s.%s", tenant.Subdomain, app.FreeSubdomainDomain),
BackEndService: app.GreenhouseThresholdServiceId,
},
ThresholdTunnel{
ClientId: fmt.Sprintf("%d.%s", app.AdminTenantId, app.AdminThresholdNodeId),
ListenPort: 443,
ListenAddress: "0.0.0.0",
ListenHostnameGlob: fmt.Sprintf("%s.%s", tenant.Subdomain, app.FreeSubdomainDomain),
BackEndService: app.GreenhouseThresholdServiceId,
},
})
log.Println(string(jsonBytes))
if err != nil {
return taskResult{
Name: row.GetVPSInstanceId(),
Err: errors.Wrapf(
err,
"json serialization error calling %s (threshold admin tenant management API)",
url,
),
}
}
_, err = app.ThresholdProvisioning.MyHTTP200(
"PUT",
url,
bytes.NewBuffer(jsonBytes),
func(request *http.Request) { request.Header.Set("Content-Type", "application/json") },
)
if err != nil {
return taskResult{Name: row.GetVPSInstanceId(), Err: err}
}
return taskResult{Name: row.GetVPSInstanceId(), Err: nil}
})
}
}
}
if len(actions) == 0 {
return errors.New("admin tenant threshold server assignment not found")
}
errorStrings := []string{}
results := doInParallel(false, actions...)
for _, result := range results {
if result.Err != nil {
errorStrings = append(errorStrings, fmt.Sprintf("Can't update threshold tunnels for %s: %+v", result.Name, result.Err))
}
}
if len(errorStrings) > 0 {
return errors.Errorf("ConfigureThresholdServer(): \n%s", strings.Join(errorStrings, "\n"))
}
return nil
}
func getInstanceProjectedUsage(
instance *VPSInstance,
workingAllocations *map[string]map[int]bool,
tenants *map[int]*TenantInfo,
desiredInstancesPerTenant int,
amountOfMonthElapsed float64,
) int64 {
usageByActiveTenantsSoFar := int64(0)
instanceAllocations, has := (*workingAllocations)[instance.GetId()]
if has {
for tenantId := range instanceAllocations {
if !(*tenants)[tenantId].Deactivated {
usageByActiveTenantsSoFar += (*tenants)[tenantId].Bytes
}
}
}
soFarFloat := float64(instance.Bytes)
activeTenantsSoFarFloat := float64(usageByActiveTenantsSoFar) / float64(desiredInstancesPerTenant)
totalProjectedUsageFloat := soFarFloat + ((activeTenantsSoFarFloat / amountOfMonthElapsed) - activeTenantsSoFarFloat)
return int64(totalProjectedUsageFloat)
}
func (app *BackendApp) SpawnNewMultitenantInstance() (*VPSInstance, error) {
userData := fmt.Sprintf(
`#!/bin/sh
touch /root/.hushlogin
%s
%s`,
app.DigitalOcean.GetSSHHostKeysFileScript(), app.BackblazeB2.GetFileUploadShellScript(),
)
log.Println("Creating new multitenant worker...")
instance, err := app.DigitalOcean.Create("multitenant-worker", userData)
if err != nil {
return nil, errors.Wrap(err, "failed to DigitalOcean.Create")
}
log.Printf("Waiting for %s to get an IPv4...\n", instance.GetId())
waitedSeconds := 0
for instance.IPV4 == "" && waitedSeconds < 300 {
time.Sleep(time.Second * time.Duration(5))
waitedSeconds += 5
instance, err = app.DigitalOcean.Get(instance.ProviderInstanceId)
if err != nil {
return nil, errors.Wrap(err, "failed to DigitalOcean.Get")
}
}
log.Printf("Waiting for %s (%s) to upload its ssh host public keys to backblaze...\n", instance.GetId(), instance.IPV4)
knownHostsFileContent, err := app.pollForSSHHostKeys(instance.GetId())
if err != nil {
return nil, errors.Wrap(err, "failed to PollBackblazeForSSHHostKeys")
}
log.Printf("adding %s (%s) to the user's known-hosts file...\n", instance.GetId(), instance.IPV4)
err = app.SSH.AppendToKnownHostsFile(knownHostsFileContent)
if err != nil {
return nil, errors.Wrap(err, "failed to AppendToSSHKnownHostsFile")
}
log.Printf("Generating threshold install script for %s (%s)...\n", instance.GetId(), instance.IPV4)
ipv4 := net.ParseIP(instance.IPV4)
ipv6 := net.ParseIP(instance.IPV6)
ips := []net.IP{}
if ipv4 != nil {
ips = append(ips, ipv4)
}
if ipv6 != nil {
ips = append(ips, ipv6)
}
if len(ips) == 0 {
return nil, errors.Errorf("failed to convert instance IPs '%s' or '%s' to net.IP", instance.IPV4, instance.IPV6)
}
provisionThresholdScript, err := app.ThresholdProvisioning.GetServerInstallScript(instance.GetId(), ips)
if err != nil {
return nil, errors.Wrap(err, "failed to ThresholdProvisioning.GetServerInstallScript")
}
// TODO use potentially different username depending on cloud provider
log.Printf("Installing threshold on %s (%s)...\n", instance.GetId(), instance.IPV4)
stdout, stderr, err := app.SSH.RunScriptOnRemoteHost(provisionThresholdScript, "root", instance.IPV4)
if err != nil {
return nil, errors.Wrapf(
err, "failed to RunScriptOnRemoteHost(provisionThresholdScript, \"root\", %s)",
instance.IPV4,
)
}
log.Println("stderr:")
log.Println(stderr)
log.Println("")
log.Println("")
log.Println("stdout:")
log.Println(stdout)
log.Println("")
log.Println("")
log.Printf("Saving %s (%s) info to the database...\n", instance.GetId(), instance.IPV4)
err = app.Model.CreateVPSInstance(instance)
if err != nil {
return nil, errors.Wrap(err, "failed to Model.CreateVPSInstance")
}
log.Printf("DONE! %s (%s) is provisioned!\n\n\n", instance.GetId(), instance.IPV4)
return instance, nil
}
func (app *BackendApp) pollForSSHHostKeys(instanceId string) (string, error) {
// it usually takes at least 15 seconds for an instance to boot and run
time.Sleep(time.Second * time.Duration(15))
iterations := 0
for iterations < 60 {
iterations++
fileContent, err := app.BackblazeB2.Get(fmt.Sprintf("greenhouse/known-hosts/%s", instanceId))
if err != nil {
return "", err
}
if fileContent != nil {
return string(fileContent), nil
}
time.Sleep(time.Second * time.Duration(5))
}
return "", fmt.Errorf("timed out waiting for %s to upload its host ssh public keys", instanceId)
}
func (app *BackendApp) saveVpsInstanceTenantSettings(
billingYear int,
billingMonth int,
instance *VPSInstance,
config map[int]*TunnelSettings,
) error {
url := fmt.Sprintf("https://%s:%d/tenants", instance.IPV4, app.ThresholdManagementPort)
jsonBytes, err := json.Marshal(config)
log.Println(string(jsonBytes))
if err != nil {
return errors.Wrapf(
err,
"json serialization error calling %s (threshold multitenant management API)",
url,
)
}
returnedBytes, err := app.MyHTTP200(
"PUT",
url,
bytes.NewBuffer(jsonBytes),
func(request *http.Request) { request.Header.Set("Content-Type", "application/json") },
)
if err != nil {
return err
}
log.Printf("returnedBytes: \n%s\n", string(returnedBytes))
returnedBytes2, err := app.MyHTTP200("GET", url, nil, nil)
if err != nil {
return err
}
log.Printf("returnedBytes2: \n%s\n", string(returnedBytes2))
err = app.Model.SaveInstanceConfiguration(billingYear, billingMonth, instance, config)
if err != nil {
return errors.Wrapf(err, "Can't save instance configuration to database for %s", instance.GetId())
}
return nil
}
// TODO this fn needs to use app.MyHTTP() too
// func (app *BackendApp) getTenantSettingsFromVpsInstance(url string) (map[string]TunnelSettings, error) {
// httpClient := http.Client{
// Transport: &http.Transport{
// TLSClientConfig: app.ThresholdManagementTlsClientConfig,
// },
// Timeout: 10 * time.Second,
// }
// request, err := http.NewRequest("GET", url, nil)
// response, err := httpClient.Do(request)
// if err != nil {
// return nil, err
// }
// if response.StatusCode != 200 {
// return nil, fmt.Errorf("HTTP %d when calling %s (threshold multitenant management API)", response.StatusCode, url)
// }
// bytes, err := ioutil.ReadAll(response.Body)
// if err != nil {
// return nil, errors.Wrapf(err, "HTTP read error when calling %s (threshold multitenant management API)", url)
// }
// var responseObject map[string]TunnelSettings
// err = json.Unmarshal(bytes, &responseObject)
// if err != nil {
// return nil, errors.Wrapf(err, "JSON parse error when calling %s (threshold multitenant management API)", url)
// }
// return responseObject, nil
// }
func (service *BaseHTTPService) MyHTTP(
method string,
url string,
body io.Reader,
withRequest func(*http.Request),
) (int, []byte, error) {
if service.client == nil || service.clientExpiry != nil && time.Now().After(*(service.clientExpiry)) {
httpClient, httpClientExpiry, err := service.ClientFactory()
if err != nil {
return 0, nil, errors.Wrapf(err, "failed to create HTTP client when calling %s %s", method, url)
}
service.client = httpClient
service.clientExpiry = httpClientExpiry
}
request, err := http.NewRequest(method, url, body)
if err != nil {
return 0, nil, errors.Wrapf(err, "failed to create HTTP request calling %s %s", method, url)
}
if withRequest != nil {
withRequest(request)
}
response, err := service.client.Do(request)
if err != nil {
return 0, nil, errors.Wrapf(err, "HTTP request error when calling %s %s", method, url)
}
bytes, err := ioutil.ReadAll(response.Body)
if err != nil {
return response.StatusCode, nil, errors.Wrapf(err, "HTTP read error when calling %s %s ", method, url)
}
return response.StatusCode, bytes, err
}
func (service *BaseHTTPService) MyHTTP200(
method string,
url string,
body io.Reader,
withRequest func(*http.Request),
) ([]byte, error) {
statusCode, bytes, err := service.MyHTTP(method, url, body, withRequest)
if err == nil && statusCode >= 300 {
err = errors.Errorf("HTTP %d when calling %s %s: \n%s\n\n", statusCode, method, url, string(bytes))
}
return bytes, err
}
func doInParallel(stopOnFirstError bool, actions ...func() taskResult) map[string]taskResult {
resultsChannel := make(chan taskResult)
results := map[string]taskResult{}
//log.Printf("do %d actions in parallel", len(actions))
for _, action := range actions {
// this is how you do closures over the VALUE of a variable (not the variable itself) in golang
// https://stackoverflow.com/questions/26692844/captured-closure-for-loop-variable-in-go
action := action
go (func() {
result := action()
resultsChannel <- result
})()
}
for range actions {
result := <-resultsChannel
results[result.Name] = result
if result.Err != nil && stopOnFirstError {
break
}
}
return results
}
func int64Abs(x int64) int64 {
if x > 0 {
return x
} else {
return -x
}
}
func getRandomPermutation(length int) []int {
toReturn := make([]int, length)
toAppend := make([]int, length)
for i := 0; i < length; i++ {
toAppend[i] = i
}
for i := 0; i < length; i++ {
picked := mathRand.Intn(length - i)
toReturn[i] = toAppend[picked]
toAppend[picked] = toAppend[length-i]
toAppend = toAppend[0 : length-i]
}
return toReturn
}
func getKnapsackSolutionScore(scale int64, proposedDistance int64, proposedLength int) int64 {
return proposedDistance + int64(float64(scale)*knapsackShufflingMinimizationFactor)*int64(proposedLength)
}