🌱🏠 a cloud service to enable your own server (owned by you and running on your computer) to be accessible on the internet in seconds, no credit card required https://greenhouse.server.garden/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

1386 lines
44 KiB

package main
import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"math"
mathRand "math/rand"
"net"
"net/http"
"regexp"
"strconv"
"strings"
"time"
"git.sequentialread.com/forest/greenhouse/pki"
errors "git.sequentialread.com/forest/pkg-errors"
)
type BaseHTTPService struct {
client *http.Client
clientExpiry *time.Time
ClientFactory func() (*http.Client, *time.Time, error)
}
type BackendApp struct {
BaseHTTPService
WorkingDirectory string
EmailService *EmailService
Model *DBModel
DigitalOcean *DigitalOceanService
Gandi *GandiService
BackblazeB2 *BackblazeB2Service
SSH *SSHService
ThresholdProvisioning *ThresholdProvisioningService
ThresholdManagementPort int
ThresholdPort int
AdminTenantId int
AdminThresholdNodeId string
GreenhouseListenPort int
GreenhouseThresholdServiceId string
}
type ThresholdMetrics struct {
InboundByTenant map[string]int64
OutboundByTenant map[string]int64
}
type ThresholdTenantInfo struct {
ThresholdServers []string
ClientStates map[string]ThresholdClientState
Listeners []ThresholdTunnel
AuthorizedDomains []string
PortStart int
PortEnd int
EmailAddress string
}
type ThresholdClientState struct {
CurrentState string
LastState string
}
type VPSInstanceWithTenants struct {
TenantSettings map[string]TunnelSettings
VPSInstance *VPSInstance
}
type stringBytesTuple struct {
id string
bytes int64
}
type vpsInstanceBytesTuple struct {
instance *VPSInstance
bytes int64
}
type tenantMoveTuple struct {
tenantId int
instanceId string
bytes int64
}
type knapsackGuess struct {
moves []*tenantMoveTuple
distance int64
score int64
}
type taskResult struct {
Name string
Err error
Result interface{}
}
const tenantShufflingThreshold = float64(0.2)
const knapsackShufflingMinimizationFactor = float64(0.3)
const knapsackNumberOfGuessesFactor = float64(5)
const shufflingCircuitBreakerLimit = 1000
const GIGABYTE = int64(1000000000)
const TERABYTE = int64(1000000000000)
const tenantPinDuration = 6 * time.Hour
const managementClientCertSubject = "management@greenhouse.server.garden"
const freeSubdomainDomain = "greenhouseusers.com"
var projectedOverageAllowedBeforeSpawningNewInstance int64 = GIGABYTE * 250
var projectedUnderageAllowedBeforeTerminatingInstance int64 = TERABYTE
func initBackend(
workingDirectory string,
config *Config,
pkiService *pki.PKIService,
model *DBModel,
emailService *EmailService,
) *BackendApp {
greenhouseThresholdServiceId := "greenhouse_https"
toReturn := BackendApp{
WorkingDirectory: workingDirectory,
EmailService: emailService,
Model: model,
DigitalOcean: NewDigitalOceanService(config),
Gandi: NewGandiService(config),
BackblazeB2: NewBackblazeB2Service(config),
SSH: NewSSHService(config),
ThresholdProvisioning: NewThresholdProvisioningService(pkiService),
ThresholdPort: config.ThresholdPort,
ThresholdManagementPort: config.ThresholdManagementPort,
AdminTenantId: config.AdminTenantId,
AdminThresholdNodeId: adminThresholdNodeId,
GreenhouseListenPort: config.FrontendPort,
GreenhouseThresholdServiceId: greenhouseThresholdServiceId,
}
// TODO move this BaseHTTPService into the ThresholdProvisioning service?
toReturn.ClientFactory = func() (*http.Client, *time.Time, error) {
caCert, err := pkiService.GetCACertificate(mainCAName)
if err != nil {
return nil, nil, err
}
expiryTime := time.Now().Add(time.Hour * 24)
cert, err := pkiService.GetClientTLSCertificate(managementAPIAuthCAName, managementClientCertSubject, expiryTime)
if err != nil {
return nil, nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AddCert(caCert)
tlsClientConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}
tlsClientConfig.BuildNameToCertificate()
return &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsClientConfig,
},
Timeout: 10 * time.Second,
}, &expiryTime, nil
}
return &toReturn
}
func (app *BackendApp) InitializeTenant(tenantId int, email string) error {
emailSplit := strings.Split(email, "@")
if len(emailSplit) != 2 {
return errors.Errorf("InitializeTenant(): invalid email '%s'", email)
}
emailDomainSplit := strings.Split(emailSplit[1], ".")
if len(emailDomainSplit) < 2 {
return errors.Errorf("InitializeTenant(): invalid email '%s'", email)
}
replaceInvalidCharactersRegex := regexp.MustCompile("[^a-z0-9]")
replaceRepeatedHyphensRegex := regexp.MustCompile("-+")
makeSubdomain := func(input string) string {
toReturn := replaceInvalidCharactersRegex.ReplaceAllString(input, "-")
toReturn = replaceRepeatedHyphensRegex.ReplaceAllString(toReturn, "-")
return strings.Trim(toReturn, "-")
}
subdomain := makeSubdomain(emailSplit[0])
var err error
alreadyTaken := true
i := 0
for alreadyTaken && i < 1000 {
i++
alreadyTaken, err = app.Model.SetFreeSubdomain(tenantId, subdomain)
if err != nil {
return err
}
if i == 1 {
subdomain = makeSubdomain(fmt.Sprintf("%s-%s", emailSplit[0], emailDomainSplit[0]))
} else if i == 2 {
subdomain = makeSubdomain(email)
} else {
subdomain = makeSubdomain(fmt.Sprintf("%s-%d", emailSplit[0], i-2))
}
}
if i >= 1000 {
return errors.Errorf("InitializeTenant(): something went wrong generating a subdomain for '%s'", email)
}
start, end, bucket, err := app.Model.GetNextReservedPorts()
if err != nil {
return errors.Wrapf(err, "InitializeTenant() for '%s':", email)
}
err = app.Model.SetReservedPorts(tenantId, start, end, bucket)
if err != nil {
return errors.Wrapf(err, "InitializeTenant() for '%s':", email)
}
return nil
}
func (app *BackendApp) ConsumeMetrics() error {
vpsInstances, err := app.Model.GetVPSInstances()
if err != nil {
return err
}
billingYear, billingMonth, _, _, _ := getBillingTimeInfo()
actions := make([]func() taskResult, len(vpsInstances))
i := 0
for instanceId, instance := range vpsInstances {
actions[i] = func() taskResult {
// TODO move the metrics to the management API / to the client side in threshold
responseBytes, err := app.MyHTTP200(
"GET",
fmt.Sprintf("https://%s:%d/consumeMetrics", instance.IPV4, app.ThresholdManagementPort),
nil,
nil,
)
result := ThresholdMetrics{}
if err == nil {
err = json.Unmarshal(responseBytes, &result)
}
if err == nil {
err = app.Model.RecordVPSUsage(instance, result, billingYear, billingMonth)
}
return taskResult{
Name: instanceId,
Err: err,
Result: result,
}
}
i++
}
results := doInParallel(false, actions...)
errorStrings := []string{}
bytesByTenant := map[int]int64{}
for _, result := range results {
if result.Err != nil {
errorStrings = append(errorStrings, fmt.Sprintf("error collecting metrics from %s: %+v", result.Name, result.Err))
} else {
instanceBytesByTenant := result.Result.(ThresholdMetrics)
for tenantIdString, bytez := range instanceBytesByTenant.InboundByTenant {
tenantId, err := strconv.Atoi(tenantIdString)
if err != nil {
errorStrings = append(errorStrings, fmt.Sprintf("can't convert tenantId '%s' to integer: %s", tenantIdString, err))
} else {
bytesByTenant[tenantId] += bytez
}
}
for tenantIdString, bytez := range instanceBytesByTenant.OutboundByTenant {
tenantId, err := strconv.Atoi(tenantIdString)
if err != nil {
errorStrings = append(errorStrings, fmt.Sprintf("can't convert tenantId '%s' to integer: %s", tenantIdString, err))
} else {
bytesByTenant[tenantId] += bytez
}
}
}
}
if len(errorStrings) > 0 {
return errors.Errorf("ConsumeMetrics() doInParallel(actions...): %s\n\n", strings.Join(errorStrings, "\n"))
}
err = app.Model.RecordTenantsUsage(bytesByTenant)
if err != nil {
return err
}
return nil
}
func (app *BackendApp) GetTenantInfo(tenantId int) (*ThresholdTenantInfo, error) {
tenant, err := app.Model.GetTenant(tenantId)
if err != nil {
return nil, err
}
vpsInstances, err := app.Model.GetVPSInstances()
if err != nil {
return nil, err
}
billingYear, billingMonth, _, _, _ := getBillingTimeInfo()
tenantVPSInstanceRows, err := app.Model.GetTenantVPSInstanceRows(billingYear, billingMonth)
if err != nil {
return nil, err
}
thresholdServerHostPorts := []string{}
actions := make([]func() taskResult, 0)
for _, row := range tenantVPSInstanceRows {
vpsInstance, hasVpsInstance := vpsInstances[row.GetVPSInstanceId()]
activeOrPinned := (row.Active || row.DeactivatedAt != nil && row.DeactivatedAt.Add(tenantPinDuration).After(time.Now()))
if hasVpsInstance && row.TenantId == tenantId && activeOrPinned {
thresholdServerHostPorts = append(thresholdServerHostPorts, fmt.Sprintf("%s:%d", vpsInstance.IPV4, app.ThresholdManagementPort))
actions = append(actions, func() taskResult {
responseBytes, err := app.MyHTTP200(
"GET",
fmt.Sprintf("https://%s:%d/tenantInfo?tenantId=%d", vpsInstance.IPV4, app.ThresholdManagementPort, tenantId),
nil,
nil,
)
tenantInfo := ThresholdTenantInfo{}
if err == nil {
err = json.Unmarshal(responseBytes, &tenantInfo)
}
return taskResult{
Name: row.GetVPSInstanceId(),
Err: err,
Result: tenantInfo,
}
})
}
}
results := doInParallel(false, actions...)
//log.Printf("%+v, %d", tenant.TunnelSettings.AuthorizedDomains, len(tenant.TunnelSettings.AuthorizedDomains))
errorStrings := []string{}
toReturn := ThresholdTenantInfo{
ThresholdServers: thresholdServerHostPorts,
ClientStates: map[string]ThresholdClientState{},
Listeners: []ThresholdTunnel{},
AuthorizedDomains: tenant.TunnelSettings.AuthorizedDomains,
PortStart: tenant.TunnelSettings.PortStart,
PortEnd: tenant.TunnelSettings.PortEnd,
EmailAddress: tenant.Email,
}
listenersToReturn := map[string]ThresholdTunnel{}
for _, result := range results {
if result.Err != nil {
errorStrings = append(errorStrings, fmt.Sprintf("error getting active nodeIds for tenant %d from %s: %+v", tenantId, result.Name, result.Err))
} else {
tenantInfo := result.Result.(ThresholdTenantInfo)
// deduplicate ClientStates from multiple threshold servers, preferring the connected state
for clientNodeId, state := range tenantInfo.ClientStates {
alreadyExistingState, hasAlreadyExistingState := toReturn.ClientStates[clientNodeId]
if !hasAlreadyExistingState || alreadyExistingState.CurrentState != "ClientConnected" {
toReturn.ClientStates[clientNodeId] = state
}
}
// deduplicate listeners lists from multiple threshold servers via map
for _, listener := range tenantInfo.Listeners {
listenersToReturn[listener.String()] = listener
}
}
}
for _, listener := range listenersToReturn {
toReturn.Listeners = append(toReturn.Listeners, listener)
}
if len(errorStrings) > 0 {
return nil, errors.Errorf("GetTenantInfo() doInParallel(actions...): %s\n\n", strings.Join(errorStrings, "\n"))
}
return &toReturn, nil
}
func (app *BackendApp) GetInstances() (map[string]*VPSInstance, map[string]*VPSInstance, map[string]*VPSInstance, error) {
dbVPSInstances, err := app.Model.GetVPSInstances()
if err != nil {
return nil, nil, nil, err
}
cloudVPSInstances, err := app.DigitalOcean.List()
if err != nil {
return nil, nil, nil, err
}
// jsonBytes, err = json.MarshalIndent(dbVPSInstances, "", " ")
// log.Printf("dbVPSInstances: %s\n\n", string(jsonBytes))
// jsonBytes, err = json.MarshalIndent(digitalOceanVpsInstances, "", " ")
// log.Printf("digitalOceanVpsInstances: %s\n\n", string(jsonBytes))
validVpsInstances := map[string]*VPSInstance{}
onlyCloudInstances := map[string]*VPSInstance{}
onlyDBInstances := map[string]*VPSInstance{}
for k, v := range cloudVPSInstances {
if _, has := dbVPSInstances[k]; !has {
onlyCloudInstances[k] = v
}
}
for k, v := range dbVPSInstances {
if _, has := cloudVPSInstances[k]; !has {
onlyDBInstances[k] = v
} else if !v.Deleted && !v.Deprecated {
validVpsInstances[k] = v
}
}
return validVpsInstances, onlyDBInstances, onlyCloudInstances, nil
}
func (app *BackendApp) HealthcheckInstances(vpsInstances map[string]*VPSInstance) map[string]bool {
if len(vpsInstances) == 0 {
return map[string]bool{}
}
actions := make([]func() taskResult, len(vpsInstances))
i := 0
for instanceId, instance := range vpsInstances {
actions[i] = func() taskResult {
_, err := app.MyHTTP200(
"GET",
fmt.Sprintf("https://%s:%d/ping", instance.IPV4, app.ThresholdManagementPort),
nil,
nil,
)
return taskResult{
Name: instanceId,
Err: err,
Result: nil,
}
}
i++
}
results := doInParallel(false, actions...)
healthStatus := map[string]bool{}
for _, result := range results {
if result.Err != nil {
log.Printf("error pinging %s: %s", result.Name, result.Err)
}
healthStatus[result.Name] = (result.Err == nil)
}
return healthStatus
}
func getBillingTimeInfo() (int, int, time.Time, time.Time, float64) {
billingYear := time.Now().UTC().Year()
billingMonth := int(time.Now().UTC().Month())
nextBillingYear := billingYear
nextBillingMonth := billingMonth + 1
if nextBillingMonth > 12 {
nextBillingMonth = 1
nextBillingYear++
}
utcLocation, err := time.LoadLocation("UTC")
if err != nil {
err = errors.Wrap(err, "can't calculate billing because can't load UTC location. This should have been caught at startup")
log.Printf("%+v\n", err)
panic(err)
}
startOfBillingMonth := time.Date(billingYear, time.Month(billingMonth), 1, 0, 0, 0, 0, utcLocation)
endOfBillingMonth := time.Date(nextBillingYear, time.Month(nextBillingMonth), 0, 23, 59, 59, int(int64(time.Second))-1, utcLocation)
//TODO
monthDuration := float64(int64(endOfBillingMonth.Sub(startOfBillingMonth)))
monthElapsed := float64(int64(endOfBillingMonth.Sub(time.Now().UTC())))
amountOfMonthElapsed := monthElapsed / monthDuration
return billingMonth, billingYear, startOfBillingMonth, endOfBillingMonth, amountOfMonthElapsed
}
func (app *BackendApp) Rebalance() (bool, error) {
billingYear, billingMonth, _, _, amountOfMonthElapsed := getBillingTimeInfo()
//TODO
desiredInstancesPerTenant := 2
tenants, err := app.Model.GetTenants()
if err != nil {
return false, err
}
tenantVpsInstanceRows, err := app.Model.GetTenantVPSInstanceRows(billingYear, billingMonth)
if err != nil {
return false, err
}
validVpsInstances, onlyDBInstances, onlyCloudInstances, err := app.GetInstances()
if err != nil {
return false, err
}
errorStrings := []string{}
for _, v := range onlyDBInstances {
errorStrings = append(errorStrings, fmt.Sprintf("instance %s (%s) is in the database, but not in the provider api", v.GetId(), v.IPV4))
}
for _, v := range onlyCloudInstances {
errorStrings = append(errorStrings, fmt.Sprintf("instance %s (%s) is in the provider api, but not in the database", v.GetId(), v.IPV4))
}
// issue warnings for inconsistencies between DB and provider
if len(errorStrings) > 0 {
return false, errors.Errorf("VPS instances are inconsistent: \n%s\n", strings.Join(errorStrings, "\n"))
}
// aggregate dedicated vps instances count onto tenants
keysToRemove := []string{}
for k, vpsInstance := range validVpsInstances {
if vpsInstance.TenantId != 0 {
keysToRemove = append(keysToRemove, k)
tenants[vpsInstance.TenantId].DedicatedVPSCount += 1
}
}
// TODO do we need to do anything else with these besides cutting them from the "validVpsInstances" ?
for _, k := range keysToRemove {
delete(validVpsInstances, k)
}
// jsonBytes, err := json.MarshalIndent(tenants, "", " ")
// log.Printf("tenants: %s\n\n", string(jsonBytes))
// jsonBytes, err = json.MarshalIndent(tenantVpsInstanceRows, "", " ")
// log.Printf("tenantVpsInstanceRows: %s\n\n", string(jsonBytes))
// jsonBytes, err = json.MarshalIndent(validVpsInstances, "", " ")
// log.Printf("validVpsInstances: %s\n\n", string(jsonBytes))
// aggregate the bandwidth usage from the tenantVPS relation onto both
// note that the total bandwidth on the tenants may be larger
// than the total bandwidth on the vps instances because some vps instances may have been deleted
// if you update the following loop, consider updating the similar one in frontend.go admin handler 😬
tenantAllocationCount := map[int]int{}
shadowConfigs := map[string]map[int]*TunnelSettings{}
workingAllocations := map[string]map[int]bool{}
originalAllocations := map[string]map[int]bool{}
pinned := map[string]map[int]bool{}
for _, row := range tenantVpsInstanceRows {
vpsInstanceId := row.GetVPSInstanceId()
vpsInstance, hasVPSInstance := validVpsInstances[vpsInstanceId]
tenant, hasTenant := tenants[row.TenantId]
if hasVPSInstance {
vpsInstance.Bytes += row.Bytes
}
if hasTenant {
tenant.Bytes += row.Bytes
}
if hasTenant && hasVPSInstance {
_, has := shadowConfigs[vpsInstanceId]
if !has {
shadowConfigs[vpsInstanceId] = map[int]*TunnelSettings{}
workingAllocations[vpsInstanceId] = map[int]bool{}
originalAllocations[vpsInstanceId] = map[int]bool{}
}
shadowConfigs[vpsInstanceId][tenant.Id] = row.ShadowConfig
// if the tenannts allocation to this instance was removed recently (less than tenantPinDuration ago)
// then add the allocation to pinned, keeping the allocation live for a while while DNS updates propagate.
if row.DeactivatedAt != nil && row.DeactivatedAt.Add(tenantPinDuration).After(time.Now()) {
pinned[vpsInstanceId][tenant.Id] = true
}
if row.Active {
workingAllocations[vpsInstanceId][tenant.Id] = true
originalAllocations[vpsInstanceId][tenant.Id] = true
tenantAllocationCount[row.TenantId]++
}
}
}
// Deactiveate tenants who have surpassed thier service limit for this billing month
bytesPerCent := 1000000000
for _, tenant := range tenants {
bandwidthCostCents := int(tenant.Bytes / int64(bytesPerCent))
if bandwidthCostCents >= tenant.ServiceLimitCents {
tenant.Deactivated = true
}
}
// -------------------------------------------------------------------------------
// STEP 1: scale the # of instances up or down depending on total projected usage.
totalUsageSoFar := int64(0)
totalUsageByActiveTenantsSoFar := int64(0)
for _, tenant := range tenants {
totalUsageSoFar += tenant.Bytes
if !tenant.Deactivated {
totalUsageByActiveTenantsSoFar += tenant.Bytes
}
}
soFarFloat := float64(totalUsageSoFar)
activeTenantsSoFarFloat := float64(totalUsageByActiveTenantsSoFar)
totalProjectedUsageFloat := soFarFloat + ((activeTenantsSoFarFloat / amountOfMonthElapsed) - activeTenantsSoFarFloat)
//totalProjectedUsage := int64(totalProjectedUsageFloat)
totalMonthlyAllotment := int64(0)
for _, vpsInstance := range validVpsInstances {
if !vpsInstance.Deprecated && !vpsInstance.Deleted {
// TODO handle BytesMonthly when node is created in the middle of the month / near end of month
amountOfMonthThatThisInstanceWillHaveBeenTurnedOnFor := float64(1)
totalMonthlyAllotment += int64(float64(vpsInstance.BytesMonthly) * amountOfMonthThatThisInstanceWillHaveBeenTurnedOnFor)
}
}
totalMonthlyAllotmentFloat := float64(totalMonthlyAllotment)
overageFloat := totalProjectedUsageFloat - totalMonthlyAllotmentFloat
overflowAmountFloat := overageFloat - float64(projectedOverageAllowedBeforeSpawningNewInstance)
if overflowAmountFloat > 0 || len(validVpsInstances) == 0 {
instancesToCreate := int(math.Ceil(overflowAmountFloat / float64(DEFAULT_INSTANCE_MONTHLY_BYTES)))
if instancesToCreate < 1 && len(validVpsInstances) == 0 {
instancesToCreate = 1
}
log.Printf("spawning %d instances...", instancesToCreate)
tasks := []func() taskResult{}
for i := 0; i < instancesToCreate; i++ {
tasks = append(tasks, func() taskResult {
instance, err := app.SpawnNewMultitenantInstance()
return taskResult{
Err: err,
Result: instance,
Name: strconv.Itoa(i),
}
})
}
results := doInParallel(false, tasks...)
errors := []string{}
for _, result := range results {
if result.Err != nil {
errors = append(errors, fmt.Sprintf("%+v", result.Err))
}
}
if len(errors) > 0 {
return false, fmt.Errorf("SpawnNewMultitenantInstance failed: \n%s\n", strings.Join(errors, "\n"))
}
return false, nil
}
underusageAmount := -overageFloat - float64(projectedUnderageAllowedBeforeTerminatingInstance)
if underusageAmount > 0 && len(validVpsInstances) > 1 {
instancesToDeprecate := int(math.Ceil(underusageAmount / float64(DEFAULT_INSTANCE_MONTHLY_BYTES)))
if instancesToDeprecate > len(validVpsInstances)-1 {
instancesToDeprecate = len(validVpsInstances) - 1
}
log.Printf("deprecating %d instances...", instancesToDeprecate)
return false, nil
}
// STEP 2: shuffle tenants around to balance load.
// if we got to this point that means we didn't spawn or deprecate any instances this time around.
// sanity check the desiredInstancesPerTenant
if desiredInstancesPerTenant > len(validVpsInstances) {
desiredInstancesPerTenant = len(validVpsInstances)
}
// the allocate function will ADD (not remove) the tenant to the <instanceCount> vps instances that don't already
// have that tenant on them & have the lowest projected usage for this month.
allocate := func(tenant *TenantInfo, instanceCount int, workingAllocations *map[string]map[int]bool) {
lowestUsedInstances := []stringBytesTuple{}
for _, instance := range validVpsInstances {
// get total projected usage for each instance
// un-allocate any existing allocations for this tenant.. ?
// find lowest desiredInstancesPerTenant instances & allocate them
instanceAllocations, has := (*workingAllocations)[instance.GetId()]
// if this instance already has this tenant allocated, skip it.
if has && instanceAllocations[tenant.Id] || instance.Deprecated || instance.Deleted {
continue
}
instanceMetadata := stringBytesTuple{
id: instance.GetId(),
bytes: getInstanceProjectedUsage(instance, workingAllocations, &tenants, desiredInstancesPerTenant, amountOfMonthElapsed),
}
if len(lowestUsedInstances) < instanceCount {
lowestUsedInstances = append(lowestUsedInstances, instanceMetadata)
} else {
indexOfMostUsed := -1
for i, instance := range lowestUsedInstances {
if indexOfMostUsed == -1 || instance.bytes > lowestUsedInstances[indexOfMostUsed].bytes {
indexOfMostUsed = i
}
}
if instanceMetadata.bytes < lowestUsedInstances[indexOfMostUsed].bytes {
lowestUsedInstances[indexOfMostUsed] = instanceMetadata
}
}
}
for _, instanceMetadata := range lowestUsedInstances {
if _, has := (*workingAllocations)[instanceMetadata.id]; !has {
(*workingAllocations)[instanceMetadata.id] = map[int]bool{}
}
(*workingAllocations)[instanceMetadata.id][tenant.Id] = true
}
}
// 2.1 allocate tenants who don't have the desired # of active vps allocations yet.
for _, tenant := range tenants {
if !tenant.Deactivated && tenantAllocationCount[tenant.Id] < desiredInstancesPerTenant {
allocate(tenant, desiredInstancesPerTenant-tenantAllocationCount[tenant.Id], &workingAllocations)
}
}
// 2.2 shuffle tenant allocations around so tenants
// are moved from instances with predicted overages to instances with predicted underages
iterations := 0
doneShuffling := false
for !doneShuffling && iterations < shufflingCircuitBreakerLimit && len(validVpsInstances) > 1 {
highestSurplus := vpsInstanceBytesTuple{instance: nil, bytes: 0}
lowestSurplus := vpsInstanceBytesTuple{instance: nil, bytes: 0}
for _, instance := range validVpsInstances {
if !instance.Deprecated && !instance.Deleted {
//TODO
amountOfMonthThatThisInstanceWillHaveBeenTurnedOnFor := float64(1)
// TODO handle BytesMonthly when node is created in the middle of the month / near end of month
totalMonthlyAllotment := int64(float64(instance.BytesMonthly) * amountOfMonthThatThisInstanceWillHaveBeenTurnedOnFor)
surplus := totalMonthlyAllotment - getInstanceProjectedUsage(
instance,
&workingAllocations,
&tenants,
desiredInstancesPerTenant,
amountOfMonthElapsed,
)
if highestSurplus.instance == nil || highestSurplus.bytes < surplus {
highestSurplus = vpsInstanceBytesTuple{instance: instance, bytes: surplus}
}
if lowestSurplus.instance == nil || lowestSurplus.bytes > surplus {
lowestSurplus = vpsInstanceBytesTuple{instance: instance, bytes: surplus}
}
}
}
// if there are no instances which have a predicted overage, exit.
if lowestSurplus.bytes >= 0 {
doneShuffling = true
break
}
// lets say that the most overused instance has a predicted overusage of 100gb
// and the most underused instance has a predicted underusage of 700gb
// the difference between those is 800gb.
// so we want to move 400gb (half the difference) of estimated future traffic
// from the most overused to the most underused.
// if we did that, then the result would be balanced: both predicted to be underused by 300gb
desiredDifference := int64Abs(lowestSurplus.bytes-highestSurplus.bytes) / 2
// only continue shuffling if the difference between the two instances
// is over 20% of thier monthly allowance
// TODO handle BytesMonthly when node is created in the middle of the month / near end of month
averageMonthlyBytes := float64(highestSurplus.instance.BytesMonthly+lowestSurplus.instance.BytesMonthly) * float64(0.5)
if float64(desiredDifference) < averageMonthlyBytes*tenantShufflingThreshold {
doneShuffling = true
break
}
tenantsOfEmptiestInstance := workingAllocations[highestSurplus.instance.GetId()]
tenantsOfFullestInstance := workingAllocations[lowestSurplus.instance.GetId()]
// we are going to create a list of tenantId, instanceId pairs, one for each
// of the tenant allocations on each of the most underused and overused instances
// each list item also includes the net effect on the projected usage difference of the two instances
// assuming that the tenant was moved from one instance to the other
effectsOfMovingTenantToOtherInstance := []tenantMoveTuple{}
for tenantId := range tenantsOfEmptiestInstance {
tenantUsageShare := float64(tenants[tenantId].Bytes / int64(desiredInstancesPerTenant))
futureUsage := (tenantUsageShare / amountOfMonthElapsed) - tenantUsageShare
// only consider moving the tenant if it does not already exist on the destination
if !tenantsOfFullestInstance[tenantId] {
effectsOfMovingTenantToOtherInstance = append(
effectsOfMovingTenantToOtherInstance,
tenantMoveTuple{
tenantId: tenantId,
instanceId: highestSurplus.instance.GetId(),
bytes: int64(futureUsage * 2),
},
)
}
}
for tenantId := range tenantsOfFullestInstance {
tenantUsageShare := float64(tenants[tenantId].Bytes / int64(desiredInstancesPerTenant))
futureUsage := (tenantUsageShare / amountOfMonthElapsed) - tenantUsageShare
// only consider moving the tenant if it does not already exist on the destination
if !tenantsOfEmptiestInstance[tenantId] {
effectsOfMovingTenantToOtherInstance = append(
effectsOfMovingTenantToOtherInstance,
tenantMoveTuple{
tenantId: tenantId,
instanceId: lowestSurplus.instance.GetId(),
bytes: int64(-futureUsage * 2),
},
)
}
}
// we constructed a list of all possible moves we could make to shuffle tenants between the two,
// now we use a heuristic method to find the best combination of moves which
// gets us closest to our desiredDifference.
// (weighted to also minimize the # of moves, based on knapsackShufflingMinimizationFactor)
// This is basically the knapsack problem: https://en.wikipedia.org/wiki/Knapsack_problem
positiveValue := int64(0)
negativeValue := int64(0)
for _, move := range effectsOfMovingTenantToOtherInstance {
if move.bytes > 0 {
positiveValue += move.bytes
} else {
negativeValue += move.bytes
}
}
bestGuessSoFar := knapsackGuess{
moves: []*tenantMoveTuple{},
distance: desiredDifference,
score: getKnapsackSolutionScore(desiredDifference, desiredDifference, 0),
}
numberOfAttempts := int(float64(len(effectsOfMovingTenantToOtherInstance)) * knapsackNumberOfGuessesFactor)
for attempt := 0; attempt < numberOfAttempts; attempt++ {
difference := int64(0)
moves := []*tenantMoveTuple{}
positiveTotal := positiveValue
negativeTotal := negativeValue
permutation := getRandomPermutation(len(effectsOfMovingTenantToOtherInstance))
for i := range effectsOfMovingTenantToOtherInstance {
index := permutation[i]
move := effectsOfMovingTenantToOtherInstance[index]
proposedDifference := difference + move.bytes
proposedDistance := int64Abs(proposedDifference - desiredDifference)
proposedScore := getKnapsackSolutionScore(desiredDifference, proposedDistance, len(moves)+1)
// if moving this tenant would push our current guess "too far" in the positive direction
if proposedDifference > desiredDifference {
// ok, we overshot... would it be possible to "walk it back"?
// or did we overshoot only a little bit & create a good solution?
impossibleToGoBack := proposedDifference+negativeTotal > desiredDifference
if impossibleToGoBack && proposedScore > bestGuessSoFar.score {
continue
}
}
// if moving this tenant would push our current guess "too far" in the negative direction
if proposedDifference < 0 {
impossibleToGoBack := proposedDifference+positiveTotal < 0
if impossibleToGoBack {
continue
}
}
difference = proposedDifference
moves = append(moves, &move)
if move.bytes > 0 {
positiveTotal -= move.bytes
} else {
negativeTotal -= move.bytes
}
if proposedScore < bestGuessSoFar.score {
bestGuessSoFar = knapsackGuess{
moves: moves,
distance: proposedDistance,
score: proposedScore,
}
}
}
}
if len(bestGuessSoFar.moves) == 0 {
doneShuffling = true
} else {
for _, move := range bestGuessSoFar.moves {
if move.instanceId == highestSurplus.instance.GetId() {
delete(workingAllocations[move.instanceId], move.tenantId)
workingAllocations[lowestSurplus.instance.GetId()][move.tenantId] = true
} else {
delete(workingAllocations[move.instanceId], move.tenantId)
workingAllocations[highestSurplus.instance.GetId()][move.tenantId] = true
}
}
}
}
if iterations == shufflingCircuitBreakerLimit {
return false, fmt.Errorf(
`something went wrong shuffling tenants shufflingCircuitBreakerLimit was reached (%d iterations)`,
shufflingCircuitBreakerLimit,
)
}
// Step 3: Now that we have the workingAllocations shuffled to balance the load, we need to apply
// those changes to the instances.
// we store a "shadow config" in the database for the tenants membership on vps.
// Then when we are ready to apply a new configuration, we do a diff against the current "shadow config"
// and only make requests to each instance which has diffs.
existingConfig := map[string]map[int]*TunnelSettings{}
newConfig := map[string]map[int]*TunnelSettings{}
for instanceId, instanceAllocations := range originalAllocations {
for tenantId := range instanceAllocations {
settings, has := shadowConfigs[instanceId][tenantId]
if !has {
settings = &TunnelSettings{
AuthorizedDomains: []string{},
}
}
if _, has = existingConfig[instanceId]; !has {
existingConfig[instanceId] = map[int]*TunnelSettings{}
}
existingConfig[instanceId][tenantId] = settings
}
}
// Add both workingAllocations and pinned to newConfig
for instanceId, instanceAllocations := range workingAllocations {
for tenantId := range instanceAllocations {
if _, has := newConfig[instanceId]; !has {
newConfig[instanceId] = map[int]*TunnelSettings{}
}
if !tenants[tenantId].Deactivated {
// TODO handle other authorized domains
newConfig[instanceId][tenantId] = tenants[tenantId].TunnelSettings
}
}
}
for instanceId, instanceAllocations := range pinned {
for tenantId := range instanceAllocations {
if _, has := newConfig[instanceId]; !has {
newConfig[instanceId] = map[int]*TunnelSettings{}
}
if !tenants[tenantId].Deactivated {
// TODO handle other authorized domains
newConfig[instanceId][tenantId] = tenants[tenantId].TunnelSettings
}
}
}
// go over the new and existing configs and mark any instanceIds which have differences
changedInstanceIds := map[string]bool{}
for instanceId, instanceTenants := range existingConfig {
if _, has := newConfig[instanceId]; !has {
changedInstanceIds[instanceId] = true
continue
}
newInstanceTenants := newConfig[instanceId]
for tenantId, tenantConfig := range instanceTenants {
newTenantConfig, has := newInstanceTenants[tenantId]
if !has || !newTenantConfig.DeepEquals(tenantConfig) {
changedInstanceIds[instanceId] = true
}
}
}
for instanceId, instanceTenants := range newConfig {
if _, has := existingConfig[instanceId]; !has {
changedInstanceIds[instanceId] = true
continue
}
existingInstanceTenants := newConfig[instanceId]
for tenantId, tenantConfig := range instanceTenants {
existingTenantConfig, has := existingInstanceTenants[tenantId]
// TODO remove hardcoded true
if !has || !existingTenantConfig.DeepEquals(tenantConfig) || true {
changedInstanceIds[instanceId] = true
}
}
}
// we send PUT requests to all the instances that have config changes
actions := make([]func() taskResult, len(changedInstanceIds))
i := 0
for instanceId := range changedInstanceIds {
instance := validVpsInstances[instanceId]
actions[i] = func() taskResult {
err := app.saveVpsInstanceTenantSettings(billingYear, billingMonth, instance, newConfig[instanceId])
return taskResult{
Name: instanceId,
Err: err,
Result: nil,
}
}
i++
}
log.Println("saving tenants assignments to threshold servers...")
results := doInParallel(false, actions...)
for _, result := range results {
if result.Err != nil {
return true, result.Err
}
}
freeSubdomains := map[string][]string{}
for instanceId, tenantIds := range workingAllocations {
instance, hasInstance := validVpsInstances[instanceId]
if hasInstance {
for tenantId := range tenantIds {
tenant, hasTenant := tenants[tenantId]
if hasTenant && tenant.Subdomain != "" {
_, hasIps := freeSubdomains[tenant.Subdomain]
if !hasIps {
freeSubdomains[tenant.Subdomain] = []string{instance.IPV4}
} else {
freeSubdomains[tenant.Subdomain] = append(freeSubdomains[tenant.Subdomain], instance.IPV4)
}
}
}
}
}
log.Println("updating tenants free subdomains...")
err = app.Gandi.UpdateFreeSubdomains(freeSubdomains)
if err != nil {
return true, err
}
log.Println("rebalance complete!")
return true, nil
}
func getInstanceProjectedUsage(
instance *VPSInstance,
workingAllocations *map[string]map[int]bool,
tenants *map[int]*TenantInfo,
desiredInstancesPerTenant int,
amountOfMonthElapsed float64,
) int64 {
usageByActiveTenantsSoFar := int64(0)
instanceAllocations, has := (*workingAllocations)[instance.GetId()]
if has {
for tenantId := range instanceAllocations {
if !(*tenants)[tenantId].Deactivated {
usageByActiveTenantsSoFar += (*tenants)[tenantId].Bytes
}
}
}
soFarFloat := float64(instance.Bytes)
activeTenantsSoFarFloat := float64(usageByActiveTenantsSoFar) / float64(desiredInstancesPerTenant)
totalProjectedUsageFloat := soFarFloat + ((activeTenantsSoFarFloat / amountOfMonthElapsed) - activeTenantsSoFarFloat)
return int64(totalProjectedUsageFloat)
}
func (app *BackendApp) SpawnNewMultitenantInstance() (*VPSInstance, error) {
userData := fmt.Sprintf(
`#!/bin/sh
touch /root/.hushlogin
%s
%s`,
app.DigitalOcean.GetSSHHostKeysFileScript(), app.BackblazeB2.GetFileUploadShellScript(),
)
log.Println("Creating new multitenant worker...")
instance, err := app.DigitalOcean.Create("multitenant-worker", userData)
if err != nil {
return nil, errors.Wrap(err, "failed to DigitalOcean.Create")
}
log.Printf("Waiting for %s to get an IPv4...\n", instance.GetId())
waitedSeconds := 0
for instance.IPV4 == "" && waitedSeconds < 300 {
time.Sleep(time.Second * time.Duration(5))
waitedSeconds += 5
instance, err = app.DigitalOcean.Get(instance.ProviderInstanceId)
if err != nil {
return nil, errors.Wrap(err, "failed to DigitalOcean.Get")
}
}
log.Printf("Waiting for %s (%s) to upload its ssh host public keys to backblaze...\n", instance.GetId(), instance.IPV4)
knownHostsFileContent, err := app.pollForSSHHostKeys(instance.GetId())
if err != nil {
return nil, errors.Wrap(err, "failed to PollBackblazeForSSHHostKeys")
}
log.Printf("adding %s (%s) to the user's known-hosts file...\n", instance.GetId(), instance.IPV4)
err = app.SSH.AppendToKnownHostsFile(knownHostsFileContent)
if err != nil {
return nil, errors.Wrap(err, "failed to AppendToSSHKnownHostsFile")
}
log.Printf("Generating threshold install script for %s (%s)...\n", instance.GetId(), instance.IPV4)
ipv4 := net.ParseIP(instance.IPV4)
ipv6 := net.ParseIP(instance.IPV6)
ips := []net.IP{}
if ipv4 != nil {
ips = append(ips, ipv4)
}
if ipv6 != nil {
ips = append(ips, ipv6)
}
if len(ips) == 0 {
return nil, errors.Errorf("failed to convert instance IPs '%s' or '%s' to net.IP", instance.IPV4, instance.IPV6)
}
provisionThresholdScript, err := app.ThresholdProvisioning.GetServerInstallScript(instance.GetId(), ips)
if err != nil {
return nil, errors.Wrap(err, "failed to ThresholdProvisioning.GetServerInstallScript")
}
// TODO use potentially different username depending on cloud provider
log.Printf("Installing threshold on %s (%s)...\n", instance.GetId(), instance.IPV4)
stdout, stderr, err := app.SSH.RunScriptOnRemoteHost(provisionThresholdScript, "root", instance.IPV4)
if err != nil {
return nil, errors.Wrapf(
err, "failed to RunScriptOnRemoteHost(provisionThresholdScript, \"root\", %s)",
instance.IPV4,
)
}
log.Println("stderr:")
log.Println(stderr)
log.Println("")
log.Println("")
log.Println("stdout:")
log.Println(stdout)
log.Println("")
log.Println("")
log.Printf("Saving %s (%s) info to the database...\n", instance.GetId(), instance.IPV4)
err = app.Model.CreateVPSInstance(instance)
if err != nil {
return nil, errors.Wrap(err, "failed to Model.CreateVPSInstance")
}
log.Printf("DONE! %s (%s) is provisioned!\n\n\n", instance.GetId(), instance.IPV4)
return instance, nil
}
func (app *BackendApp) pollForSSHHostKeys(instanceId string) (string, error) {
// it usually takes at least 15 seconds for an instance to boot and run
time.Sleep(time.Second * time.Duration(15))
iterations := 0
for iterations < 60 {
iterations++
fileContent, err := app.BackblazeB2.Get(fmt.Sprintf("greenhouse/known-hosts/%s", instanceId))
if err != nil {
return "", err
}
if fileContent != nil {
return string(fileContent), nil
}
time.Sleep(time.Second * time.Duration(5))
}
return "", fmt.Errorf("timed out waiting for %s to upload its host ssh public keys", instanceId)
}
func (app *BackendApp) saveVpsInstanceTenantSettings(
billingYear int,
billingMonth int,
instance *VPSInstance,
config map[int]*TunnelSettings,
) error {
url := fmt.Sprintf("https://%s:%d/tenants", instance.IPV4, app.ThresholdManagementPort)
jsonBytes, err := json.Marshal(config)
log.Println(string(jsonBytes))
if err != nil {
return errors.Wrapf(
err,
"json serialization error calling %s (threshold multitenant management API)",
url,
)
}
returnedBytes, err := app.MyHTTP200(
"PUT",
url,
bytes.NewBuffer(jsonBytes),
func(request *http.Request) { request.Header.Set("Content-Type", "application/json") },
)
if err != nil {
return err
}
log.Printf("returnedBytes: \n%s\n", string(returnedBytes))
returnedBytes2, err := app.MyHTTP200("GET", url, nil, nil)
if err != nil {
return err
}
log.Printf("returnedBytes2: \n%s\n", string(returnedBytes2))
err = app.Model.SaveInstanceConfiguration(billingYear, billingMonth, instance, config)
if err != nil {
return errors.Wrapf(err, "Can't save instance configuration to database for %s", instance.GetId())
}
return nil
}
// TODO this fn needs to use app.MyHTTP() too
// func (app *BackendApp) getTenantSettingsFromVpsInstance(url string) (map[string]TunnelSettings, error) {
// httpClient := http.Client{
// Transport: &http.Transport{
// TLSClientConfig: app.ThresholdManagementTlsClientConfig,
// },
// Timeout: 10 * time.Second,
// }
// request, err := http.NewRequest("GET", url, nil)
// response, err := httpClient.Do(request)
// if err != nil {
// return nil, err
// }
// if response.StatusCode != 200 {
// return nil, fmt.Errorf("HTTP %d when calling %s (threshold multitenant management API)", response.StatusCode, url)
// }
// bytes, err := ioutil.ReadAll(response.Body)
// if err != nil {
// return nil, errors.Wrapf(err, "HTTP read error when calling %s (threshold multitenant management API)", url)
// }
// var responseObject map[string]TunnelSettings
// err = json.Unmarshal(bytes, &responseObject)
// if err != nil {
// return nil, errors.Wrapf(err, "JSON parse error when calling %s (threshold multitenant management API)", url)
// }
// return responseObject, nil
// }
func CreateUnixTransport(socketFile string) *http.Transport {
return &http.Transport{
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", socketFile)
},
}
}
func (service *BaseHTTPService) MyHTTP(
method string,
url string,
body io.Reader,
withRequest func(*http.Request),
) (int, []byte, error) {
if service.client == nil || service.clientExpiry != nil && time.Now().After(*(service.clientExpiry)) {
httpClient, httpClientExpiry, err := service.ClientFactory()
if err != nil {
return 0, nil, errors.Wrapf(err, "failed to create HTTP client when calling %s %s", method, url)
}
service.client = httpClient
service.clientExpiry = httpClientExpiry
}
request, err := http.NewRequest(method, url, body)
if err != nil {
return 0, nil, errors.Wrapf(err, "failed to create HTTP request calling %s %s", method, url)
}
if withRequest != nil {
withRequest(request)
}
response, err := service.client.Do(request)
if err != nil {
return 0, nil, errors.Wrapf(err, "HTTP request error when calling %s %s", method, url)
}
bytes, err := ioutil.ReadAll(response.Body)
if err != nil {
return response.StatusCode, nil, errors.Wrapf(err, "HTTP read error when calling %s %s ", method, url)
}
return response.StatusCode, bytes, err
}
func (service *BaseHTTPService) MyHTTP200(
method string,
url string,
body io.Reader,
withRequest func(*http.Request),
) ([]byte, error) {
statusCode, bytes, err := service.MyHTTP(method, url, body, withRequest)
if err == nil && statusCode >= 300 {
err = errors.Errorf("HTTP %d when calling %s %s: \n%s\n\n", statusCode, method, url, string(bytes))
}
return bytes, err
}
func doInParallel(stopOnFirstError bool, actions ...func() taskResult) map[string]taskResult {
resultsChannel := make(chan taskResult)
results := map[string]taskResult{}
//log.Printf("do %d actions in parallel", len(actions))
for _, action := range actions {
// this is how you do closures over the VALUE of a variable (not the variable itself) in golang
// https://stackoverflow.com/questions/26692844/captured-closure-for-loop-variable-in-go
action := action
go (func() {
result := action()
resultsChannel <- result
})()
}
for range actions {
result := <-resultsChannel
results[result.Name] = result
if result.Err != nil && stopOnFirstError {
break
}
}
return results
}
func int64Abs(x int64) int64 {
if x > 0 {
return x
} else {
return -x
}
}
func getRandomPermutation(length int) []int {
toReturn := make([]int, length)
toAppend := make([]int, length)
for i := 0; i < length; i++ {
toAppend[i] = i
}
for i := 0; i < length; i++ {
picked := mathRand.Intn(length - i)
toReturn[i] = toAppend[picked]
toAppend[picked] = toAppend[length-i]
toAppend = toAppend[0 : length-i]
}
return toReturn
}
func getKnapsackSolutionScore(scale int64, proposedDistance int64, proposedLength int) int64 {
return proposedDistance + int64(float64(scale)*knapsackShufflingMinimizationFactor)*int64(proposedLength)
}