From 4dfe8ed8554f22c9273312bfd77d0b47f65fa2da Mon Sep 17 00:00:00 2001 From: Stephen Marshall Date: Wed, 6 Jun 2018 13:50:48 +0100 Subject: [PATCH] Improve metrics error handling (#114) * Improve metrics error handling * Updates to metrics error handling --- cmd/runmqserver/signals.go | 2 + internal/metrics/metrics.go | 66 +++++++++++++--------- internal/metrics/update.go | 106 +++++++++++++++++++++--------------- 3 files changed, 103 insertions(+), 71 deletions(-) diff --git a/cmd/runmqserver/signals.go b/cmd/runmqserver/signals.go index c7a2e79..47c3dfb 100644 --- a/cmd/runmqserver/signals.go +++ b/cmd/runmqserver/signals.go @@ -20,6 +20,7 @@ import ( "os/signal" "syscall" + "github.com/ibm-messaging/mq-container/internal/metrics" "golang.org/x/sys/unix" ) @@ -42,6 +43,7 @@ func signalHandler(qmgr string) chan int { log.Printf("Signal received: %v", sig) signal.Stop(reapSignals) signal.Stop(stopSignals) + metrics.StopMetricsGathering() stopQueueManager(qmgr) // One final reap reapZombies() diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index de31a2b..7c0aaae 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -18,9 +18,9 @@ limitations under the License. package metrics import ( + "context" "fmt" "net/http" - "sync" "time" "github.com/ibm-messaging/mq-container/internal/logger" @@ -29,29 +29,27 @@ import ( const ( defaultPort = "9157" - retryCount = 3 - retryWait = 5 +) + +var ( + metricsEnabled = false + metricsServer = &http.Server{Addr: ":" + defaultPort} ) // GatherMetrics gathers metrics for the queue manager func GatherMetrics(qmName string, log *logger.Logger) { - for i := 0; i <= retryCount; i++ { - err := startMetricsGathering(qmName, log) - if err != nil { - log.Errorf("Metrics Error: %s", err.Error()) - } - if i != retryCount { - log.Printf("Waiting %d seconds before retrying metrics gathering", retryWait) - time.Sleep(retryWait * time.Second) - } else { - log.Println("Unable to gather metrics - metrics are now disabled") - } + + metricsEnabled = true + + err := startMetricsGathering(qmName, log) + if err != nil { + log.Errorf("Metrics Error: %s", err.Error()) + StopMetricsGathering() } } // startMetricsGathering starts gathering metrics for the queue manager func startMetricsGathering(qmName string, log *logger.Logger) error { - var wg sync.WaitGroup defer func() { if r := recover(); r != nil { @@ -62,19 +60,17 @@ func startMetricsGathering(qmName string, log *logger.Logger) error { log.Println("Starting metrics gathering") // Start processing metrics - wg.Add(1) - go processMetrics(log, qmName, &wg) + go processMetrics(log, qmName) - // Wait for metrics to be ready before starting the prometheus handler - wg.Wait() + // Wait for metrics to be ready before starting the Prometheus handler + <-startChannel // Register metrics - exporter := newExporter(qmName, log) - err := prometheus.Register(exporter) + metricsExporter := newExporter(qmName, log) + err := prometheus.Register(metricsExporter) if err != nil { return fmt.Errorf("Failed to register metrics: %v", err) } - defer prometheus.Unregister(exporter) // Setup HTTP server to handle requests from Prometheus http.Handle("/metrics", prometheus.Handler()) @@ -83,10 +79,28 @@ func startMetricsGathering(qmName string, log *logger.Logger) error { w.Write([]byte("Status: METRICS ACTIVE")) }) - err = http.ListenAndServe(":"+defaultPort, nil) - if err != nil { - return fmt.Errorf("Failed to handle metrics request: %v", err) - } + go func() { + err = metricsServer.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + log.Errorf("Metrics Error: Failed to handle metrics request: %v", err) + StopMetricsGathering() + } + }() return nil } + +// StopMetricsGathering stops gathering metrics for the queue manager +func StopMetricsGathering() { + + if metricsEnabled { + + // Stop processing metrics + stopChannel <- true + + // Shutdown HTTP server + timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + metricsServer.Shutdown(timeout) + } +} diff --git a/internal/metrics/update.go b/internal/metrics/update.go index 043382c..121c5cd 100644 --- a/internal/metrics/update.go +++ b/internal/metrics/update.go @@ -20,7 +20,6 @@ package metrics import ( "fmt" "strings" - "sync" "time" "github.com/ibm-messaging/mq-container/internal/logger" @@ -33,6 +32,8 @@ const ( ) var ( + startChannel = make(chan bool) + stopChannel = make(chan bool, 2) requestChannel = make(chan bool) responseChannel = make(chan map[string]*metricData) ) @@ -44,10 +45,67 @@ type metricData struct { values map[string]float64 } -var keepRunning = true -var first = true +// processMetrics processes publications of metric data and handles describe/collect/stop requests +func processMetrics(log *logger.Logger, qmName string) { + var err error + var firstConnect = true + var metrics map[string]*metricData + + for { + // Connect to queue manager and discover available metrics + err = doConnect(qmName) + if err == nil { + if firstConnect { + firstConnect = false + startChannel <- true + } + metrics, _ = initialiseMetrics(log) + } + + // Now loop until something goes wrong + for err == nil { + + // Process publications of metric data + // TODO: If we have a large number of metrics to process, then we could be blocked from responding to stop requests + err = mqmetric.ProcessPublications() + + // Handle describe/collect/stop requests + if err == nil { + select { + case collect := <-requestChannel: + if collect { + updateMetrics(metrics) + } + responseChannel <- metrics + case <-stopChannel: + log.Println("Stopping metrics gathering") + mqmetric.EndConnection() + return + case <-time.After(requestTimeout * time.Second): + log.Debugf("Metrics: No requests received within timeout period (%d seconds)", requestTimeout) + } + } + } + log.Errorf("Metrics Error: %s", err.Error()) + + // Close the connection + mqmetric.EndConnection() + + // Handle stop requests + select { + case <-stopChannel: + log.Println("Stopping metrics gathering") + return + case <-time.After(requestTimeout * time.Second): + log.Println("Retrying metrics gathering") + } + } +} + +// doConnect connects to the queue manager and discovers available metrics func doConnect(qmName string) error { + // Set connection configuration var connConfig mqmetric.ConnectionConfig connConfig.ClientMode = false @@ -69,48 +127,6 @@ func doConnect(qmName string) error { return nil } -// processMetrics processes publications of metric data and handles describe/collect requests -func processMetrics(log *logger.Logger, qmName string, wg *sync.WaitGroup) { - var err error - var metrics map[string]*metricData - - for keepRunning { - err = doConnect(qmName) - if err == nil { - if first { - first = false - wg.Done() - } - metrics, _ = initialiseMetrics(log) - } - - // Now loop until something goes wrong - for err == nil { - - // Process publications of metric data - err = mqmetric.ProcessPublications() - - // Handle describe/collect requests - select { - case collect := <-requestChannel: - if collect { - updateMetrics(metrics) - } - responseChannel <- metrics - case <-time.After(requestTimeout * time.Second): - log.Debugf("Metrics: No requests received within timeout period (%d seconds)", requestTimeout) - } - } - log.Errorf("Metrics Error: %s", err.Error()) - - // Close the connection - mqmetric.EndConnection() - - // If we're told to keep running sleep for a bit before trying again - time.Sleep(10 * time.Second) - } -} - // initialiseMetrics sets initial details for all available metrics func initialiseMetrics(log *logger.Logger) (map[string]*metricData, error) {