In this article, I'll demonstrate:
- How to properly write tests that needs external services
- How to write a parallel benchmark
- NSQ performances
Properly test services
Self contained
The idea is to write self-contained tests that do not leak one into an other. It is particularly useful when testing databases, http server or in our case, nsqd.
For http tests, we use httptest.NewServer()
which can be closed in a defer. That way, you have a brand new server each time. For databases, the best would be to mock, but if you can't, then create a new connection object, empty or create a new database, then defer destruction or empty.
In our case, for nsqd, we will simply spawn a new daemon, disable all logging, disable the dump of the queue on disc and destroy that daemon when the test is finished.
We could have done something much more simple and spawn an nsqd first then run our tests against that instance, but then the tests would not be consistent as what has been done by one test can affect the result of an other test.
Other note
A pattern which I like is to create helpers in test that instead of returning an error, take the testing object.
Go provides the testing.TB
interface which allow to receive either *testing.B
or *testing.T
. This is nice when writing helper for both tests and benchmarks.
Instead of having:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func newObject(addr string) (*object, error) {
if false {
return nil, err
}
return &object{}, nil
}
func TestObject(t *testing.T) {
obj, err := newObject("localhost")
if err != nil {
t.Fatal(err)
}
_ = obj
}
|
You would have:
1
2
3
4
5
6
7
8
9
10
11
|
func newObject(t testing.TB, addr string) *object {
if false {
t.Fatal(err)
}
return &object{}
}
func TestObject(t *testing.T) {
obj := newObject(t, "localhost")
_ = obj
}
|
Parallel Benchmark
It is very easy in Go to write parallel benchmarks:
From the Go documentation
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
package main
import (
"bytes"
"testing"
"text/template"
)
func BenchmarkTemplate(b *testing.B) {
// Parallel benchmark for text/template.Template.Execute on a single object.
templ := template.Must(template.New("test").Parse("Hello, {{.}}!"))
// RunParallel will create GOMAXPROCS goroutines
// and distribute work among them.
b.RunParallel(func(pb *testing.PB) {
// Each goroutine has its own bytes.Buffer.
var buf bytes.Buffer
for pb.Next() {
// The loop body is executed b.N times total across all goroutines.
buf.Reset()
templ.Execute(&buf, "World")
}
})
}
|
So, instead of the “regular” benchmark where you do something like
1
2
3
4
5
|
func BenchmarkSomething(b *testing.B) {
for i := 0; i < b.N; i++ {
doSomething()
}
}
|
you simply do:
1
2
3
4
5
6
7
|
func BenchmarkSomething(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
doSomething()
}
}
}
|
You will note that we do note use b.N
anymore.
Good to know as well: b.SetParallel(int)
will specify the amount of goroutines allowed.
Write benchmark for nsqd
Nothing is better than an example: Playground
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
package main
import (
"io/ioutil"
"log"
"os"
"runtime"
"testing"
)
import (
nsq "github.com/bitly/go-nsq"
"github.com/bitly/nsq/nsqd"
)
// nopLogger simply discard any logs it receives
type nopLogger struct{}
func (*nopLogger) Output(int, string) error {
return nil
}
// newDaemon creates a quiet, stripped down daemon and start it
func newDaemon() *nsqd.NSQD {
opts := nsqd.NewNSQDOptions()
// Disable http/https
opts.HTTPAddress = ""
opts.HTTPSAddress = ""
// Disable logging
opts.Logger = &nopLogger{}
// Do not create on disc queue
opts.DataPath = "/dev/null"
nsqd := nsqd.NewNSQD(opts)
nsqd.Main()
return nsqd
}
// Wrap nsq.Consumer so we have control over Stop behavior
type consumer struct{ *nsq.Consumer }
func (c *consumer) Stop() {
c.Consumer.Stop()
<-c.Consumer.StopChan
}
// newConsumer creates a quiet connected Consumer
func newConsumer(t testing.TB, tcpAddr, topicName, channelName string, hdlr nsq.HandlerFunc) *consumer {
// Create the configuration object and set the maxInFlight
cfg := nsq.NewConfig()
cfg.MaxInFlight = 8
// Create the consumer with the given topic and chanel names
r, err := nsq.NewConsumer(topicName, channelName, cfg)
if err != nil {
t.Fatal(err)
}
// Disable logging
r.SetLogger(&nopLogger{}, 0)
// Set the handler
r.AddHandler(hdlr)
// Connect to the NSQ daemon
if err := r.ConnectToNSQD(tcpAddr); err != nil {
t.Fatal(err)
}
return &consumer{Consumer: r}
}
// newProducer creates a quiet connected Producer
func newProducer(t testing.TB, tcpAddr string) *nsq.Producer {
// Create the configuration object and set the maxInFlight
cfg := nsq.NewConfig()
cfg.MaxInFlight = 8
// Create the producer
p, err := nsq.NewProducer(tcpAddr, cfg)
if err != nil {
t.Fatal(err)
}
// Disable logging
p.SetLogger(&nopLogger{}, 0)
return p
}
func BenchmarkPubSub(b *testing.B) {
// Disable general logging.
log.SetOutput(ioutil.Discard)
defer func() { log.SetOutput(os.Stderr) }()
// Start NSQD and make sure to shut it down when leaving.
nsqd := newDaemon()
defer nsqd.Exit()
// Create the consumer and send every message to the chan.
msgs := make(chan []byte)
hdlr := func(msg *nsq.Message) error { msgs <- msg.Body; return nil }
consumer := newConsumer(b, "localhost:4150", "mytopic", "mychan1", hdlr)
defer consumer.Stop()
// Create producer.
producer := newProducer(b, "localhost:4150")
defer producer.Stop()
// Tell Go to use as many cores as available.
b.SetParallelism(runtime.NumCPU())
// reset Go's timer.
b.ResetTimer()
// Run in Parallel
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Send "hello world" to NSQ and wait for it to arrive.
if err := producer.Publish("mytopic", []byte("hello world")); err != nil {
b.Fatal(err)
}
if msg, ok := <-msgs; !ok {
b.Fatal("Message chan closed.")
} else if expect, got := "hello world", string(msg); expect != got {
b.Fatalf("Unexpected message. Expected: %s, Got: %s", expect, got)
}
}
})
}
|
Results
On my MacbookPro i7 8 cores 2Ghz:
1
2
3
4
|
$> GOMAXPROCS=8 go test -v -bench . .
PASS
BenchmarkPubSub-8 50000 37167 ns/op
ok github.com/bitly/nsq/apps/test/bench 3.425s
|
So this is roughly 27K messages per seconds or 0.0004ms per send/receive operation.
Now let's see the impact of GOMAXPROCS:
1
2
3
4
5
6
7
8
9
|
$> for i in {1..8}; do GOMAXPROCS=$i go test -v -bench . .; done
BenchmarkPubSub 5 271720061 ns/op
BenchmarkPubSub-2 10 135813612 ns/op
BenchmarkPubSub-3 20 92693480 ns/op
BenchmarkPubSub-4 20 67964795 ns/op
BenchmarkPubSub-5 50 52089680 ns/op
BenchmarkPubSub-6 50 47146900 ns/op
BenchmarkPubSub-7 50 32083808 ns/op
BenchmarkPubSub-8 50000 37167 ns/op
|
Conclusion
It is the first queueing service I am trying so I don't really know if it is good or bad, it would be interesting to compare with 0mq, redis, rabbitmq to see the difference.
In any case, it is a nice example for self-contained test leveraging the parallel capabilities of Go.