From 6c72c894f775752a8ec8944d73dde4264711f0ff Mon Sep 17 00:00:00 2001 From: Stephen Marshall Date: Wed, 1 May 2019 14:42:25 +0100 Subject: [PATCH] add multi-instance Queue Managers (#307) * Initial code to implement multi-instance queue manager * alter default mqsc to prevent race between listeners on standby startup * Updates to multi-instance queue manager code * initial multi instance test * Multi-instance code improvements * Multi instance fixes and first test * configure queue manager * Add mirror log filtering for mult-instance QMs * Add log message for multi-instance enabled * Improvements to container runtime logging * refactor test * Test active standby switch * Improve createQueueManager function * Test multi instance race * wait * multi instance mount tests * skip race test * mount tests * no mount test * single instance split mount tests * readiness check * More updates for handling standby queue manager * Improve standby checks * Minor fixes to miqm * Fix logging of JSON errors * Update copyrights * Fix log includes --- cmd/chkmqhealthy/main.go | 4 +- cmd/chkmqready/main.go | 24 +- cmd/runmqdevserver/logruntime.go | 68 ----- cmd/runmqdevserver/main.go | 7 +- cmd/runmqserver/crtmqvol.go | 6 +- cmd/runmqserver/logging.go | 43 +++- cmd/runmqserver/main.go | 29 ++- cmd/runmqserver/mirror.go | 19 +- cmd/runmqserver/mirror_test.go | 22 +- cmd/runmqserver/qmgr.go | 121 +++++++-- .../mqadvanced-server-dev/10-dev.mqsc.tpl | 1 + install-mq.sh | 4 + internal/containerruntime/runtime_linux.go | 9 + .../containerruntimelogger}/logruntime.go | 21 +- internal/logger/logger.go | 5 - internal/metrics/metrics.go | 12 +- internal/mqini/mqini.go | 5 +- internal/ready/ready.go | 26 +- test/docker/docker_api_test.go | 62 ++++- test/docker/docker_api_test_util.go | 17 +- test/docker/mq_multi_instance_test.go | 234 ++++++++++++++++++ test/docker/mq_multi_instance_test_util.go | 178 +++++++++++++ 22 files changed, 773 insertions(+), 144 deletions(-) delete mode 100644 cmd/runmqdevserver/logruntime.go rename {cmd/runmqserver => internal/containerruntimelogger}/logruntime.go (74%) create mode 100644 test/docker/mq_multi_instance_test.go create mode 100644 test/docker/mq_multi_instance_test_util.go diff --git a/cmd/chkmqhealthy/main.go b/cmd/chkmqhealthy/main.go index 1420805..6fea70b 100644 --- a/cmd/chkmqhealthy/main.go +++ b/cmd/chkmqhealthy/main.go @@ -1,5 +1,5 @@ /* -© Copyright IBM Corporation 2017 +© Copyright IBM Corporation 2017, 2019 Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -41,7 +41,7 @@ func queueManagerHealthy() (bool, error) { return false, err } fmt.Printf("%s", out) - if !strings.Contains(string(out), "(RUNNING)") { + if !strings.Contains(string(out), "(RUNNING)") && !strings.Contains(string(out), "(RUNNING AS STANDBY)") { return false, nil } return true, nil diff --git a/cmd/chkmqready/main.go b/cmd/chkmqready/main.go index 3f978d1..c1f06fe 100644 --- a/cmd/chkmqready/main.go +++ b/cmd/chkmqready/main.go @@ -1,5 +1,5 @@ /* -© Copyright IBM Corporation 2017, 2018 +© Copyright IBM Corporation 2017, 2019 Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import ( "net" "os" + "github.com/ibm-messaging/mq-container/internal/name" "github.com/ibm-messaging/mq-container/internal/ready" ) @@ -31,14 +32,25 @@ func main() { if !r || err != nil { os.Exit(1) } - // Check if the queue manager has a running listener - conn, err := net.Dial("tcp", "127.0.0.1:1414") + name, err := name.GetQueueManagerName() if err != nil { fmt.Println(err) os.Exit(1) } - err = conn.Close() - if err != nil { - fmt.Println(err) + + // Check if the queue manager has a running listener + if standby, _ := ready.IsRunningAsStandbyQM(name); !standby { + conn, err := net.Dial("tcp", "127.0.0.1:1414") + if err != nil { + fmt.Println(err) + os.Exit(1) + } + err = conn.Close() + if err != nil { + fmt.Println(err) + } + } else { + fmt.Printf("Detected queue manager running in standby mode") + os.Exit(10) } } diff --git a/cmd/runmqdevserver/logruntime.go b/cmd/runmqdevserver/logruntime.go deleted file mode 100644 index efa9694..0000000 --- a/cmd/runmqdevserver/logruntime.go +++ /dev/null @@ -1,68 +0,0 @@ -/* -© Copyright IBM Corporation 2017, 2019 - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -package main - -import ( - "runtime" - "strings" - - containerruntime "github.com/ibm-messaging/mq-container/internal/containerruntime" - "github.com/ibm-messaging/mq-container/internal/user" -) - -func logContainerDetails() { - log.Printf("CPU architecture: %v", runtime.GOARCH) - kv, err := containerruntime.GetKernelVersion() - if err == nil { - log.Printf("Linux kernel version: %v", kv) - } - cr, err := containerruntime.GetContainerRuntime() - if err == nil { - log.Printf("Container runtime: %v", cr) - } - bi, err := containerruntime.GetBaseImage() - if err == nil { - log.Printf("Base image: %v", bi) - } - u, err := user.GetUser() - if err == nil { - if len(u.SupplementalGID) == 0 { - log.Printf("Running as user ID %v (%v) with primary group %v", u.UID, u.Name, u.PrimaryGID) - } else { - log.Printf("Running as user ID %v (%v) with primary group %v, and supplementary groups %v", u.UID, u.Name, u.PrimaryGID, strings.Join(u.SupplementalGID, ",")) - } - } - caps, err := containerruntime.GetCapabilities() - capLogged := false - if err == nil { - for k, v := range caps { - if len(v) > 0 { - log.Printf("Capabilities (%s set): %v", strings.ToLower(k), strings.Join(v, ",")) - capLogged = true - } - } - if !capLogged { - log.Print("Capabilities: none") - } - } else { - log.Errorf("Error getting capabilities: %v", err) - } - sc, err := containerruntime.GetSeccomp() - if err == nil { - log.Printf("seccomp enforcing mode: %v", sc) - } - log.Printf("Process security attributes: %v", containerruntime.GetSecurityAttributes()) -} diff --git a/cmd/runmqdevserver/main.go b/cmd/runmqdevserver/main.go index 0cfc366..5ffa939 100644 --- a/cmd/runmqdevserver/main.go +++ b/cmd/runmqdevserver/main.go @@ -23,6 +23,7 @@ import ( "syscall" "github.com/ibm-messaging/mq-container/internal/command" + containerruntimelogger "github.com/ibm-messaging/mq-container/internal/containerruntimelogger" "github.com/ibm-messaging/mq-container/internal/logger" "github.com/ibm-messaging/mq-container/internal/name" ) @@ -117,7 +118,11 @@ func doMain() error { return err } - logContainerDetails() + err = containerruntimelogger.LogContainerDetails(log) + if err != nil { + logTermination(err) + return err + } adminPassword, set := os.LookupEnv("MQ_ADMIN_PASSWORD") if set { diff --git a/cmd/runmqserver/crtmqvol.go b/cmd/runmqserver/crtmqvol.go index cadd833..e064e72 100644 --- a/cmd/runmqserver/crtmqvol.go +++ b/cmd/runmqserver/crtmqvol.go @@ -1,5 +1,5 @@ /* -© Copyright IBM Corporation 2017, 2018 +© Copyright IBM Corporation 2017, 2019 Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -17,15 +17,13 @@ package main import ( "os" - "path/filepath" "runtime" "syscall" "github.com/ibm-messaging/mq-container/internal/command" ) -func createVolume(path string) error { - dataPath := filepath.Join(path, "data") +func createVolume(dataPath string) error { fi, err := os.Stat(dataPath) if err != nil { if os.IsNotExist(err) { diff --git a/cmd/runmqserver/logging.go b/cmd/runmqserver/logging.go index f2f30a8..1b35d3c 100644 --- a/cmd/runmqserver/logging.go +++ b/cmd/runmqserver/logging.go @@ -23,6 +23,7 @@ import ( "os" "os/exec" "path/filepath" + "strings" "sync" "github.com/ibm-messaging/mq-container/internal/command" @@ -66,7 +67,7 @@ func formatSimple(datetime string, message string) string { // mirrorSystemErrorLogs starts a goroutine to mirror the contents of the MQ system error logs func mirrorSystemErrorLogs(ctx context.Context, wg *sync.WaitGroup, mf mirrorFunc) (chan error, error) { // Always use the JSON log as the source - return mirrorLog(ctx, wg, "/var/mqm/errors/AMQERR01.json", false, mf) + return mirrorLog(ctx, wg, "/var/mqm/errors/AMQERR01.json", false, mf, false) } // mirrorQueueManagerErrorLogs starts a goroutine to mirror the contents of the MQ queue manager error logs @@ -78,7 +79,7 @@ func mirrorQueueManagerErrorLogs(ctx context.Context, wg *sync.WaitGroup, name s return nil, err } f := filepath.Join(mqini.GetErrorLogDirectory(qm), "AMQERR01.json") - return mirrorLog(ctx, wg, f, fromStart, mf) + return mirrorLog(ctx, wg, f, fromStart, mf, true) } func getDebug() bool { @@ -99,21 +100,35 @@ func configureLogger(name string) (mirrorFunc, error) { if err != nil { return nil, err } - return log.LogDirect, nil + return func(msg string, isQMLog bool) bool { + obj, err := processLogMessage(msg) + if err == nil && isQMLog && filterQMLogMessage(obj) { + return false + } + if err != nil { + log.Printf("Failed to unmarshall JSON - %v", msg) + } else { + fmt.Println(msg) + } + return true + }, nil case "basic": log, err = logger.NewLogger(os.Stderr, d, false, name) if err != nil { return nil, err } - return func(msg string) { + return func(msg string, isQMLog bool) bool { // Parse the JSON message, and print a simplified version - var obj map[string]interface{} - err := json.Unmarshal([]byte(msg), &obj) + obj, err := processLogMessage(msg) + if err == nil && isQMLog && filterQMLogMessage(obj) { + return false + } if err != nil { - fmt.Printf("Failed to Unmarshall JSON - %v", err) + log.Printf("Failed to unmarshall JSON - %v", err) } else { fmt.Printf(formatSimple(obj["ibm_datetime"].(string), obj["message"].(string))) } + return true }, nil default: log, err = logger.NewLogger(os.Stdout, d, false, name) @@ -124,6 +139,20 @@ func configureLogger(name string) (mirrorFunc, error) { } } +func processLogMessage(msg string) (map[string]interface{}, error) { + var obj map[string]interface{} + err := json.Unmarshal([]byte(msg), &obj) + return obj, err +} + +func filterQMLogMessage(obj map[string]interface{}) bool { + hostname, err := os.Hostname() + if os.Getenv("MQ_MULTI_INSTANCE") == "true" && err == nil && !strings.Contains(obj["host"].(string), hostname) { + return true + } + return false +} + func logDiagnostics() { log.Debug("--- Start Diagnostics ---") diff --git a/cmd/runmqserver/main.go b/cmd/runmqserver/main.go index 2814e27..d25810b 100644 --- a/cmd/runmqserver/main.go +++ b/cmd/runmqserver/main.go @@ -24,6 +24,7 @@ import ( "os" "sync" + containerruntimelogger "github.com/ibm-messaging/mq-container/internal/containerruntimelogger" "github.com/ibm-messaging/mq-container/internal/metrics" "github.com/ibm-messaging/mq-container/internal/name" "github.com/ibm-messaging/mq-container/internal/ready" @@ -45,7 +46,7 @@ func doMain() error { // Check whether they only want debug info if *infoFlag { logVersionInfo() - err = logContainerDetails() + err = containerruntimelogger.LogContainerDetails(log) if err != nil { log.Printf("Error displaying container details: %v", err) } @@ -86,14 +87,24 @@ func doMain() error { collectDiagOnFail = true if *devFlag == false { - err = logContainerDetails() + err = containerruntimelogger.LogContainerDetails(log) if err != nil { logTermination(err) return err } } - err = createVolume("/mnt/mqm") + err = createVolume("/mnt/mqm/data") + if err != nil { + logTermination(err) + return err + } + err = createVolume("/mnt/mqm-log/log") + if err != nil { + logTermination(err) + return err + } + err = createVolume("/mnt/mqm-data/qmgrs") if err != nil { logTermination(err) return err @@ -149,15 +160,17 @@ func doMain() error { logTermination(err) return err } - err = startQueueManager() + err = startQueueManager(name) if err != nil { logTermination(err) return err } - err = configureQueueManager() - if err != nil { - logTermination(err) - return err + if standby, _ := ready.IsRunningAsStandbyQM(name); !standby { + err = configureQueueManager() + if err != nil { + logTermination(err) + return err + } } enableMetrics := os.Getenv("MQ_ENABLE_METRICS") diff --git a/cmd/runmqserver/mirror.go b/cmd/runmqserver/mirror.go index 19b44e7..9f22ca0 100644 --- a/cmd/runmqserver/mirror.go +++ b/cmd/runmqserver/mirror.go @@ -1,5 +1,5 @@ /* -© Copyright IBM Corporation 2018 +© Copyright IBM Corporation 2018, 2019 Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -49,16 +49,17 @@ func waitForFile(ctx context.Context, path string) (os.FileInfo, error) { } } -type mirrorFunc func(msg string) +type mirrorFunc func(msg string, isQMLog bool) bool // mirrorAvailableMessages prints lines from the file, until no more are available -func mirrorAvailableMessages(f *os.File, mf mirrorFunc) { +func mirrorAvailableMessages(f *os.File, mf mirrorFunc, isQMLog bool) { scanner := bufio.NewScanner(f) count := 0 for scanner.Scan() { t := scanner.Text() - mf(t) - count++ + if mf(t, isQMLog) { + count++ + } } if count > 0 { log.Debugf("Mirrored %v log entries from %v", count, f.Name()) @@ -73,7 +74,7 @@ func mirrorAvailableMessages(f *os.File, mf mirrorFunc) { // mirrorLog tails the specified file, and logs each line to stdout. // This is useful for usability, as the container console log can show // messages from the MQ error logs. -func mirrorLog(ctx context.Context, wg *sync.WaitGroup, path string, fromStart bool, mf mirrorFunc) (chan error, error) { +func mirrorLog(ctx context.Context, wg *sync.WaitGroup, path string, fromStart bool, mf mirrorFunc, isQMLog bool) (chan error, error) { errorChannel := make(chan error, 1) var offset int64 = -1 var f *os.File @@ -147,7 +148,7 @@ func mirrorLog(ctx context.Context, wg *sync.WaitGroup, path string, fromStart b closing := false for { // If there's already data there, mirror it now. - mirrorAvailableMessages(f, mf) + mirrorAvailableMessages(f, mf, isQMLog) // Wait for the new log file (after rotation) newFI, err := waitForFile(ctx, path) if err != nil { @@ -161,7 +162,7 @@ func mirrorLog(ctx context.Context, wg *sync.WaitGroup, path string, fromStart b // log rotation happens before we can open the new file, then we // could skip all those messages. This could happen with a very small // MQ error log size. - mirrorAvailableMessages(f, mf) + mirrorAvailableMessages(f, mf, isQMLog) err = f.Close() if err != nil { log.Errorf("Unable to close mirror file handle: %v", err) @@ -176,7 +177,7 @@ func mirrorLog(ctx context.Context, wg *sync.WaitGroup, path string, fromStart b } fi = newFI // Don't seek this time, because we know it's a new file - mirrorAvailableMessages(f, mf) + mirrorAvailableMessages(f, mf, isQMLog) } select { case <-ctx.Done(): diff --git a/cmd/runmqserver/mirror_test.go b/cmd/runmqserver/mirror_test.go index 117a97c..3d07402 100644 --- a/cmd/runmqserver/mirror_test.go +++ b/cmd/runmqserver/mirror_test.go @@ -1,5 +1,5 @@ /* -© Copyright IBM Corporation 2018 +© Copyright IBM Corporation 2018, 2019 Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -41,9 +41,10 @@ func TestMirrorLogWithoutRotation(t *testing.T) { count := 0 ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup - _, err = mirrorLog(ctx, &wg, tmp.Name(), true, func(msg string) { + _, err = mirrorLog(ctx, &wg, tmp.Name(), true, func(msg string, isQMLog bool) bool { count++ - }) + return true + }, false) if err != nil { t.Fatal(err) } @@ -82,9 +83,10 @@ func TestMirrorLogWithRotation(t *testing.T) { count := 0 ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup - _, err = mirrorLog(ctx, &wg, tmp.Name(), true, func(msg string) { + _, err = mirrorLog(ctx, &wg, tmp.Name(), true, func(msg string, isQMLog bool) bool { count++ - }) + return true + }, false) if err != nil { t.Fatal(err) } @@ -135,9 +137,10 @@ func testMirrorLogExistingFile(t *testing.T, newQM bool) int { count := 0 ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup - _, err = mirrorLog(ctx, &wg, tmp.Name(), newQM, func(msg string) { + _, err = mirrorLog(ctx, &wg, tmp.Name(), newQM, func(msg string, isQMLog bool) bool { count++ - }) + return true + }, false) if err != nil { t.Fatal(err) } @@ -179,8 +182,9 @@ func TestMirrorLogCancelWhileWaiting(t *testing.T) { cancel() wg.Wait() }() - _, err := mirrorLog(ctx, &wg, "fake.log", true, func(msg string) { - }) + _, err := mirrorLog(ctx, &wg, "fake.log", true, func(msg string, isQMLog bool) bool { + return true + }, false) if err != nil { t.Error(err) } diff --git a/cmd/runmqserver/qmgr.go b/cmd/runmqserver/qmgr.go index 60d8774..14b5a49 100644 --- a/cmd/runmqserver/qmgr.go +++ b/cmd/runmqserver/qmgr.go @@ -25,7 +25,9 @@ import ( "strings" "github.com/ibm-messaging/mq-container/internal/command" + containerruntime "github.com/ibm-messaging/mq-container/internal/containerruntime" "github.com/ibm-messaging/mq-container/internal/mqscredact" + "github.com/ibm-messaging/mq-container/internal/ready" ) // createDirStructure creates the default MQ directory structure under /var/mqm @@ -40,20 +42,48 @@ func createDirStructure() error { } // createQueueManager creates a queue manager, if it doesn't already exist. -// It returns true if one was created, or false if one already existed +// It returns true if one was created (or a standby was created), or false if one already existed func createQueueManager(name string) (bool, error) { log.Printf("Creating queue manager %v", name) - out, rc, err := command.Run("crtmqm", "-q", "-p", "1414", name) + + // Run 'dspmqinf' to check if 'mqs.ini' configuration file exists + // If command succeeds, the queue manager (or standby queue manager) has already been created + _, _, err := command.Run("dspmqinf", name) + if err == nil { + log.Printf("Detected existing queue manager %v", name) + return false, nil + } + + mounts, err := containerruntime.GetMounts() if err != nil { - // 8=Queue manager exists, which is fine - if rc == 8 { - log.Printf("Detected existing queue manager %v", name) - return false, nil - } - log.Printf("crtmqm returned %v", rc) - log.Println(string(out)) + log.Printf("Error getting mounts for queue manager") return false, err } + + // Check if 'qm.ini' configuration file exists for the queue manager + // TODO : handle possible race condition - use a file lock? + dataDir := getQueueManagerDataDir(mounts, name) + _, err = os.Stat(filepath.Join(dataDir, "qm.ini")) + if err != nil { + // If 'qm.ini' is not found - run 'crtmqm' to create a new queue manager + args := getCreateQueueManagerArgs(mounts, name) + out, rc, err := command.Run("crtmqm", args...) + if err != nil { + log.Printf("Error %v creating queue manager: %v", rc, string(out)) + return false, err + } + } else { + // If 'qm.ini' is found - run 'addmqinf' to create a standby queue manager with existing configuration + args := getCreateStandbyQueueManagerArgs(name) + out, rc, err := command.Run("addmqinf", args...) + if err != nil { + log.Printf("Error %v creating standby queue manager: %v", rc, string(out)) + return false, err + } + log.Println("Created standby queue manager") + return true, nil + } + log.Println("Created queue manager") return true, nil } @@ -70,10 +100,15 @@ func updateCommandLevel() error { return nil } -func startQueueManager() error { +func startQueueManager(name string) error { log.Println("Starting queue manager") - out, rc, err := command.Run("strmqm") + out, rc, err := command.Run("strmqm", "-x", name) if err != nil { + // 30=standby queue manager started, which is fine + if rc == 30 { + log.Printf("Started standby queue manager") + return nil + } log.Printf("Error %v starting queue manager: %v", rc, string(out)) return err } @@ -136,12 +171,29 @@ func configureQueueManager() error { func stopQueueManager(name string) error { log.Println("Stopping queue manager") - out, _, err := command.Run("endmqm", "-w", "-r", name) + isStandby, err := ready.IsRunningAsStandbyQM(name) if err != nil { - log.Printf("Error stopping queue manager: %v", string(out)) + log.Printf("Error getting status for queue manager %v: ", name, err.Error()) return err } - log.Println("Stopped queue manager") + args := []string{"-w", "-r", name} + if os.Getenv("MQ_MULTI_INSTANCE") == "true" { + if isStandby { + args = []string{"-x", name} + } else { + args = []string{"-s", "-w", "-r", name} + } + } + out, rc, err := command.Run("endmqm", args...) + if err != nil { + log.Printf("Error %v stopping queue manager: %v", rc, string(out)) + return err + } + if isStandby { + log.Printf("Stopped standby queue manager") + } else { + log.Println("Stopped queue manager") + } return nil } @@ -152,3 +204,44 @@ func formatMQSCOutput(out string) string { // add tab characters to make it more readable as part of the log return strings.Replace(string(out), "\n", "\n\t", -1) } + +func isStandbyQueueManager(name string) (bool, error) { + out, rc, err := command.Run("dspmq", "-n", "-m", name) + if err != nil { + log.Printf("Error %v getting status for queue manager %v: %v", rc, name, string(out)) + return false, err + } + if strings.Contains(string(out), "(RUNNING AS STANDBY)") { + return true, nil + } + return false, nil +} + +func getQueueManagerDataDir(mounts map[string]string, name string) string { + dataDir := filepath.Join("/var/mqm/qmgrs", name) + if _, ok := mounts["/mnt/mqm-data"]; ok { + dataDir = filepath.Join("/mnt/mqm-data/qmgrs", name) + } + return dataDir +} + +func getCreateQueueManagerArgs(mounts map[string]string, name string) []string { + args := []string{"-q", "-p", "1414"} + if _, ok := mounts["/mnt/mqm-log"]; ok { + args = append(args, "-ld", "/mnt/mqm-log/log") + } + if _, ok := mounts["/mnt/mqm-data"]; ok { + args = append(args, "-md", "/mnt/mqm-data/qmgrs") + } + args = append(args, name) + return args +} + +func getCreateStandbyQueueManagerArgs(name string) []string { + args := []string{"-s", "QueueManager"} + args = append(args, "-v", fmt.Sprintf("Name=%v", name)) + args = append(args, "-v", fmt.Sprintf("Directory=%v", name)) + args = append(args, "-v", "Prefix=/var/mqm") + args = append(args, "-v", fmt.Sprintf("DataPath=/mnt/mqm-data/qmgrs/%v", name)) + return args +} diff --git a/incubating/mqadvanced-server-dev/10-dev.mqsc.tpl b/incubating/mqadvanced-server-dev/10-dev.mqsc.tpl index 987e8e3..5796bbd 100644 --- a/incubating/mqadvanced-server-dev/10-dev.mqsc.tpl +++ b/incubating/mqadvanced-server-dev/10-dev.mqsc.tpl @@ -14,6 +14,7 @@ * limitations under the License. STOP LISTENER('SYSTEM.LISTENER.TCP.1') IGNSTATE(YES) +ALTER LISTENER('SYSTEM.LISTENER.TCP.1') TRPTYPE(TCP) CONTROL(MANUAL) * Developer queues DEFINE QLOCAL('DEV.QUEUE.1') REPLACE diff --git a/install-mq.sh b/install-mq.sh index c4b2926..533e7d0 100644 --- a/install-mq.sh +++ b/install-mq.sh @@ -156,6 +156,10 @@ rm -rf /var/mqm install --directory --mode 0775 --owner mqm --group root /mnt install --directory --mode 0775 --owner mqm --group root /mnt/mqm install --directory --mode 0775 --owner mqm --group root /mnt/mqm/data +install --directory --mode 0775 --owner mqm --group root /mnt/mqm-log +install --directory --mode 0775 --owner mqm --group root /mnt/mqm-log/log +install --directory --mode 0775 --owner mqm --group root /mnt/mqm-data +install --directory --mode 0775 --owner mqm --group root /mnt/mqm-data/qmgrs # Create the directory for MQ configuration files install --directory --mode 0775 --owner mqm --group root /etc/mqm diff --git a/internal/containerruntime/runtime_linux.go b/internal/containerruntime/runtime_linux.go index 94c99c3..1804449 100644 --- a/internal/containerruntime/runtime_linux.go +++ b/internal/containerruntime/runtime_linux.go @@ -123,3 +123,12 @@ func SupportedFilesystem(fsType string) bool { return true } } + +// ValidMultiInstanceFilesystem returns true if the supplied filesystem type is valid for a multi-instance queue manager +func ValidMultiInstanceFilesystem(fsType string) bool { + if !SupportedFilesystem(fsType) { + return false + } + // TODO : check for non-shared filesystems & shared filesystems which are known not to work + return true +} diff --git a/cmd/runmqserver/logruntime.go b/internal/containerruntimelogger/logruntime.go similarity index 74% rename from cmd/runmqserver/logruntime.go rename to internal/containerruntimelogger/logruntime.go index 2f552f0..03fafb2 100644 --- a/cmd/runmqserver/logruntime.go +++ b/internal/containerruntimelogger/logruntime.go @@ -13,18 +13,21 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -package main +package logruntime import ( "fmt" + "os" "runtime" "strings" containerruntime "github.com/ibm-messaging/mq-container/internal/containerruntime" + "github.com/ibm-messaging/mq-container/internal/logger" "github.com/ibm-messaging/mq-container/internal/user" ) -func logContainerDetails() error { +// LogContainerDetails logs details about the container runtime +func LogContainerDetails(log *logger.Logger) error { if runtime.GOOS != "linux" { return fmt.Errorf("Unsupported platform: %v", runtime.GOOS) } @@ -82,5 +85,19 @@ func logContainerDetails() error { } } } + // For a multi-instance queue manager - check all required mounts exist & validate filesystem type + if os.Getenv("MQ_MULTI_INSTANCE") == "true" { + log.Println("Multi-instance queue manager: enabled") + reqMounts := []string{"/mnt/mqm", "/mnt/mqm-log", "/mnt/mqm-data"} + for _, mountPoint := range reqMounts { + if fsType, ok := m[mountPoint]; ok { + if !containerruntime.ValidMultiInstanceFilesystem(fsType) { + return fmt.Errorf("%v uses filesystem type '%v' which is invalid for a multi-instance queue manager", mountPoint, fsType) + } + } else { + return fmt.Errorf("Missing required mount '%v' for a multi-instance queue manager", mountPoint) + } + } + } return nil } diff --git a/internal/logger/logger.go b/internal/logger/logger.go index 7278c98..25a21c7 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -114,11 +114,6 @@ func (l *Logger) log(level string, msg string) { l.mutex.Unlock() } -// LogDirect logs a message directly to stdout -func (l *Logger) LogDirect(msg string) { - fmt.Println(msg) -} - // Debug logs a line as debug func (l *Logger) Debug(args ...interface{}) { if l.debug { diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 0947465..f14fb60 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -1,5 +1,5 @@ /* -© Copyright IBM Corporation 2018 +© Copyright IBM Corporation 2018, 2019 Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import ( "time" "github.com/ibm-messaging/mq-container/internal/logger" + "github.com/ibm-messaging/mq-container/internal/ready" "github.com/prometheus/client_golang/prometheus" ) @@ -39,6 +40,15 @@ var ( // GatherMetrics gathers metrics for the queue manager func GatherMetrics(qmName string, log *logger.Logger) { + // If running in standby mode - wait until the queue manager becomes active + for { + active, _ := ready.IsRunningAsActiveQM(qmName) + if active { + break + } + time.Sleep(requestTimeout * time.Second) + } + metricsEnabled = true err := startMetricsGathering(qmName, log) diff --git a/internal/mqini/mqini.go b/internal/mqini/mqini.go index 4bfd846..21d4786 100644 --- a/internal/mqini/mqini.go +++ b/internal/mqini/mqini.go @@ -1,5 +1,5 @@ /* -© Copyright IBM Corporation 2018 +© Copyright IBM Corporation 2018, 2019 Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -71,5 +71,8 @@ func GetQueueManager(name string) (*QueueManager, error) { // GetErrorLogDirectory returns the directory holding the error logs for the // specified queue manager func GetErrorLogDirectory(qm *QueueManager) string { + if qm.DataPath != "" { + return filepath.Join(qm.DataPath, "errors") + } return filepath.Join(qm.Prefix, "qmgrs", qm.Directory, "errors") } diff --git a/internal/ready/ready.go b/internal/ready/ready.go index b97bd27..d866878 100644 --- a/internal/ready/ready.go +++ b/internal/ready/ready.go @@ -1,5 +1,5 @@ /* -© Copyright IBM Corporation 2018 +© Copyright IBM Corporation 2018, 2019 Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -20,6 +20,9 @@ package ready import ( "io/ioutil" "os" + "strings" + + "github.com/ibm-messaging/mq-container/internal/command" ) const fileName string = "/run/runmqserver/ready" @@ -62,3 +65,24 @@ func Check() (bool, error) { } return exists, nil } + +// IsRunningAsActiveQM returns true if the queue manager is running in active mode +func IsRunningAsActiveQM(name string) (bool, error) { + return isRunningQM(name, "(RUNNING)") +} + +// IsRunningAsStandbyQM returns true if the queue manager is running in standby mode +func IsRunningAsStandbyQM(name string) (bool, error) { + return isRunningQM(name, "(RUNNING AS STANDBY)") +} + +func isRunningQM(name string, status string) (bool, error) { + out, _, err := command.Run("dspmq", "-n", "-m", name) + if err != nil { + return false, err + } + if strings.Contains(string(out), status) { + return true, nil + } + return false, nil +} diff --git a/test/docker/docker_api_test.go b/test/docker/docker_api_test.go index e83a2ca..e06b91c 100644 --- a/test/docker/docker_api_test.go +++ b/test/docker/docker_api_test.go @@ -235,7 +235,7 @@ func withVolume(t *testing.T, metric bool) { if err != nil { t.Fatal(err) } - vol := createVolume(t, cli) + vol := createVolume(t, cli, t.Name()) defer removeVolume(t, cli, vol.Name) containerConfig := container.Config{ Image: imageName(), @@ -273,6 +273,62 @@ func withVolume(t *testing.T, metric bool) { waitForReady(t, cli, ctr2.ID) } +// TestWithSplitVolumesLogsData starts a queue manager with separate log/data mounts +func TestWithSplitVolumesLogsData(t *testing.T) { + cli, err := client.NewEnvClient() + if err != nil { + t.Fatal(err) + } + + qmsharedlogs := createVolume(t, cli, "qmsharedlogs") + defer removeVolume(t, cli, qmsharedlogs.Name) + qmshareddata := createVolume(t, cli, "qmshareddata") + defer removeVolume(t, cli, qmshareddata.Name) + + err, qmID, qmVol := startMultiVolumeQueueManager(t, cli, true, qmsharedlogs.Name, qmshareddata.Name, []string{"LICENSE=accept", "MQ_QMGR_NAME=qm1"}) + + defer removeVolume(t, cli, qmVol) + defer cleanContainer(t, cli, qmID) + + waitForReady(t, cli, qmID) +} + +// TestWithSplitVolumesLogsOnly starts a queue manager with a separate log mount +func TestWithSplitVolumesLogsOnly(t *testing.T) { + cli, err := client.NewEnvClient() + if err != nil { + t.Fatal(err) + } + + qmsharedlogs := createVolume(t, cli, "qmsharedlogs") + defer removeVolume(t, cli, qmsharedlogs.Name) + + err, qmID, qmVol := startMultiVolumeQueueManager(t, cli, true, qmsharedlogs.Name, "", []string{"LICENSE=accept", "MQ_QMGR_NAME=qm1"}) + + defer removeVolume(t, cli, qmVol) + defer cleanContainer(t, cli, qmID) + + waitForReady(t, cli, qmID) +} + +// TestWithSplitVolumesDataOnly starts a queue manager with a separate data mount +func TestWithSplitVolumesDataOnly(t *testing.T) { + cli, err := client.NewEnvClient() + if err != nil { + t.Fatal(err) + } + + qmshareddata := createVolume(t, cli, "qmshareddata") + defer removeVolume(t, cli, qmshareddata.Name) + + err, qmID, qmVol := startMultiVolumeQueueManager(t, cli, true, "", qmshareddata.Name, []string{"LICENSE=accept", "MQ_QMGR_NAME=qm1"}) + + defer removeVolume(t, cli, qmVol) + defer cleanContainer(t, cli, qmID) + + waitForReady(t, cli, qmID) +} + // TestNoVolumeWithRestart ensures a queue manager container can be stopped // and restarted cleanly func TestNoVolumeWithRestart(t *testing.T) { @@ -302,7 +358,7 @@ func TestVolumeRequiresRoot(t *testing.T) { if err != nil { t.Fatal(err) } - vol := createVolume(t, cli) + vol := createVolume(t, cli, t.Name()) defer removeVolume(t, cli, vol.Name) // Set permissions on the volume to only allow root to write it @@ -438,7 +494,7 @@ func TestVolumeUnmount(t *testing.T) { if err != nil { t.Fatal(err) } - vol := createVolume(t, cli) + vol := createVolume(t, cli, t.Name()) defer removeVolume(t, cli, vol.Name) containerConfig := container.Config{ Image: imageName(), diff --git a/test/docker/docker_api_test_util.go b/test/docker/docker_api_test_util.go index b94545f..30941df 100644 --- a/test/docker/docker_api_test_util.go +++ b/test/docker/docker_api_test_util.go @@ -398,6 +398,14 @@ func stopContainer(t *testing.T, cli *client.Client, ID string) { } } +func killContainer(t *testing.T, cli *client.Client, ID string, signal string) { + t.Logf("Killing container: %v", ID) + err := cli.ContainerKill(context.Background(), ID, signal) + if err != nil { + t.Fatal(err) + } +} + func getExitCodeFilename(t *testing.T) string { return t.Name() + "ExitCode" } @@ -522,6 +530,9 @@ func waitForReady(t *testing.T, cli *client.Client, ID string) { if rc == 0 { t.Log("MQ is ready") return + } else if rc == 10 { + t.Log("MQ Readiness: Queue Manager Running as Standby") + return } case <-ctx.Done(): t.Fatal("Timed out waiting for container to become ready") @@ -557,17 +568,17 @@ func removeNetwork(t *testing.T, cli *client.Client, ID string) { } } -func createVolume(t *testing.T, cli *client.Client) types.Volume { +func createVolume(t *testing.T, cli *client.Client, name string) types.Volume { v, err := cli.VolumeCreate(context.Background(), volume.VolumesCreateBody{ Driver: "local", DriverOpts: map[string]string{}, Labels: map[string]string{}, - Name: t.Name(), + Name: name, }) if err != nil { t.Fatal(err) } - t.Logf("Created volume %v", t.Name()) + t.Logf("Created volume %v", v.Name) return v } diff --git a/test/docker/mq_multi_instance_test.go b/test/docker/mq_multi_instance_test.go new file mode 100644 index 0000000..c54e28f --- /dev/null +++ b/test/docker/mq_multi_instance_test.go @@ -0,0 +1,234 @@ +/* +© Copyright IBM Corporation 2019 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package main + +import ( + "strings" + "testing" + "time" + + "github.com/docker/docker/client" +) + +var miEnv = []string{ + "LICENSE=accept", + "MQ_QMGR_NAME=QM1", + "MQ_MULTI_INSTANCE=true", +} + +// TestMultiInstanceStartStop creates 2 containers in a multi instance queue manager configuration +// and starts/stop them checking we always have an active and standby +func TestMultiInstanceStartStop(t *testing.T) { + cli, err := client.NewEnvClient() + if err != nil { + t.Fatal(err) + } + err, qm1aId, qm1bId, volumes := configureMultiInstance(t, cli) + if err != nil { + t.Fatal(err) + } + for _, volume := range volumes { + defer removeVolume(t, cli, volume) + } + defer cleanContainer(t, cli, qm1aId) + defer cleanContainer(t, cli, qm1bId) + + waitForReady(t, cli, qm1aId) + waitForReady(t, cli, qm1bId) + + err, active, standby := getActiveStandbyQueueManager(t, cli, qm1aId, qm1bId) + if err != nil { + t.Fatal(err) + } + + killContainer(t, cli, active, "SIGTERM") + time.Sleep(2 * time.Second) + + if status := getQueueManagerStatus(t, cli, standby, "QM1"); strings.Compare(status, "Running") != 0 { + t.Fatalf("Expected QM1 to be running as active queue manager, dspmq returned status of %v", status) + } + + startContainer(t, cli, qm1aId) + waitForReady(t, cli, qm1aId) + + err, _, _ = getActiveStandbyQueueManager(t, cli, qm1aId, qm1bId) + if err != nil { + t.Fatal(err) + } + +} + +// TestMultiInstanceContainerStop starts 2 containers in a multi instance queue manager configuration, +// stops the active queue manager, then checks to ensure the backup queue manager becomes active +func TestMultiInstanceContainerStop(t *testing.T) { + cli, err := client.NewEnvClient() + if err != nil { + t.Fatal(err) + } + err, qm1aId, qm1bId, volumes := configureMultiInstance(t, cli) + if err != nil { + t.Fatal(err) + } + for _, volume := range volumes { + defer removeVolume(t, cli, volume) + } + defer cleanContainer(t, cli, qm1aId) + defer cleanContainer(t, cli, qm1bId) + + waitForReady(t, cli, qm1aId) + waitForReady(t, cli, qm1bId) + + err, active, standby := getActiveStandbyQueueManager(t, cli, qm1aId, qm1bId) + if err != nil { + t.Fatal(err) + } + + stopContainer(t, cli, active) + + if status := getQueueManagerStatus(t, cli, standby, "QM1"); strings.Compare(status, "Running") != 0 { + t.Fatalf("Expected QM1 to be running as active queue manager, dspmq returned status of %v", status) + } +} + +// TestMultiInstanceRace starts 2 containers in separate goroutines in a multi instance queue manager +// configuration, then checks to ensure that both an active and standby queue manager have been started +func TestMultiInstanceRace(t *testing.T) { + t.Skipf("Skipping %v until file lock is implemented", t.Name()) + + cli, err := client.NewEnvClient() + if err != nil { + t.Fatal(err) + } + + qmsharedlogs := createVolume(t, cli, "qmsharedlogs") + defer removeVolume(t, cli, qmsharedlogs.Name) + qmshareddata := createVolume(t, cli, "qmshareddata") + defer removeVolume(t, cli, qmshareddata.Name) + + qmsChannel := make(chan QMChan) + + go singleMultiInstanceQueueManager(t, cli, qmsharedlogs.Name, qmshareddata.Name, qmsChannel) + go singleMultiInstanceQueueManager(t, cli, qmsharedlogs.Name, qmshareddata.Name, qmsChannel) + + qm1a := <-qmsChannel + if qm1a.Error != nil { + t.Fatal(qm1a.Error) + } + + qm1b := <-qmsChannel + if qm1b.Error != nil { + t.Fatal(qm1b.Error) + } + + qm1aId, qm1aData := qm1a.QMId, qm1a.QMData + qm1bId, qm1bData := qm1b.QMId, qm1b.QMData + + defer removeVolume(t, cli, qm1aData) + defer removeVolume(t, cli, qm1bData) + defer cleanContainer(t, cli, qm1aId) + defer cleanContainer(t, cli, qm1bId) + + waitForReady(t, cli, qm1aId) + waitForReady(t, cli, qm1bId) + + err, _, _ = getActiveStandbyQueueManager(t, cli, qm1aId, qm1bId) + if err != nil { + t.Fatal(err) + } +} + +// TestMultiInstanceNoSharedMounts starts 2 multi instance queue managers without providing shared log/data +// mounts, then checks to ensure that the container terminates with the expected message +func TestMultiInstanceNoSharedMounts(t *testing.T) { + t.Parallel() + cli, err := client.NewEnvClient() + if err != nil { + t.Fatal(err) + } + + err, qm1aId, qm1aData := startMultiVolumeQueueManager(t, cli, true, "", "", miEnv) + if err != nil { + t.Fatal(err) + } + + defer removeVolume(t, cli, qm1aData) + defer cleanContainer(t, cli, qm1aId) + + waitForTerminationMessage(t, cli, qm1aId, "Missing required mount '/mnt/mqm-log'", 30*time.Second) +} + +// TestMultiInstanceNoSharedLogs starts 2 multi instance queue managers without providing a shared log +// mount, then checks to ensure that the container terminates with the expected message +func TestMultiInstanceNoSharedLogs(t *testing.T) { + cli, err := client.NewEnvClient() + if err != nil { + t.Fatal(err) + } + + qmshareddata := createVolume(t, cli, "qmshareddata") + defer removeVolume(t, cli, qmshareddata.Name) + + err, qm1aId, qm1aData := startMultiVolumeQueueManager(t, cli, true, "", qmshareddata.Name, miEnv) + if err != nil { + t.Fatal(err) + } + + defer removeVolume(t, cli, qm1aData) + defer cleanContainer(t, cli, qm1aId) + + waitForTerminationMessage(t, cli, qm1aId, "Missing required mount '/mnt/mqm-log'", 30*time.Second) +} + +// TestMultiInstanceNoSharedData starts 2 multi instance queue managers without providing a shared data +// mount, then checks to ensure that the container terminates with the expected message +func TestMultiInstanceNoSharedData(t *testing.T) { + cli, err := client.NewEnvClient() + if err != nil { + t.Fatal(err) + } + + qmsharedlogs := createVolume(t, cli, "qmsharedlogs") + defer removeVolume(t, cli, qmsharedlogs.Name) + + err, qm1aId, qm1aData := startMultiVolumeQueueManager(t, cli, true, qmsharedlogs.Name, "", miEnv) + if err != nil { + t.Fatal(err) + } + + defer removeVolume(t, cli, qm1aData) + defer cleanContainer(t, cli, qm1aId) + + waitForTerminationMessage(t, cli, qm1aId, "Missing required mount '/mnt/mqm-data'", 30*time.Second) +} + +// TestMultiInstanceNoMounts starts 2 multi instance queue managers without providing a shared data +// mount, then checks to ensure that the container terminates with the expected message +func TestMultiInstanceNoMounts(t *testing.T) { + cli, err := client.NewEnvClient() + if err != nil { + t.Fatal(err) + } + + err, qm1aId, qm1aData := startMultiVolumeQueueManager(t, cli, false, "", "", miEnv) + if err != nil { + t.Fatal(err) + } + + defer removeVolume(t, cli, qm1aData) + defer cleanContainer(t, cli, qm1aId) + + waitForTerminationMessage(t, cli, qm1aId, "Missing required mount '/mnt/mqm'", 30*time.Second) +} diff --git a/test/docker/mq_multi_instance_test_util.go b/test/docker/mq_multi_instance_test_util.go new file mode 100644 index 0000000..c8864e0 --- /dev/null +++ b/test/docker/mq_multi_instance_test_util.go @@ -0,0 +1,178 @@ +/* +© Copyright IBM Corporation 2019 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package main + +import ( + "context" + "fmt" + "regexp" + "strconv" + "strings" + "testing" + "time" + + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/network" + "github.com/docker/docker/client" +) + +type QMChan struct { + QMId string + QMData string + Error error +} + +// configureMultiInstance creates the volumes and containers required for basic testing +// of multi instance queue managers. Returns error, qm1a ID, qm1b ID, slice of volume names +func configureMultiInstance(t *testing.T, cli *client.Client) (error, string, string, []string) { + + qmsharedlogs := createVolume(t, cli, "qmsharedlogs") + qmshareddata := createVolume(t, cli, "qmshareddata") + + err, qm1aId, qm1aData := startMultiVolumeQueueManager(t, cli, true, qmsharedlogs.Name, qmshareddata.Name, miEnv) + if err != nil { + return err, "", "", []string{} + } + time.Sleep(10 * time.Second) + err, qm1bId, qm1bData := startMultiVolumeQueueManager(t, cli, true, qmsharedlogs.Name, qmshareddata.Name, miEnv) + if err != nil { + return err, "", "", []string{} + } + + volumes := []string{qmsharedlogs.Name, qmshareddata.Name, qm1aData, qm1bData} + + return nil, qm1aId, qm1bId, volumes +} + +func singleMultiInstanceQueueManager(t *testing.T, cli *client.Client, qmsharedlogs string, qmshareddata string, qmsChannel chan QMChan) { + err, qmId, qmData := startMultiVolumeQueueManager(t, cli, true, qmsharedlogs, qmshareddata, miEnv) + if err != nil { + qmsChannel <- QMChan{Error: err} + } + qmsChannel <- QMChan{QMId: qmId, QMData: qmData} +} + +func getHostConfig(t *testing.T, mounts int, qmsharedlogs string, qmshareddata string, qmdata string) container.HostConfig { + + var hostConfig container.HostConfig + + switch mounts { + case 1: + hostConfig = container.HostConfig{ + Binds: []string{ + coverageBind(t), + qmdata + ":/mnt/mqm", + }, + } + case 2: + hostConfig = container.HostConfig{ + Binds: []string{ + coverageBind(t), + qmdata + ":/mnt/mqm", + qmshareddata + ":/mnt/mqm-data", + }, + } + case 3: + hostConfig = container.HostConfig{ + Binds: []string{ + coverageBind(t), + qmdata + ":/mnt/mqm", + qmsharedlogs + ":/mnt/mqm-log", + }, + } + case 4: + hostConfig = container.HostConfig{ + Binds: []string{ + coverageBind(t), + qmdata + ":/mnt/mqm", + qmsharedlogs + ":/mnt/mqm-log", + qmshareddata + ":/mnt/mqm-data", + }, + } + } + + return hostConfig +} + +func startMultiVolumeQueueManager(t *testing.T, cli *client.Client, dataVol bool, qmsharedlogs string, qmshareddata string, env []string) (error, string, string) { + id := strconv.FormatInt(time.Now().UnixNano(), 10) + qmdata := createVolume(t, cli, id) + containerConfig := container.Config{ + Image: imageName(), + Env: env, + } + var hostConfig container.HostConfig + + if !dataVol { + hostConfig = container.HostConfig{} + } else if qmsharedlogs == "" && qmshareddata == "" { + hostConfig = getHostConfig(t, 1, "", "", qmdata.Name) + } else if qmsharedlogs == "" { + hostConfig = getHostConfig(t, 2, "", qmshareddata, qmdata.Name) + } else if qmshareddata == "" { + hostConfig = getHostConfig(t, 3, qmsharedlogs, "", qmdata.Name) + } else { + hostConfig = getHostConfig(t, 4, qmsharedlogs, qmshareddata, qmdata.Name) + } + networkingConfig := network.NetworkingConfig{} + qm, err := cli.ContainerCreate(context.Background(), &containerConfig, &hostConfig, &networkingConfig, t.Name()+id) + if err != nil { + return err, "", "" + } + startContainer(t, cli, qm.ID) + + return nil, qm.ID, qmdata.Name +} + +func getActiveStandbyQueueManager(t *testing.T, cli *client.Client, qm1aId string, qm1bId string) (error, string, string) { + qm1aStatus := getQueueManagerStatus(t, cli, qm1aId, "QM1") + qm1bStatus := getQueueManagerStatus(t, cli, qm1bId, "QM1") + + if qm1aStatus == "Running" && qm1bStatus == "Running as standby" { + return nil, qm1aId, qm1bId + } else if qm1bStatus == "Running" && qm1aStatus == "Running as standby" { + return nil, qm1bId, qm1aId + } + err := fmt.Errorf("Expected to be running in multi instance configuration, got status 1) %v status 2) %v", qm1aStatus, qm1bStatus) + return err, "", "" +} + +func getQueueManagerStatus(t *testing.T, cli *client.Client, containerID string, queueManagerName string) string { + _, dspmqOut := execContainer(t, cli, containerID, "mqm", []string{"bash", "-c", "dspmq", "-m", queueManagerName}) + regex := regexp.MustCompile(`STATUS\(.*\)`) + status := regex.FindString(dspmqOut) + status = strings.TrimSuffix(strings.TrimPrefix(status, "STATUS("), ")") + return status +} + +func waitForTerminationMessage(t *testing.T, cli *client.Client, qmId string, terminationString string, timeout time.Duration) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + for { + select { + case <-time.After(1 * time.Second): + m := terminationMessage(t, cli, qmId) + if m != "" { + if !strings.Contains(m, terminationString) { + t.Fatalf("Expected container to fail on missing required mount. Got termination message: %v", m) + } + return + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for container to terminate") + } + } +}