parallize checks per instance using go routines

This commit is contained in:
Bnyro 2023-07-21 23:49:00 +02:00
parent 2e081e3641
commit c369f2b7bc

187
main.go
View File

@ -44,92 +44,141 @@ type FrontendConfig struct {
S3Enabled bool `json:"s3Enabled"` S3Enabled bool `json:"s3Enabled"`
} }
func testUrl(url string) (*http.Response, error) {
resp, err := http.Get(url)
if err != nil {
return resp, err
}
if resp.StatusCode != 200 {
return resp, errors.New(fmt.Sprintf("Invalid response code at %s: %d", url, resp.StatusCode))
}
return resp, err
}
func testCaching(ApiUrl string) (bool, error) {
resp, err := testUrl(ApiUrl + "/trending?region=US")
if err != nil {
return false, err
}
oldTiming := resp.Header.Get("Server-Timing")
resp, err = testUrl(ApiUrl + "/trending?region=US")
if err != nil {
return false, err
}
newTiming := resp.Header.Get("Server-Timing")
cacheWorking := oldTiming == newTiming
return cacheWorking, nil
}
func getConfig(ApiUrl string) (FrontendConfig, error) {
resp, err := testUrl(ApiUrl + "/config")
if err != nil {
return FrontendConfig{}, err
}
bytes, err := io.ReadAll(resp.Body)
if err != nil {
return FrontendConfig{}, err
}
var config FrontendConfig
err = json.Unmarshal(bytes, &config)
if err != nil {
return FrontendConfig{}, err
}
return config, nil
}
func getInstanceDetails(line string, latest string) (Instance, error) { func getInstanceDetails(line string, latest string) (Instance, error) {
split := strings.Split(line, "|") split := strings.Split(line, "|")
if len(split) >= 5 { if len(split) < 5 {
return Instance{}, errors.New(fmt.Sprintf("Invalid line: %s", line))
}
ApiUrl := strings.TrimSpace(split[1]) ApiUrl := strings.TrimSpace(split[1])
resp, err := http.Get(ApiUrl + "/healthcheck") wg := sync.WaitGroup{}
errorChannel := make(chan error)
// the amount of tests to do
wg.Add(6)
var lastChecked int64
var registered int64
var config FrontendConfig
var hash string
var version string
var cacheWorking bool
go func() {
wg.Wait()
close(errorChannel)
}()
go func() {
defer wg.Done()
if _, err := testUrl(ApiUrl + "/healthcheck"); err != nil {
errorChannel <- err
return
}
lastChecked = time.Now().Unix()
}()
go func() {
defer wg.Done()
resp, err := testUrl(ApiUrl + "/registered/badge")
if err != nil { if err != nil {
return Instance{}, err errorChannel <- err
return
} }
LastChecked := time.Now().Unix() registered, err = strconv.ParseInt(number_re.FindString(resp.Request.URL.Path), 10, 32)
if resp.StatusCode != 200 {
return Instance{}, errors.New("Invalid response code")
}
resp, err = http.Get(ApiUrl + "/registered/badge")
if err != nil { if err != nil {
return Instance{}, err errorChannel <- err
} }
registered, err := strconv.ParseInt(number_re.FindString(resp.Request.URL.Path), 10, 32) }()
go func() {
defer wg.Done()
resp, err := testUrl(ApiUrl + "/version")
if err != nil { if err != nil {
return Instance{}, err errorChannel <- err
} return
resp, err = http.Get(ApiUrl + "/version")
if err != nil {
return Instance{}, err
}
if resp.StatusCode != 200 {
return Instance{}, errors.New("Invalid response code")
} }
buf := new(strings.Builder) buf := new(strings.Builder)
_, err = io.Copy(buf, resp.Body) _, err = io.Copy(buf, resp.Body)
if err != nil { if err != nil {
return Instance{}, err errorChannel <- err
return
} }
version := strings.TrimSpace(buf.String()) version = strings.TrimSpace(buf.String())
version_split := strings.Split(version, "-") version_split := strings.Split(version, "-")
hash := version_split[len(version_split)-1] hash = version_split[len(version_split)-1]
}()
resp, err = http.Get(ApiUrl + "/config") go func() {
defer wg.Done()
var err error
config, err = getConfig(ApiUrl)
if err != nil { if err != nil {
return Instance{}, err errorChannel <- err
}
if resp.StatusCode != 200 {
return Instance{}, errors.New("Invalid response code")
} }
}()
bytes, err := io.ReadAll(resp.Body) go func() {
defer wg.Done()
var err error
cacheWorking, err = testCaching(ApiUrl)
if err != nil { if err != nil {
return Instance{}, err errorChannel <- err
}
var config FrontendConfig
err = json.Unmarshal(bytes, &config)
if err != nil {
return Instance{}, err
}
cache_working := false
resp, err = http.Get(ApiUrl + "/trending?region=US")
if err != nil {
return Instance{}, err
}
if resp.StatusCode == 200 {
old_timing := resp.Header.Get("Server-Timing")
resp, err = http.Get(ApiUrl + "/trending?region=US")
if err != nil {
return Instance{}, err
}
if resp.StatusCode == 200 {
new_timing := resp.Header.Get("Server-Timing")
if old_timing == new_timing {
cache_working = true
}
}
} }
}()
go func() {
defer wg.Done()
// check if instance can fetch videos // check if instance can fetch videos
resp, err = http.Get(ApiUrl + "/streams/jNQXAC9IVRw") if _, err := testUrl(ApiUrl + "/streams/jNQXAC9IVRw"); err != nil {
if err != nil { errorChannel <- err
return Instance{}, err
} }
if resp.StatusCode != 200 { }()
return Instance{}, errors.New("Invalid status code")
for err := range errorChannel {
return Instance{}, err
} }
return Instance{ return Instance{
@ -138,14 +187,12 @@ func getInstanceDetails(line string, latest string) (Instance, error) {
Locations: strings.TrimSpace(split[2]), Locations: strings.TrimSpace(split[2]),
Cdn: strings.TrimSpace(split[3]) == "Yes", Cdn: strings.TrimSpace(split[3]) == "Yes",
Registered: int(registered), Registered: int(registered),
LastChecked: LastChecked, LastChecked: lastChecked,
Version: version, Version: version,
UpToDate: strings.Contains(latest, hash), UpToDate: strings.Contains(latest, hash),
Cache: cache_working, Cache: cacheWorking,
S3Enabled: config.S3Enabled, S3Enabled: config.S3Enabled,
}, nil }, nil
}
return Instance{}, errors.New("Invalid line")
} }
func monitorInstances() { func monitorInstances() {
@ -184,7 +231,6 @@ func monitorInstances() {
} }
if resp.StatusCode == 200 { if resp.StatusCode == 200 {
// parse the response // parse the response
buf := new(strings.Builder) buf := new(strings.Builder)
_, err := io.Copy(buf, resp.Body) _, err := io.Copy(buf, resp.Body)
@ -200,20 +246,20 @@ func monitorInstances() {
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
for index, line := range lines { for index, line := range lines {
if index < 3 { // skip first two and last line
fmt.Println(line) if index < 2 || index == len(lines)-1 {
continue continue
} }
wg.Add(1) wg.Add(1)
go func(line string) { go func(line string) {
defer wg.Done()
instance, err := getInstanceDetails(line, latest) instance, err := getInstanceDetails(line, latest)
if err == nil { if err == nil {
instances = append(instances, instance) instances = append(instances, instance)
} else { } else {
log.Print(err) log.Print(err)
} }
wg.Done()
}(line) }(line)
} }
wg.Wait() wg.Wait()
@ -237,5 +283,6 @@ func main() {
return c.JSON(monitored_instances) return c.JSON(monitored_instances)
}) })
fmt.Println("Listening on http://localhost:3000")
app.Listen(":3000") app.Listen(":3000")
} }