426 lines
9.8 KiB
Go
426 lines
9.8 KiB
Go
package i2c
|
|
|
|
import (
|
|
"errors"
|
|
"strings"
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
"gobot.io/x/gobot/v2"
|
|
)
|
|
|
|
// this ensures that the implementation is based on i2c.Driver, which implements the gobot.Driver
|
|
// and tests all implementations, so no further tests needed here for gobot.Driver interface
|
|
var _ gobot.Driver = (*PCA9501Driver)(nil)
|
|
|
|
var (
|
|
pinVal = map[string]interface{}{
|
|
"pin": uint8(7),
|
|
"val": uint8(0),
|
|
}
|
|
pin = map[string]interface{}{
|
|
"pin": uint8(7),
|
|
}
|
|
addressVal = map[string]interface{}{
|
|
"address": uint8(15),
|
|
"val": uint8(7),
|
|
}
|
|
address = map[string]interface{}{
|
|
"address": uint8(15),
|
|
}
|
|
)
|
|
|
|
func initPCA9501WithStubbedAdaptor() (*PCA9501Driver, *i2cTestAdaptor) {
|
|
a := newI2cTestAdaptor()
|
|
d := NewPCA9501Driver(a)
|
|
if err := d.Start(); err != nil {
|
|
panic(err)
|
|
}
|
|
return d, a
|
|
}
|
|
|
|
func TestNewPCA9501Driver(t *testing.T) {
|
|
// arrange, act
|
|
var di interface{} = NewPCA9501Driver(newI2cTestAdaptor())
|
|
// assert
|
|
d, ok := di.(*PCA9501Driver)
|
|
if !ok {
|
|
t.Errorf("NewPCA9501Driver() should have returned a *PCA9501Driver")
|
|
}
|
|
assert.NotNil(t, d.Driver)
|
|
assert.True(t, strings.HasPrefix(d.Name(), "PCA9501"))
|
|
assert.Equal(t, 0x3f, d.defaultAddress)
|
|
}
|
|
|
|
func TestPCA9501Options(t *testing.T) {
|
|
// This is a general test, that options are applied in constructor by using the common WithBus() option and
|
|
// least one of this driver. Further tests for options can also be done by call of "WithOption(val)(d)".
|
|
d := NewPCA9501Driver(newI2cTestAdaptor(), WithBus(2))
|
|
assert.Equal(t, 2, d.GetBusOrDefault(1))
|
|
}
|
|
|
|
func TestPCA9501CommandsWriteGPIO(t *testing.T) {
|
|
// arrange
|
|
d, a := initPCA9501WithStubbedAdaptor()
|
|
a.i2cReadImpl = func(b []byte) (int, error) {
|
|
return len(b), nil
|
|
}
|
|
a.i2cWriteImpl = func([]byte) (int, error) {
|
|
return 0, nil
|
|
}
|
|
// act
|
|
result := d.Command("WriteGPIO")(pinVal)
|
|
// assert
|
|
assert.Nil(t, result.(map[string]interface{})["err"])
|
|
}
|
|
|
|
func TestPCA9501CommandsReadGPIO(t *testing.T) {
|
|
// arrange
|
|
d, a := initPCA9501WithStubbedAdaptor()
|
|
a.i2cReadImpl = func(b []byte) (int, error) {
|
|
return len(b), nil
|
|
}
|
|
// act
|
|
result := d.Command("ReadGPIO")(pin)
|
|
// assert
|
|
assert.Nil(t, result.(map[string]interface{})["err"])
|
|
}
|
|
|
|
func TestPCA9501CommandsWriteEEPROM(t *testing.T) {
|
|
// arrange
|
|
d, a := initPCA9501WithStubbedAdaptor()
|
|
a.i2cWriteImpl = func([]byte) (int, error) {
|
|
return 0, nil
|
|
}
|
|
// act
|
|
result := d.Command("WriteEEPROM")(addressVal)
|
|
// assert
|
|
assert.Nil(t, result.(map[string]interface{})["err"])
|
|
}
|
|
|
|
func TestPCA9501CommandsReadEEPROM(t *testing.T) {
|
|
// arrange
|
|
d, a := initPCA9501WithStubbedAdaptor()
|
|
a.i2cWriteImpl = func([]byte) (int, error) {
|
|
return 0, nil
|
|
}
|
|
a.i2cReadImpl = func(b []byte) (int, error) {
|
|
return len(b), nil
|
|
}
|
|
// act
|
|
result := d.Command("ReadEEPROM")(address)
|
|
// assert
|
|
assert.Nil(t, result.(map[string]interface{})["err"])
|
|
}
|
|
|
|
func TestPCA9501WriteGPIO(t *testing.T) {
|
|
tests := map[string]struct {
|
|
setVal uint8
|
|
ioDirAllInput uint8
|
|
ioStateAllInput uint8
|
|
pin uint8
|
|
wantPin uint8
|
|
wantState uint8
|
|
}{
|
|
"clear_bit": {
|
|
setVal: 0,
|
|
ioDirAllInput: 0xF1,
|
|
ioStateAllInput: 0xF2,
|
|
pin: 6,
|
|
wantPin: 0xB1,
|
|
wantState: 0xB2,
|
|
},
|
|
"set_bit": {
|
|
setVal: 2,
|
|
ioDirAllInput: 0x1F,
|
|
ioStateAllInput: 0x20,
|
|
pin: 3,
|
|
wantPin: 0x17,
|
|
wantState: 0x28,
|
|
},
|
|
}
|
|
for name, tc := range tests {
|
|
t.Run(name, func(t *testing.T) {
|
|
// arrange
|
|
d, a := initPCA9501WithStubbedAdaptor()
|
|
// prepare all reads
|
|
numCallsRead := 0
|
|
a.i2cReadImpl = func(b []byte) (int, error) {
|
|
numCallsRead++
|
|
if numCallsRead == 1 {
|
|
// first call read current io direction of all pins
|
|
b[0] = tc.ioDirAllInput
|
|
}
|
|
if numCallsRead == 2 {
|
|
// second call read current state of all pins
|
|
b[0] = tc.ioStateAllInput
|
|
}
|
|
return len(b), nil
|
|
}
|
|
// act
|
|
err := d.WriteGPIO(tc.pin, tc.setVal)
|
|
// assert
|
|
require.NoError(t, err)
|
|
assert.Equal(t, 2, numCallsRead)
|
|
assert.Len(t, a.written, 2)
|
|
assert.Equal(t, tc.wantPin, a.written[0])
|
|
assert.Equal(t, tc.wantState, a.written[1])
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestPCA9501WriteGPIOErrorAtWriteDirection(t *testing.T) {
|
|
// arrange
|
|
d, a := initPCA9501WithStubbedAdaptor()
|
|
wantErr := errors.New("write error")
|
|
// prepare all reads
|
|
numCallsRead := 0
|
|
a.i2cReadImpl = func(b []byte) (int, error) {
|
|
numCallsRead++
|
|
return len(b), nil
|
|
}
|
|
// prepare all writes
|
|
numCallsWrite := 0
|
|
a.i2cWriteImpl = func([]byte) (int, error) {
|
|
numCallsWrite++
|
|
if numCallsWrite == 1 {
|
|
// first call writes the CTRL register for port direction
|
|
return 0, wantErr
|
|
}
|
|
return 0, nil
|
|
}
|
|
// act
|
|
err := d.WriteGPIO(7, 0)
|
|
// assert
|
|
assert.Equal(t, wantErr, err)
|
|
assert.Less(t, numCallsRead, 2)
|
|
assert.Equal(t, 1, numCallsWrite)
|
|
}
|
|
|
|
func TestPCA9501WriteGPIOErrorAtWriteValue(t *testing.T) {
|
|
// arrange
|
|
d, a := initPCA9501WithStubbedAdaptor()
|
|
wantErr := errors.New("write error")
|
|
// prepare all reads
|
|
a.i2cReadImpl = func(b []byte) (int, error) {
|
|
return len(b), nil
|
|
}
|
|
// prepare all writes
|
|
numCallsWrite := 0
|
|
a.i2cWriteImpl = func([]byte) (int, error) {
|
|
numCallsWrite++
|
|
if numCallsWrite == 2 {
|
|
// second call writes the value to IO port
|
|
return 0, wantErr
|
|
}
|
|
return 0, nil
|
|
}
|
|
// act
|
|
err := d.WriteGPIO(7, 0)
|
|
// assert
|
|
assert.Equal(t, wantErr, err)
|
|
assert.Equal(t, 2, numCallsWrite)
|
|
}
|
|
|
|
func TestPCA9501ReadGPIO(t *testing.T) {
|
|
tests := map[string]struct {
|
|
ctrlState uint8
|
|
want uint8
|
|
}{
|
|
"pin_is_set": {ctrlState: 0x80, want: 1},
|
|
"pin_is_not_set": {ctrlState: 0x7F, want: 0},
|
|
}
|
|
for name, tc := range tests {
|
|
t.Run(name, func(t *testing.T) {
|
|
// arrange
|
|
const (
|
|
pin = uint8(7)
|
|
wantCtrlState = uint8(0x80)
|
|
)
|
|
d, a := initPCA9501WithStubbedAdaptor()
|
|
// prepare all reads
|
|
numCallsRead := 0
|
|
a.i2cReadImpl = func(b []byte) (int, error) {
|
|
numCallsRead++
|
|
if numCallsRead == 1 {
|
|
// current state of io
|
|
b[0] = 0x00
|
|
}
|
|
if numCallsRead == 2 {
|
|
b[0] = tc.ctrlState
|
|
}
|
|
return len(b), nil
|
|
}
|
|
// act
|
|
got, err := d.ReadGPIO(pin)
|
|
// assert
|
|
require.NoError(t, err)
|
|
assert.Equal(t, tc.want, got)
|
|
assert.Equal(t, 2, numCallsRead)
|
|
assert.Len(t, a.written, 1)
|
|
assert.Equal(t, wantCtrlState, a.written[0])
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestPCA9501ReadGPIOErrorAtReadDirection(t *testing.T) {
|
|
// arrange
|
|
d, a := initPCA9501WithStubbedAdaptor()
|
|
wantErr := errors.New("read error")
|
|
// prepare all reads
|
|
numCallsRead := 0
|
|
a.i2cReadImpl = func(b []byte) (int, error) {
|
|
numCallsRead++
|
|
if numCallsRead == 1 {
|
|
// first read gets the CTRL register for pin direction
|
|
return 0, wantErr
|
|
}
|
|
return len(b), nil
|
|
}
|
|
// prepare all writes
|
|
numCallsWrite := 0
|
|
a.i2cWriteImpl = func([]byte) (int, error) {
|
|
numCallsWrite++
|
|
return 0, nil
|
|
}
|
|
// act
|
|
_, err := d.ReadGPIO(1)
|
|
// assert
|
|
assert.Equal(t, wantErr, err)
|
|
assert.Equal(t, 1, numCallsRead)
|
|
assert.Equal(t, 0, numCallsWrite)
|
|
}
|
|
|
|
func TestPCA9501ReadGPIOErrorAtReadValue(t *testing.T) {
|
|
// arrange
|
|
d, a := initPCA9501WithStubbedAdaptor()
|
|
wantErr := errors.New("read error")
|
|
// prepare all reads
|
|
numCallsRead := 0
|
|
a.i2cReadImpl = func(b []byte) (int, error) {
|
|
numCallsRead++
|
|
if numCallsRead == 2 {
|
|
// second read gets the value from IO port
|
|
return 0, wantErr
|
|
}
|
|
return len(b), nil
|
|
}
|
|
// prepare all writes
|
|
numCallsWrite := 0
|
|
a.i2cWriteImpl = func([]byte) (int, error) {
|
|
numCallsWrite++
|
|
return 0, nil
|
|
}
|
|
// act
|
|
_, err := d.ReadGPIO(2)
|
|
// assert
|
|
assert.Equal(t, wantErr, err)
|
|
assert.Equal(t, 1, numCallsWrite)
|
|
}
|
|
|
|
func TestPCA9501WriteEEPROM(t *testing.T) {
|
|
// arrange
|
|
const (
|
|
addressEEPROM = uint8(0x52)
|
|
want = uint8(0x25)
|
|
)
|
|
d, a := initPCA9501WithStubbedAdaptor()
|
|
// prepare all writes
|
|
numCallsWrite := 0
|
|
a.i2cWriteImpl = func([]byte) (int, error) {
|
|
numCallsWrite++
|
|
return 0, nil
|
|
}
|
|
// act
|
|
err := d.WriteEEPROM(addressEEPROM, want)
|
|
// assert
|
|
require.NoError(t, err)
|
|
assert.Equal(t, 1, numCallsWrite)
|
|
assert.Equal(t, addressEEPROM, a.written[0])
|
|
assert.Equal(t, want, a.written[1])
|
|
}
|
|
|
|
func TestPCA9501ReadEEPROM(t *testing.T) {
|
|
// arrange
|
|
const (
|
|
addressEEPROM = uint8(51)
|
|
want = uint8(0x44)
|
|
)
|
|
d, a := initPCA9501WithStubbedAdaptor()
|
|
// prepare all writes
|
|
numCallsWrite := 0
|
|
a.i2cWriteImpl = func(b []byte) (int, error) {
|
|
numCallsWrite++
|
|
return 0, nil
|
|
}
|
|
// prepare all reads
|
|
numCallsRead := 0
|
|
a.i2cReadImpl = func(b []byte) (int, error) {
|
|
numCallsRead++
|
|
b[0] = want
|
|
return len(b), nil
|
|
}
|
|
// act
|
|
val, err := d.ReadEEPROM(addressEEPROM)
|
|
// assert
|
|
require.NoError(t, err)
|
|
assert.Equal(t, want, val)
|
|
assert.Equal(t, 1, numCallsWrite)
|
|
assert.Equal(t, addressEEPROM, a.written[0])
|
|
assert.Equal(t, 1, numCallsRead)
|
|
}
|
|
|
|
func TestPCA9501ReadEEPROMErrorWhileWriteAddress(t *testing.T) {
|
|
// arrange
|
|
d, a := initPCA9501WithStubbedAdaptor()
|
|
wantErr := errors.New("error while write")
|
|
// prepare all writes
|
|
a.i2cWriteImpl = func([]byte) (int, error) {
|
|
return 0, wantErr
|
|
}
|
|
// prepare all reads
|
|
numCallsRead := 0
|
|
a.i2cReadImpl = func(b []byte) (int, error) {
|
|
numCallsRead++
|
|
return len(b), nil
|
|
}
|
|
// act
|
|
_, err := d.ReadEEPROM(15)
|
|
// assert
|
|
assert.Equal(t, wantErr, err)
|
|
assert.Equal(t, 0, numCallsRead)
|
|
}
|
|
|
|
func TestPCA9501ReadEEPROMErrorWhileReadValue(t *testing.T) {
|
|
// arrange
|
|
d, a := initPCA9501WithStubbedAdaptor()
|
|
wantErr := errors.New("error while read")
|
|
// prepare all writes
|
|
numCallsWrite := 0
|
|
a.i2cWriteImpl = func([]byte) (int, error) {
|
|
numCallsWrite++
|
|
return 0, nil
|
|
}
|
|
// prepare all reads
|
|
a.i2cReadImpl = func(b []byte) (int, error) {
|
|
return len(b), wantErr
|
|
}
|
|
// act
|
|
_, err := d.ReadEEPROM(15)
|
|
// assert
|
|
assert.Equal(t, 1, numCallsWrite)
|
|
assert.Equal(t, wantErr, err)
|
|
}
|
|
|
|
func TestPCA9501_initialize(t *testing.T) {
|
|
// arrange
|
|
const want = 0x7f
|
|
d, a := initPCA9501WithStubbedAdaptor()
|
|
// act
|
|
err := d.initialize()
|
|
// assert
|
|
require.NoError(t, err)
|
|
assert.Equal(t, want, a.address)
|
|
}
|