|
|
|
package main
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"encoding/binary"
|
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
|
|
|
"io/ioutil"
|
|
|
|
"log"
|
|
|
|
"net/http"
|
|
|
|
"os"
|
|
|
|
"os/signal"
|
|
|
|
"regexp"
|
|
|
|
"runtime/debug"
|
|
|
|
"strconv"
|
|
|
|
"strings"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
errors "git.sequentialread.com/forest/pkg-errors"
|
|
|
|
cbor "github.com/fxamacker/cbor/v2"
|
|
|
|
leveldb "github.com/syndtr/goleveldb/leveldb"
|
|
|
|
"github.com/syndtr/goleveldb/leveldb/opt"
|
|
|
|
"github.com/syndtr/goleveldb/leveldb/util"
|
|
|
|
"golang.org/x/sys/unix"
|
|
|
|
)
|
|
|
|
|
|
|
|
type TelemetryEvent struct {
|
|
|
|
Submission *LokiSubmission `cbor:"1,keyasint,omitempty"`
|
|
|
|
AccountByIpTimestamp int64 `cbor:"8,keyasint,omitempty"`
|
|
|
|
AccountByDaemonTimestamp int64 `cbor:"10,keyasint,omitempty"`
|
|
|
|
AccountByURLTimestamp int64 `cbor:"12,keyasint,omitempty"`
|
|
|
|
AccountByUserIdTimestamp int64 `cbor:"14,keyasint,omitempty"`
|
|
|
|
}
|
|
|
|
|
|
|
|
type LokiSubmission struct {
|
|
|
|
Streams []LokiStream `json:"streams,omitempty",cbor:"1,keyasint,omitempty"`
|
|
|
|
}
|
|
|
|
|
|
|
|
type LokiStream struct {
|
|
|
|
Stream map[string]string `json:"stream,omitempty",cbor:"1,keyasint,omitempty"`
|
|
|
|
Values [][]string `json:"values,omitempty",cbor:"2,keyasint,omitempty"`
|
|
|
|
}
|
|
|
|
|
|
|
|
var datesNotValidBefore time.Time
|
|
|
|
var datesNotValidAfter time.Time
|
|
|
|
|
|
|
|
var indexCheckpointTimestampByDelaySeconds []byte
|
|
|
|
var indexEventsByTimestamp []byte
|
|
|
|
var indexAccountByIp []byte
|
|
|
|
var indexAccountByDaemon []byte
|
|
|
|
var indexAccountByUserID []byte
|
|
|
|
var indexAccountByURL []byte
|
|
|
|
|
|
|
|
const indexPrefixLength = 2
|
|
|
|
|
|
|
|
var beginningOfTimeBytes []byte
|
|
|
|
var endOfTimeBytes []byte
|
|
|
|
|
|
|
|
var db *leveldb.DB
|
|
|
|
var emptyWriteOptions *opt.WriteOptions
|
|
|
|
var emptyReadOptions *opt.ReadOptions
|
|
|
|
|
|
|
|
var delayNamesBySeconds map[int]string
|
|
|
|
var delayChunkSize time.Duration
|
|
|
|
var lokiClient *http.Client
|
|
|
|
var lokiURL = "http://loki:3100"
|
|
|
|
|
|
|
|
func main() {
|
|
|
|
|
|
|
|
if os.Getenv("LOKI_URL") != "" {
|
|
|
|
lokiURL = os.Getenv("LOKI_URL")
|
|
|
|
}
|
|
|
|
|
|
|
|
os.MkdirAll("./data/leveldb", 0755)
|
|
|
|
|
|
|
|
var err error
|
|
|
|
db, err = leveldb.OpenFile("./data/leveldb", &opt.Options{})
|
|
|
|
defer (func() {
|
|
|
|
err := db.Close()
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
})()
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
matchLokiSubmissionJSON := regexp.MustCompile("^[ \\t\\n]*\\{[ \\t\\n]*\"streams\"")
|
|
|
|
datesNotValidBefore = time.Date(1970, 0, 0, 0, 0, 0, 0, time.UTC)
|
|
|
|
datesNotValidAfter = time.Date(2100, 0, 0, 0, 0, 0, 0, time.UTC)
|
|
|
|
indexCheckpointTimestampByDelaySeconds = []byte{0x00, 0x20}
|
|
|
|
indexEventsByTimestamp = []byte{0x00, 0x30}
|
|
|
|
indexAccountByIp = []byte{0x00, 0x40}
|
|
|
|
indexAccountByDaemon = []byte{0x00, 0x44}
|
|
|
|
indexAccountByURL = []byte{0x00, 0x48}
|
|
|
|
indexAccountByUserID = []byte{0x00, 0x4b}
|
|
|
|
beginningOfTimeBytes = []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
|
|
|
|
endOfTimeBytes = []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
|
|
|
|
|
|
|
|
delayNamesBySeconds = map[int]string{
|
|
|
|
5: "rt",
|
|
|
|
300: "user",
|
|
|
|
1800: "30min",
|
|
|
|
86400: "day",
|
|
|
|
}
|
|
|
|
delayChunkSize = time.Second
|
|
|
|
lokiClient = &http.Client{
|
|
|
|
Timeout: time.Second * 5,
|
|
|
|
}
|
|
|
|
|
|
|
|
server := http.NewServeMux()
|
|
|
|
server.HandleFunc("/", func(responseWriter http.ResponseWriter, request *http.Request) {
|
|
|
|
if request.Method == "POST" {
|
|
|
|
ip := request.Header.Get("X-Forwarded-For")
|
|
|
|
|
|
|
|
var submission LokiSubmission
|
|
|
|
var timestamp int64
|
|
|
|
body, err := ioutil.ReadAll(request.Body)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("500 internal server error: http read error: %+v\n", err)
|
|
|
|
http.Error(responseWriter, "500 internal server error: http read error", http.StatusInternalServerError)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
if request.Header.Get("Content-Type") == "application/json" || matchLokiSubmissionJSON.Match(body) {
|
|
|
|
err := json.Unmarshal(body, body)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("400 bad request: invalid json: %+v\n", err)
|
|
|
|
http.Error(responseWriter, "400 bad request: invalid json", http.StatusBadRequest)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if len(submission.Streams) != 1 {
|
|
|
|
log.Println("400 bad request: exactly one stream is required")
|
|
|
|
http.Error(responseWriter, "400 bad request: at least one stream is required", http.StatusBadRequest)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if len(submission.Streams[0].Stream) == 0 {
|
|
|
|
log.Println("400 bad request: no labels, at least one label is required")
|
|
|
|
http.Error(responseWriter, "400 bad request: no labels, at least one label is required", http.StatusBadRequest)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if len(submission.Streams[0].Values) == 0 {
|
|
|
|
log.Println("400 bad request: no values, at least one value is required")
|
|
|
|
http.Error(responseWriter, "400 bad request: no values, at least one value is required", http.StatusBadRequest)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if len(submission.Streams[0].Values[0]) != 2 {
|
|
|
|
log.Println("400 bad request: each value must have two strings")
|
|
|
|
http.Error(responseWriter, "400 bad request: each value must have two strings", http.StatusBadRequest)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
timestamp, err = strconv.ParseInt(submission.Streams[0].Values[0][0], 10, 64)
|
|
|
|
if err != nil {
|
|
|
|
log.Println("400 bad request: timestamp was not an integer")
|
|
|
|
http.Error(responseWriter, "400 bad request: timestamp was not an integer", http.StatusBadRequest)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
timestampTime := time.Unix(0, timestamp).UTC()
|
|
|
|
if timestampTime.Before(datesNotValidBefore) || timestampTime.After(datesNotValidAfter) {
|
|
|
|
log.Println("400 bad request: timestamp was out of bounds")
|
|
|
|
http.Error(responseWriter, "400 bad request: timestamp was out of bounds", http.StatusBadRequest)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
stream := map[string]string{}
|
|
|
|
query := request.URL.Query()
|
|
|
|
noneQuery := true
|
|
|
|
for k, vs := range query {
|
|
|
|
if len(vs) > 0 {
|
|
|
|
stream[k] = vs[0]
|
|
|
|
noneQuery = false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if noneQuery {
|
|
|
|
log.Println("400 bad request: at least one query param is requred")
|
|
|
|
http.Error(responseWriter, "400 bad request: at least one query param is requred", http.StatusBadRequest)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
timestamp = time.Now().UTC().UnixNano()
|
|
|
|
submission = LokiSubmission{
|
|
|
|
Streams: []LokiStream{
|
|
|
|
{
|
|
|
|
Stream: stream,
|
|
|
|
Values: [][]string{{
|
|
|
|
strconv.FormatInt(timestamp, 10),
|
|
|
|
strings.TrimSpace(string(body)),
|
|
|
|
}},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
originalIp := submission.Streams[0].Stream["ip"]
|
|
|
|
if originalIp == "auto" {
|
|
|
|
submission.Streams[0].Stream["ip"] = ip
|
|
|
|
}
|
|
|
|
event := TelemetryEvent{
|
|
|
|
Submission: &submission,
|
|
|
|
}
|
|
|
|
|
|
|
|
err = putEvent(timestamp, &event)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("500 internal server error: putEvent: %+v\n", err)
|
|
|
|
http.Error(responseWriter, "500 internal server error", http.StatusInternalServerError)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
account, hasAccount := submission.Streams[0].Stream["account"]
|
|
|
|
daemon, hasDaemon := submission.Streams[0].Stream["daemon"]
|
|
|
|
userId, hasUserId := submission.Streams[0].Stream["userId"]
|
|
|
|
url, hasURL := submission.Streams[0].Stream["url"]
|
|
|
|
if hasAccount {
|
|
|
|
err = putAccountByStr(indexAccountByIp, timestamp, account, ip)
|
|
|
|
}
|
|
|
|
if err == nil && hasDaemon && hasAccount {
|
|
|
|
err = putAccountByStr(indexAccountByDaemon, timestamp, account, daemon)
|
|
|
|
}
|
|
|
|
if err == nil && hasDaemon && hasUserId {
|
|
|
|
err = putAccountByStr(indexAccountByUserID, timestamp, account, userId)
|
|
|
|
}
|
|
|
|
if err == nil && hasDaemon && hasURL {
|
|
|
|
err = putAccountByStr(indexAccountByURL, timestamp, account, url)
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("500 internal server error: putAccountByStr: %+v\n", err)
|
|
|
|
http.Error(responseWriter, "500 internal server error", http.StatusInternalServerError)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
} else {
|
|
|
|
http.Error(responseWriter, "405 method not allowed, try POST instead", http.StatusMethodNotAllowed)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
|
|
|
|
// main async process that consumes and annotates events with `account` based on joining on daemon, url, or IP
|
|
|
|
// different time delays are used to get real time metrics + more robust joins that take longer to resolve
|
|
|
|
go func() {
|
|
|
|
debug.SetPanicOnFault(true)
|
|
|
|
defer func() {
|
|
|
|
if err := recover(); err != nil {
|
|
|
|
fmt.Print("\n\n!!!!!\n")
|
|
|
|
log.Printf("panic: %+v\n", err)
|
|
|
|
debug.PrintStack()
|
|
|
|
fmt.Print("\n!!!!!\n\n")
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
for {
|
|
|
|
for seconds, name := range delayNamesBySeconds {
|
|
|
|
|
|
|
|
// retrieve last checkpoint
|
|
|
|
var err error
|
|
|
|
var startTimestamp int64
|
|
|
|
var startTimestampBytes []byte
|
|
|
|
|
|
|
|
secondsBytes := make([]byte, 4)
|
|
|
|
binary.BigEndian.PutUint32(secondsBytes, uint32(seconds))
|
|
|
|
startTimestampBytes, err = db.Get(append(indexCheckpointTimestampByDelaySeconds, secondsBytes...), emptyReadOptions)
|
|
|
|
if err == nil {
|
|
|
|
startTimestamp = int64(binary.BigEndian.Uint64(startTimestampBytes))
|
|
|
|
} else if err == leveldb.ErrNotFound {
|
|
|
|
// default: start seconds + delayChunkSize ago
|
|
|
|
startTimestamp = time.Now().UTC().UnixNano() - (int64(time.Second*time.Duration(seconds)) + int64(delayChunkSize))
|
|
|
|
startTimestampBytes = make([]byte, 8)
|
|
|
|
binary.BigEndian.PutUint64(startTimestampBytes, uint64(startTimestamp))
|
|
|
|
} else {
|
|
|
|
log.Printf("db.Get indexCheckpointTimestampByDelaySeconds %d failed: %+v\n", seconds, err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
endTimestamp := time.Now().UTC().UnixNano() - int64(time.Second*time.Duration(seconds))
|
|
|
|
endTimestampBytes := make([]byte, 8)
|
|
|
|
binary.BigEndian.PutUint64(endTimestampBytes, uint64(endTimestamp))
|
|
|
|
|
|
|
|
chunkOfThePast := &util.Range{
|
|
|
|
Start: append(indexEventsByTimestamp, startTimestampBytes...),
|
|
|
|
Limit: append(indexEventsByTimestamp, endTimestampBytes...),
|
|
|
|
}
|
|
|
|
|
|
|
|
// log.Printf("chunkOfThePastStart: %x, %x (%d), key: %x \n", indexEventsByTimestamp, startTimestampBytes, startTimestamp, chunkOfThePast.Start)
|
|
|
|
// log.Printf("chunkOfThePastEnd: %x, %x (%d), key: %x \n", indexEventsByTimestamp, endTimestampBytes, endTimestamp, chunkOfThePast.Limit)
|
|
|
|
|
|
|
|
iterator := db.NewIterator(chunkOfThePast, emptyReadOptions)
|
|
|
|
//iterator.Seek(chunkOfThePast.Start)
|
|
|
|
|
|
|
|
events := []TelemetryEvent{}
|
|
|
|
for iterator.Next() {
|
|
|
|
eventBytes := iterator.Value()
|
|
|
|
var event TelemetryEvent
|
|
|
|
|
|
|
|
//log.Printf("read eventBytes: %x -> %x\n", iterator.Key(), eventBytes)
|
|
|
|
err := cbor.Unmarshal(eventBytes, &event)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("cbor.Unmarshal eventBytes failed: %+v\n", err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if len(event.Submission.Streams) != 1 {
|
|
|
|
log.Printf("unmarshalled eventBytes has %d streams", len(event.Submission.Streams))
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if event.Submission.Streams[0].Stream == nil {
|
|
|
|
value := "nil"
|
|
|
|
if len(event.Submission.Streams[0].Values) != 0 {
|
|
|
|
value = strings.Join(event.Submission.Streams[0].Values[0], ": ")
|
|
|
|
}
|
|
|
|
log.Printf("unmarshalled eventBytes first stream has nil map. value: %s", value)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if len(event.Submission.Streams[0].Values) == 0 {
|
|
|
|
log.Printf("unmarshalled eventBytes first stream has no values")
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
events = append(events, event)
|
|
|
|
}
|
|
|
|
iterator.Release()
|
|
|
|
|
|
|
|
db.Put(append(indexCheckpointTimestampByDelaySeconds, secondsBytes...), endTimestampBytes, emptyWriteOptions)
|
|
|
|
|
|
|
|
seconds := seconds
|
|
|
|
name := name
|
|
|
|
|
|
|
|
go func(events []TelemetryEvent, seconds int, name string) {
|
|
|
|
debug.SetPanicOnFault(true)
|
|
|
|
defer func() {
|
|
|
|
if err := recover(); err != nil {
|
|
|
|
fmt.Print("\n\n!!!!!\n")
|
|
|
|
log.Printf("panic: %+v\n", err)
|
|
|
|
debug.PrintStack()
|
|
|
|
fmt.Print("\n!!!!!\n\n")
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
for _, event := range events {
|
|
|
|
|
|
|
|
timestamp, err := strconv.ParseInt(event.Submission.Streams[0].Values[0][0], 10, 64)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("strconv.ParseInt on first timestamp failed: %+v\n", err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
_, hasAccount := event.Submission.Streams[0].Stream["account"]
|
|
|
|
ip, hasIp := event.Submission.Streams[0].Stream["ip"]
|
|
|
|
userId, hasUserId := event.Submission.Streams[0].Stream["userId"]
|
|
|
|
daemon, hasDaemon := event.Submission.Streams[0].Stream["daemon"]
|
|
|
|
url, hasURL := event.Submission.Streams[0].Stream["url"]
|
|
|
|
|
|
|
|
//log.Printf("hasAccount: %t, hasIp: %t, hasDaemon: %t, hasURL: %t\n", hasAccount, hasIp, hasDaemon, hasURL)
|
|
|
|
|
|
|
|
if !hasAccount && hasUserId {
|
|
|
|
account, accountByUserIdTimestamp, err := getNearestKnownAccount(
|
|
|
|
indexAccountByUserID, timestamp, userId, event.AccountByUserIdTimestamp,
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf(
|
|
|
|
"getNearestKnownAccount(indexAccountByDaemon, %d, %s, %d) failed: %+v\n",
|
|
|
|
timestamp, userId, event.AccountByUserIdTimestamp, err,
|
|
|
|
)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if account != "" {
|
|
|
|
//log.Printf("account: %s, from userId: %s\n", account, userId)
|
|
|
|
event.AccountByUserIdTimestamp = accountByUserIdTimestamp
|
|
|
|
event.Submission.Streams[0].Stream["account"] = account
|
|
|
|
}
|
|
|
|
} else if !hasAccount && hasDaemon {
|
|
|
|
account, accountByDaemonTimestamp, err := getNearestKnownAccount(
|
|
|
|
indexAccountByDaemon, timestamp, daemon, event.AccountByDaemonTimestamp,
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf(
|
|
|
|
"getNearestKnownAccount(indexAccountByDaemon, %d, %s, %d) failed: %+v\n",
|
|
|
|
timestamp, daemon, event.AccountByDaemonTimestamp, err,
|
|
|
|
)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if account != "" {
|
|
|
|
//log.Printf("account: %s, from daemon: %s\n", account, daemon)
|
|
|
|
event.AccountByDaemonTimestamp = accountByDaemonTimestamp
|
|
|
|
event.Submission.Streams[0].Stream["account"] = account
|
|
|
|
}
|
|
|
|
} else if !hasAccount && hasURL {
|
|
|
|
account, accountByURLTimestamp, err := getNearestKnownAccount(
|
|
|
|
indexAccountByURL, timestamp, url, event.AccountByURLTimestamp,
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf(
|
|
|
|
"getNearestKnownAccount(indexAccountByURL, %d, %s, %d) failed: %+v\n",
|
|
|
|
timestamp, url, event.AccountByURLTimestamp, err,
|
|
|
|
)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if account != "" {
|
|
|
|
//log.Printf("account: %s, from URL: %s\n", account, url)
|
|
|
|
event.AccountByURLTimestamp = accountByURLTimestamp
|
|
|
|
event.Submission.Streams[0].Stream["account"] = account
|
|
|
|
}
|
|
|
|
} else if !hasAccount && hasIp {
|
|
|
|
|
|
|
|
// the "user" delayed view is the one that gets displayed on greenhouse user profiles
|
|
|
|
// so we dont want to do join-on-ip on that one.. there is a risk of cross contamination between accounts
|
|
|
|
// if two users have the same IP (VPN for example)
|
|
|
|
if name != "user" {
|
|
|
|
account, accountByIpTimestamp, err := getNearestKnownAccount(
|
|
|
|
indexAccountByIp, timestamp, ip, event.AccountByIpTimestamp,
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf(
|
|
|
|
"getNearestKnownAccount(indexAccountByIp, %d, %s, %d) failed: %+v\n",
|
|
|
|
timestamp, ip, event.AccountByURLTimestamp, err,
|
|
|
|
)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if account != "" {
|
|
|
|
//log.Printf("account: %s, from ip: %s\n", account, ip)
|
|
|
|
event.AccountByIpTimestamp = accountByIpTimestamp
|
|
|
|
event.Submission.Streams[0].Stream["account"] = account
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
stream := event.Submission.Streams[0]
|
|
|
|
stream.Stream["delay"] = name
|
|
|
|
for i := range stream.Values {
|
|
|
|
userId := stream.Stream["userId"]
|
|
|
|
if userId == "0" {
|
|
|
|
userId = ""
|
|
|
|
}
|
|
|
|
if userId != "" {
|
|
|
|
userIdInt, err := strconv.Atoi(stream.Stream["userId"])
|
|
|
|
if err == nil {
|
|
|
|
userId = fmt.Sprintf("%04d: ", userIdInt)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
account := fmt.Sprintf("%s%s", userId, stream.Stream["account"])
|
|
|
|
if len(account) > 30 {
|
|
|
|
account = account[:30]
|
|
|
|
}
|
|
|
|
tyype := stream.Stream["type"]
|
|
|
|
if len(tyype) > 30 {
|
|
|
|
tyype = tyype[:30]
|
|
|
|
}
|
|
|
|
|
|
|
|
ip, hasIp := stream.Stream["ip"]
|
|
|
|
if hasIp {
|
|
|
|
stream.Stream["my_ip"] = ip
|
|
|
|
delete(stream.Stream, "ip")
|
|
|
|
}
|
|
|
|
|
|
|
|
stream.Values[i][1] = fmt.Sprintf(
|
|
|
|
"%-6s %-30s %-15s %-30s %s",
|
|
|
|
stream.Stream["delay"], account, stream.Stream["my_ip"], tyype, stream.Values[i][1],
|
|
|
|
)
|
|
|
|
}
|
|
|
|
eventJSONBytes, err := json.Marshal(event.Submission)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("json.Marshal(event) failed: %+v\n", err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
err = postToLoki(eventJSONBytes)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("postToLoki failed: %+v\n", err)
|
|
|
|
}
|
|
|
|
// else {
|
|
|
|
// log.Printf("postToLoki success: %s\n", string(eventJSONBytes))
|
|
|
|
// }
|
|
|
|
}
|
|
|
|
}(events, seconds, name)
|
|
|
|
|
|
|
|
}
|
|
|
|
time.Sleep(delayChunkSize)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
err = http.ListenAndServe(":8080", server)
|
|
|
|
panic(err)
|
|
|
|
}()
|
|
|
|
|
|
|
|
sigs := make(chan os.Signal, 10)
|
|
|
|
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGHUP)
|
|
|
|
done := make(chan bool, 1)
|
|
|
|
go func() {
|
|
|
|
sig := <-sigs
|
|
|
|
log.Printf("greenhouse-telemetry recieved signal: %s\n", sig)
|
|
|
|
done <- true
|
|
|
|
}()
|
|
|
|
|
|
|
|
fmt.Println("🌱🏠📠 greenhouse-telemetry is running, listening on :8080")
|
|
|
|
<-done
|
|
|
|
fmt.Println("exiting")
|
|
|
|
}
|
|
|
|
|
|
|
|
func postToLoki(bytez []byte) error {
|
|
|
|
lokiRequest, err := http.NewRequest("POST", fmt.Sprintf("%s/loki/api/v1/push", lokiURL), bytes.NewBuffer(bytez))
|
|
|
|
if err != nil {
|
|
|
|
return errors.Wrap(err, "http.NewRequest")
|
|
|
|
}
|
|
|
|
lokiRequest.Header.Set("Content-Type", "application/json")
|
|
|
|
response, err := lokiClient.Do(lokiRequest)
|
|
|
|
if err != nil {
|
|
|
|
return errors.Wrap(err, "lokiClient.Do")
|
|
|
|
}
|
|
|
|
if response.StatusCode > 299 {
|
|
|
|
responseString := "<read error>"
|
|
|
|
errorBytes, err := ioutil.ReadAll(response.Body)
|
|
|
|
if err == nil {
|
|
|
|
responseString = string(errorBytes)
|
|
|
|
}
|
|
|
|
return errors.Errorf("loki returned HTTP %d: %s", response.StatusCode, responseString)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func putEvent(timestamp int64, event *TelemetryEvent) error {
|
|
|
|
eventBytes, err := cbor.Marshal(event)
|
|
|
|
if err != nil {
|
|
|
|
return errors.Wrap(err, "cbor.Marshal")
|
|
|
|
}
|
|
|
|
timestampBytes := make([]byte, 8)
|
|
|
|
binary.BigEndian.PutUint64(timestampBytes, uint64(timestamp))
|
|
|
|
|
|
|
|
key := append(indexEventsByTimestamp, timestampBytes...)
|
|
|
|
//log.Printf("put eventBytes: %x, %x (%d), %x -> %x\n", indexEventsByTimestamp, timestampBytes, timestamp, key, eventBytes)
|
|
|
|
err = db.Put(key, eventBytes, emptyWriteOptions)
|
|
|
|
if err != nil {
|
|
|
|
return errors.Wrap(err, "db.Put")
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func putAccountByStr(index []byte, timestamp int64, account, str string) error {
|
|
|
|
timestampBytes := make([]byte, 8)
|
|
|
|
binary.BigEndian.PutUint64(timestampBytes, uint64(timestamp))
|
|
|
|
|
|
|
|
strBytes := []byte(str)
|
|
|
|
key := append(index, append(strBytes, timestampBytes...)...)
|
|
|
|
//log.Printf("putAccountByStr: %x, %x, %x, (%x -> %s)\n", index, strBytes, timestampBytes, key, account)
|
|
|
|
err := db.Put(key, []byte(account), emptyWriteOptions)
|
|
|
|
if err != nil {
|
|
|
|
return errors.Wrap(err, "db.Put")
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func getNearestKnownAccount(index []byte, timestamp int64, value string, nearestKnownTimestamp int64) (string, int64, error) {
|
|
|
|
timestampBytes := make([]byte, 8)
|
|
|
|
binary.BigEndian.PutUint64(timestampBytes, uint64(timestamp))
|
|
|
|
|
|
|
|
beginningOfValue := append(index, append([]byte(value), beginningOfTimeBytes...)...)
|
|
|
|
currentTimeOfValue := append(index, append([]byte(value), timestampBytes...)...)
|
|
|
|
endOfValue := append(index, append([]byte(value), endOfTimeBytes...)...)
|
|
|
|
|
|
|
|
// log.Printf("search: beginningOfValue: %x, %x, %x (%x)\n", index, []byte(value), beginningOfTimeBytes, beginningOfValue)
|
|
|
|
// log.Printf("search: currentTimeOfValue: %x, %x, %x (%x)\n", index, []byte(value), timestampBytes, currentTimeOfValue)
|
|
|
|
// log.Printf("search: endOfValue: %x, %x, %x (%x)\n", index, []byte(value), endOfTimeBytes, endOfValue)
|
|
|
|
|
|
|
|
thePast := &util.Range{
|
|
|
|
Start: beginningOfValue,
|
|
|
|
Limit: currentTimeOfValue,
|
|
|
|
}
|
|
|
|
theFuture := &util.Range{
|
|
|
|
Start: currentTimeOfValue,
|
|
|
|
Limit: endOfValue,
|
|
|
|
}
|
|
|
|
|
|
|
|
smallestDifference := timestamp - nearestKnownTimestamp
|
|
|
|
if smallestDifference < 0 {
|
|
|
|
smallestDifference = -smallestDifference
|
|
|
|
}
|
|
|
|
valueToReturn := ""
|
|
|
|
|
|
|
|
pastIterator := db.NewIterator(thePast, emptyReadOptions)
|
|
|
|
pastIterator.Seek(currentTimeOfValue)
|
|
|
|
if pastIterator.Prev() {
|
|
|
|
key := pastIterator.Key()
|
|
|
|
|
|
|
|
// the last 8 bytes of the key contains the timestamp
|
|
|
|
pastTimestamp := int64(binary.BigEndian.Uint64(key[len(key)-8:]))
|
|
|
|
|
|
|
|
newDifference := timestamp - pastTimestamp
|
|
|
|
if newDifference < 0 {
|
|
|
|
newDifference = -newDifference
|
|
|
|
}
|
|
|
|
//log.Printf("pastIterator found %d, newDifference: %d, smallestDifference: %d, go: %t, v: %s\n", pastTimestamp, newDifference, smallestDifference, newDifference < smallestDifference, string(pastIterator.Value()))
|
|
|
|
if newDifference < smallestDifference {
|
|
|
|
nearestKnownTimestamp = pastTimestamp
|
|
|
|
smallestDifference = newDifference
|
|
|
|
valueToReturn = string(pastIterator.Value())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
pastIterator.Release()
|
|
|
|
|
|
|
|
futureIterator := db.NewIterator(theFuture, emptyReadOptions)
|
|
|
|
if futureIterator.Seek(currentTimeOfValue) {
|
|
|
|
key := futureIterator.Key()
|
|
|
|
|
|
|
|
// the last 8 bytes of the key contains the timestamp
|
|
|
|
futureTimestamp := int64(binary.BigEndian.Uint64(key[len(key)-8:]))
|
|
|
|
|
|
|
|
newDifference := timestamp - futureTimestamp
|
|
|
|
if newDifference < 0 {
|
|
|
|
newDifference = -newDifference
|
|
|
|
}
|
|
|
|
//log.Printf("futureIterator found %x %x (%d), newDifference: %d, smallestDifference: %d, go: %t, v: %s\n", futureIterator.Key(), futureIterator.Key()[2:10], futureTimestamp, newDifference, smallestDifference, newDifference < smallestDifference, string(futureIterator.Value()))
|
|
|
|
if newDifference < smallestDifference {
|
|
|
|
nearestKnownTimestamp = futureTimestamp
|
|
|
|
smallestDifference = newDifference
|
|
|
|
valueToReturn = string(futureIterator.Value())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
futureIterator.Release()
|
|
|
|
|
|
|
|
return valueToReturn, nearestKnownTimestamp, nil
|
|
|
|
}
|