owncast customizations & stream elements
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

313 lines
8.1 KiB

package main
import (
"crypto/md5"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/koding/websocketproxy"
)
type CachedResponse struct {
StatusCode int
Headers map[string][]string
}
func main() {
if len(os.Args[1]) < 2 {
panic("first argument must be of the form host:port")
}
backend := os.Args[1]
hostPort := strings.Split(backend, ":")
if len(hostPort) != 2 {
panic("first argument must be of the form host:port")
}
_, err := strconv.Atoi(hostPort[1])
if err != nil {
panic("first argument must be of the form host:port")
}
mutex := sync.Mutex{}
fails := 1
memCache := map[string][]byte{}
memCacheHeader := map[string]CachedResponse{}
debugString := strings.ToLower(os.ExpandEnv("$DEBUG"))
debug := debugString == "true" || debugString == "t" || debugString == "1"
websocketURL := &url.URL{
Scheme: "ws",
Host: backend,
Path: "/entry",
}
websocketProxyInstance := websocketproxy.NewProxy(websocketURL)
err = os.MkdirAll("./cache", 0660)
if err != nil {
panic(fmt.Sprintf("cant make cache folder: %v", err))
}
log.Printf("listening 8080 --> %s ", backend)
client := http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: func(n, a string) (net.Conn, error) {
return net.DialTimeout(n, a, time.Second)
},
},
Timeout: time.Second * time.Duration(5),
}
getCacheKey := func(url *url.URL, ifNoneMatchHeader string) string {
if debug {
log.Printf("getCacheKey %s%s%s", url.Path, url.RawQuery, ifNoneMatchHeader)
}
return fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s%s%s", url.Path, url.RawQuery, ifNoneMatchHeader))))
}
proxyRequest := func(writer http.ResponseWriter, request *http.Request) (*http.Response, []byte, error) {
newURL := url.URL{
Scheme: "http",
Host: backend,
Path: request.URL.Path,
RawQuery: request.URL.RawQuery,
}
newRequest, err := http.NewRequest(request.Method, newURL.String(), nil)
if err != nil {
log.Printf("cant make request: %v\n", err)
return nil, nil, err
}
for k, vs := range request.Header {
for _, v := range vs {
newRequest.Header.Add(k, v)
}
}
//log.Println("proxyRequest")
response, err := client.Do(newRequest)
if err != nil {
if debug {
log.Printf("cant do request: %v\n", err)
}
return nil, nil, err
}
responseBytes, err := ioutil.ReadAll(response.Body)
if err != nil {
log.Printf("read error: %v\n", err)
return nil, nil, err
}
return response, responseBytes, nil
}
proxyResponse := func(writer http.ResponseWriter, statusCode int, responseHeader map[string][]string, responseBytes []byte) {
for k, vs := range responseHeader {
for _, v := range vs {
writer.Header().Add(k, v)
}
}
writer.WriteHeader(statusCode)
writer.Write(responseBytes)
}
cachedResponse := func(writer http.ResponseWriter, cacheKey string) {
headerBytes, err := ioutil.ReadFile(fmt.Sprintf("cache/%s_header", cacheKey))
if err == nil {
cachedResponse := CachedResponse{}
err := json.Unmarshal(headerBytes, &cachedResponse)
if err == nil {
bodyBytes, err := ioutil.ReadFile(fmt.Sprintf("cache/%s_body", cacheKey))
if err != nil && cachedResponse.StatusCode >= 300 {
err = nil
bodyBytes = []byte{}
}
if err == nil {
if debug {
log.Printf("cachedResponse %s worked", cacheKey)
}
proxyResponse(writer, cachedResponse.StatusCode, cachedResponse.Headers, bodyBytes)
return
}
}
}
if debug {
log.Printf("cachedResponse %s failed", cacheKey)
}
proxyResponse(writer, 502, map[string][]string{}, []byte("server is down & document not in cache"))
}
http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
requestQuery := request.URL.Query()
// the the zuck outta here with that BS
if requestQuery.Get("fbclid") != "" {
requestQuery.Del("fbclid")
redirectURL := url.URL{
Scheme: request.URL.Scheme,
Host: request.URL.Host,
Path: request.URL.Path,
RawQuery: requestQuery.Encode(),
RawFragment: request.URL.RawFragment,
}
http.Redirect(writer, request, redirectURL.String(), http.StatusFound)
}
isAdmin := strings.HasPrefix(request.URL.Path, "/admin") || strings.HasPrefix(request.URL.Path, "/api/admin")
if isAdmin {
writer.WriteHeader(418) //teapot
return
}
isWebsocket := strings.HasPrefix(request.URL.Path, "/entry")
if isWebsocket {
websocketProxyInstance.ServeHTTP(writer, request)
return
}
isAPI := strings.HasPrefix(request.URL.Path, "/api")
isConfigJson := strings.HasPrefix(request.URL.Path, "/api/config")
isStream := strings.HasPrefix(request.URL.Path, "/hls")
isDynamic := !isConfigJson && (isAPI || isStream)
if isDynamic {
response, responseBytes, err := proxyRequest(writer, request)
if err != nil {
writer.WriteHeader(502)
return
}
if debug {
log.Printf("dynamic response HTTP %d from %s", response.StatusCode, request.URL.String())
}
proxyResponse(writer, response.StatusCode, response.Header, responseBytes)
} else {
cacheKey := getCacheKey(request.URL, request.Header.Get("If-None-Match"))
if fails > 1 {
cachedResponse(writer, cacheKey)
} else {
response, responseBytes, err := proxyRequest(writer, request)
if err != nil {
writer.WriteHeader(502)
return
}
responseHeader := CachedResponse{Headers: response.Header, StatusCode: response.StatusCode}
writeFiles := false
mutex.Lock()
if !byteSliceEquals(responseBytes, memCache[cacheKey]) || !cachedResponseEquals(responseHeader, memCacheHeader[cacheKey]) {
if debug {
headerBytes, _ := json.Marshal(responseHeader)
cachedHeaderBytes, _ := json.Marshal(memCacheHeader[cacheKey])
log.Printf("!byteSliceEquals(responseBytes, memCache[cacheKey]) %d vs %d, %d vs %d", len(responseBytes), len(memCache[cacheKey]), len(headerBytes), len(cachedHeaderBytes))
if !cachedResponseEquals(responseHeader, memCacheHeader[cacheKey]) {
log.Println("compare header 2 cache")
log.Println("--------------")
log.Println(string(headerBytes))
log.Println("--------------")
log.Println(string(cachedHeaderBytes))
log.Println("--------------")
}
}
copy := make([]byte, len(responseBytes))
for i, b := range responseBytes {
copy[i] = b
}
memCache[cacheKey] = copy
memCacheHeader[cacheKey] = responseHeader
writeFiles = true
}
mutex.Unlock()
if writeFiles {
headerBytes, err := json.Marshal(responseHeader)
if err == nil {
ioutil.WriteFile(fmt.Sprintf("cache/%s_header", cacheKey), headerBytes, 0660)
}
ioutil.WriteFile(fmt.Sprintf("cache/%s_body", cacheKey), responseBytes, 0660)
}
if debug {
log.Printf("non-dynamic response (writeFiles: %t) %d for %s", writeFiles, response.StatusCode, request.URL.String())
}
proxyResponse(writer, response.StatusCode, response.Header, responseBytes)
}
}
})
go (func() {
for {
//log.Println("active health request")
response, err := client.Get(fmt.Sprintf("http://%s", backend))
mutex.Lock()
if err != nil || response.StatusCode != 200 {
if fails < 2 {
fails++
if fails == 2 {
log.Println("owncast went unhealthy")
}
}
} else if fails > 0 {
fails--
if fails == 0 {
log.Println("owncast became healthy again")
}
}
mutex.Unlock()
time.Sleep(time.Second * time.Duration(5))
}
})()
http.ListenAndServe(":8080", nil)
}
func cachedResponseEquals(a, b CachedResponse) bool {
if a.StatusCode != b.StatusCode {
return false
}
for k := range a.Headers {
if _, has := b.Headers[k]; !has {
return false
}
}
for k, bValue := range b.Headers {
if strings.ToLower(k) == "date" {
continue
}
aValue, has := a.Headers[k]
if !has {
return false
}
if len(bValue) != len(aValue) {
return false
}
for i := range aValue {
if aValue[i] != bValue[i] {
return false
}
}
}
return true
}
func byteSliceEquals(a, b []byte) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}