Add mirrorFunc to handle mirrored messages
This commit is contained in:
@@ -18,9 +18,7 @@ package main
|
|||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
@@ -47,19 +45,23 @@ func waitForFile(path string) (os.FileInfo, error) {
|
|||||||
return fi, nil
|
return fi, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type mirrorFunc func(msg string)
|
||||||
|
|
||||||
// mirrorAvailableMessages prints lines from the file, until no more are available
|
// mirrorAvailableMessages prints lines from the file, until no more are available
|
||||||
func mirrorAvailableMessages(f *os.File, w io.Writer) {
|
func mirrorAvailableMessages(f *os.File, mf mirrorFunc) {
|
||||||
scanner := bufio.NewScanner(f)
|
scanner := bufio.NewScanner(f)
|
||||||
count := 0
|
count := 0
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
t := scanner.Text()
|
t := scanner.Text()
|
||||||
if strings.HasPrefix(t, "{") {
|
mf(t)
|
||||||
// Assume JSON, so just print it
|
// if strings.HasPrefix(t, "{") {
|
||||||
fmt.Fprintln(w, t)
|
// // Assume JSON, so just print it
|
||||||
} else if strings.HasPrefix(t, "AMQ") {
|
// fmt.Fprintln(w, t)
|
||||||
// Only print MQ messages with AMQnnnn codes
|
// } else if strings.HasPrefix(t, "AMQ") {
|
||||||
fmt.Fprintln(w, t)
|
// // Only print MQ messages with AMQnnnn codes
|
||||||
}
|
// log.Println(t)
|
||||||
|
// //fmt.Fprintln(w, t)
|
||||||
|
// }
|
||||||
count++
|
count++
|
||||||
}
|
}
|
||||||
log.Debugf("Mirrored %v log entries", count)
|
log.Debugf("Mirrored %v log entries", count)
|
||||||
@@ -73,7 +75,7 @@ func mirrorAvailableMessages(f *os.File, w io.Writer) {
|
|||||||
// mirrorLog tails the specified file, and logs each line to stdout.
|
// mirrorLog tails the specified file, and logs each line to stdout.
|
||||||
// This is useful for usability, as the container console log can show
|
// This is useful for usability, as the container console log can show
|
||||||
// messages from the MQ error logs.
|
// messages from the MQ error logs.
|
||||||
func mirrorLog(path string, w io.Writer) (chan bool, error) {
|
func mirrorLog(path string, mf mirrorFunc) (chan bool, error) {
|
||||||
lifecycle := make(chan bool)
|
lifecycle := make(chan bool)
|
||||||
var offset int64 = -1
|
var offset int64 = -1
|
||||||
var f *os.File
|
var f *os.File
|
||||||
@@ -134,7 +136,7 @@ func mirrorLog(path string, w io.Writer) (chan bool, error) {
|
|||||||
for {
|
for {
|
||||||
log.Debugln("Start of loop")
|
log.Debugln("Start of loop")
|
||||||
// If there's already data there, mirror it now.
|
// If there's already data there, mirror it now.
|
||||||
mirrorAvailableMessages(f, w)
|
mirrorAvailableMessages(f, mf)
|
||||||
log.Debugf("Stat %v", path)
|
log.Debugf("Stat %v", path)
|
||||||
newFI, err := os.Stat(path)
|
newFI, err := os.Stat(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -148,7 +150,7 @@ func mirrorLog(path string, w io.Writer) (chan bool, error) {
|
|||||||
// log rotation happens before we can open the new file, then we
|
// log rotation happens before we can open the new file, then we
|
||||||
// could skip all those messages. This could happen with a very small
|
// could skip all those messages. This could happen with a very small
|
||||||
// MQ error log size.
|
// MQ error log size.
|
||||||
mirrorAvailableMessages(f, w)
|
mirrorAvailableMessages(f, mf)
|
||||||
f.Close()
|
f.Close()
|
||||||
// Re-open file
|
// Re-open file
|
||||||
log.Debugln("Re-opening error log file")
|
log.Debugln("Re-opening error log file")
|
||||||
@@ -160,7 +162,7 @@ func mirrorLog(path string, w io.Writer) (chan bool, error) {
|
|||||||
}
|
}
|
||||||
fi = newFI
|
fi = newFI
|
||||||
// Don't seek this time, because we know it's a new file
|
// Don't seek this time, because we know it's a new file
|
||||||
mirrorAvailableMessages(f, w)
|
mirrorAvailableMessages(f, mf)
|
||||||
}
|
}
|
||||||
log.Debugln("Check for lifecycle event")
|
log.Debugln("Check for lifecycle event")
|
||||||
select {
|
select {
|
||||||
|
|||||||
@@ -16,8 +16,6 @@ limitations under the License.
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
|
||||||
"bytes"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
@@ -39,9 +37,10 @@ func TestMirrorLogWithoutRotation(t *testing.T) {
|
|||||||
}
|
}
|
||||||
t.Log(tmp.Name())
|
t.Log(tmp.Name())
|
||||||
defer os.Remove(tmp.Name())
|
defer os.Remove(tmp.Name())
|
||||||
b := make([]byte, 256)
|
count := 0
|
||||||
buf := bytes.NewBuffer(b)
|
lifecycle, err := mirrorLog(tmp.Name(), func(msg string) {
|
||||||
lifecycle, err := mirrorLog(tmp.Name(), buf)
|
count++
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@@ -57,18 +56,6 @@ func TestMirrorLogWithoutRotation(t *testing.T) {
|
|||||||
lifecycle <- true
|
lifecycle <- true
|
||||||
<-lifecycle
|
<-lifecycle
|
||||||
|
|
||||||
// Read the buffer back and count the lines
|
|
||||||
r := strings.NewReader(buf.String())
|
|
||||||
scanner := bufio.NewScanner(r)
|
|
||||||
count := 0
|
|
||||||
for scanner.Scan() {
|
|
||||||
count++
|
|
||||||
t.Logf("Received entry: %v", scanner.Text())
|
|
||||||
}
|
|
||||||
err = scanner.Err()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if count != 3 {
|
if count != 3 {
|
||||||
t.Fatalf("Expected 3 log entries; got %v", count)
|
t.Fatalf("Expected 3 log entries; got %v", count)
|
||||||
}
|
}
|
||||||
@@ -90,9 +77,10 @@ func TestMirrorLogWithRotation(t *testing.T) {
|
|||||||
t.Log("Removing file")
|
t.Log("Removing file")
|
||||||
os.Remove(tmp.Name())
|
os.Remove(tmp.Name())
|
||||||
}()
|
}()
|
||||||
b := make([]byte, 512)
|
count := 0
|
||||||
buf := bytes.NewBuffer(b)
|
lifecycle, err := mirrorLog(tmp.Name(), func(msg string) {
|
||||||
lifecycle, err := mirrorLog(tmp.Name(), buf)
|
count++
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@@ -125,18 +113,6 @@ func TestMirrorLogWithRotation(t *testing.T) {
|
|||||||
// Wait until it's finished
|
// Wait until it's finished
|
||||||
<-lifecycle
|
<-lifecycle
|
||||||
|
|
||||||
// Read the buffer back and count the lines
|
|
||||||
r := strings.NewReader(buf.String())
|
|
||||||
scanner := bufio.NewScanner(r)
|
|
||||||
count := 0
|
|
||||||
for scanner.Scan() {
|
|
||||||
count++
|
|
||||||
t.Logf("Received entry: %v", scanner.Text())
|
|
||||||
}
|
|
||||||
err = scanner.Err()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if count != 5 {
|
if count != 5 {
|
||||||
t.Fatalf("Expected 5 log entries; got %v", count)
|
t.Fatalf("Expected 5 log entries; got %v", count)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
@@ -220,10 +221,17 @@ func doMain() error {
|
|||||||
f := "/var/mqm/qmgrs/" + name + "/errors/AMQERR01"
|
f := "/var/mqm/qmgrs/" + name + "/errors/AMQERR01"
|
||||||
if jsonLogs() {
|
if jsonLogs() {
|
||||||
f = f + ".json"
|
f = f + ".json"
|
||||||
|
mirrorLifecycle, err = mirrorLog(f, func(msg string) {
|
||||||
|
// Print the message straight to stdout
|
||||||
|
fmt.Println(msg)
|
||||||
|
})
|
||||||
} else {
|
} else {
|
||||||
f = f + ".LOG"
|
f = f + ".LOG"
|
||||||
|
mirrorLifecycle, err = mirrorLog(f, func(msg string) {
|
||||||
|
// Log the message, so we get a timestamp etc.
|
||||||
|
log.Println(msg)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
mirrorLifecycle, err = mirrorLog(f, os.Stdout)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user