nats: support wildcard subjects by changing the handler to take

nats.Message.

Also update the docs and examples.

Signed-off-by: Michael Hope <mlhx@google.com>
This commit is contained in:
Michael Hope 2017-04-21 20:46:45 +02:00
parent fa0b82d84c
commit 412fa0a3b1
6 changed files with 19 additions and 18 deletions

View File

@ -16,10 +16,10 @@ func main() {
natsAdaptor := nats.NewAdaptorWithAuth("localhost:4222", 1234, "user", "pass")
work := func() {
natsAdaptor.On("hello", func(data []byte) {
natsAdaptor.On("hello", func(msg nats.Message) {
fmt.Println("hello")
})
natsAdaptor.On("hola", func(data []byte) {
natsAdaptor.On("hola", func(msg nats.Message) {
fmt.Println("hola")
})
data := []byte("o")

View File

@ -26,11 +26,11 @@ func main() {
helloDriver := nats.NewDriver(natsAdaptor, "hello")
work := func() {
helloDriver.On(nats.Data, func(data interface{}) {
helloDriver.On(nats.Data, func(msg nats.Message) {
fmt.Println("hello")
})
holaDriver.On(nats.Data, func(data interface{}) {
holaDriver.On(nats.Data, func(msg nats.Message) {
fmt.Println("hola")
})

View File

@ -33,10 +33,10 @@ func main() {
natsAdaptor := nats.NewNatsAdaptor("nats", "localhost:4222", 1234)
work := func() {
natsAdaptor.On("hello", func(subject string, data []byte) {
natsAdaptor.On("hello", func(msg nats.Message) {
fmt.Println(subject)
})
natsAdaptor.On("hola", func(subject string, data []byte) {
natsAdaptor.On("hola", func(msg nats.Message) {
fmt.Println(subject)
})
data := []byte("o")
@ -74,10 +74,10 @@ func main() {
natsAdaptor := nats.NewNatsAdaptor("tls://localhost:4222", 1234, natsio.RootCAs("certs/ca.pem"))
work := func() {
natsAdaptor.On("hello", func(subject string, data []byte) {
natsAdaptor.On("hello", func(msg nats.Message) {
fmt.Println(subject)
})
natsAdaptor.On("hola", func(subject string, data []byte) {
natsAdaptor.On("hola", func(msg nats.Message) {
fmt.Println(subject)
})
data := []byte("o")

View File

@ -21,6 +21,9 @@ type Adaptor struct {
connect func() (*nats.Conn, error)
}
// Message is a message received from the server.
type Message *nats.Msg
// NewAdaptor populates a new NATS Adaptor.
func NewAdaptor(host string, clientID int, options ...nats.Option) *Adaptor {
hosts, err := processHostString(host)
@ -112,12 +115,13 @@ func (a *Adaptor) Publish(topic string, message []byte) bool {
// On is an event-handler style subscriber to a particular topic (named event).
// Supply a handler function to use the bytes returned by the server.
func (a *Adaptor) On(event string, f func(subject string, s []byte)) bool {
func (a *Adaptor) On(event string, f func(msg Message)) bool {
if a.client == nil {
return false
}
a.client.Subscribe(event, func(msg *nats.Msg) {
f(msg.Subject, msg.Data)
f(msg)
})
return true
}

View File

@ -96,7 +96,7 @@ func TestNatsAdaptorOnWhenConnected(t *testing.T) {
t.Skip("TODO: implement this test without requiring actual server connection")
a := initTestNatsAdaptor()
a.Connect()
gobottest.Assert(t, a.On("hola", func(subject string, data []byte) {
gobottest.Assert(t, a.On("hola", func(msg Message) {
fmt.Println("hola")
}), true)
}
@ -115,7 +115,7 @@ func TestNatsAdaptorOnWhenConnectedWithAuth(t *testing.T) {
t.Skip("TODO: implement this test without requiring actual server connection")
a := NewAdaptorWithAuth("localhost:4222", 9999, "test", "testwd")
a.Connect()
gobottest.Assert(t, a.On("hola", func(subject string, data []byte) {
gobottest.Assert(t, a.On("hola", func(msg Message) {
fmt.Println("hola")
}), true)
}
@ -138,7 +138,7 @@ func TestNatsAdaptorCannotPublishUnlessConnected(t *testing.T) {
func TestNatsAdaptorCannotOnUnlessConnected(t *testing.T) {
a := NewAdaptor("localhost:9999", 9999)
gobottest.Assert(t, a.On("hola", func(subject string, data []byte) {
gobottest.Assert(t, a.On("hola", func(msg Message) {
fmt.Println("hola")
}), false)
}

View File

@ -71,11 +71,8 @@ func (m *Driver) Publish(data interface{}) bool {
// On subscribes to data updates for the current device topic,
// and then calls the message handler function when data is received
func (m *Driver) On(n string, f func(subject string, d interface{})) error {
func (m *Driver) On(n string, f func(msg Message)) error {
// TODO: also be able to subscribe to Error updates
f1 := func(subject string, d []byte) {
f(subject, d)
}
m.adaptor().On(m.topic, f1)
m.adaptor().On(m.topic, f)
return nil
}