add multi-instance Queue Managers (#307)

* Initial code to implement multi-instance queue manager

* alter default mqsc to prevent race between listeners on standby startup

* Updates to multi-instance queue manager code

* initial multi instance test

* Multi-instance code improvements

* Multi instance fixes and first test

* configure queue manager

* Add mirror log filtering for mult-instance QMs

* Add log message for multi-instance enabled

* Improvements to container runtime logging

* refactor test

* Test active standby switch

* Improve createQueueManager function

* Test multi instance race

* wait

* multi instance mount tests

* skip race test

* mount tests

* no mount test

* single instance split mount tests

* readiness check

* More updates for handling standby queue manager

* Improve standby checks

* Minor fixes to miqm

* Fix logging of JSON errors

* Update copyrights

* Fix log includes
This commit is contained in:
Stephen Marshall
2019-05-01 14:42:25 +01:00
committed by Arthur Barr
parent 63af43f19d
commit 6c72c894f7
22 changed files with 773 additions and 144 deletions

View File

@@ -1,5 +1,5 @@
/*
© Copyright IBM Corporation 2017
© Copyright IBM Corporation 2017, 2019
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -41,7 +41,7 @@ func queueManagerHealthy() (bool, error) {
return false, err
}
fmt.Printf("%s", out)
if !strings.Contains(string(out), "(RUNNING)") {
if !strings.Contains(string(out), "(RUNNING)") && !strings.Contains(string(out), "(RUNNING AS STANDBY)") {
return false, nil
}
return true, nil

View File

@@ -1,5 +1,5 @@
/*
© Copyright IBM Corporation 2017, 2018
© Copyright IBM Corporation 2017, 2019
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@ import (
"net"
"os"
"github.com/ibm-messaging/mq-container/internal/name"
"github.com/ibm-messaging/mq-container/internal/ready"
)
@@ -31,14 +32,25 @@ func main() {
if !r || err != nil {
os.Exit(1)
}
// Check if the queue manager has a running listener
conn, err := net.Dial("tcp", "127.0.0.1:1414")
name, err := name.GetQueueManagerName()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
err = conn.Close()
if err != nil {
fmt.Println(err)
// Check if the queue manager has a running listener
if standby, _ := ready.IsRunningAsStandbyQM(name); !standby {
conn, err := net.Dial("tcp", "127.0.0.1:1414")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
err = conn.Close()
if err != nil {
fmt.Println(err)
}
} else {
fmt.Printf("Detected queue manager running in standby mode")
os.Exit(10)
}
}

View File

@@ -1,68 +0,0 @@
/*
© Copyright IBM Corporation 2017, 2019
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"runtime"
"strings"
containerruntime "github.com/ibm-messaging/mq-container/internal/containerruntime"
"github.com/ibm-messaging/mq-container/internal/user"
)
func logContainerDetails() {
log.Printf("CPU architecture: %v", runtime.GOARCH)
kv, err := containerruntime.GetKernelVersion()
if err == nil {
log.Printf("Linux kernel version: %v", kv)
}
cr, err := containerruntime.GetContainerRuntime()
if err == nil {
log.Printf("Container runtime: %v", cr)
}
bi, err := containerruntime.GetBaseImage()
if err == nil {
log.Printf("Base image: %v", bi)
}
u, err := user.GetUser()
if err == nil {
if len(u.SupplementalGID) == 0 {
log.Printf("Running as user ID %v (%v) with primary group %v", u.UID, u.Name, u.PrimaryGID)
} else {
log.Printf("Running as user ID %v (%v) with primary group %v, and supplementary groups %v", u.UID, u.Name, u.PrimaryGID, strings.Join(u.SupplementalGID, ","))
}
}
caps, err := containerruntime.GetCapabilities()
capLogged := false
if err == nil {
for k, v := range caps {
if len(v) > 0 {
log.Printf("Capabilities (%s set): %v", strings.ToLower(k), strings.Join(v, ","))
capLogged = true
}
}
if !capLogged {
log.Print("Capabilities: none")
}
} else {
log.Errorf("Error getting capabilities: %v", err)
}
sc, err := containerruntime.GetSeccomp()
if err == nil {
log.Printf("seccomp enforcing mode: %v", sc)
}
log.Printf("Process security attributes: %v", containerruntime.GetSecurityAttributes())
}

View File

@@ -23,6 +23,7 @@ import (
"syscall"
"github.com/ibm-messaging/mq-container/internal/command"
containerruntimelogger "github.com/ibm-messaging/mq-container/internal/containerruntimelogger"
"github.com/ibm-messaging/mq-container/internal/logger"
"github.com/ibm-messaging/mq-container/internal/name"
)
@@ -117,7 +118,11 @@ func doMain() error {
return err
}
logContainerDetails()
err = containerruntimelogger.LogContainerDetails(log)
if err != nil {
logTermination(err)
return err
}
adminPassword, set := os.LookupEnv("MQ_ADMIN_PASSWORD")
if set {

View File

@@ -1,5 +1,5 @@
/*
© Copyright IBM Corporation 2017, 2018
© Copyright IBM Corporation 2017, 2019
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -17,15 +17,13 @@ package main
import (
"os"
"path/filepath"
"runtime"
"syscall"
"github.com/ibm-messaging/mq-container/internal/command"
)
func createVolume(path string) error {
dataPath := filepath.Join(path, "data")
func createVolume(dataPath string) error {
fi, err := os.Stat(dataPath)
if err != nil {
if os.IsNotExist(err) {

View File

@@ -23,6 +23,7 @@ import (
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"github.com/ibm-messaging/mq-container/internal/command"
@@ -66,7 +67,7 @@ func formatSimple(datetime string, message string) string {
// mirrorSystemErrorLogs starts a goroutine to mirror the contents of the MQ system error logs
func mirrorSystemErrorLogs(ctx context.Context, wg *sync.WaitGroup, mf mirrorFunc) (chan error, error) {
// Always use the JSON log as the source
return mirrorLog(ctx, wg, "/var/mqm/errors/AMQERR01.json", false, mf)
return mirrorLog(ctx, wg, "/var/mqm/errors/AMQERR01.json", false, mf, false)
}
// mirrorQueueManagerErrorLogs starts a goroutine to mirror the contents of the MQ queue manager error logs
@@ -78,7 +79,7 @@ func mirrorQueueManagerErrorLogs(ctx context.Context, wg *sync.WaitGroup, name s
return nil, err
}
f := filepath.Join(mqini.GetErrorLogDirectory(qm), "AMQERR01.json")
return mirrorLog(ctx, wg, f, fromStart, mf)
return mirrorLog(ctx, wg, f, fromStart, mf, true)
}
func getDebug() bool {
@@ -99,21 +100,35 @@ func configureLogger(name string) (mirrorFunc, error) {
if err != nil {
return nil, err
}
return log.LogDirect, nil
return func(msg string, isQMLog bool) bool {
obj, err := processLogMessage(msg)
if err == nil && isQMLog && filterQMLogMessage(obj) {
return false
}
if err != nil {
log.Printf("Failed to unmarshall JSON - %v", msg)
} else {
fmt.Println(msg)
}
return true
}, nil
case "basic":
log, err = logger.NewLogger(os.Stderr, d, false, name)
if err != nil {
return nil, err
}
return func(msg string) {
return func(msg string, isQMLog bool) bool {
// Parse the JSON message, and print a simplified version
var obj map[string]interface{}
err := json.Unmarshal([]byte(msg), &obj)
obj, err := processLogMessage(msg)
if err == nil && isQMLog && filterQMLogMessage(obj) {
return false
}
if err != nil {
fmt.Printf("Failed to Unmarshall JSON - %v", err)
log.Printf("Failed to unmarshall JSON - %v", err)
} else {
fmt.Printf(formatSimple(obj["ibm_datetime"].(string), obj["message"].(string)))
}
return true
}, nil
default:
log, err = logger.NewLogger(os.Stdout, d, false, name)
@@ -124,6 +139,20 @@ func configureLogger(name string) (mirrorFunc, error) {
}
}
func processLogMessage(msg string) (map[string]interface{}, error) {
var obj map[string]interface{}
err := json.Unmarshal([]byte(msg), &obj)
return obj, err
}
func filterQMLogMessage(obj map[string]interface{}) bool {
hostname, err := os.Hostname()
if os.Getenv("MQ_MULTI_INSTANCE") == "true" && err == nil && !strings.Contains(obj["host"].(string), hostname) {
return true
}
return false
}
func logDiagnostics() {
log.Debug("--- Start Diagnostics ---")

View File

@@ -24,6 +24,7 @@ import (
"os"
"sync"
containerruntimelogger "github.com/ibm-messaging/mq-container/internal/containerruntimelogger"
"github.com/ibm-messaging/mq-container/internal/metrics"
"github.com/ibm-messaging/mq-container/internal/name"
"github.com/ibm-messaging/mq-container/internal/ready"
@@ -45,7 +46,7 @@ func doMain() error {
// Check whether they only want debug info
if *infoFlag {
logVersionInfo()
err = logContainerDetails()
err = containerruntimelogger.LogContainerDetails(log)
if err != nil {
log.Printf("Error displaying container details: %v", err)
}
@@ -86,14 +87,24 @@ func doMain() error {
collectDiagOnFail = true
if *devFlag == false {
err = logContainerDetails()
err = containerruntimelogger.LogContainerDetails(log)
if err != nil {
logTermination(err)
return err
}
}
err = createVolume("/mnt/mqm")
err = createVolume("/mnt/mqm/data")
if err != nil {
logTermination(err)
return err
}
err = createVolume("/mnt/mqm-log/log")
if err != nil {
logTermination(err)
return err
}
err = createVolume("/mnt/mqm-data/qmgrs")
if err != nil {
logTermination(err)
return err
@@ -149,15 +160,17 @@ func doMain() error {
logTermination(err)
return err
}
err = startQueueManager()
err = startQueueManager(name)
if err != nil {
logTermination(err)
return err
}
err = configureQueueManager()
if err != nil {
logTermination(err)
return err
if standby, _ := ready.IsRunningAsStandbyQM(name); !standby {
err = configureQueueManager()
if err != nil {
logTermination(err)
return err
}
}
enableMetrics := os.Getenv("MQ_ENABLE_METRICS")

View File

@@ -1,5 +1,5 @@
/*
© Copyright IBM Corporation 2018
© Copyright IBM Corporation 2018, 2019
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -49,16 +49,17 @@ func waitForFile(ctx context.Context, path string) (os.FileInfo, error) {
}
}
type mirrorFunc func(msg string)
type mirrorFunc func(msg string, isQMLog bool) bool
// mirrorAvailableMessages prints lines from the file, until no more are available
func mirrorAvailableMessages(f *os.File, mf mirrorFunc) {
func mirrorAvailableMessages(f *os.File, mf mirrorFunc, isQMLog bool) {
scanner := bufio.NewScanner(f)
count := 0
for scanner.Scan() {
t := scanner.Text()
mf(t)
count++
if mf(t, isQMLog) {
count++
}
}
if count > 0 {
log.Debugf("Mirrored %v log entries from %v", count, f.Name())
@@ -73,7 +74,7 @@ func mirrorAvailableMessages(f *os.File, mf mirrorFunc) {
// mirrorLog tails the specified file, and logs each line to stdout.
// This is useful for usability, as the container console log can show
// messages from the MQ error logs.
func mirrorLog(ctx context.Context, wg *sync.WaitGroup, path string, fromStart bool, mf mirrorFunc) (chan error, error) {
func mirrorLog(ctx context.Context, wg *sync.WaitGroup, path string, fromStart bool, mf mirrorFunc, isQMLog bool) (chan error, error) {
errorChannel := make(chan error, 1)
var offset int64 = -1
var f *os.File
@@ -147,7 +148,7 @@ func mirrorLog(ctx context.Context, wg *sync.WaitGroup, path string, fromStart b
closing := false
for {
// If there's already data there, mirror it now.
mirrorAvailableMessages(f, mf)
mirrorAvailableMessages(f, mf, isQMLog)
// Wait for the new log file (after rotation)
newFI, err := waitForFile(ctx, path)
if err != nil {
@@ -161,7 +162,7 @@ func mirrorLog(ctx context.Context, wg *sync.WaitGroup, path string, fromStart b
// log rotation happens before we can open the new file, then we
// could skip all those messages. This could happen with a very small
// MQ error log size.
mirrorAvailableMessages(f, mf)
mirrorAvailableMessages(f, mf, isQMLog)
err = f.Close()
if err != nil {
log.Errorf("Unable to close mirror file handle: %v", err)
@@ -176,7 +177,7 @@ func mirrorLog(ctx context.Context, wg *sync.WaitGroup, path string, fromStart b
}
fi = newFI
// Don't seek this time, because we know it's a new file
mirrorAvailableMessages(f, mf)
mirrorAvailableMessages(f, mf, isQMLog)
}
select {
case <-ctx.Done():

View File

@@ -1,5 +1,5 @@
/*
© Copyright IBM Corporation 2018
© Copyright IBM Corporation 2018, 2019
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -41,9 +41,10 @@ func TestMirrorLogWithoutRotation(t *testing.T) {
count := 0
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
_, err = mirrorLog(ctx, &wg, tmp.Name(), true, func(msg string) {
_, err = mirrorLog(ctx, &wg, tmp.Name(), true, func(msg string, isQMLog bool) bool {
count++
})
return true
}, false)
if err != nil {
t.Fatal(err)
}
@@ -82,9 +83,10 @@ func TestMirrorLogWithRotation(t *testing.T) {
count := 0
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
_, err = mirrorLog(ctx, &wg, tmp.Name(), true, func(msg string) {
_, err = mirrorLog(ctx, &wg, tmp.Name(), true, func(msg string, isQMLog bool) bool {
count++
})
return true
}, false)
if err != nil {
t.Fatal(err)
}
@@ -135,9 +137,10 @@ func testMirrorLogExistingFile(t *testing.T, newQM bool) int {
count := 0
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
_, err = mirrorLog(ctx, &wg, tmp.Name(), newQM, func(msg string) {
_, err = mirrorLog(ctx, &wg, tmp.Name(), newQM, func(msg string, isQMLog bool) bool {
count++
})
return true
}, false)
if err != nil {
t.Fatal(err)
}
@@ -179,8 +182,9 @@ func TestMirrorLogCancelWhileWaiting(t *testing.T) {
cancel()
wg.Wait()
}()
_, err := mirrorLog(ctx, &wg, "fake.log", true, func(msg string) {
})
_, err := mirrorLog(ctx, &wg, "fake.log", true, func(msg string, isQMLog bool) bool {
return true
}, false)
if err != nil {
t.Error(err)
}

View File

@@ -25,7 +25,9 @@ import (
"strings"
"github.com/ibm-messaging/mq-container/internal/command"
containerruntime "github.com/ibm-messaging/mq-container/internal/containerruntime"
"github.com/ibm-messaging/mq-container/internal/mqscredact"
"github.com/ibm-messaging/mq-container/internal/ready"
)
// createDirStructure creates the default MQ directory structure under /var/mqm
@@ -40,20 +42,48 @@ func createDirStructure() error {
}
// createQueueManager creates a queue manager, if it doesn't already exist.
// It returns true if one was created, or false if one already existed
// It returns true if one was created (or a standby was created), or false if one already existed
func createQueueManager(name string) (bool, error) {
log.Printf("Creating queue manager %v", name)
out, rc, err := command.Run("crtmqm", "-q", "-p", "1414", name)
// Run 'dspmqinf' to check if 'mqs.ini' configuration file exists
// If command succeeds, the queue manager (or standby queue manager) has already been created
_, _, err := command.Run("dspmqinf", name)
if err == nil {
log.Printf("Detected existing queue manager %v", name)
return false, nil
}
mounts, err := containerruntime.GetMounts()
if err != nil {
// 8=Queue manager exists, which is fine
if rc == 8 {
log.Printf("Detected existing queue manager %v", name)
return false, nil
}
log.Printf("crtmqm returned %v", rc)
log.Println(string(out))
log.Printf("Error getting mounts for queue manager")
return false, err
}
// Check if 'qm.ini' configuration file exists for the queue manager
// TODO : handle possible race condition - use a file lock?
dataDir := getQueueManagerDataDir(mounts, name)
_, err = os.Stat(filepath.Join(dataDir, "qm.ini"))
if err != nil {
// If 'qm.ini' is not found - run 'crtmqm' to create a new queue manager
args := getCreateQueueManagerArgs(mounts, name)
out, rc, err := command.Run("crtmqm", args...)
if err != nil {
log.Printf("Error %v creating queue manager: %v", rc, string(out))
return false, err
}
} else {
// If 'qm.ini' is found - run 'addmqinf' to create a standby queue manager with existing configuration
args := getCreateStandbyQueueManagerArgs(name)
out, rc, err := command.Run("addmqinf", args...)
if err != nil {
log.Printf("Error %v creating standby queue manager: %v", rc, string(out))
return false, err
}
log.Println("Created standby queue manager")
return true, nil
}
log.Println("Created queue manager")
return true, nil
}
@@ -70,10 +100,15 @@ func updateCommandLevel() error {
return nil
}
func startQueueManager() error {
func startQueueManager(name string) error {
log.Println("Starting queue manager")
out, rc, err := command.Run("strmqm")
out, rc, err := command.Run("strmqm", "-x", name)
if err != nil {
// 30=standby queue manager started, which is fine
if rc == 30 {
log.Printf("Started standby queue manager")
return nil
}
log.Printf("Error %v starting queue manager: %v", rc, string(out))
return err
}
@@ -136,12 +171,29 @@ func configureQueueManager() error {
func stopQueueManager(name string) error {
log.Println("Stopping queue manager")
out, _, err := command.Run("endmqm", "-w", "-r", name)
isStandby, err := ready.IsRunningAsStandbyQM(name)
if err != nil {
log.Printf("Error stopping queue manager: %v", string(out))
log.Printf("Error getting status for queue manager %v: ", name, err.Error())
return err
}
log.Println("Stopped queue manager")
args := []string{"-w", "-r", name}
if os.Getenv("MQ_MULTI_INSTANCE") == "true" {
if isStandby {
args = []string{"-x", name}
} else {
args = []string{"-s", "-w", "-r", name}
}
}
out, rc, err := command.Run("endmqm", args...)
if err != nil {
log.Printf("Error %v stopping queue manager: %v", rc, string(out))
return err
}
if isStandby {
log.Printf("Stopped standby queue manager")
} else {
log.Println("Stopped queue manager")
}
return nil
}
@@ -152,3 +204,44 @@ func formatMQSCOutput(out string) string {
// add tab characters to make it more readable as part of the log
return strings.Replace(string(out), "\n", "\n\t", -1)
}
func isStandbyQueueManager(name string) (bool, error) {
out, rc, err := command.Run("dspmq", "-n", "-m", name)
if err != nil {
log.Printf("Error %v getting status for queue manager %v: %v", rc, name, string(out))
return false, err
}
if strings.Contains(string(out), "(RUNNING AS STANDBY)") {
return true, nil
}
return false, nil
}
func getQueueManagerDataDir(mounts map[string]string, name string) string {
dataDir := filepath.Join("/var/mqm/qmgrs", name)
if _, ok := mounts["/mnt/mqm-data"]; ok {
dataDir = filepath.Join("/mnt/mqm-data/qmgrs", name)
}
return dataDir
}
func getCreateQueueManagerArgs(mounts map[string]string, name string) []string {
args := []string{"-q", "-p", "1414"}
if _, ok := mounts["/mnt/mqm-log"]; ok {
args = append(args, "-ld", "/mnt/mqm-log/log")
}
if _, ok := mounts["/mnt/mqm-data"]; ok {
args = append(args, "-md", "/mnt/mqm-data/qmgrs")
}
args = append(args, name)
return args
}
func getCreateStandbyQueueManagerArgs(name string) []string {
args := []string{"-s", "QueueManager"}
args = append(args, "-v", fmt.Sprintf("Name=%v", name))
args = append(args, "-v", fmt.Sprintf("Directory=%v", name))
args = append(args, "-v", "Prefix=/var/mqm")
args = append(args, "-v", fmt.Sprintf("DataPath=/mnt/mqm-data/qmgrs/%v", name))
return args
}

View File

@@ -14,6 +14,7 @@
* limitations under the License.
STOP LISTENER('SYSTEM.LISTENER.TCP.1') IGNSTATE(YES)
ALTER LISTENER('SYSTEM.LISTENER.TCP.1') TRPTYPE(TCP) CONTROL(MANUAL)
* Developer queues
DEFINE QLOCAL('DEV.QUEUE.1') REPLACE

View File

@@ -156,6 +156,10 @@ rm -rf /var/mqm
install --directory --mode 0775 --owner mqm --group root /mnt
install --directory --mode 0775 --owner mqm --group root /mnt/mqm
install --directory --mode 0775 --owner mqm --group root /mnt/mqm/data
install --directory --mode 0775 --owner mqm --group root /mnt/mqm-log
install --directory --mode 0775 --owner mqm --group root /mnt/mqm-log/log
install --directory --mode 0775 --owner mqm --group root /mnt/mqm-data
install --directory --mode 0775 --owner mqm --group root /mnt/mqm-data/qmgrs
# Create the directory for MQ configuration files
install --directory --mode 0775 --owner mqm --group root /etc/mqm

View File

@@ -123,3 +123,12 @@ func SupportedFilesystem(fsType string) bool {
return true
}
}
// ValidMultiInstanceFilesystem returns true if the supplied filesystem type is valid for a multi-instance queue manager
func ValidMultiInstanceFilesystem(fsType string) bool {
if !SupportedFilesystem(fsType) {
return false
}
// TODO : check for non-shared filesystems & shared filesystems which are known not to work
return true
}

View File

@@ -13,18 +13,21 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
package logruntime
import (
"fmt"
"os"
"runtime"
"strings"
containerruntime "github.com/ibm-messaging/mq-container/internal/containerruntime"
"github.com/ibm-messaging/mq-container/internal/logger"
"github.com/ibm-messaging/mq-container/internal/user"
)
func logContainerDetails() error {
// LogContainerDetails logs details about the container runtime
func LogContainerDetails(log *logger.Logger) error {
if runtime.GOOS != "linux" {
return fmt.Errorf("Unsupported platform: %v", runtime.GOOS)
}
@@ -82,5 +85,19 @@ func logContainerDetails() error {
}
}
}
// For a multi-instance queue manager - check all required mounts exist & validate filesystem type
if os.Getenv("MQ_MULTI_INSTANCE") == "true" {
log.Println("Multi-instance queue manager: enabled")
reqMounts := []string{"/mnt/mqm", "/mnt/mqm-log", "/mnt/mqm-data"}
for _, mountPoint := range reqMounts {
if fsType, ok := m[mountPoint]; ok {
if !containerruntime.ValidMultiInstanceFilesystem(fsType) {
return fmt.Errorf("%v uses filesystem type '%v' which is invalid for a multi-instance queue manager", mountPoint, fsType)
}
} else {
return fmt.Errorf("Missing required mount '%v' for a multi-instance queue manager", mountPoint)
}
}
}
return nil
}

View File

@@ -114,11 +114,6 @@ func (l *Logger) log(level string, msg string) {
l.mutex.Unlock()
}
// LogDirect logs a message directly to stdout
func (l *Logger) LogDirect(msg string) {
fmt.Println(msg)
}
// Debug logs a line as debug
func (l *Logger) Debug(args ...interface{}) {
if l.debug {

View File

@@ -1,5 +1,5 @@
/*
© Copyright IBM Corporation 2018
© Copyright IBM Corporation 2018, 2019
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -24,6 +24,7 @@ import (
"time"
"github.com/ibm-messaging/mq-container/internal/logger"
"github.com/ibm-messaging/mq-container/internal/ready"
"github.com/prometheus/client_golang/prometheus"
)
@@ -39,6 +40,15 @@ var (
// GatherMetrics gathers metrics for the queue manager
func GatherMetrics(qmName string, log *logger.Logger) {
// If running in standby mode - wait until the queue manager becomes active
for {
active, _ := ready.IsRunningAsActiveQM(qmName)
if active {
break
}
time.Sleep(requestTimeout * time.Second)
}
metricsEnabled = true
err := startMetricsGathering(qmName, log)

View File

@@ -1,5 +1,5 @@
/*
© Copyright IBM Corporation 2018
© Copyright IBM Corporation 2018, 2019
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -71,5 +71,8 @@ func GetQueueManager(name string) (*QueueManager, error) {
// GetErrorLogDirectory returns the directory holding the error logs for the
// specified queue manager
func GetErrorLogDirectory(qm *QueueManager) string {
if qm.DataPath != "" {
return filepath.Join(qm.DataPath, "errors")
}
return filepath.Join(qm.Prefix, "qmgrs", qm.Directory, "errors")
}

View File

@@ -1,5 +1,5 @@
/*
© Copyright IBM Corporation 2018
© Copyright IBM Corporation 2018, 2019
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -20,6 +20,9 @@ package ready
import (
"io/ioutil"
"os"
"strings"
"github.com/ibm-messaging/mq-container/internal/command"
)
const fileName string = "/run/runmqserver/ready"
@@ -62,3 +65,24 @@ func Check() (bool, error) {
}
return exists, nil
}
// IsRunningAsActiveQM returns true if the queue manager is running in active mode
func IsRunningAsActiveQM(name string) (bool, error) {
return isRunningQM(name, "(RUNNING)")
}
// IsRunningAsStandbyQM returns true if the queue manager is running in standby mode
func IsRunningAsStandbyQM(name string) (bool, error) {
return isRunningQM(name, "(RUNNING AS STANDBY)")
}
func isRunningQM(name string, status string) (bool, error) {
out, _, err := command.Run("dspmq", "-n", "-m", name)
if err != nil {
return false, err
}
if strings.Contains(string(out), status) {
return true, nil
}
return false, nil
}

View File

@@ -235,7 +235,7 @@ func withVolume(t *testing.T, metric bool) {
if err != nil {
t.Fatal(err)
}
vol := createVolume(t, cli)
vol := createVolume(t, cli, t.Name())
defer removeVolume(t, cli, vol.Name)
containerConfig := container.Config{
Image: imageName(),
@@ -273,6 +273,62 @@ func withVolume(t *testing.T, metric bool) {
waitForReady(t, cli, ctr2.ID)
}
// TestWithSplitVolumesLogsData starts a queue manager with separate log/data mounts
func TestWithSplitVolumesLogsData(t *testing.T) {
cli, err := client.NewEnvClient()
if err != nil {
t.Fatal(err)
}
qmsharedlogs := createVolume(t, cli, "qmsharedlogs")
defer removeVolume(t, cli, qmsharedlogs.Name)
qmshareddata := createVolume(t, cli, "qmshareddata")
defer removeVolume(t, cli, qmshareddata.Name)
err, qmID, qmVol := startMultiVolumeQueueManager(t, cli, true, qmsharedlogs.Name, qmshareddata.Name, []string{"LICENSE=accept", "MQ_QMGR_NAME=qm1"})
defer removeVolume(t, cli, qmVol)
defer cleanContainer(t, cli, qmID)
waitForReady(t, cli, qmID)
}
// TestWithSplitVolumesLogsOnly starts a queue manager with a separate log mount
func TestWithSplitVolumesLogsOnly(t *testing.T) {
cli, err := client.NewEnvClient()
if err != nil {
t.Fatal(err)
}
qmsharedlogs := createVolume(t, cli, "qmsharedlogs")
defer removeVolume(t, cli, qmsharedlogs.Name)
err, qmID, qmVol := startMultiVolumeQueueManager(t, cli, true, qmsharedlogs.Name, "", []string{"LICENSE=accept", "MQ_QMGR_NAME=qm1"})
defer removeVolume(t, cli, qmVol)
defer cleanContainer(t, cli, qmID)
waitForReady(t, cli, qmID)
}
// TestWithSplitVolumesDataOnly starts a queue manager with a separate data mount
func TestWithSplitVolumesDataOnly(t *testing.T) {
cli, err := client.NewEnvClient()
if err != nil {
t.Fatal(err)
}
qmshareddata := createVolume(t, cli, "qmshareddata")
defer removeVolume(t, cli, qmshareddata.Name)
err, qmID, qmVol := startMultiVolumeQueueManager(t, cli, true, "", qmshareddata.Name, []string{"LICENSE=accept", "MQ_QMGR_NAME=qm1"})
defer removeVolume(t, cli, qmVol)
defer cleanContainer(t, cli, qmID)
waitForReady(t, cli, qmID)
}
// TestNoVolumeWithRestart ensures a queue manager container can be stopped
// and restarted cleanly
func TestNoVolumeWithRestart(t *testing.T) {
@@ -302,7 +358,7 @@ func TestVolumeRequiresRoot(t *testing.T) {
if err != nil {
t.Fatal(err)
}
vol := createVolume(t, cli)
vol := createVolume(t, cli, t.Name())
defer removeVolume(t, cli, vol.Name)
// Set permissions on the volume to only allow root to write it
@@ -438,7 +494,7 @@ func TestVolumeUnmount(t *testing.T) {
if err != nil {
t.Fatal(err)
}
vol := createVolume(t, cli)
vol := createVolume(t, cli, t.Name())
defer removeVolume(t, cli, vol.Name)
containerConfig := container.Config{
Image: imageName(),

View File

@@ -398,6 +398,14 @@ func stopContainer(t *testing.T, cli *client.Client, ID string) {
}
}
func killContainer(t *testing.T, cli *client.Client, ID string, signal string) {
t.Logf("Killing container: %v", ID)
err := cli.ContainerKill(context.Background(), ID, signal)
if err != nil {
t.Fatal(err)
}
}
func getExitCodeFilename(t *testing.T) string {
return t.Name() + "ExitCode"
}
@@ -522,6 +530,9 @@ func waitForReady(t *testing.T, cli *client.Client, ID string) {
if rc == 0 {
t.Log("MQ is ready")
return
} else if rc == 10 {
t.Log("MQ Readiness: Queue Manager Running as Standby")
return
}
case <-ctx.Done():
t.Fatal("Timed out waiting for container to become ready")
@@ -557,17 +568,17 @@ func removeNetwork(t *testing.T, cli *client.Client, ID string) {
}
}
func createVolume(t *testing.T, cli *client.Client) types.Volume {
func createVolume(t *testing.T, cli *client.Client, name string) types.Volume {
v, err := cli.VolumeCreate(context.Background(), volume.VolumesCreateBody{
Driver: "local",
DriverOpts: map[string]string{},
Labels: map[string]string{},
Name: t.Name(),
Name: name,
})
if err != nil {
t.Fatal(err)
}
t.Logf("Created volume %v", t.Name())
t.Logf("Created volume %v", v.Name)
return v
}

View File

@@ -0,0 +1,234 @@
/*
© Copyright IBM Corporation 2019
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"strings"
"testing"
"time"
"github.com/docker/docker/client"
)
var miEnv = []string{
"LICENSE=accept",
"MQ_QMGR_NAME=QM1",
"MQ_MULTI_INSTANCE=true",
}
// TestMultiInstanceStartStop creates 2 containers in a multi instance queue manager configuration
// and starts/stop them checking we always have an active and standby
func TestMultiInstanceStartStop(t *testing.T) {
cli, err := client.NewEnvClient()
if err != nil {
t.Fatal(err)
}
err, qm1aId, qm1bId, volumes := configureMultiInstance(t, cli)
if err != nil {
t.Fatal(err)
}
for _, volume := range volumes {
defer removeVolume(t, cli, volume)
}
defer cleanContainer(t, cli, qm1aId)
defer cleanContainer(t, cli, qm1bId)
waitForReady(t, cli, qm1aId)
waitForReady(t, cli, qm1bId)
err, active, standby := getActiveStandbyQueueManager(t, cli, qm1aId, qm1bId)
if err != nil {
t.Fatal(err)
}
killContainer(t, cli, active, "SIGTERM")
time.Sleep(2 * time.Second)
if status := getQueueManagerStatus(t, cli, standby, "QM1"); strings.Compare(status, "Running") != 0 {
t.Fatalf("Expected QM1 to be running as active queue manager, dspmq returned status of %v", status)
}
startContainer(t, cli, qm1aId)
waitForReady(t, cli, qm1aId)
err, _, _ = getActiveStandbyQueueManager(t, cli, qm1aId, qm1bId)
if err != nil {
t.Fatal(err)
}
}
// TestMultiInstanceContainerStop starts 2 containers in a multi instance queue manager configuration,
// stops the active queue manager, then checks to ensure the backup queue manager becomes active
func TestMultiInstanceContainerStop(t *testing.T) {
cli, err := client.NewEnvClient()
if err != nil {
t.Fatal(err)
}
err, qm1aId, qm1bId, volumes := configureMultiInstance(t, cli)
if err != nil {
t.Fatal(err)
}
for _, volume := range volumes {
defer removeVolume(t, cli, volume)
}
defer cleanContainer(t, cli, qm1aId)
defer cleanContainer(t, cli, qm1bId)
waitForReady(t, cli, qm1aId)
waitForReady(t, cli, qm1bId)
err, active, standby := getActiveStandbyQueueManager(t, cli, qm1aId, qm1bId)
if err != nil {
t.Fatal(err)
}
stopContainer(t, cli, active)
if status := getQueueManagerStatus(t, cli, standby, "QM1"); strings.Compare(status, "Running") != 0 {
t.Fatalf("Expected QM1 to be running as active queue manager, dspmq returned status of %v", status)
}
}
// TestMultiInstanceRace starts 2 containers in separate goroutines in a multi instance queue manager
// configuration, then checks to ensure that both an active and standby queue manager have been started
func TestMultiInstanceRace(t *testing.T) {
t.Skipf("Skipping %v until file lock is implemented", t.Name())
cli, err := client.NewEnvClient()
if err != nil {
t.Fatal(err)
}
qmsharedlogs := createVolume(t, cli, "qmsharedlogs")
defer removeVolume(t, cli, qmsharedlogs.Name)
qmshareddata := createVolume(t, cli, "qmshareddata")
defer removeVolume(t, cli, qmshareddata.Name)
qmsChannel := make(chan QMChan)
go singleMultiInstanceQueueManager(t, cli, qmsharedlogs.Name, qmshareddata.Name, qmsChannel)
go singleMultiInstanceQueueManager(t, cli, qmsharedlogs.Name, qmshareddata.Name, qmsChannel)
qm1a := <-qmsChannel
if qm1a.Error != nil {
t.Fatal(qm1a.Error)
}
qm1b := <-qmsChannel
if qm1b.Error != nil {
t.Fatal(qm1b.Error)
}
qm1aId, qm1aData := qm1a.QMId, qm1a.QMData
qm1bId, qm1bData := qm1b.QMId, qm1b.QMData
defer removeVolume(t, cli, qm1aData)
defer removeVolume(t, cli, qm1bData)
defer cleanContainer(t, cli, qm1aId)
defer cleanContainer(t, cli, qm1bId)
waitForReady(t, cli, qm1aId)
waitForReady(t, cli, qm1bId)
err, _, _ = getActiveStandbyQueueManager(t, cli, qm1aId, qm1bId)
if err != nil {
t.Fatal(err)
}
}
// TestMultiInstanceNoSharedMounts starts 2 multi instance queue managers without providing shared log/data
// mounts, then checks to ensure that the container terminates with the expected message
func TestMultiInstanceNoSharedMounts(t *testing.T) {
t.Parallel()
cli, err := client.NewEnvClient()
if err != nil {
t.Fatal(err)
}
err, qm1aId, qm1aData := startMultiVolumeQueueManager(t, cli, true, "", "", miEnv)
if err != nil {
t.Fatal(err)
}
defer removeVolume(t, cli, qm1aData)
defer cleanContainer(t, cli, qm1aId)
waitForTerminationMessage(t, cli, qm1aId, "Missing required mount '/mnt/mqm-log'", 30*time.Second)
}
// TestMultiInstanceNoSharedLogs starts 2 multi instance queue managers without providing a shared log
// mount, then checks to ensure that the container terminates with the expected message
func TestMultiInstanceNoSharedLogs(t *testing.T) {
cli, err := client.NewEnvClient()
if err != nil {
t.Fatal(err)
}
qmshareddata := createVolume(t, cli, "qmshareddata")
defer removeVolume(t, cli, qmshareddata.Name)
err, qm1aId, qm1aData := startMultiVolumeQueueManager(t, cli, true, "", qmshareddata.Name, miEnv)
if err != nil {
t.Fatal(err)
}
defer removeVolume(t, cli, qm1aData)
defer cleanContainer(t, cli, qm1aId)
waitForTerminationMessage(t, cli, qm1aId, "Missing required mount '/mnt/mqm-log'", 30*time.Second)
}
// TestMultiInstanceNoSharedData starts 2 multi instance queue managers without providing a shared data
// mount, then checks to ensure that the container terminates with the expected message
func TestMultiInstanceNoSharedData(t *testing.T) {
cli, err := client.NewEnvClient()
if err != nil {
t.Fatal(err)
}
qmsharedlogs := createVolume(t, cli, "qmsharedlogs")
defer removeVolume(t, cli, qmsharedlogs.Name)
err, qm1aId, qm1aData := startMultiVolumeQueueManager(t, cli, true, qmsharedlogs.Name, "", miEnv)
if err != nil {
t.Fatal(err)
}
defer removeVolume(t, cli, qm1aData)
defer cleanContainer(t, cli, qm1aId)
waitForTerminationMessage(t, cli, qm1aId, "Missing required mount '/mnt/mqm-data'", 30*time.Second)
}
// TestMultiInstanceNoMounts starts 2 multi instance queue managers without providing a shared data
// mount, then checks to ensure that the container terminates with the expected message
func TestMultiInstanceNoMounts(t *testing.T) {
cli, err := client.NewEnvClient()
if err != nil {
t.Fatal(err)
}
err, qm1aId, qm1aData := startMultiVolumeQueueManager(t, cli, false, "", "", miEnv)
if err != nil {
t.Fatal(err)
}
defer removeVolume(t, cli, qm1aData)
defer cleanContainer(t, cli, qm1aId)
waitForTerminationMessage(t, cli, qm1aId, "Missing required mount '/mnt/mqm'", 30*time.Second)
}

View File

@@ -0,0 +1,178 @@
/*
© Copyright IBM Corporation 2019
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"context"
"fmt"
"regexp"
"strconv"
"strings"
"testing"
"time"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/client"
)
type QMChan struct {
QMId string
QMData string
Error error
}
// configureMultiInstance creates the volumes and containers required for basic testing
// of multi instance queue managers. Returns error, qm1a ID, qm1b ID, slice of volume names
func configureMultiInstance(t *testing.T, cli *client.Client) (error, string, string, []string) {
qmsharedlogs := createVolume(t, cli, "qmsharedlogs")
qmshareddata := createVolume(t, cli, "qmshareddata")
err, qm1aId, qm1aData := startMultiVolumeQueueManager(t, cli, true, qmsharedlogs.Name, qmshareddata.Name, miEnv)
if err != nil {
return err, "", "", []string{}
}
time.Sleep(10 * time.Second)
err, qm1bId, qm1bData := startMultiVolumeQueueManager(t, cli, true, qmsharedlogs.Name, qmshareddata.Name, miEnv)
if err != nil {
return err, "", "", []string{}
}
volumes := []string{qmsharedlogs.Name, qmshareddata.Name, qm1aData, qm1bData}
return nil, qm1aId, qm1bId, volumes
}
func singleMultiInstanceQueueManager(t *testing.T, cli *client.Client, qmsharedlogs string, qmshareddata string, qmsChannel chan QMChan) {
err, qmId, qmData := startMultiVolumeQueueManager(t, cli, true, qmsharedlogs, qmshareddata, miEnv)
if err != nil {
qmsChannel <- QMChan{Error: err}
}
qmsChannel <- QMChan{QMId: qmId, QMData: qmData}
}
func getHostConfig(t *testing.T, mounts int, qmsharedlogs string, qmshareddata string, qmdata string) container.HostConfig {
var hostConfig container.HostConfig
switch mounts {
case 1:
hostConfig = container.HostConfig{
Binds: []string{
coverageBind(t),
qmdata + ":/mnt/mqm",
},
}
case 2:
hostConfig = container.HostConfig{
Binds: []string{
coverageBind(t),
qmdata + ":/mnt/mqm",
qmshareddata + ":/mnt/mqm-data",
},
}
case 3:
hostConfig = container.HostConfig{
Binds: []string{
coverageBind(t),
qmdata + ":/mnt/mqm",
qmsharedlogs + ":/mnt/mqm-log",
},
}
case 4:
hostConfig = container.HostConfig{
Binds: []string{
coverageBind(t),
qmdata + ":/mnt/mqm",
qmsharedlogs + ":/mnt/mqm-log",
qmshareddata + ":/mnt/mqm-data",
},
}
}
return hostConfig
}
func startMultiVolumeQueueManager(t *testing.T, cli *client.Client, dataVol bool, qmsharedlogs string, qmshareddata string, env []string) (error, string, string) {
id := strconv.FormatInt(time.Now().UnixNano(), 10)
qmdata := createVolume(t, cli, id)
containerConfig := container.Config{
Image: imageName(),
Env: env,
}
var hostConfig container.HostConfig
if !dataVol {
hostConfig = container.HostConfig{}
} else if qmsharedlogs == "" && qmshareddata == "" {
hostConfig = getHostConfig(t, 1, "", "", qmdata.Name)
} else if qmsharedlogs == "" {
hostConfig = getHostConfig(t, 2, "", qmshareddata, qmdata.Name)
} else if qmshareddata == "" {
hostConfig = getHostConfig(t, 3, qmsharedlogs, "", qmdata.Name)
} else {
hostConfig = getHostConfig(t, 4, qmsharedlogs, qmshareddata, qmdata.Name)
}
networkingConfig := network.NetworkingConfig{}
qm, err := cli.ContainerCreate(context.Background(), &containerConfig, &hostConfig, &networkingConfig, t.Name()+id)
if err != nil {
return err, "", ""
}
startContainer(t, cli, qm.ID)
return nil, qm.ID, qmdata.Name
}
func getActiveStandbyQueueManager(t *testing.T, cli *client.Client, qm1aId string, qm1bId string) (error, string, string) {
qm1aStatus := getQueueManagerStatus(t, cli, qm1aId, "QM1")
qm1bStatus := getQueueManagerStatus(t, cli, qm1bId, "QM1")
if qm1aStatus == "Running" && qm1bStatus == "Running as standby" {
return nil, qm1aId, qm1bId
} else if qm1bStatus == "Running" && qm1aStatus == "Running as standby" {
return nil, qm1bId, qm1aId
}
err := fmt.Errorf("Expected to be running in multi instance configuration, got status 1) %v status 2) %v", qm1aStatus, qm1bStatus)
return err, "", ""
}
func getQueueManagerStatus(t *testing.T, cli *client.Client, containerID string, queueManagerName string) string {
_, dspmqOut := execContainer(t, cli, containerID, "mqm", []string{"bash", "-c", "dspmq", "-m", queueManagerName})
regex := regexp.MustCompile(`STATUS\(.*\)`)
status := regex.FindString(dspmqOut)
status = strings.TrimSuffix(strings.TrimPrefix(status, "STATUS("), ")")
return status
}
func waitForTerminationMessage(t *testing.T, cli *client.Client, qmId string, terminationString string, timeout time.Duration) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
for {
select {
case <-time.After(1 * time.Second):
m := terminationMessage(t, cli, qmId)
if m != "" {
if !strings.Contains(m, terminationString) {
t.Fatalf("Expected container to fail on missing required mount. Got termination message: %v", m)
}
return
}
case <-ctx.Done():
t.Fatal("Timed out waiting for container to terminate")
}
}
}