diff --git a/examples/spark_core_events.go b/examples/spark_core_events.go index b1b40486..65c5d5e3 100644 --- a/examples/spark_core_events.go +++ b/examples/spark_core_events.go @@ -9,24 +9,21 @@ import ( func main() { gbot := gobot.NewGobot() + sparkCore := spark.NewSparkCoreAdaptor("spark", "DEVICE_ID", "ACCESS_TOKEN") work := func() { - stream, err := sparkCore.EventStream("all", "") - - if err != nil { - fmt.Println(err.Error()) + if stream, err := sparkCore.EventStream("all", ""); err != nil { + fmt.Println(err) } else { - for { - ev := <-stream.Events - fmt.Println(ev.Event(), ev.Data()) - } + gobot.On(stream, func(data interface{}) { + fmt.Println(data.(spark.Event)) + }) } } robot := gobot.NewRobot("spark", []gobot.Connection{sparkCore}, - []gobot.Device{}, work, ) diff --git a/platforms/spark/spark_core_adaptor.go b/platforms/spark/spark_core_adaptor.go index d61ccdeb..2e4e9c12 100644 --- a/platforms/spark/spark_core_adaptor.go +++ b/platforms/spark/spark_core_adaptor.go @@ -28,6 +28,20 @@ type SparkCoreAdaptor struct { APIServer string } +type Event struct { + Name string + Data string + Error error +} + +var eventSource = func(url string) (chan eventsource.Event, chan error, error) { + stream, err := eventsource.Subscribe(url, "") + if err != nil { + return nil, nil, err + } + return stream.Events, stream.Errors, nil +} + // NewSparkCoreAdaptor creates new spark core adaptor with deviceId and accessToken // using api.spark.io server as default func NewSparkCoreAdaptor(name string, deviceID string, accessToken string) *SparkCoreAdaptor { @@ -110,26 +124,13 @@ func (s *SparkCoreAdaptor) DigitalRead(pin string) (val int, err error) { return -1, err } -// EventStream returns an event stream based on the following params: +// EventStream returns a gobot.Event based on the following params: // // * source - "all"/"devices"/"device" (More info at: http://docs.spark.io/api/#reading-data-from-a-core-events) // * name - Event name to subscribe for, leave blank to subscribe to all events. // -// A stream returned contains an Event chan that can be used to process received -// information. Each event has Id(), Data() and Event() methods. -// -// Example: -// -// stream, err := sparkCore.EventStream("all", "") -// if err != nil { -// fmt.Println(err.Error()) -// } else { -// for { -// ev := <-stream.Events -// fmt.Println(ev.Event(), ev.Data()) -// } -// } -func (s *SparkCoreAdaptor) EventStream(source string, name string) (stream *eventsource.Stream, err error) { +// A new event is emitted as a spark.Event struct +func (s *SparkCoreAdaptor) EventStream(source string, name string) (event *gobot.Event, err error) { var url string switch source { @@ -144,7 +145,25 @@ func (s *SparkCoreAdaptor) EventStream(source string, name string) (stream *even return } - stream, err = eventsource.Subscribe(url, "") + events, errors, err := eventSource(url) + if err != nil { + return + } + + event = gobot.NewEvent() + + go func() { + for { + select { + case ev := <-events: + if ev.Event() != "" && ev.Data() != "" { + gobot.Publish(event, Event{Name: ev.Event(), Data: ev.Data()}) + } + case ev := <-errors: + gobot.Publish(event, Event{Error: ev}) + } + } + }() return } diff --git a/platforms/spark/spark_core_adaptor_test.go b/platforms/spark/spark_core_adaptor_test.go index 246132ef..e0b2b5fe 100644 --- a/platforms/spark/spark_core_adaptor_test.go +++ b/platforms/spark/spark_core_adaptor_test.go @@ -1,11 +1,13 @@ package spark import ( + "errors" "net/http" "net/http/httptest" "net/url" "testing" + "github.com/donovanhide/eventsource" "github.com/hybridgroup/gobot" ) @@ -332,3 +334,72 @@ func TestSparkCoreAdaptorPostToSpark(t *testing.T) { } } + +type testEventSource struct { + event string + data string +} + +func (testEventSource) Id() string { return "" } +func (t testEventSource) Event() string { return t.event } +func (t testEventSource) Data() string { return t.data } + +func TestSparkCoreAdaptorEventStream(t *testing.T) { + a := initTestSparkCoreAdaptor() + var url string + eventSource = func(u string) (chan eventsource.Event, chan error, error) { + url = u + return nil, nil, nil + } + a.EventStream("all", "ping") + gobot.Assert(t, url, "https://api.spark.io/v1/events/ping?access_token=token") + a.EventStream("devices", "ping") + gobot.Assert(t, url, "https://api.spark.io/v1/devices/events/ping?access_token=token") + a.EventStream("device", "ping") + gobot.Assert(t, url, "https://api.spark.io/v1/devices/myDevice/events/ping?access_token=token") + _, err := a.EventStream("nothing", "ping") + gobot.Assert(t, err.Error(), "source param should be: all, devices or device") + + eventSource = func(u string) (chan eventsource.Event, chan error, error) { + return nil, nil, errors.New("error connecting sse") + } + + _, err = a.EventStream("devices", "") + gobot.Assert(t, err.Error(), "error connecting sse") + + eventChan := make(chan eventsource.Event, 0) + errorChan := make(chan error, 0) + + eventSource = func(u string) (chan eventsource.Event, chan error, error) { + return eventChan, errorChan, nil + } + + sem := make(chan bool, 0) + stream, err := a.EventStream("devices", "") + gobot.Assert(t, err, nil) + + eventChan <- testEventSource{event: "event", data: "sse event"} + + gobot.Once(stream, func(data interface{}) { + e := data.(Event) + gobot.Assert(t, e.Name, "event") + gobot.Assert(t, e.Data, "sse event") + gobot.Assert(t, e.Error, nil) + sem <- true + }) + + <-sem + + errorChan <- errors.New("stream error") + + gobot.Once(stream, func(data interface{}) { + e := data.(Event) + gobot.Assert(t, e.Name, "") + gobot.Assert(t, e.Data, "") + gobot.Assert(t, e.Error.Error(), "stream error") + sem <- true + }) + + <-sem + +}