From 81bba0db32935ec745463d58f3a9591e5a5a6c6c Mon Sep 17 00:00:00 2001 From: Arthur Barr Date: Mon, 29 Jan 2018 17:05:19 +0000 Subject: [PATCH] Add mirrorFunc to handle mirrored messages --- cmd/runmqserver/logging.go | 30 +++++++++++++------------ cmd/runmqserver/logging_test.go | 40 +++++++-------------------------- cmd/runmqserver/main.go | 10 ++++++++- 3 files changed, 33 insertions(+), 47 deletions(-) diff --git a/cmd/runmqserver/logging.go b/cmd/runmqserver/logging.go index 08c5315..ddd5341 100644 --- a/cmd/runmqserver/logging.go +++ b/cmd/runmqserver/logging.go @@ -18,9 +18,7 @@ package main import ( "bufio" "fmt" - "io" "os" - "strings" "time" log "github.com/sirupsen/logrus" @@ -47,19 +45,23 @@ func waitForFile(path string) (os.FileInfo, error) { return fi, nil } +type mirrorFunc func(msg string) + // mirrorAvailableMessages prints lines from the file, until no more are available -func mirrorAvailableMessages(f *os.File, w io.Writer) { +func mirrorAvailableMessages(f *os.File, mf mirrorFunc) { scanner := bufio.NewScanner(f) count := 0 for scanner.Scan() { t := scanner.Text() - if strings.HasPrefix(t, "{") { - // Assume JSON, so just print it - fmt.Fprintln(w, t) - } else if strings.HasPrefix(t, "AMQ") { - // Only print MQ messages with AMQnnnn codes - fmt.Fprintln(w, t) - } + mf(t) + // if strings.HasPrefix(t, "{") { + // // Assume JSON, so just print it + // fmt.Fprintln(w, t) + // } else if strings.HasPrefix(t, "AMQ") { + // // Only print MQ messages with AMQnnnn codes + // log.Println(t) + // //fmt.Fprintln(w, t) + // } count++ } log.Debugf("Mirrored %v log entries", count) @@ -73,7 +75,7 @@ func mirrorAvailableMessages(f *os.File, w io.Writer) { // mirrorLog tails the specified file, and logs each line to stdout. // This is useful for usability, as the container console log can show // messages from the MQ error logs. -func mirrorLog(path string, w io.Writer) (chan bool, error) { +func mirrorLog(path string, mf mirrorFunc) (chan bool, error) { lifecycle := make(chan bool) var offset int64 = -1 var f *os.File @@ -134,7 +136,7 @@ func mirrorLog(path string, w io.Writer) (chan bool, error) { for { log.Debugln("Start of loop") // If there's already data there, mirror it now. - mirrorAvailableMessages(f, w) + mirrorAvailableMessages(f, mf) log.Debugf("Stat %v", path) newFI, err := os.Stat(path) if err != nil { @@ -148,7 +150,7 @@ func mirrorLog(path string, w io.Writer) (chan bool, error) { // log rotation happens before we can open the new file, then we // could skip all those messages. This could happen with a very small // MQ error log size. - mirrorAvailableMessages(f, w) + mirrorAvailableMessages(f, mf) f.Close() // Re-open file log.Debugln("Re-opening error log file") @@ -160,7 +162,7 @@ func mirrorLog(path string, w io.Writer) (chan bool, error) { } fi = newFI // Don't seek this time, because we know it's a new file - mirrorAvailableMessages(f, w) + mirrorAvailableMessages(f, mf) } log.Debugln("Check for lifecycle event") select { diff --git a/cmd/runmqserver/logging_test.go b/cmd/runmqserver/logging_test.go index d56c854..439811f 100644 --- a/cmd/runmqserver/logging_test.go +++ b/cmd/runmqserver/logging_test.go @@ -16,8 +16,6 @@ limitations under the License. package main import ( - "bufio" - "bytes" "fmt" "io/ioutil" "os" @@ -39,9 +37,10 @@ func TestMirrorLogWithoutRotation(t *testing.T) { } t.Log(tmp.Name()) defer os.Remove(tmp.Name()) - b := make([]byte, 256) - buf := bytes.NewBuffer(b) - lifecycle, err := mirrorLog(tmp.Name(), buf) + count := 0 + lifecycle, err := mirrorLog(tmp.Name(), func(msg string) { + count++ + }) if err != nil { t.Fatal(err) } @@ -57,18 +56,6 @@ func TestMirrorLogWithoutRotation(t *testing.T) { lifecycle <- true <-lifecycle - // Read the buffer back and count the lines - r := strings.NewReader(buf.String()) - scanner := bufio.NewScanner(r) - count := 0 - for scanner.Scan() { - count++ - t.Logf("Received entry: %v", scanner.Text()) - } - err = scanner.Err() - if err != nil { - t.Fatal(err) - } if count != 3 { t.Fatalf("Expected 3 log entries; got %v", count) } @@ -90,9 +77,10 @@ func TestMirrorLogWithRotation(t *testing.T) { t.Log("Removing file") os.Remove(tmp.Name()) }() - b := make([]byte, 512) - buf := bytes.NewBuffer(b) - lifecycle, err := mirrorLog(tmp.Name(), buf) + count := 0 + lifecycle, err := mirrorLog(tmp.Name(), func(msg string) { + count++ + }) if err != nil { t.Fatal(err) } @@ -125,18 +113,6 @@ func TestMirrorLogWithRotation(t *testing.T) { // Wait until it's finished <-lifecycle - // Read the buffer back and count the lines - r := strings.NewReader(buf.String()) - scanner := bufio.NewScanner(r) - count := 0 - for scanner.Scan() { - count++ - t.Logf("Received entry: %v", scanner.Text()) - } - err = scanner.Err() - if err != nil { - t.Fatal(err) - } if count != 5 { t.Fatalf("Expected 5 log entries; got %v", count) } diff --git a/cmd/runmqserver/main.go b/cmd/runmqserver/main.go index 9cb4e81..efb93c7 100644 --- a/cmd/runmqserver/main.go +++ b/cmd/runmqserver/main.go @@ -19,6 +19,7 @@ package main import ( "errors" + "fmt" "io/ioutil" "os" "os/exec" @@ -220,10 +221,17 @@ func doMain() error { f := "/var/mqm/qmgrs/" + name + "/errors/AMQERR01" if jsonLogs() { f = f + ".json" + mirrorLifecycle, err = mirrorLog(f, func(msg string) { + // Print the message straight to stdout + fmt.Println(msg) + }) } else { f = f + ".LOG" + mirrorLifecycle, err = mirrorLog(f, func(msg string) { + // Log the message, so we get a timestamp etc. + log.Println(msg) + }) } - mirrorLifecycle, err = mirrorLog(f, os.Stdout) if err != nil { return err }