🌱🏠 instant least-authority port-forwarding (with automatic HTTPS) for anyone, anywhere! We **really** don't want your TLS private keys, you can keep them 😃 https://greenhouse.server.garden/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

172 lines
5.3 KiB

package main
import (
"fmt"
"log"
"runtime/debug"
"strconv"
"time"
errors "git.sequentialread.com/forest/pkg-errors"
)
type ScheduledTasks struct {
Backend *BackendApp
Ingress *IngressService
DB *DBModel
AdminTenantId int
Registry map[string]ScheduledTask
}
type ScheduledTask struct {
Every time.Duration
Action func() error
}
func NewScheduledTasks(ingress *IngressService, backendApp *BackendApp, dbModel *DBModel, adminTenantId int) *ScheduledTasks {
return &ScheduledTasks{
Ingress: ingress,
Backend: backendApp,
DB: dbModel,
AdminTenantId: adminTenantId,
Registry: map[string]ScheduledTask{},
}
}
func (tasks *ScheduledTasks) Initialize() error {
var err error
// Need to update the external domain validation first so that the subsequent reallocate will
// send the correct authorized domains to all of the threshold server instances
err = tasks.Register("validate-external-domains", DomainVerificationPollingInterval, func() error { return tasks.Backend.ValidateExternalDomains() })
if err != nil {
return errors.Wrap(err, "could not register validate-external-domains task: ")
}
// TODO this needs to run only if it hasn't ran recently.. ?
err = tasks.Register("reallocate", time.Hour, func() error { return tasks.Backend.Reallocate(true, true) })
if err != nil {
return errors.Wrap(err, "could not register reallocate task: ")
}
err = tasks.Ingress.StartGreenhouseDaemon()
if err != nil {
return errors.Wrap(err, "could not start greenhouse-daemon: ")
}
// wait for daemon to become responsive
startedWaitingForDaemon := time.Now()
daemonResponded := false
for !daemonResponded && time.Since(startedWaitingForDaemon) < (time.Second*10) {
time.Sleep(time.Millisecond * 500)
_, err = tasks.Ingress.GetGreenhouseDaemonStatus()
if err != nil {
log.Printf("could not reach greenhouse-daemon: %s\n", err)
} else {
daemonResponded = true
}
}
if !daemonResponded {
return errors.Wrap(err, "greenhouse-daemon never responded to requests after 10 seconds: ")
}
// TODO this needs to run only if the config changes...?
// err = tasks.Ingress.ConfigureGreenhouseDaemon()
// if err != nil {
// return errors.Wrap(err, "could not configure the greenhouse-daemon: ")
// }
lastApplyConfigStatusIndex := -1
startedWaitingForDaemon = time.Now()
daemonApplyConfigProcessFinished := false
for !daemonApplyConfigProcessFinished && time.Since(startedWaitingForDaemon) < (time.Second*10) {
time.Sleep(time.Millisecond * 500)
status, err := tasks.Ingress.GetGreenhouseDaemonStatus()
if err != nil {
return errors.New("greenhouse-daemon stopped responding to requests. Did it crash?")
} else if status.NeedsAPIToken {
return errors.New("greenhouse-daemon configuration seems to have failed: I gave it an API token and its still complaining that it needs one")
} else if status.ApplyConfigStatusError != "" {
return errors.New(status.ApplyConfigStatusError)
} else if status.ApplyConfigStatusIndex > lastApplyConfigStatusIndex {
for lastApplyConfigStatusIndex < status.ApplyConfigStatusIndex {
lastApplyConfigStatusIndex++
if lastApplyConfigStatusIndex < len(status.ApplyConfigStatuses) {
log.Println(status.ApplyConfigStatuses[lastApplyConfigStatusIndex])
}
}
if lastApplyConfigStatusIndex >= len(status.ApplyConfigStatuses) {
daemonApplyConfigProcessFinished = true
}
startedWaitingForDaemon = time.Now()
}
}
if !daemonApplyConfigProcessFinished {
return errors.New("greenhouse-daemon never finished applying its config. Timed out after no activity for 10 seconds.")
}
log.Println("🌱🏠 greenhouse has initialized successfully!")
go (func() {
defer (func() {
if r := recover(); r != nil {
go postTelemetry("greenhouse-web", "", "admin", strconv.Itoa(tasks.AdminTenantId), fmt.Sprintf("scheduledTasks panic: %s", r))
fmt.Printf("scheduledTasks panic: %+v\n", r)
debug.PrintStack()
}
})()
for {
time.Sleep(time.Second * 10)
for name, scheduledTask := range tasks.Registry {
name := name
scheduledTask := scheduledTask
needsToBeRun, err := tasks.DB.PollScheduledTask(name, scheduledTask.Every)
if err != nil {
log.Printf("DB.PollScheduledTask('%s') failed: %s\n", name, err)
continue
}
if needsToBeRun {
go (func(name string, scheduledTask ScheduledTask) {
err := scheduledTask.Action()
if err != nil {
go postTelemetry("greenhouse-web", "", "admin", strconv.Itoa(tasks.AdminTenantId), fmt.Sprintf("scheduledTask '%s' failed: %s\n", name, err))
log.Printf("scheduledTask (%s) Action() failed: %+v\n", name, err)
} else {
err := tasks.DB.ScheduledTaskCompleted(name)
if err != nil {
log.Printf("DB.ScheduledTaskCompleted('%s') failed: %s\n", name, err)
}
}
})(name, scheduledTask)
}
}
}
})()
return nil
}
func (tasks *ScheduledTasks) Register(name string, every time.Duration, action func() error) error {
tasks.Registry[name] = ScheduledTask{
Every: every,
Action: action,
}
needsToBeRun, err := tasks.DB.PollScheduledTask(name, every)
if err != nil {
return err
}
if needsToBeRun {
err := action()
if err != nil {
return err
}
err = tasks.DB.ScheduledTaskCompleted(name)
if err != nil {
return err
}
}
return nil
}