From b62a883d4c89e58746d570fa03ef65b8449c91f4 Mon Sep 17 00:00:00 2001 From: Arthur Barr Date: Thu, 15 Feb 2018 14:59:14 +0000 Subject: [PATCH] Improvements to log mirroring --- cmd/runmqserver/logging.go | 5 ++- cmd/runmqserver/logging_test.go | 51 +++++++++++++++++++++- cmd/runmqserver/main.go | 62 +++++++++++++++------------ internal/mqini/dspmqinf1.txt | 5 +++ internal/mqini/dspmqinf2.txt | 5 +++ internal/mqini/dspmqinf3.txt | 5 +++ internal/mqini/mqini.go | 75 +++++++++++++++++++++++++++++++++ internal/mqini/mqini_test.go | 64 ++++++++++++++++++++++++++++ 8 files changed, 241 insertions(+), 31 deletions(-) create mode 100644 internal/mqini/dspmqinf1.txt create mode 100644 internal/mqini/dspmqinf2.txt create mode 100644 internal/mqini/dspmqinf3.txt create mode 100644 internal/mqini/mqini.go create mode 100644 internal/mqini/mqini_test.go diff --git a/cmd/runmqserver/logging.go b/cmd/runmqserver/logging.go index ddd5341..613767f 100644 --- a/cmd/runmqserver/logging.go +++ b/cmd/runmqserver/logging.go @@ -75,7 +75,7 @@ func mirrorAvailableMessages(f *os.File, mf mirrorFunc) { // mirrorLog tails the specified file, and logs each line to stdout. // This is useful for usability, as the container console log can show // messages from the MQ error logs. -func mirrorLog(path string, mf mirrorFunc) (chan bool, error) { +func mirrorLog(path string, fromStart bool, mf mirrorFunc) (chan bool, error) { lifecycle := make(chan bool) var offset int64 = -1 var f *os.File @@ -128,7 +128,8 @@ func mirrorLog(path string, mf mirrorFunc) (chan bool, error) { return } // The file now exists. If it didn't exist before we started, offset=0 - if offset != 0 { + // Always start at the beginning if we've been told to go from the start + if offset != 0 && !fromStart { log.Debugf("Seeking %v", offset) f.Seek(offset, 0) } diff --git a/cmd/runmqserver/logging_test.go b/cmd/runmqserver/logging_test.go index 439811f..5dddc66 100644 --- a/cmd/runmqserver/logging_test.go +++ b/cmd/runmqserver/logging_test.go @@ -38,7 +38,7 @@ func TestMirrorLogWithoutRotation(t *testing.T) { t.Log(tmp.Name()) defer os.Remove(tmp.Name()) count := 0 - lifecycle, err := mirrorLog(tmp.Name(), func(msg string) { + lifecycle, err := mirrorLog(tmp.Name(), true, func(msg string) { count++ }) if err != nil { @@ -78,7 +78,7 @@ func TestMirrorLogWithRotation(t *testing.T) { os.Remove(tmp.Name()) }() count := 0 - lifecycle, err := mirrorLog(tmp.Name(), func(msg string) { + lifecycle, err := mirrorLog(tmp.Name(), true, func(msg string) { count++ }) if err != nil { @@ -120,6 +120,53 @@ func TestMirrorLogWithRotation(t *testing.T) { } } +func testMirrorLogExistingFile(t *testing.T, newQM bool) int { + tmp, err := ioutil.TempFile("", t.Name()) + if err != nil { + t.Fatal(err) + } + t.Log(tmp.Name()) + log.Println("Logging 1 message before we start") + ioutil.WriteFile(tmp.Name(), []byte("{\"message\"=\"A\"}\n"), 0600) + defer os.Remove(tmp.Name()) + count := 0 + lifecycle, err := mirrorLog(tmp.Name(), newQM, func(msg string) { + count++ + }) + if err != nil { + t.Fatal(err) + } + f, err := os.OpenFile(tmp.Name(), os.O_APPEND|os.O_WRONLY, 0700) + if err != nil { + t.Fatal(err) + } + log.Println("Logging 2 new JSON messages") + fmt.Fprintln(f, "{\"message\"=\"B\"}") + fmt.Fprintln(f, "{\"message\"=\"C\"}") + f.Close() + lifecycle <- true + <-lifecycle + return count +} + +// TestMirrorLogExistingFile tests that we only get new log messages, if the +// log file already exists +func TestMirrorLogExistingFile(t *testing.T) { + count := testMirrorLogExistingFile(t, false) + if count != 2 { + t.Fatalf("Expected 2 log entries; got %v", count) + } +} + +// TestMirrorLogExistingFileButNewQueueManager tests that we only get all log +// messages, even if the file exists, if we tell it we want all messages +func TestMirrorLogExistingFileButNewQueueManager(t *testing.T) { + count := testMirrorLogExistingFile(t, true) + if count != 3 { + t.Fatalf("Expected 3 log entries; got %v", count) + } +} + func init() { log.SetLevel(log.DebugLevel) } diff --git a/cmd/runmqserver/main.go b/cmd/runmqserver/main.go index 43e65da..4226628 100644 --- a/cmd/runmqserver/main.go +++ b/cmd/runmqserver/main.go @@ -18,6 +18,7 @@ limitations under the License. package main import ( + "encoding/json" "errors" "fmt" "io" @@ -31,6 +32,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/ibm-messaging/mq-container/internal/command" + "github.com/ibm-messaging/mq-container/internal/mqini" "github.com/ibm-messaging/mq-container/internal/name" "github.com/ibm-messaging/mq-container/internal/ready" ) @@ -60,19 +62,22 @@ func createDirStructure() error { return nil } -func createQueueManager(name string) error { +// createQueueManager creates a queue manager, if it doesn't already exist. +// It returns true if one was created, or false if one already existed +func createQueueManager(name string) (bool, error) { log.Printf("Creating queue manager %v", name) out, rc, err := command.Run("crtmqm", "-q", "-p", "1414", name) if err != nil { // 8=Queue manager exists, which is fine - if rc != 8 { - log.Printf("crtmqm returned %v", rc) - log.Println(string(out)) - return err + if rc == 8 { + log.Printf("Detected existing queue manager %v", name) + return false, nil } - log.Printf("Detected existing queue manager %v", name) + log.Printf("crtmqm returned %v", rc) + log.Println(string(out)) + return false, err } - return nil + return true, nil } func updateCommandLevel() error { @@ -159,34 +164,42 @@ func jsonLogs() bool { return false } -func mirrorLogs(name string) (chan bool, error) { - f := "/var/mqm/qmgrs/" + name + "/errors/AMQERR01" +func mirrorLogs(name string, fromStart bool) (chan bool, error) { + // Always use the JSON log as the source + // Put the queue manager name in quotes to handle cases like name=.. + qm, err := mqini.GetQueueManager(name) + if err != nil { + logDebugf("%v", err) + return nil, err + } + f := filepath.Join(mqini.GetErrorLogDirectory(qm), "AMQERR01.json") + // f := fmt.Sprintf("/var/mqm/qmgrs/\"%v\"/errors/AMQERR01.json", name) if jsonLogs() { - f = f + ".json" - return mirrorLog(f, func(msg string) { + return mirrorLog(f, fromStart, func(msg string) { // Print the message straight to stdout fmt.Println(msg) }) } - f = f + ".LOG" - return mirrorLog(f, func(msg string) { - if strings.HasPrefix(msg, "AMQ") { - // Log the message, so we get a timestamp etc. - log.Println(msg) - } + return mirrorLog(f, fromStart, func(msg string) { + // Parse the JSON message, and print a simplified version + var obj map[string]interface{} + json.Unmarshal([]byte(msg), &obj) + fmt.Printf("%v %v\n", obj["ibm_datetime"], obj["message"]) }) } type simpleTextFormatter struct { } +const timestampFormat string = "2006-01-02T15:04:05.000Z07:00" + func (f *simpleTextFormatter) Format(entry *logrus.Entry) ([]byte, error) { // If debugging, and a prefix, but only for this formatter. if entry.Level == logrus.DebugLevel { entry.Message = "DEBUG: " + entry.Message } - // Use a simple, human-readable format, with a timestamp - return []byte(fmt.Sprintf("%s %s\n", entry.Time.Format("2006/01/02 15:04:05"), entry.Message)), nil + // Use a simple format, with a timestamp + return []byte(fmt.Sprintf("%v %v\n", entry.Time.Format(timestampFormat), entry.Message)), nil } func configureLogger() { @@ -198,16 +211,11 @@ func configureLogger() { logrus.FieldKeyTime: "ibm_datetime", }, // Match time stamp format used by MQ messages (includes milliseconds) - TimestampFormat: "2006-01-02T15:04:05.000Z07:00", + TimestampFormat: timestampFormat, } logrus.SetFormatter(&formatter) } else { log.SetFormatter(new(simpleTextFormatter)) - - // formatter := logrus.TextFormatter{ - // FullTimestamp: true, - // } - // logrus.SetFormatter(&formatter) } } @@ -250,11 +258,11 @@ func doMain() error { if err != nil { return err } - mirrorLifecycle, err := mirrorLogs(name) + newQM, err := createQueueManager(name) if err != nil { return err } - err = createQueueManager(name) + mirrorLifecycle, err := mirrorLogs(name, newQM) if err != nil { return err } diff --git a/internal/mqini/dspmqinf1.txt b/internal/mqini/dspmqinf1.txt new file mode 100644 index 0000000..ceff4e4 --- /dev/null +++ b/internal/mqini/dspmqinf1.txt @@ -0,0 +1,5 @@ +QueueManager: + Name=foo + Directory=foo + Prefix=/var/mqm + InstallationName=Installation1 \ No newline at end of file diff --git a/internal/mqini/dspmqinf2.txt b/internal/mqini/dspmqinf2.txt new file mode 100644 index 0000000..409acce --- /dev/null +++ b/internal/mqini/dspmqinf2.txt @@ -0,0 +1,5 @@ +QueueManager: + Name=a/b + Directory=a&b + Prefix=/var/mqm + InstallationName=Installation1 \ No newline at end of file diff --git a/internal/mqini/dspmqinf3.txt b/internal/mqini/dspmqinf3.txt new file mode 100644 index 0000000..a0528dc --- /dev/null +++ b/internal/mqini/dspmqinf3.txt @@ -0,0 +1,5 @@ +QueueManager: + Name=.. + Directory=!! + Prefix=/var/mqm + InstallationName=Installation1 \ No newline at end of file diff --git a/internal/mqini/mqini.go b/internal/mqini/mqini.go new file mode 100644 index 0000000..571c77e --- /dev/null +++ b/internal/mqini/mqini.go @@ -0,0 +1,75 @@ +/* +© Copyright IBM Corporation 2018 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package mqini + +import ( + "bufio" + "bytes" + "os/exec" + "path/filepath" + "strings" +) + +// QueueManager describe high-level configuration information for a queue manager +type QueueManager struct { + Name string + Prefix string + Directory string + DataPath string + InstallationName string +} + +// getQueueManagerFromStanza parses a queue manager stanza +func getQueueManagerFromStanza(stanza []byte) (*QueueManager, error) { + scanner := bufio.NewScanner(bytes.NewReader(stanza)) + qm := QueueManager{} + for scanner.Scan() { + l := scanner.Text() + l = strings.TrimSpace(l) + t := strings.Split(l, "=") + switch t[0] { + case "Name": + qm.Name = t[1] + case "Prefix": + qm.Prefix = t[1] + case "Directory": + qm.Directory = t[1] + case "DataPath": + qm.DataPath = t[1] + case "InstallationName": + qm.InstallationName = t[1] + } + } + return &qm, scanner.Err() +} + +// GetQueueManager returns queue manager configuration information +func GetQueueManager(name string) (*QueueManager, error) { + // dspmqinf essentially returns a subset of mqs.ini, but it's simpler to parse + cmd := exec.Command("dspmqinf", "-o", "stanza", name) + // Run the command and wait for completion + out, err := cmd.CombinedOutput() + if err != nil { + return nil, err + } + return getQueueManagerFromStanza(out) +} + +// GetErrorLogDirectory returns the directory holding the error logs for the +// specified queue manager +func GetErrorLogDirectory(qm *QueueManager) string { + return filepath.Join(qm.Prefix, "qmgrs", qm.Directory, "errors") +} diff --git a/internal/mqini/mqini_test.go b/internal/mqini/mqini_test.go new file mode 100644 index 0000000..ed25c56 --- /dev/null +++ b/internal/mqini/mqini_test.go @@ -0,0 +1,64 @@ +/* +© Copyright IBM Corporation 2018 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package mqini + +import ( + "io/ioutil" + "testing" +) + +var getQueueManagerTests = []struct { + file string + name string + prefix string + directory string + errorLogDir string +}{ + {"dspmqinf1.txt", "foo", "/var/mqm", "foo", "/var/mqm/qmgrs/foo/errors"}, + {"dspmqinf2.txt", "a/b", "/var/mqm", "a&b", "/var/mqm/qmgrs/a&b/errors"}, + {"dspmqinf3.txt", "..", "/var/mqm", "!!", "/var/mqm/qmgrs/!!/errors"}, +} + +func TestGetQueueManager(t *testing.T) { + for _, table := range getQueueManagerTests { + t.Run(table.file, func(t *testing.T) { + b, err := ioutil.ReadFile(table.file) + if err != nil { + t.Fatal(err) + } + qm, err := getQueueManagerFromStanza(b) + if err != nil { + t.Fatal(err) + } + t.Logf("%#v", qm) + if qm.Name != table.name { + t.Errorf("Expected name=%v; got %v", table.name, qm.Name) + } + if qm.Prefix != table.prefix { + t.Errorf("Expected prefix=%v; got %v", table.prefix, qm.Prefix) + } + if qm.Directory != table.directory { + t.Errorf("Expected directory=%v; got %v", table.directory, qm.Directory) + } + + // Test + d := GetErrorLogDirectory(qm) + if d != table.errorLogDir { + t.Errorf("Expected error log directory=%v; got %v", table.errorLogDir, d) + } + }) + } +}