Browse Source

app appears to be working

main
forest 1 month ago
parent
commit
4d5871378a
3 changed files with 187 additions and 42 deletions
  1. +18
    -0
      Dockerfile
  2. +41
    -0
      build-docker.sh
  3. +128
    -42
      main.go

+ 18
- 0
Dockerfile View File

@ -0,0 +1,18 @@
FROM golang:1.15.2-alpine as build
ARG GOARCH=
ARG GO_BUILD_ARGS=
RUN mkdir /build
WORKDIR /build
COPY . .
RUN go build -v $GO_BUILD_ARGS -o /build/greenhouse-telemetry main.go
FROM alpine
WORKDIR /greenhouse-telemetry
COPY --from=build /build/greenhouse-telemetry /greenhouse-telemetry/greenhouse-telemetry
RUN mkdir -p /greenhouse-telemetry/config
ENTRYPOINT ["/greenhouse-telemetry/greenhouse-telemetry"]

+ 41
- 0
build-docker.sh View File

@ -0,0 +1,41 @@
#!/bin/bash -e
VERSION="0.1.1"
rm -rf dockerbuild || true
mkdir dockerbuild
#cp Dockerfile dockerbuild/Dockerfile-amd64
cp Dockerfile dockerbuild/Dockerfile-arm
#cp Dockerfile dockerbuild/Dockerfile-arm64
#sed -E 's|FROM alpine|FROM amd64/alpine|' -i dockerbuild/Dockerfile-amd64
sed -E 's|FROM alpine|FROM arm32v7/alpine|' -i dockerbuild/Dockerfile-arm
#sed -E 's|FROM alpine|FROM arm64v8/alpine|' -i dockerbuild/Dockerfile-arm64
#sed -E 's/GOARCH=/GOARCH=amd64/' -i dockerbuild/Dockerfile-amd64
sed -E 's/GOARCH=/GOARCH=arm/' -i dockerbuild/Dockerfile-arm
#sed -E 's/GOARCH=/GOARCH=arm64/' -i dockerbuild/Dockerfile-arm64
#docker build -f dockerbuild/Dockerfile-amd64 -t sequentialread/greenhouse-telemetry:$VERSION-amd64 .
docker build -f dockerbuild/Dockerfile-arm -t sequentialread/greenhouse-telemetry:$VERSION-arm .
#docker build -f dockerbuild/Dockerfile-arm64 -t sequentialread/greenhouse-telemetry:$VERSION-arm64 .
#docker push sequentialread/greenhouse-telemetry:$VERSION-amd64
docker push sequentialread/greenhouse-telemetry:$VERSION-arm
#docker push sequentialread/greenhouse-telemetry:$VERSION-arm64
export DOCKER_CLI_EXPERIMENTAL=enabled
# docker manifest create sequentialread/greenhouse-telemetry:$VERSION \
# sequentialread/greenhouse-telemetry:$VERSION-amd64 \
# sequentialread/greenhouse-telemetry:$VERSION-arm \
# sequentialread/greenhouse-telemetry:$VERSION-arm64
docker manifest create sequentialread/greenhouse-telemetry:$VERSION sequentialread/greenhouse-telemetry:$VERSION-arm \
#docker manifest annotate --arch amd64 sequentialread/greenhouse-telemetry:$VERSION sequentialread/greenhouse-telemetry:$VERSION-amd64
docker manifest annotate --arch arm sequentialread/greenhouse-telemetry:$VERSION sequentialread/greenhouse-telemetry:$VERSION-arm
#docker manifest annotate --arch arm64 sequentialread/greenhouse-telemetry:$VERSION sequentialread/greenhouse-telemetry:$VERSION-arm64
docker manifest push sequentialread/greenhouse-telemetry:$VERSION

+ 128
- 42
main.go View File

@ -11,6 +11,7 @@ import (
"os"
"os/signal"
"regexp"
"runtime/debug"
"strconv"
"strings"
"time"
@ -43,10 +44,12 @@ var datesNotValidBefore time.Time
var datesNotValidAfter time.Time
var indexCheckpointTimestampByDelaySeconds []byte
var indexEventsByTimestamp []byte
var indexAccountByIp []byte
var indexAccountByDaemon []byte
var indexAccountByURL []byte
var indexEventsByTimestamp []byte
const indexPrefixLength = 2
var beginningOfTimeBytes []byte
var endOfTimeBytes []byte
@ -78,9 +81,10 @@ func main() {
datesNotValidBefore = time.Date(1970, 0, 0, 0, 0, 0, 0, time.UTC)
datesNotValidAfter = time.Date(2100, 0, 0, 0, 0, 0, 0, time.UTC)
indexCheckpointTimestampByDelaySeconds = []byte{0x00, 0x20}
indexAccountByIp = []byte{0x00, 0x30}
indexAccountByDaemon = []byte{0x00, 0x34}
indexAccountByURL = []byte{0x00, 0x38}
indexEventsByTimestamp = []byte{0x00, 0x30}
indexAccountByIp = []byte{0x00, 0x40}
indexAccountByDaemon = []byte{0x00, 0x44}
indexAccountByURL = []byte{0x00, 0x48}
beginningOfTimeBytes = []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
endOfTimeBytes = []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
@ -97,10 +101,17 @@ func main() {
server := http.NewServeMux()
server.HandleFunc("/", func(responseWriter http.ResponseWriter, request *http.Request) {
if request.Method == "POST" {
ip := request.Header.Get("X-Forwarded-For")
ipValues := request.Header.Values("X-Forwarded-For")
//log.Printf("strings.Join(ipValues): %s\n", strings.Join(ipValues, ", "))
ip := "null"
for _, value := range ipValues {
if !strings.HasPrefix(value, "192.168.") && !strings.HasPrefix(value, "10.") && !strings.HasPrefix(value, "127.") {
ip = value
return
}
}
var submission LokiSubmission
//var submissionBody []byte
var timestamp int64
body, err := ioutil.ReadAll(request.Body)
if err != nil {
@ -110,7 +121,6 @@ func main() {
}
if request.Header.Get("Content-Type") == "application/json" || matchLokiSubmissionJSON.Match(body) {
//submissionBody = body
err := json.Unmarshal(body, body)
if err != nil {
log.Printf("400 bad request: invalid json: %+v\n", err)
@ -169,14 +179,12 @@ func main() {
},
},
}
// submissionBody, err = json.Marshal(submission)
// if err != nil {
// log.Printf("500 internal server error: json.Marshal: %+v\n", err)
// http.Error(responseWriter, "500 internal server error", http.StatusInternalServerError)
// return
// }
}
originalIp := submission.Streams[0].Stream["ip"]
if originalIp == "auto" {
submission.Streams[0].Stream["ip"] = ip
}
event := TelemetryEvent{
Submission: &submission,
}
@ -206,14 +214,6 @@ func main() {
return
}
// we don't wanna post it raw I don't think...
// go func(submissionBody []byte) {
// err = postToLoki(submissionBody)
// if err != nil {
// log.Printf("postToLoki failed: %+v\n", err)
// }
// }(submissionBody)
} else {
http.Error(responseWriter, "405 method not allowed, try POST instead", http.StatusMethodNotAllowed)
}
@ -222,23 +222,40 @@ func main() {
// main async process that consumes and annotates events with `account` based on joining on daemon, url, or IP
// different time delays are used to get real time metrics + more robust joins that take longer to resolve
go func() {
debug.SetPanicOnFault(true)
defer func() {
if err := recover(); err != nil {
fmt.Print("\n\n!!!!!\n")
log.Printf("panic: %+v\n", err)
debug.PrintStack()
fmt.Print("\n!!!!!\n\n")
}
}()
for {
for seconds, name := range delayNamesBySeconds {
// default: start seconds + delayChunkSize ago
startTimestamp := time.Now().UTC().UnixNano() - (int64(time.Second*time.Duration(seconds)) + int64(delayChunkSize))
// retrieve last checkpoint and replace the default if found
// retrieve last checkpoint
var err error
var startTimestamp int64
var startTimestampBytes []byte
secondsBytes := make([]byte, 4)
binary.BigEndian.PutUint32(secondsBytes, uint32(seconds))
startTimestampBytes, err := db.Get(append(indexCheckpointTimestampByDelaySeconds, secondsBytes...), emptyReadOptions)
startTimestampBytes, err = db.Get(append(indexCheckpointTimestampByDelaySeconds, secondsBytes...), emptyReadOptions)
if err == nil {
startTimestamp = int64(binary.BigEndian.Uint64(startTimestampBytes))
} else if err != leveldb.ErrNotFound {
} else if err == leveldb.ErrNotFound {
// default: start seconds + delayChunkSize ago
startTimestamp = time.Now().UTC().UnixNano() - (int64(time.Second*time.Duration(seconds)) + int64(delayChunkSize))
startTimestampBytes = make([]byte, 8)
binary.BigEndian.PutUint64(startTimestampBytes, uint64(startTimestamp))
} else {
log.Printf("db.Get indexCheckpointTimestampByDelaySeconds %d failed: %+v\n", seconds, err)
continue
}
endTimestamp := startTimestamp + int64(time.Second)
endTimestamp := startTimestamp + int64(delayChunkSize)
endTimestampBytes := make([]byte, 8)
binary.BigEndian.PutUint64(endTimestampBytes, uint64(endTimestamp))
@ -247,12 +264,18 @@ func main() {
Limit: append(indexEventsByTimestamp, endTimestampBytes...),
}
// log.Printf("chunkOfThePastStart: %x, %x (%d), key: %x \n", indexEventsByTimestamp, startTimestampBytes, startTimestamp, chunkOfThePast.Start)
// log.Printf("chunkOfThePastEnd: %x, %x (%d), key: %x \n", indexEventsByTimestamp, endTimestampBytes, endTimestamp, chunkOfThePast.Limit)
iterator := db.NewIterator(chunkOfThePast, emptyReadOptions)
//iterator.Seek(chunkOfThePast.Start)
events := []TelemetryEvent{}
for iterator.Next() {
eventBytes := iterator.Value()
var event TelemetryEvent
//log.Printf("read eventBytes: %x -> %x\n", iterator.Key(), eventBytes)
err := cbor.Unmarshal(eventBytes, &event)
if err != nil {
log.Printf("cbor.Unmarshal eventBytes failed: %+v\n", err)
@ -262,10 +285,22 @@ func main() {
}
iterator.Release()
db.Put(append(indexCheckpointTimestampByDelaySeconds, secondsBytes...), endTimestampBytes, emptyWriteOptions)
seconds := seconds
name := name
go func(events []TelemetryEvent, seconds int, name string) {
debug.SetPanicOnFault(true)
defer func() {
if err := recover(); err != nil {
fmt.Print("\n\n!!!!!\n")
log.Printf("panic: %+v\n", err)
debug.PrintStack()
fmt.Print("\n!!!!!\n\n")
}
}()
for _, event := range events {
timestamp, err := strconv.ParseInt(event.Submission.Streams[0].Values[0][0], 10, 64)
@ -278,6 +313,9 @@ func main() {
ip, hasIp := event.Submission.Streams[0].Stream["ip"]
daemon, hasDaemon := event.Submission.Streams[0].Stream["daemon"]
url, hasURL := event.Submission.Streams[0].Stream["url"]
//log.Printf("hasAccount: %t, hasIp: %t, hasDaemon: %t, hasURL: %t\n", hasAccount, hasIp, hasDaemon, hasURL)
if !hasAccount && hasDaemon {
account, accountByDaemonTimestamp, err := getNearestKnownAccount(
indexAccountByDaemon, timestamp, daemon, event.AccountByDaemonTimestamp,
@ -289,8 +327,11 @@ func main() {
)
continue
}
event.AccountByDaemonTimestamp = accountByDaemonTimestamp
event.Submission.Streams[0].Stream["account"] = account
if account != "" {
log.Printf("account: %s, from daemon: %s\n", account, daemon)
event.AccountByDaemonTimestamp = accountByDaemonTimestamp
event.Submission.Streams[0].Stream["account"] = account
}
} else if !hasAccount && hasURL {
account, accountByURLTimestamp, err := getNearestKnownAccount(
indexAccountByURL, timestamp, url, event.AccountByURLTimestamp,
@ -302,8 +343,11 @@ func main() {
)
continue
}
event.AccountByURLTimestamp = accountByURLTimestamp
event.Submission.Streams[0].Stream["account"] = account
if account != "" {
log.Printf("account: %s, from URL: %s\n", account, url)
event.AccountByURLTimestamp = accountByURLTimestamp
event.Submission.Streams[0].Stream["account"] = account
}
} else if !hasAccount && hasIp {
account, accountByIpTimestamp, err := getNearestKnownAccount(
indexAccountByIp, timestamp, ip, event.AccountByIpTimestamp,
@ -315,20 +359,42 @@ func main() {
)
continue
}
event.AccountByIpTimestamp = accountByIpTimestamp
event.Submission.Streams[0].Stream["account"] = account
if account != "" {
log.Printf("account: %s, from ip: %s\n", account, ip)
event.AccountByIpTimestamp = accountByIpTimestamp
event.Submission.Streams[0].Stream["account"] = account
}
}
event.Submission.Streams[0].Stream["delay"] = name
eventBytes, err := json.Marshal(event)
stream := event.Submission.Streams[0]
stream.Stream["delay"] = name
for i := range stream.Values {
account := stream.Stream["account"]
if len(account) > 30 {
account = account[:30]
}
tyype := stream.Stream["type"]
if len(tyype) > 30 {
tyype = tyype[:30]
}
stream.Values[i][1] = fmt.Sprintf(
"%-6s %-30s %-15s %-30s %s",
stream.Stream["delay"], account, stream.Stream["ip"], tyype, stream.Values[i][1],
)
}
eventJSONBytes, err := json.Marshal(event.Submission)
if err != nil {
log.Printf("json.Marshal(event) failed: %+v\n", err)
continue
}
err = postToLoki(eventBytes)
err = postToLoki(eventJSONBytes)
if err != nil {
log.Printf("postToLoki failed: %+v\n", err)
}
// else {
// log.Printf("postToLoki success: %s\n", string(eventJSONBytes))
// }
}
}(events, seconds, name)
@ -357,10 +423,11 @@ func main() {
}
func postToLoki(bytez []byte) error {
lokiRequest, err := http.NewRequest("POST", "http://loki:3100/api/v1/push", bytes.NewBuffer(bytez))
lokiRequest, err := http.NewRequest("POST", "http://loki:3100/loki/api/v1/push", bytes.NewBuffer(bytez))
if err != nil {
return errors.Wrap(err, "http.NewRequest")
}
lokiRequest.Header.Set("Content-Type", "application/json")
response, err := lokiClient.Do(lokiRequest)
if err != nil {
return errors.Wrap(err, "lokiClient.Do")
@ -368,7 +435,7 @@ func postToLoki(bytez []byte) error {
if response.StatusCode > 299 {
responseString := "<read error>"
errorBytes, err := ioutil.ReadAll(response.Body)
if err != nil {
if err == nil {
responseString = string(errorBytes)
}
return errors.Errorf("loki returned HTTP %d: %s", response.StatusCode, responseString)
@ -384,7 +451,9 @@ func putEvent(timestamp int64, event *TelemetryEvent) error {
timestampBytes := make([]byte, 8)
binary.BigEndian.PutUint64(timestampBytes, uint64(timestamp))
err = db.Put(append(indexEventsByTimestamp, timestampBytes...), eventBytes, emptyWriteOptions)
key := append(indexEventsByTimestamp, timestampBytes...)
//log.Printf("put eventBytes: %x, %x (%d), %x -> %x\n", indexEventsByTimestamp, timestampBytes, timestamp, key, eventBytes)
err = db.Put(key, eventBytes, emptyWriteOptions)
if err != nil {
return errors.Wrap(err, "db.Put")
}
@ -395,7 +464,10 @@ func putAccountByStr(index []byte, timestamp int64, account, str string) error {
timestampBytes := make([]byte, 8)
binary.BigEndian.PutUint64(timestampBytes, uint64(timestamp))
err := db.Put(append(index, append([]byte(str), timestampBytes...)...), []byte(account), emptyWriteOptions)
strBytes := []byte(str)
key := append(index, append(strBytes, timestampBytes...)...)
//log.Printf("putAccountByStr: %x, %x, %x, (%x -> %s)\n", index, strBytes, timestampBytes, key, account)
err := db.Put(key, []byte(account), emptyWriteOptions)
if err != nil {
return errors.Wrap(err, "db.Put")
}
@ -410,6 +482,10 @@ func getNearestKnownAccount(index []byte, timestamp int64, value string, nearest
currentTimeOfValue := append(index, append([]byte(value), timestampBytes...)...)
endOfValue := append(index, append([]byte(value), endOfTimeBytes...)...)
// log.Printf("search: beginningOfValue: %x, %x, %x (%x)\n", index, []byte(value), beginningOfTimeBytes, beginningOfValue)
// log.Printf("search: currentTimeOfValue: %x, %x, %x (%x)\n", index, []byte(value), timestampBytes, currentTimeOfValue)
// log.Printf("search: endOfValue: %x, %x, %x (%x)\n", index, []byte(value), endOfTimeBytes, endOfValue)
thePast := &util.Range{
Start: beginningOfValue,
Limit: currentTimeOfValue,
@ -428,11 +504,16 @@ func getNearestKnownAccount(index []byte, timestamp int64, value string, nearest
pastIterator := db.NewIterator(thePast, emptyReadOptions)
pastIterator.Seek(currentTimeOfValue)
if pastIterator.Prev() {
pastTimestamp := int64(binary.BigEndian.Uint64(pastIterator.Key()[2:10]))
key := pastIterator.Key()
// the last 8 bytes of the key contains the timestamp
pastTimestamp := int64(binary.BigEndian.Uint64(key[len(key)-8:]))
newDifference := timestamp - pastTimestamp
if newDifference < 0 {
newDifference = -newDifference
}
//log.Printf("pastIterator found %d, newDifference: %d, smallestDifference: %d, go: %t\n", pastTimestamp, newDifference, smallestDifference, newDifference < smallestDifference)
if newDifference < smallestDifference {
timestamp = pastTimestamp
smallestDifference = newDifference
@ -443,11 +524,16 @@ func getNearestKnownAccount(index []byte, timestamp int64, value string, nearest
futureIterator := db.NewIterator(theFuture, emptyReadOptions)
if futureIterator.Seek(currentTimeOfValue) {
futureTimestamp := int64(binary.BigEndian.Uint64(futureIterator.Key()[2:10]))
key := futureIterator.Key()
// the last 8 bytes of the key contains the timestamp
futureTimestamp := int64(binary.BigEndian.Uint64(key[len(key)-8:]))
newDifference := timestamp - futureTimestamp
if newDifference < 0 {
newDifference = -newDifference
}
//log.Printf("futureIterator found %x %x (%d), newDifference: %d, smallestDifference: %d, go: %t\n", futureIterator.Key(), futureIterator.Key()[2:10], futureTimestamp, newDifference, smallestDifference, newDifference < smallestDifference)
if newDifference < smallestDifference {
timestamp = futureTimestamp
smallestDifference = newDifference


Loading…
Cancel
Save