Refactor and add tests
This commit is contained in:
parent
541761b98e
commit
78436285ec
|
@ -9,24 +9,21 @@ import (
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
gbot := gobot.NewGobot()
|
gbot := gobot.NewGobot()
|
||||||
|
|
||||||
sparkCore := spark.NewSparkCoreAdaptor("spark", "DEVICE_ID", "ACCESS_TOKEN")
|
sparkCore := spark.NewSparkCoreAdaptor("spark", "DEVICE_ID", "ACCESS_TOKEN")
|
||||||
|
|
||||||
work := func() {
|
work := func() {
|
||||||
stream, err := sparkCore.EventStream("all", "")
|
if stream, err := sparkCore.EventStream("all", ""); err != nil {
|
||||||
|
fmt.Println(err)
|
||||||
if err != nil {
|
|
||||||
fmt.Println(err.Error())
|
|
||||||
} else {
|
} else {
|
||||||
for {
|
gobot.On(stream, func(data interface{}) {
|
||||||
ev := <-stream.Events
|
fmt.Println(data.(spark.Event))
|
||||||
fmt.Println(ev.Event(), ev.Data())
|
})
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
robot := gobot.NewRobot("spark",
|
robot := gobot.NewRobot("spark",
|
||||||
[]gobot.Connection{sparkCore},
|
[]gobot.Connection{sparkCore},
|
||||||
[]gobot.Device{},
|
|
||||||
work,
|
work,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,20 @@ type SparkCoreAdaptor struct {
|
||||||
APIServer string
|
APIServer string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Event struct {
|
||||||
|
Name string
|
||||||
|
Data string
|
||||||
|
Error error
|
||||||
|
}
|
||||||
|
|
||||||
|
var eventSource = func(url string) (chan eventsource.Event, chan error, error) {
|
||||||
|
stream, err := eventsource.Subscribe(url, "")
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
return stream.Events, stream.Errors, nil
|
||||||
|
}
|
||||||
|
|
||||||
// NewSparkCoreAdaptor creates new spark core adaptor with deviceId and accessToken
|
// NewSparkCoreAdaptor creates new spark core adaptor with deviceId and accessToken
|
||||||
// using api.spark.io server as default
|
// using api.spark.io server as default
|
||||||
func NewSparkCoreAdaptor(name string, deviceID string, accessToken string) *SparkCoreAdaptor {
|
func NewSparkCoreAdaptor(name string, deviceID string, accessToken string) *SparkCoreAdaptor {
|
||||||
|
@ -110,26 +124,13 @@ func (s *SparkCoreAdaptor) DigitalRead(pin string) (val int, err error) {
|
||||||
return -1, err
|
return -1, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// EventStream returns an event stream based on the following params:
|
// EventStream returns a gobot.Event based on the following params:
|
||||||
//
|
//
|
||||||
// * source - "all"/"devices"/"device" (More info at: http://docs.spark.io/api/#reading-data-from-a-core-events)
|
// * source - "all"/"devices"/"device" (More info at: http://docs.spark.io/api/#reading-data-from-a-core-events)
|
||||||
// * name - Event name to subscribe for, leave blank to subscribe to all events.
|
// * name - Event name to subscribe for, leave blank to subscribe to all events.
|
||||||
//
|
//
|
||||||
// A stream returned contains an Event chan that can be used to process received
|
// A new event is emitted as a spark.Event struct
|
||||||
// information. Each event has Id(), Data() and Event() methods.
|
func (s *SparkCoreAdaptor) EventStream(source string, name string) (event *gobot.Event, err error) {
|
||||||
//
|
|
||||||
// Example:
|
|
||||||
//
|
|
||||||
// stream, err := sparkCore.EventStream("all", "")
|
|
||||||
// if err != nil {
|
|
||||||
// fmt.Println(err.Error())
|
|
||||||
// } else {
|
|
||||||
// for {
|
|
||||||
// ev := <-stream.Events
|
|
||||||
// fmt.Println(ev.Event(), ev.Data())
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
func (s *SparkCoreAdaptor) EventStream(source string, name string) (stream *eventsource.Stream, err error) {
|
|
||||||
var url string
|
var url string
|
||||||
|
|
||||||
switch source {
|
switch source {
|
||||||
|
@ -144,7 +145,25 @@ func (s *SparkCoreAdaptor) EventStream(source string, name string) (stream *even
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
stream, err = eventsource.Subscribe(url, "")
|
events, errors, err := eventSource(url)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
event = gobot.NewEvent()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case ev := <-events:
|
||||||
|
if ev.Event() != "" && ev.Data() != "" {
|
||||||
|
gobot.Publish(event, Event{Name: ev.Event(), Data: ev.Data()})
|
||||||
|
}
|
||||||
|
case ev := <-errors:
|
||||||
|
gobot.Publish(event, Event{Error: ev})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,11 +1,13 @@
|
||||||
package spark
|
package spark
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"net/url"
|
"net/url"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/donovanhide/eventsource"
|
||||||
"github.com/hybridgroup/gobot"
|
"github.com/hybridgroup/gobot"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -332,3 +334,72 @@ func TestSparkCoreAdaptorPostToSpark(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type testEventSource struct {
|
||||||
|
event string
|
||||||
|
data string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (testEventSource) Id() string { return "" }
|
||||||
|
func (t testEventSource) Event() string { return t.event }
|
||||||
|
func (t testEventSource) Data() string { return t.data }
|
||||||
|
|
||||||
|
func TestSparkCoreAdaptorEventStream(t *testing.T) {
|
||||||
|
a := initTestSparkCoreAdaptor()
|
||||||
|
var url string
|
||||||
|
eventSource = func(u string) (chan eventsource.Event, chan error, error) {
|
||||||
|
url = u
|
||||||
|
return nil, nil, nil
|
||||||
|
}
|
||||||
|
a.EventStream("all", "ping")
|
||||||
|
gobot.Assert(t, url, "https://api.spark.io/v1/events/ping?access_token=token")
|
||||||
|
a.EventStream("devices", "ping")
|
||||||
|
gobot.Assert(t, url, "https://api.spark.io/v1/devices/events/ping?access_token=token")
|
||||||
|
a.EventStream("device", "ping")
|
||||||
|
gobot.Assert(t, url, "https://api.spark.io/v1/devices/myDevice/events/ping?access_token=token")
|
||||||
|
_, err := a.EventStream("nothing", "ping")
|
||||||
|
gobot.Assert(t, err.Error(), "source param should be: all, devices or device")
|
||||||
|
|
||||||
|
eventSource = func(u string) (chan eventsource.Event, chan error, error) {
|
||||||
|
return nil, nil, errors.New("error connecting sse")
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = a.EventStream("devices", "")
|
||||||
|
gobot.Assert(t, err.Error(), "error connecting sse")
|
||||||
|
|
||||||
|
eventChan := make(chan eventsource.Event, 0)
|
||||||
|
errorChan := make(chan error, 0)
|
||||||
|
|
||||||
|
eventSource = func(u string) (chan eventsource.Event, chan error, error) {
|
||||||
|
return eventChan, errorChan, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
sem := make(chan bool, 0)
|
||||||
|
stream, err := a.EventStream("devices", "")
|
||||||
|
gobot.Assert(t, err, nil)
|
||||||
|
|
||||||
|
eventChan <- testEventSource{event: "event", data: "sse event"}
|
||||||
|
|
||||||
|
gobot.Once(stream, func(data interface{}) {
|
||||||
|
e := data.(Event)
|
||||||
|
gobot.Assert(t, e.Name, "event")
|
||||||
|
gobot.Assert(t, e.Data, "sse event")
|
||||||
|
gobot.Assert(t, e.Error, nil)
|
||||||
|
sem <- true
|
||||||
|
})
|
||||||
|
|
||||||
|
<-sem
|
||||||
|
|
||||||
|
errorChan <- errors.New("stream error")
|
||||||
|
|
||||||
|
gobot.Once(stream, func(data interface{}) {
|
||||||
|
e := data.(Event)
|
||||||
|
gobot.Assert(t, e.Name, "")
|
||||||
|
gobot.Assert(t, e.Data, "")
|
||||||
|
gobot.Assert(t, e.Error.Error(), "stream error")
|
||||||
|
sem <- true
|
||||||
|
})
|
||||||
|
|
||||||
|
<-sem
|
||||||
|
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue