Browse Source

first commit of passthrough telemetry

main
forest 1 month ago
parent
commit
01616f5842
4 changed files with 321 additions and 0 deletions
  1. +1
    -0
      .gitignore
  2. +10
    -0
      go.mod
  3. +33
    -0
      go.sum
  4. +277
    -0
      main.go

+ 1
- 0
.gitignore View File

@ -0,0 +1 @@
data

+ 10
- 0
go.mod View File

@ -0,0 +1,10 @@
module git.sequentialread.com/forest/greenhouse-telemetry
go 1.16
require (
git.sequentialread.com/forest/pkg-errors v0.9.2 // indirect
github.com/fxamacker/cbor/v2 v2.3.0 // indirect
github.com/syndtr/goleveldb v1.0.0
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e
)

+ 33
- 0
go.sum View File

@ -0,0 +1,33 @@
git.sequentialread.com/forest/pkg-errors v0.9.2 h1:j6pwbL6E+TmE7TD0tqRtGwuoCbCfO6ZR26Nv5nest9g=
git.sequentialread.com/forest/pkg-errors v0.9.2/go.mod h1:8TkJ/f8xLWFIAid20aoqgDZcCj9QQt+FU+rk415XO1w=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fxamacker/cbor/v2 v2.3.0 h1:aM45YGMctNakddNNAezPxDUpv38j44Abh+hifNuqXik=
github.com/fxamacker/cbor/v2 v2.3.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrtAnWBwBCVo=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

+ 277
- 0
main.go View File

@ -0,0 +1,277 @@
package main
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"os/signal"
"regexp"
"strconv"
"strings"
"time"
errors "git.sequentialread.com/forest/pkg-errors"
cbor "github.com/fxamacker/cbor/v2"
leveldb "github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
"golang.org/x/sys/unix"
)
type TelemetryEvent struct {
Submission *LokiSubmission `cbor:"1,keyasint,omitempty"`
AccountByIpTimestamp int64 `cbor:"8,keyasint,omitempty"`
AccountByDaemonTimestamp int64 `cbor:"10,keyasint,omitempty"`
AccountByURLTimestamp int64 `cbor:"12,keyasint,omitempty"`
}
type LokiSubmission struct {
Streams []LokiStream `json:"streams,omitempty",cbor:"1,keyasint,omitempty"`
}
type LokiStream struct {
Stream map[string]string `json:"stream,omitempty",cbor:"1,keyasint,omitempty"`
Values [][]string `json:"values,omitempty",cbor:"2,keyasint,omitempty"`
}
var datesNotValidBefore time.Time
var datesNotValidAfter time.Time
var indexCheckpointTimestampByDelaySeconds []byte
var indexAccountByIp []byte
var indexAccountByDaemon []byte
var indexAccountByURL []byte
var indexEventsByTimestamp []byte
var db *leveldb.DB
var emptyWriteOptions *opt.WriteOptions
var delayNamesBySeconds map[int]string
var delayChunkSize time.Duration
var lokiClient *http.Client
func main() {
os.MkdirAll("./data/leveldb", 0755)
var err error
db, err = leveldb.OpenFile("./data/leveldb", &opt.Options{})
defer (func() {
err := db.Close()
if err != nil {
panic(err)
}
})()
if err != nil {
panic(err)
}
matchLokiSubmissionJSON := regexp.MustCompile("^[ \\t\\n]*\\{[ \\t\\n]*\"streams\"")
datesNotValidBefore = time.Date(1970, 0, 0, 0, 0, 0, 0, time.UTC)
datesNotValidAfter = time.Date(2100, 0, 0, 0, 0, 0, 0, time.UTC)
indexCheckpointTimestampByDelaySeconds = []byte{0x00, 0x20}
indexAccountByIp = []byte{0x00, 0x30}
indexAccountByDaemon = []byte{0x00, 0x34}
indexAccountByURL = []byte{0x00, 0x38}
delayNamesBySeconds = map[int]string{
5: "realtime",
600: "tenmin",
86400: "day",
}
delayChunkSize = time.Second
lokiClient = &http.Client{
Timeout: time.Second * 5,
}
server := http.NewServeMux()
server.HandleFunc("/", func(responseWriter http.ResponseWriter, request *http.Request) {
if request.Method == "POST" {
ip := request.Header.Get("X-Forwarded-For")
var submission LokiSubmission
var submissionBody []byte
var timestamp int64
body, err := ioutil.ReadAll(request.Body)
if err != nil {
log.Printf("500 internal server error: http read error: %+v\n", err)
http.Error(responseWriter, "500 internal server error: http read error", http.StatusInternalServerError)
return
}
if request.Header.Get("Content-Type") == "application/json" || matchLokiSubmissionJSON.Match(body) {
submissionBody = body
err := json.Unmarshal(body, body)
if err != nil {
log.Printf("400 bad request: invalid json: %+v\n", err)
http.Error(responseWriter, "400 bad request: invalid json", http.StatusBadRequest)
return
}
if len(submission.Streams) == 0 {
log.Println("400 bad request: no streams, at least one stream is required")
http.Error(responseWriter, "400 bad request: at least one stream is required", http.StatusBadRequest)
return
}
if len(submission.Streams[0].Stream) == 0 {
log.Println("400 bad request: no labels, at least one label is required")
http.Error(responseWriter, "400 bad request: no labels, at least one label is required", http.StatusBadRequest)
return
}
if len(submission.Streams[0].Values) == 0 {
log.Println("400 bad request: no values, at least one value is required")
http.Error(responseWriter, "400 bad request: no values, at least one value is required", http.StatusBadRequest)
return
}
if len(submission.Streams[0].Values[0]) != 2 {
log.Println("400 bad request: each value must have two strings")
http.Error(responseWriter, "400 bad request: each value must have two strings", http.StatusBadRequest)
return
}
timestamp, err = strconv.ParseInt(submission.Streams[0].Values[0][0], 10, 64)
if err != nil {
log.Println("400 bad request: timestamp was not an integer")
http.Error(responseWriter, "400 bad request: timestamp was not an integer", http.StatusBadRequest)
return
}
timestampTime := time.Unix(0, timestamp).UTC()
if timestampTime.Before(datesNotValidBefore) || timestampTime.After(datesNotValidAfter) {
log.Println("400 bad request: timestamp was out of bounds")
http.Error(responseWriter, "400 bad request: timestamp was out of bounds", http.StatusBadRequest)
return
}
} else {
stream := map[string]string{}
query := request.URL.Query()
for k, vs := range query {
if len(vs) > 0 {
stream[k] = vs[0]
}
}
timestamp = time.Now().UTC().UnixNano()
submission = LokiSubmission{
Streams: []LokiStream{
{
Stream: stream,
Values: [][]string{{
strconv.FormatInt(timestamp, 10),
strings.TrimSpace(string(body)),
}},
},
},
}
submissionBody, err = json.Marshal(submission)
if err != nil {
log.Printf("500 internal server error: json.Marshal: %+v\n", err)
http.Error(responseWriter, "500 internal server error", http.StatusInternalServerError)
return
}
}
event := TelemetryEvent{
Submission: &submission,
}
err = putEvent(timestamp, &event)
if err != nil {
log.Printf("500 internal server error: putEvent: %+v\n", err)
http.Error(responseWriter, "500 internal server error", http.StatusInternalServerError)
return
}
account, hasAccount := submission.Streams[0].Stream["account"]
daemon, hasDaemon := submission.Streams[0].Stream["daemon"]
url, hasURL := submission.Streams[0].Stream["url"]
if hasAccount {
err = putAccountByStr(indexAccountByIp, timestamp, account, ip)
}
if err == nil && hasDaemon && hasAccount {
err = putAccountByStr(indexAccountByDaemon, timestamp, account, daemon)
}
if err == nil && hasDaemon && hasURL {
err = putAccountByStr(indexAccountByURL, timestamp, account, url)
}
if err != nil {
log.Printf("500 internal server error: putAccountByStr: %+v\n", err)
http.Error(responseWriter, "500 internal server error", http.StatusInternalServerError)
return
}
go func(submissionBody []byte) {
err = postToLoki(submissionBody)
if err != nil {
log.Printf("postToLoki failed: %+v\n", err)
}
}(submissionBody)
} else {
http.Error(responseWriter, "405 method not allowed, try POST instead", http.StatusMethodNotAllowed)
}
})
go func() {
err = http.ListenAndServe(":8080", server)
panic(err)
}()
sigs := make(chan os.Signal, 10)
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGHUP)
done := make(chan bool, 1)
go func() {
sig := <-sigs
log.Printf("greenhouse-telemetry recieved signal: %s\n", sig)
done <- true
}()
fmt.Println("🌱🏠📠 greenhouse-telemetry is running, listening on :8080")
<-done
fmt.Println("exiting")
}
func postToLoki(bytez []byte) error {
lokiRequest, err := http.NewRequest("POST", "http://loki:3100/api/v1/push", bytes.NewBuffer(bytez))
if err != nil {
return errors.Wrap(err, "http.NewRequest")
}
response, err := lokiClient.Do(lokiRequest)
if err != nil {
return errors.Wrap(err, "lokiClient.Do")
}
if response.StatusCode > 299 {
responseString := "<read error>"
errorBytes, err := ioutil.ReadAll(response.Body)
if err != nil {
responseString = string(errorBytes)
}
return errors.Errorf("loki returned HTTP %d: %s", response.StatusCode, responseString)
}
return nil
}
func putEvent(timestamp int64, event *TelemetryEvent) error {
eventBytes, err := cbor.Marshal(event)
if err != nil {
return errors.Wrap(err, "cbor.Marshal")
}
timestampBytes := make([]byte, 8)
binary.BigEndian.PutUint64(timestampBytes, uint64(timestamp))
err = db.Put(append(indexEventsByTimestamp, timestampBytes...), eventBytes, emptyWriteOptions)
if err != nil {
return errors.Wrap(err, "db.Put")
}
return nil
}
func putAccountByStr(index []byte, timestamp int64, account, str string) error {
timestampBytes := make([]byte, 8)
binary.BigEndian.PutUint64(timestampBytes, uint64(timestamp))
err := db.Put(append(index, append([]byte(str), timestampBytes...)...), []byte(account), emptyWriteOptions)
if err != nil {
return errors.Wrap(err, "db.Put")
}
return nil
}

Loading…
Cancel
Save