Use context and waitgroup for mirroring

This commit is contained in:
Arthur Barr
2018-02-20 14:35:58 +00:00
parent b23ec084fa
commit ace6e6364c
4 changed files with 208 additions and 128 deletions

View File

@@ -16,12 +16,15 @@ limitations under the License.
package main
import (
"context"
"fmt"
"io/ioutil"
"os"
"strconv"
"strings"
"sync"
"testing"
"time"
log "github.com/sirupsen/logrus"
)
@@ -38,7 +41,9 @@ func TestMirrorLogWithoutRotation(t *testing.T) {
t.Log(tmp.Name())
defer os.Remove(tmp.Name())
count := 0
lifecycle, err := mirrorLog(tmp.Name(), true, func(msg string) {
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
_, err = mirrorLog(ctx, &wg, tmp.Name(), true, func(msg string) {
count++
})
if err != nil {
@@ -53,9 +58,8 @@ func TestMirrorLogWithoutRotation(t *testing.T) {
fmt.Fprintln(f, "{\"message\"=\"B\"}")
fmt.Fprintln(f, "{\"message\"=\"C\"}")
f.Close()
lifecycle <- true
<-lifecycle
cancel()
wg.Wait()
if count != 3 {
t.Fatalf("Expected 3 log entries; got %v", count)
}
@@ -78,7 +82,9 @@ func TestMirrorLogWithRotation(t *testing.T) {
os.Remove(tmp.Name())
}()
count := 0
lifecycle, err := mirrorLog(tmp.Name(), true, func(msg string) {
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
_, err = mirrorLog(ctx, &wg, tmp.Name(), true, func(msg string) {
count++
})
if err != nil {
@@ -109,9 +115,8 @@ func TestMirrorLogWithRotation(t *testing.T) {
f.Close()
// Shut the mirroring down
lifecycle <- true
// Wait until it's finished
<-lifecycle
cancel()
wg.Wait()
if count != 5 {
t.Fatalf("Expected 5 log entries; got %v", count)
@@ -130,7 +135,9 @@ func testMirrorLogExistingFile(t *testing.T, newQM bool) int {
ioutil.WriteFile(tmp.Name(), []byte("{\"message\"=\"A\"}\n"), 0600)
defer os.Remove(tmp.Name())
count := 0
lifecycle, err := mirrorLog(tmp.Name(), newQM, func(msg string) {
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
_, err = mirrorLog(ctx, &wg, tmp.Name(), newQM, func(msg string) {
count++
})
if err != nil {
@@ -144,8 +151,8 @@ func testMirrorLogExistingFile(t *testing.T, newQM bool) int {
fmt.Fprintln(f, "{\"message\"=\"B\"}")
fmt.Fprintln(f, "{\"message\"=\"C\"}")
f.Close()
lifecycle <- true
<-lifecycle
cancel()
wg.Wait()
return count
}
@@ -167,6 +174,26 @@ func TestMirrorLogExistingFileButNewQueueManager(t *testing.T) {
}
}
func init() {
log.SetLevel(log.DebugLevel)
func TestMirrorLogCancelWhileWaiting(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
defer func() {
cancel()
wg.Wait()
}()
_, err := mirrorLog(ctx, &wg, "fake.log", true, func(msg string) {
})
if err != nil {
t.Error(err)
}
time.Sleep(time.Second * 3)
cancel()
wg.Wait()
// No need to assert anything. If it didn't work, the code would have hung (TODO: not ideal)
}
func init() {
fmt.Println("Setting debug level")
log.SetLevel(log.DebugLevel)
log.SetFormatter(new(simpleTextFormatter))
}