🌱🏠☎️ A simple gateway in front of Grafana Loki, for collecting telemetry data for the greenhouse alpha test phase
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

615 lines
21 KiB

package main
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"os/signal"
"regexp"
"runtime/debug"
"strconv"
"strings"
"time"
errors "git.sequentialread.com/forest/pkg-errors"
cbor "github.com/fxamacker/cbor/v2"
leveldb "github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
"golang.org/x/sys/unix"
)
type TelemetryEvent struct {
Submission *LokiSubmission `cbor:"1,keyasint,omitempty"`
AccountByIpTimestamp int64 `cbor:"8,keyasint,omitempty"`
AccountByDaemonTimestamp int64 `cbor:"10,keyasint,omitempty"`
AccountByURLTimestamp int64 `cbor:"12,keyasint,omitempty"`
AccountByUserIdTimestamp int64 `cbor:"14,keyasint,omitempty"`
}
type LokiSubmission struct {
Streams []LokiStream `json:"streams,omitempty",cbor:"1,keyasint,omitempty"`
}
type LokiStream struct {
Stream map[string]string `json:"stream,omitempty",cbor:"1,keyasint,omitempty"`
Values [][]string `json:"values,omitempty",cbor:"2,keyasint,omitempty"`
}
var datesNotValidBefore time.Time
var datesNotValidAfter time.Time
var indexCheckpointTimestampByDelaySeconds []byte
var indexEventsByTimestamp []byte
var indexAccountByIp []byte
var indexAccountByDaemon []byte
var indexAccountByUserID []byte
var indexAccountByURL []byte
const indexPrefixLength = 2
var beginningOfTimeBytes []byte
var endOfTimeBytes []byte
var db *leveldb.DB
var emptyWriteOptions *opt.WriteOptions
var emptyReadOptions *opt.ReadOptions
var delayNamesBySeconds map[int]string
var delayChunkSize time.Duration
var lokiClient *http.Client
var lokiURL = "http://loki:3100"
func main() {
if os.Getenv("LOKI_URL") != "" {
lokiURL = os.Getenv("LOKI_URL")
}
os.MkdirAll("./data/leveldb", 0755)
var err error
db, err = leveldb.OpenFile("./data/leveldb", &opt.Options{})
defer (func() {
err := db.Close()
if err != nil {
panic(err)
}
})()
if err != nil {
panic(err)
}
matchLokiSubmissionJSON := regexp.MustCompile("^[ \\t\\n]*\\{[ \\t\\n]*\"streams\"")
datesNotValidBefore = time.Date(1970, 0, 0, 0, 0, 0, 0, time.UTC)
datesNotValidAfter = time.Date(2100, 0, 0, 0, 0, 0, 0, time.UTC)
indexCheckpointTimestampByDelaySeconds = []byte{0x00, 0x20}
indexEventsByTimestamp = []byte{0x00, 0x30}
indexAccountByIp = []byte{0x00, 0x40}
indexAccountByDaemon = []byte{0x00, 0x44}
indexAccountByURL = []byte{0x00, 0x48}
indexAccountByUserID = []byte{0x00, 0x4b}
beginningOfTimeBytes = []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
endOfTimeBytes = []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
delayNamesBySeconds = map[int]string{
5: "rt",
300: "user",
1800: "30min",
86400: "day",
}
delayChunkSize = time.Second
lokiClient = &http.Client{
Timeout: time.Second * 5,
}
server := http.NewServeMux()
server.HandleFunc("/", func(responseWriter http.ResponseWriter, request *http.Request) {
if request.Method == "POST" {
ip := request.Header.Get("X-Forwarded-For")
var submission LokiSubmission
var timestamp int64
body, err := ioutil.ReadAll(request.Body)
if err != nil {
log.Printf("500 internal server error: http read error: %+v\n", err)
http.Error(responseWriter, "500 internal server error: http read error", http.StatusInternalServerError)
return
}
if request.Header.Get("Content-Type") == "application/json" || matchLokiSubmissionJSON.Match(body) {
err := json.Unmarshal(body, body)
if err != nil {
log.Printf("400 bad request: invalid json: %+v\n", err)
http.Error(responseWriter, "400 bad request: invalid json", http.StatusBadRequest)
return
}
if len(submission.Streams) != 1 {
log.Println("400 bad request: exactly one stream is required")
http.Error(responseWriter, "400 bad request: at least one stream is required", http.StatusBadRequest)
return
}
if len(submission.Streams[0].Stream) == 0 {
log.Println("400 bad request: no labels, at least one label is required")
http.Error(responseWriter, "400 bad request: no labels, at least one label is required", http.StatusBadRequest)
return
}
if len(submission.Streams[0].Values) == 0 {
log.Println("400 bad request: no values, at least one value is required")
http.Error(responseWriter, "400 bad request: no values, at least one value is required", http.StatusBadRequest)
return
}
if len(submission.Streams[0].Values[0]) != 2 {
log.Println("400 bad request: each value must have two strings")
http.Error(responseWriter, "400 bad request: each value must have two strings", http.StatusBadRequest)
return
}
timestamp, err = strconv.ParseInt(submission.Streams[0].Values[0][0], 10, 64)
if err != nil {
log.Println("400 bad request: timestamp was not an integer")
http.Error(responseWriter, "400 bad request: timestamp was not an integer", http.StatusBadRequest)
return
}
timestampTime := time.Unix(0, timestamp).UTC()
if timestampTime.Before(datesNotValidBefore) || timestampTime.After(datesNotValidAfter) {
log.Println("400 bad request: timestamp was out of bounds")
http.Error(responseWriter, "400 bad request: timestamp was out of bounds", http.StatusBadRequest)
return
}
} else {
stream := map[string]string{}
query := request.URL.Query()
noneQuery := true
for k, vs := range query {
if len(vs) > 0 {
stream[k] = vs[0]
noneQuery = false
}
}
if noneQuery {
log.Println("400 bad request: at least one query param is requred")
http.Error(responseWriter, "400 bad request: at least one query param is requred", http.StatusBadRequest)
return
}
timestamp = time.Now().UTC().UnixNano()
submission = LokiSubmission{
Streams: []LokiStream{
{
Stream: stream,
Values: [][]string{{
strconv.FormatInt(timestamp, 10),
strings.TrimSpace(string(body)),
}},
},
},
}
}
originalIp := submission.Streams[0].Stream["ip"]
if originalIp == "auto" {
submission.Streams[0].Stream["ip"] = ip
}
event := TelemetryEvent{
Submission: &submission,
}
err = putEvent(timestamp, &event)
if err != nil {
log.Printf("500 internal server error: putEvent: %+v\n", err)
http.Error(responseWriter, "500 internal server error", http.StatusInternalServerError)
return
}
account, hasAccount := submission.Streams[0].Stream["account"]
daemon, hasDaemon := submission.Streams[0].Stream["daemon"]
userId, hasUserId := submission.Streams[0].Stream["userId"]
url, hasURL := submission.Streams[0].Stream["url"]
if hasAccount {
err = putAccountByStr(indexAccountByIp, timestamp, account, ip)
}
if err == nil && hasDaemon && hasAccount {
err = putAccountByStr(indexAccountByDaemon, timestamp, account, daemon)
}
if err == nil && hasDaemon && hasUserId {
err = putAccountByStr(indexAccountByUserID, timestamp, account, userId)
}
if err == nil && hasDaemon && hasURL {
err = putAccountByStr(indexAccountByURL, timestamp, account, url)
}
if err != nil {
log.Printf("500 internal server error: putAccountByStr: %+v\n", err)
http.Error(responseWriter, "500 internal server error", http.StatusInternalServerError)
return
}
} else {
http.Error(responseWriter, "405 method not allowed, try POST instead", http.StatusMethodNotAllowed)
}
})
// main async process that consumes and annotates events with `account` based on joining on daemon, url, or IP
// different time delays are used to get real time metrics + more robust joins that take longer to resolve
go func() {
debug.SetPanicOnFault(true)
defer func() {
if err := recover(); err != nil {
fmt.Print("\n\n!!!!!\n")
log.Printf("panic: %+v\n", err)
debug.PrintStack()
fmt.Print("\n!!!!!\n\n")
}
}()
for {
for seconds, name := range delayNamesBySeconds {
// retrieve last checkpoint
var err error
var startTimestamp int64
var startTimestampBytes []byte
secondsBytes := make([]byte, 4)
binary.BigEndian.PutUint32(secondsBytes, uint32(seconds))
startTimestampBytes, err = db.Get(append(indexCheckpointTimestampByDelaySeconds, secondsBytes...), emptyReadOptions)
if err == nil {
startTimestamp = int64(binary.BigEndian.Uint64(startTimestampBytes))
} else if err == leveldb.ErrNotFound {
// default: start seconds + delayChunkSize ago
startTimestamp = time.Now().UTC().UnixNano() - (int64(time.Second*time.Duration(seconds)) + int64(delayChunkSize))
startTimestampBytes = make([]byte, 8)
binary.BigEndian.PutUint64(startTimestampBytes, uint64(startTimestamp))
} else {
log.Printf("db.Get indexCheckpointTimestampByDelaySeconds %d failed: %+v\n", seconds, err)
continue
}
endTimestamp := time.Now().UTC().UnixNano() - int64(time.Second*time.Duration(seconds))
endTimestampBytes := make([]byte, 8)
binary.BigEndian.PutUint64(endTimestampBytes, uint64(endTimestamp))
chunkOfThePast := &util.Range{
Start: append(indexEventsByTimestamp, startTimestampBytes...),
Limit: append(indexEventsByTimestamp, endTimestampBytes...),
}
// log.Printf("chunkOfThePastStart: %x, %x (%d), key: %x \n", indexEventsByTimestamp, startTimestampBytes, startTimestamp, chunkOfThePast.Start)
// log.Printf("chunkOfThePastEnd: %x, %x (%d), key: %x \n", indexEventsByTimestamp, endTimestampBytes, endTimestamp, chunkOfThePast.Limit)
iterator := db.NewIterator(chunkOfThePast, emptyReadOptions)
//iterator.Seek(chunkOfThePast.Start)
events := []TelemetryEvent{}
for iterator.Next() {
eventBytes := iterator.Value()
var event TelemetryEvent
//log.Printf("read eventBytes: %x -> %x\n", iterator.Key(), eventBytes)
err := cbor.Unmarshal(eventBytes, &event)
if err != nil {
log.Printf("cbor.Unmarshal eventBytes failed: %+v\n", err)
continue
}
if len(event.Submission.Streams) != 1 {
log.Printf("unmarshalled eventBytes has %d streams", len(event.Submission.Streams))
continue
}
if event.Submission.Streams[0].Stream == nil {
value := "nil"
if len(event.Submission.Streams[0].Values) != 0 {
value = strings.Join(event.Submission.Streams[0].Values[0], ": ")
}
log.Printf("unmarshalled eventBytes first stream has nil map. value: %s", value)
continue
}
if len(event.Submission.Streams[0].Values) == 0 {
log.Printf("unmarshalled eventBytes first stream has no values")
continue
}
events = append(events, event)
}
iterator.Release()
db.Put(append(indexCheckpointTimestampByDelaySeconds, secondsBytes...), endTimestampBytes, emptyWriteOptions)
seconds := seconds
name := name
go func(events []TelemetryEvent, seconds int, name string) {
debug.SetPanicOnFault(true)
defer func() {
if err := recover(); err != nil {
fmt.Print("\n\n!!!!!\n")
log.Printf("panic: %+v\n", err)
debug.PrintStack()
fmt.Print("\n!!!!!\n\n")
}
}()
for _, event := range events {
timestamp, err := strconv.ParseInt(event.Submission.Streams[0].Values[0][0], 10, 64)
if err != nil {
log.Printf("strconv.ParseInt on first timestamp failed: %+v\n", err)
continue
}
_, hasAccount := event.Submission.Streams[0].Stream["account"]
ip, hasIp := event.Submission.Streams[0].Stream["ip"]
userId, hasUserId := event.Submission.Streams[0].Stream["userId"]
daemon, hasDaemon := event.Submission.Streams[0].Stream["daemon"]
url, hasURL := event.Submission.Streams[0].Stream["url"]
//log.Printf("hasAccount: %t, hasIp: %t, hasDaemon: %t, hasURL: %t\n", hasAccount, hasIp, hasDaemon, hasURL)
if !hasAccount && hasUserId {
account, accountByUserIdTimestamp, err := getNearestKnownAccount(
indexAccountByUserID, timestamp, userId, event.AccountByUserIdTimestamp,
)
if err != nil {
log.Printf(
"getNearestKnownAccount(indexAccountByDaemon, %d, %s, %d) failed: %+v\n",
timestamp, userId, event.AccountByUserIdTimestamp, err,
)
continue
}
if account != "" {
//log.Printf("account: %s, from userId: %s\n", account, userId)
event.AccountByUserIdTimestamp = accountByUserIdTimestamp
event.Submission.Streams[0].Stream["account"] = account
}
} else if !hasAccount && hasDaemon {
account, accountByDaemonTimestamp, err := getNearestKnownAccount(
indexAccountByDaemon, timestamp, daemon, event.AccountByDaemonTimestamp,
)
if err != nil {
log.Printf(
"getNearestKnownAccount(indexAccountByDaemon, %d, %s, %d) failed: %+v\n",
timestamp, daemon, event.AccountByDaemonTimestamp, err,
)
continue
}
if account != "" {
//log.Printf("account: %s, from daemon: %s\n", account, daemon)
event.AccountByDaemonTimestamp = accountByDaemonTimestamp
event.Submission.Streams[0].Stream["account"] = account
}
} else if !hasAccount && hasURL {
account, accountByURLTimestamp, err := getNearestKnownAccount(
indexAccountByURL, timestamp, url, event.AccountByURLTimestamp,
)
if err != nil {
log.Printf(
"getNearestKnownAccount(indexAccountByURL, %d, %s, %d) failed: %+v\n",
timestamp, url, event.AccountByURLTimestamp, err,
)
continue
}
if account != "" {
//log.Printf("account: %s, from URL: %s\n", account, url)
event.AccountByURLTimestamp = accountByURLTimestamp
event.Submission.Streams[0].Stream["account"] = account
}
} else if !hasAccount && hasIp {
// the "user" delayed view is the one that gets displayed on greenhouse user profiles
// so we dont want to do join-on-ip on that one.. there is a risk of cross contamination between accounts
// if two users have the same IP (VPN for example)
if name != "user" {
account, accountByIpTimestamp, err := getNearestKnownAccount(
indexAccountByIp, timestamp, ip, event.AccountByIpTimestamp,
)
if err != nil {
log.Printf(
"getNearestKnownAccount(indexAccountByIp, %d, %s, %d) failed: %+v\n",
timestamp, ip, event.AccountByURLTimestamp, err,
)
continue
}
if account != "" {
//log.Printf("account: %s, from ip: %s\n", account, ip)
event.AccountByIpTimestamp = accountByIpTimestamp
event.Submission.Streams[0].Stream["account"] = account
}
}
}
stream := event.Submission.Streams[0]
stream.Stream["delay"] = name
for i := range stream.Values {
userId := stream.Stream["userId"]
if userId == "0" {
userId = ""
}
if userId != "" {
userIdInt, err := strconv.Atoi(stream.Stream["userId"])
if err == nil {
userId = fmt.Sprintf("%04d: ", userIdInt)
}
}
account := fmt.Sprintf("%s%s", userId, stream.Stream["account"])
if len(account) > 30 {
account = account[:30]
}
tyype := stream.Stream["type"]
if len(tyype) > 30 {
tyype = tyype[:30]
}
ip, hasIp := stream.Stream["ip"]
if hasIp {
stream.Stream["my_ip"] = ip
delete(stream.Stream, "ip")
}
stream.Values[i][1] = fmt.Sprintf(
"%-6s %-30s %-15s %-30s %s",
stream.Stream["delay"], account, stream.Stream["my_ip"], tyype, stream.Values[i][1],
)
}
eventJSONBytes, err := json.Marshal(event.Submission)
if err != nil {
log.Printf("json.Marshal(event) failed: %+v\n", err)
continue
}
err = postToLoki(eventJSONBytes)
if err != nil {
log.Printf("postToLoki failed: %+v\n", err)
}
// else {
// log.Printf("postToLoki success: %s\n", string(eventJSONBytes))
// }
}
}(events, seconds, name)
}
time.Sleep(delayChunkSize)
}
}()
go func() {
err = http.ListenAndServe(":8080", server)
panic(err)
}()
sigs := make(chan os.Signal, 10)
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGHUP)
done := make(chan bool, 1)
go func() {
sig := <-sigs
log.Printf("greenhouse-telemetry recieved signal: %s\n", sig)
done <- true
}()
fmt.Println("🌱🏠📠 greenhouse-telemetry is running, listening on :8080")
<-done
fmt.Println("exiting")
}
func postToLoki(bytez []byte) error {
lokiRequest, err := http.NewRequest("POST", fmt.Sprintf("%s/loki/api/v1/push", lokiURL), bytes.NewBuffer(bytez))
if err != nil {
return errors.Wrap(err, "http.NewRequest")
}
lokiRequest.Header.Set("Content-Type", "application/json")
response, err := lokiClient.Do(lokiRequest)
if err != nil {
return errors.Wrap(err, "lokiClient.Do")
}
if response.StatusCode > 299 {
responseString := "<read error>"
errorBytes, err := ioutil.ReadAll(response.Body)
if err == nil {
responseString = string(errorBytes)
}
return errors.Errorf("loki returned HTTP %d: %s", response.StatusCode, responseString)
}
return nil
}
func putEvent(timestamp int64, event *TelemetryEvent) error {
eventBytes, err := cbor.Marshal(event)
if err != nil {
return errors.Wrap(err, "cbor.Marshal")
}
timestampBytes := make([]byte, 8)
binary.BigEndian.PutUint64(timestampBytes, uint64(timestamp))
key := append(indexEventsByTimestamp, timestampBytes...)
//log.Printf("put eventBytes: %x, %x (%d), %x -> %x\n", indexEventsByTimestamp, timestampBytes, timestamp, key, eventBytes)
err = db.Put(key, eventBytes, emptyWriteOptions)
if err != nil {
return errors.Wrap(err, "db.Put")
}
return nil
}
func putAccountByStr(index []byte, timestamp int64, account, str string) error {
timestampBytes := make([]byte, 8)
binary.BigEndian.PutUint64(timestampBytes, uint64(timestamp))
strBytes := []byte(str)
key := append(index, append(strBytes, timestampBytes...)...)
//log.Printf("putAccountByStr: %x, %x, %x, (%x -> %s)\n", index, strBytes, timestampBytes, key, account)
err := db.Put(key, []byte(account), emptyWriteOptions)
if err != nil {
return errors.Wrap(err, "db.Put")
}
return nil
}
func getNearestKnownAccount(index []byte, timestamp int64, value string, nearestKnownTimestamp int64) (string, int64, error) {
timestampBytes := make([]byte, 8)
binary.BigEndian.PutUint64(timestampBytes, uint64(timestamp))
beginningOfValue := append(index, append([]byte(value), beginningOfTimeBytes...)...)
currentTimeOfValue := append(index, append([]byte(value), timestampBytes...)...)
endOfValue := append(index, append([]byte(value), endOfTimeBytes...)...)
// log.Printf("search: beginningOfValue: %x, %x, %x (%x)\n", index, []byte(value), beginningOfTimeBytes, beginningOfValue)
// log.Printf("search: currentTimeOfValue: %x, %x, %x (%x)\n", index, []byte(value), timestampBytes, currentTimeOfValue)
// log.Printf("search: endOfValue: %x, %x, %x (%x)\n", index, []byte(value), endOfTimeBytes, endOfValue)
thePast := &util.Range{
Start: beginningOfValue,
Limit: currentTimeOfValue,
}
theFuture := &util.Range{
Start: currentTimeOfValue,
Limit: endOfValue,
}
smallestDifference := timestamp - nearestKnownTimestamp
if smallestDifference < 0 {
smallestDifference = -smallestDifference
}
valueToReturn := ""
pastIterator := db.NewIterator(thePast, emptyReadOptions)
pastIterator.Seek(currentTimeOfValue)
if pastIterator.Prev() {
key := pastIterator.Key()
// the last 8 bytes of the key contains the timestamp
pastTimestamp := int64(binary.BigEndian.Uint64(key[len(key)-8:]))
newDifference := timestamp - pastTimestamp
if newDifference < 0 {
newDifference = -newDifference
}
//log.Printf("pastIterator found %d, newDifference: %d, smallestDifference: %d, go: %t, v: %s\n", pastTimestamp, newDifference, smallestDifference, newDifference < smallestDifference, string(pastIterator.Value()))
if newDifference < smallestDifference {
nearestKnownTimestamp = pastTimestamp
smallestDifference = newDifference
valueToReturn = string(pastIterator.Value())
}
}
pastIterator.Release()
futureIterator := db.NewIterator(theFuture, emptyReadOptions)
if futureIterator.Seek(currentTimeOfValue) {
key := futureIterator.Key()
// the last 8 bytes of the key contains the timestamp
futureTimestamp := int64(binary.BigEndian.Uint64(key[len(key)-8:]))
newDifference := timestamp - futureTimestamp
if newDifference < 0 {
newDifference = -newDifference
}
//log.Printf("futureIterator found %x %x (%d), newDifference: %d, smallestDifference: %d, go: %t, v: %s\n", futureIterator.Key(), futureIterator.Key()[2:10], futureTimestamp, newDifference, smallestDifference, newDifference < smallestDifference, string(futureIterator.Value()))
if newDifference < smallestDifference {
nearestKnownTimestamp = futureTimestamp
smallestDifference = newDifference
valueToReturn = string(futureIterator.Value())
}
}
futureIterator.Release()
return valueToReturn, nearestKnownTimestamp, nil
}