From 6f3672df4f06d1818598f7fbcf29c15a7aa239b8 Mon Sep 17 00:00:00 2001 From: Riccardo Biraghi Date: Mon, 9 Apr 2018 15:50:12 +0100 Subject: [PATCH 1/3] Apply mirroring delay to last iteration --- cmd/runmqserver/mirror.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/runmqserver/mirror.go b/cmd/runmqserver/mirror.go index 170e608..d58ad71 100644 --- a/cmd/runmqserver/mirror.go +++ b/cmd/runmqserver/mirror.go @@ -171,8 +171,8 @@ func mirrorLog(ctx context.Context, wg *sync.WaitGroup, path string, fromStart b // Don't seek this time, because we know it's a new file mirrorAvailableMessages(f, mf) } - select { - case <-ctx.Done(): + + if <-ctx.Done() { log.Debugf("Context cancelled for mirroring %v", path) if closing { log.Debugf("Shutting down mirror for %v", path) @@ -180,9 +180,9 @@ func mirrorLog(ctx context.Context, wg *sync.WaitGroup, path string, fromStart b } // Set a flag, to allow one more time through the loop closing = true - default: - time.Sleep(500 * time.Millisecond) } + + time.Sleep(500 * time.Millisecond) } }() return errorChannel, nil From b41a58645b5360a52050ae160104a1eb5d05daeb Mon Sep 17 00:00:00 2001 From: Riccardo Biraghi Date: Mon, 9 Apr 2018 15:50:12 +0100 Subject: [PATCH 2/3] Apply mirroring delay to last iteration --- cmd/runmqserver/mirror.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cmd/runmqserver/mirror.go b/cmd/runmqserver/mirror.go index 170e608..75cda44 100644 --- a/cmd/runmqserver/mirror.go +++ b/cmd/runmqserver/mirror.go @@ -171,6 +171,7 @@ func mirrorLog(ctx context.Context, wg *sync.WaitGroup, path string, fromStart b // Don't seek this time, because we know it's a new file mirrorAvailableMessages(f, mf) } + select { case <-ctx.Done(): log.Debugf("Context cancelled for mirroring %v", path) @@ -180,9 +181,9 @@ func mirrorLog(ctx context.Context, wg *sync.WaitGroup, path string, fromStart b } // Set a flag, to allow one more time through the loop closing = true - default: - time.Sleep(500 * time.Millisecond) } + + time.Sleep(500 * time.Millisecond) } }() return errorChannel, nil From 7009f3a39261a09edc16b96075ce62fe7a343b82 Mon Sep 17 00:00:00 2001 From: Riccardo Biraghi Date: Tue, 10 Apr 2018 10:01:47 +0100 Subject: [PATCH 3/3] Wait for log file after rotation --- cmd/runmqserver/mirror.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cmd/runmqserver/mirror.go b/cmd/runmqserver/mirror.go index 75cda44..0a78cba 100644 --- a/cmd/runmqserver/mirror.go +++ b/cmd/runmqserver/mirror.go @@ -33,7 +33,7 @@ func waitForFile(ctx context.Context, path string) (os.FileInfo, error) { select { // Check to see if cancellation has been requested case <-ctx.Done(): - return nil, nil + return os.Stat(path) default: fi, err = os.Stat(path) if err != nil { @@ -145,7 +145,8 @@ func mirrorLog(ctx context.Context, wg *sync.WaitGroup, path string, fromStart b for { // If there's already data there, mirror it now. mirrorAvailableMessages(f, mf) - newFI, err := os.Stat(path) + // Wait for the new log file (after rotation) + newFI, err := waitForFile(ctx, path) if err != nil { log.Error(err) errorChannel <- err @@ -171,7 +172,6 @@ func mirrorLog(ctx context.Context, wg *sync.WaitGroup, path string, fromStart b // Don't seek this time, because we know it's a new file mirrorAvailableMessages(f, mf) } - select { case <-ctx.Done(): log.Debugf("Context cancelled for mirroring %v", path) @@ -181,9 +181,9 @@ func mirrorLog(ctx context.Context, wg *sync.WaitGroup, path string, fromStart b } // Set a flag, to allow one more time through the loop closing = true + default: + time.Sleep(500 * time.Millisecond) } - - time.Sleep(500 * time.Millisecond) } }() return errorChannel, nil