hybridgroup.gobot/system/i2c_device_test.go

595 lines
16 KiB
Go

//nolint:gosec // ok for test
package system
import (
"os"
"testing"
"unsafe"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gobot.io/x/gobot/v2"
)
const dev = "/dev/i2c-1"
func getSyscallFuncImpl(
errorMask byte,
) func(trap, a1, a2 uintptr, a3 unsafe.Pointer) (r1, r2 uintptr, err SyscallErrno) {
// bit 0: error on function query
// bit 1: error on set address
// bit 2: error on command
//nolint:nonamedreturns // useful here
return func(trap, a1, a2 uintptr, a3 unsafe.Pointer) (r1, r2 uintptr, err SyscallErrno) {
// function query
if (trap == Syscall_SYS_IOCTL) && (a2 == I2C_FUNCS) {
if errorMask&0x01 == 0x01 {
return 0, 0, 1
}
var funcPtr *uint64 = (*uint64)(a3)
*funcPtr = I2C_FUNC_SMBUS_READ_BYTE | I2C_FUNC_SMBUS_READ_BYTE_DATA |
I2C_FUNC_SMBUS_READ_WORD_DATA |
I2C_FUNC_SMBUS_WRITE_BYTE | I2C_FUNC_SMBUS_WRITE_BYTE_DATA |
I2C_FUNC_SMBUS_WRITE_WORD_DATA
}
// set address
if (trap == Syscall_SYS_IOCTL) && (a2 == I2C_TARGET) {
if errorMask&0x02 == 0x02 {
return 0, 0, 1
}
}
// command
if (trap == Syscall_SYS_IOCTL) && (a2 == I2C_SMBUS) {
if errorMask&0x04 == 0x04 {
return 0, 0, 1
}
}
// Let all operations succeed
return 0, 0, 0
}
}
func initTestI2cDeviceWithMockedSys() (*i2cDevice, *mockSyscall) {
a := NewAccesser()
msc := a.UseMockSyscall()
a.UseMockFilesystem([]string{dev})
d, err := a.NewI2cDevice(dev)
if err != nil {
panic(err)
}
return d, msc
}
func TestNewI2cDevice(t *testing.T) {
tests := map[string]struct {
dev string
wantErr string
}{
"ok": {
dev: dev,
},
"empty": {
dev: "",
wantErr: "the given character device location is empty",
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
// arrange
a := NewAccesser()
// act
d, err := a.NewI2cDevice(tc.dev)
// assert
if tc.wantErr != "" {
require.ErrorContains(t, err, tc.wantErr)
assert.Equal(t, (*i2cDevice)(nil), d)
} else {
var _ gobot.I2cSystemDevicer = d
require.NoError(t, err)
}
})
}
}
func TestClose(t *testing.T) {
// arrange
d, _ := initTestI2cDeviceWithMockedSys()
// act & assert
require.NoError(t, d.Close())
}
func TestWriteRead(t *testing.T) {
// arrange
d, _ := initTestI2cDeviceWithMockedSys()
wbuf := []byte{0x01, 0x02, 0x03}
rbuf := make([]byte, 4)
// act
wn, werr := d.Write(1, wbuf)
rn, rerr := d.Read(1, rbuf)
// assert
require.NoError(t, werr)
require.NoError(t, rerr)
assert.Len(t, wbuf, wn)
assert.Len(t, wbuf, rn) // will read only the written values
assert.Equal(t, rbuf[:len(wbuf)], wbuf)
}
func TestReadByte(t *testing.T) {
tests := map[string]struct {
funcs uint64
syscallImpl func(trap, a1, a2 uintptr, a3 unsafe.Pointer) (r1, r2 uintptr, err SyscallErrno)
wantErr string
}{
"read_byte_ok": {
funcs: I2C_FUNC_SMBUS_READ_BYTE,
},
"error_syscall": {
funcs: I2C_FUNC_SMBUS_READ_BYTE,
syscallImpl: getSyscallFuncImpl(0x04),
wantErr: "SMBus access r/w: 1, command: 0, protocol: 1, address: 2 " +
"failed with syscall.Errno operation not permitted",
},
"error_not_supported": {
wantErr: "SMBus read byte not supported",
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
// arrange
d, msc := initTestI2cDeviceWithMockedSys()
msc.Impl = tc.syscallImpl
d.funcs = tc.funcs
const want = byte(5)
msc.dataSlice = []byte{want}
// act
got, err := d.ReadByte(2)
// assert
if tc.wantErr != "" {
require.ErrorContains(t, err, tc.wantErr)
} else {
require.NoError(t, err)
assert.Equal(t, want, got)
assert.Equal(t, d.file, msc.lastFile)
assert.Equal(t, uintptr(I2C_SMBUS), msc.lastSignal)
assert.Equal(t, byte(I2C_SMBUS_READ), msc.smbus.readWrite)
assert.Equal(t, byte(0), msc.smbus.command) // register is set to 0 in that case
assert.Equal(t, uint32(I2C_SMBUS_BYTE), msc.smbus.protocol)
}
})
}
}
func TestReadByteData(t *testing.T) {
tests := map[string]struct {
funcs uint64
syscallImpl func(trap, a1, a2 uintptr, a3 unsafe.Pointer) (r1, r2 uintptr, err SyscallErrno)
wantErr string
}{
"read_byte_data_ok": {
funcs: I2C_FUNC_SMBUS_READ_BYTE_DATA,
},
"error_syscall": {
funcs: I2C_FUNC_SMBUS_READ_BYTE_DATA,
syscallImpl: getSyscallFuncImpl(0x04),
wantErr: "SMBus access r/w: 1, command: 1, protocol: 2, address: 3 " +
"failed with syscall.Errno operation not permitted",
},
"error_not_supported": {
wantErr: "SMBus read byte data not supported",
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
// arrange
d, msc := initTestI2cDeviceWithMockedSys()
msc.Impl = tc.syscallImpl
d.funcs = tc.funcs
const (
reg = byte(0x01)
want = byte(0x02)
)
msc.dataSlice = []byte{want}
// act
got, err := d.ReadByteData(3, reg)
// assert
if tc.wantErr != "" {
require.ErrorContains(t, err, tc.wantErr)
} else {
require.NoError(t, err)
assert.Equal(t, want, got)
assert.Equal(t, d.file, msc.lastFile)
assert.Equal(t, uintptr(I2C_SMBUS), msc.lastSignal)
assert.Equal(t, byte(I2C_SMBUS_READ), msc.smbus.readWrite)
assert.Equal(t, reg, msc.smbus.command)
assert.Equal(t, uint32(I2C_SMBUS_BYTE_DATA), msc.smbus.protocol)
}
})
}
}
func TestReadWordData(t *testing.T) {
tests := map[string]struct {
funcs uint64
syscallImpl func(trap, a1, a2 uintptr, a3 unsafe.Pointer) (r1, r2 uintptr, err SyscallErrno)
wantErr string
}{
"read_word_data_ok": {
funcs: I2C_FUNC_SMBUS_READ_WORD_DATA,
},
"error_syscall": {
funcs: I2C_FUNC_SMBUS_READ_WORD_DATA,
syscallImpl: getSyscallFuncImpl(0x04),
wantErr: "SMBus access r/w: 1, command: 2, protocol: 3, address: 4 " +
"failed with syscall.Errno operation not permitted",
},
"error_not_supported": {
wantErr: "SMBus read word data not supported",
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
// arrange
d, msc := initTestI2cDeviceWithMockedSys()
msc.Impl = tc.syscallImpl
d.funcs = tc.funcs
const (
reg = byte(0x02)
msbyte = byte(0xD4)
lsbyte = byte(0x31)
want = uint16(54321)
)
// all common drivers read LSByte first
msc.dataSlice = []byte{lsbyte, msbyte}
// act
got, err := d.ReadWordData(4, reg)
// assert
if tc.wantErr != "" {
require.ErrorContains(t, err, tc.wantErr)
} else {
require.NoError(t, err)
assert.Equal(t, want, got)
assert.Equal(t, d.file, msc.lastFile)
assert.Equal(t, uintptr(I2C_SMBUS), msc.lastSignal)
assert.Equal(t, byte(I2C_SMBUS_READ), msc.smbus.readWrite)
assert.Equal(t, reg, msc.smbus.command)
assert.Equal(t, uint32(I2C_SMBUS_WORD_DATA), msc.smbus.protocol)
}
})
}
}
func TestReadBlockData(t *testing.T) {
// arrange
const (
reg = byte(0x03)
wantB0 = byte(11)
wantB1 = byte(22)
wantB2 = byte(33)
wantB3 = byte(44)
wantB4 = byte(55)
wantB5 = byte(66)
wantB6 = byte(77)
wantB7 = byte(88)
wantB8 = byte(99)
wantB9 = byte(111)
)
tests := map[string]struct {
funcs uint64
syscallImpl func(trap, a1, a2 uintptr, a3 unsafe.Pointer) (r1, r2 uintptr, err SyscallErrno)
wantErr string
}{
"read_block_data_ok": {
funcs: I2C_FUNC_SMBUS_READ_I2C_BLOCK,
},
"error_syscall": {
funcs: I2C_FUNC_SMBUS_READ_I2C_BLOCK,
syscallImpl: getSyscallFuncImpl(0x04),
wantErr: "SMBus access r/w: 1, command: 3, protocol: 8, address: 5 " +
"failed with syscall.Errno operation not permitted",
},
"error_from_used_fallback_if_not_supported": {
wantErr: "Read 1 bytes from device by sysfs, expected 10",
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
// arrange
d, msc := initTestI2cDeviceWithMockedSys()
msc.Impl = tc.syscallImpl
d.funcs = tc.funcs
msc.dataSlice = []byte{wantB0, wantB1, wantB2, wantB3, wantB4, wantB5, wantB6, wantB7, wantB8, wantB9}
buf := []byte{12, 23, 34, 45, 56, 67, 78, 89, 98, 87}
// act
err := d.ReadBlockData(5, reg, buf)
// assert
if tc.wantErr != "" {
require.ErrorContains(t, err, tc.wantErr)
} else {
require.NoError(t, err)
assert.Equal(t, msc.dataSlice, buf)
assert.Equal(t, d.file, msc.lastFile)
assert.Equal(t, uintptr(I2C_SMBUS), msc.lastSignal)
assert.Equal(t, uint8(len(buf)+1), msc.sliceSize)
assert.Equal(t, byte(I2C_SMBUS_READ), msc.smbus.readWrite)
assert.Equal(t, reg, msc.smbus.command)
assert.Equal(t, uint32(I2C_SMBUS_I2C_BLOCK_DATA), msc.smbus.protocol)
}
})
}
}
func TestWriteByte(t *testing.T) {
tests := map[string]struct {
funcs uint64
syscallImpl func(trap, a1, a2 uintptr, a3 unsafe.Pointer) (r1, r2 uintptr, err SyscallErrno)
wantErr string
}{
"write_byte_ok": {
funcs: I2C_FUNC_SMBUS_WRITE_BYTE,
},
"error_syscall": {
funcs: I2C_FUNC_SMBUS_WRITE_BYTE,
syscallImpl: getSyscallFuncImpl(0x04),
wantErr: "SMBus access r/w: 0, command: 68, protocol: 1, address: 6 " +
"failed with syscall.Errno operation not permitted",
},
"error_not_supported": {
wantErr: "SMBus write byte not supported",
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
// arrange
d, msc := initTestI2cDeviceWithMockedSys()
msc.Impl = tc.syscallImpl
d.funcs = tc.funcs
const val = byte(0x44)
// act
err := d.WriteByte(6, val)
// assert
if tc.wantErr != "" {
require.ErrorContains(t, err, tc.wantErr)
} else {
require.NoError(t, err)
assert.Equal(t, d.file, msc.lastFile)
assert.Equal(t, uintptr(I2C_SMBUS), msc.lastSignal)
assert.Equal(t, byte(I2C_SMBUS_WRITE), msc.smbus.readWrite)
assert.Equal(t, val, msc.smbus.command) // in byte write, the register/command is used for the value
assert.Equal(t, uint32(I2C_SMBUS_BYTE), msc.smbus.protocol)
}
})
}
}
func TestWriteByteData(t *testing.T) {
tests := map[string]struct {
funcs uint64
syscallImpl func(trap, a1, a2 uintptr, a3 unsafe.Pointer) (r1, r2 uintptr, err SyscallErrno)
wantErr string
}{
"write_byte_data_ok": {
funcs: I2C_FUNC_SMBUS_WRITE_BYTE_DATA,
},
"error_syscall": {
funcs: I2C_FUNC_SMBUS_WRITE_BYTE_DATA,
syscallImpl: getSyscallFuncImpl(0x04),
wantErr: "SMBus access r/w: 0, command: 4, protocol: 2, address: 7 " +
"failed with syscall.Errno operation not permitted",
},
"error_not_supported": {
wantErr: "SMBus write byte data not supported",
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
// arrange
d, msc := initTestI2cDeviceWithMockedSys()
msc.Impl = tc.syscallImpl
d.funcs = tc.funcs
const (
reg = byte(0x04)
val = byte(0x55)
)
// act
err := d.WriteByteData(7, reg, val)
// assert
if tc.wantErr != "" {
require.ErrorContains(t, err, tc.wantErr)
} else {
require.NoError(t, err)
assert.Equal(t, d.file, msc.lastFile)
assert.Equal(t, uintptr(I2C_SMBUS), msc.lastSignal)
assert.Equal(t, byte(I2C_SMBUS_WRITE), msc.smbus.readWrite)
assert.Equal(t, reg, msc.smbus.command)
assert.Equal(t, uint32(I2C_SMBUS_BYTE_DATA), msc.smbus.protocol)
assert.Len(t, msc.dataSlice, 1)
assert.Equal(t, val, msc.dataSlice[0])
}
})
}
}
func TestWriteWordData(t *testing.T) {
tests := map[string]struct {
funcs uint64
syscallImpl func(trap, a1, a2 uintptr, a3 unsafe.Pointer) (r1, r2 uintptr, err SyscallErrno)
wantErr string
}{
"write_word_data_ok": {
funcs: I2C_FUNC_SMBUS_WRITE_WORD_DATA,
},
"error_syscall": {
funcs: I2C_FUNC_SMBUS_WRITE_WORD_DATA,
syscallImpl: getSyscallFuncImpl(0x04),
wantErr: "SMBus access r/w: 0, command: 5, protocol: 3, address: 8 " +
"failed with syscall.Errno operation not permitted",
},
"error_not_supported": {
wantErr: "SMBus write word data not supported",
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
// arrange
d, msc := initTestI2cDeviceWithMockedSys()
msc.Impl = tc.syscallImpl
d.funcs = tc.funcs
const (
reg = byte(0x05)
val = uint16(54321)
wantLSByte = byte(0x31)
wantMSByte = byte(0xD4)
)
// act
err := d.WriteWordData(8, reg, val)
// assert
if tc.wantErr != "" {
require.ErrorContains(t, err, tc.wantErr)
} else {
require.NoError(t, err)
assert.Equal(t, d.file, msc.lastFile)
assert.Equal(t, uintptr(I2C_SMBUS), msc.lastSignal)
assert.Equal(t, byte(I2C_SMBUS_WRITE), msc.smbus.readWrite)
assert.Equal(t, reg, msc.smbus.command)
assert.Equal(t, uint32(I2C_SMBUS_WORD_DATA), msc.smbus.protocol)
assert.Len(t, msc.dataSlice, 2)
// all common drivers write LSByte first
assert.Equal(t, wantLSByte, msc.dataSlice[0])
assert.Equal(t, wantMSByte, msc.dataSlice[1])
}
})
}
}
func TestWriteBlockData(t *testing.T) {
// arrange
const (
reg = byte(0x06)
b0 = byte(0x09)
b1 = byte(0x11)
b2 = byte(0x22)
b3 = byte(0x33)
b4 = byte(0x44)
b5 = byte(0x55)
b6 = byte(0x66)
b7 = byte(0x77)
b8 = byte(0x88)
b9 = byte(0x99)
)
tests := map[string]struct {
funcs uint64
syscallImpl func(trap, a1, a2 uintptr, a3 unsafe.Pointer) (r1, r2 uintptr, err SyscallErrno)
wantErr string
}{
"write_block_data_ok": {
funcs: I2C_FUNC_SMBUS_WRITE_I2C_BLOCK,
},
"error_syscall": {
funcs: I2C_FUNC_SMBUS_WRITE_I2C_BLOCK,
syscallImpl: getSyscallFuncImpl(0x04),
wantErr: "SMBus access r/w: 0, command: 6, protocol: 8, address: 9 " +
"failed with syscall.Errno operation not permitted",
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
// arrange
d, msc := initTestI2cDeviceWithMockedSys()
msc.Impl = tc.syscallImpl
d.funcs = tc.funcs
data := []byte{b0, b1, b2, b3, b4, b5, b6, b7, b8, b9}
// act
err := d.WriteBlockData(9, reg, data)
// assert
if tc.wantErr != "" {
require.ErrorContains(t, err, tc.wantErr)
} else {
require.NoError(t, err)
assert.Equal(t, d.file, msc.lastFile)
assert.Equal(t, uintptr(I2C_SMBUS), msc.lastSignal)
assert.Equal(t, uint8(len(data)+1), msc.sliceSize) // including size element
assert.Equal(t, byte(I2C_SMBUS_WRITE), msc.smbus.readWrite)
assert.Equal(t, reg, msc.smbus.command)
assert.Equal(t, uint32(I2C_SMBUS_I2C_BLOCK_DATA), msc.smbus.protocol)
assert.Equal(t, uint8(len(data)), msc.dataSlice[0]) // data size
assert.Equal(t, data, msc.dataSlice[1:])
}
})
}
}
func TestWriteBlockDataTooMuch(t *testing.T) {
// arrange
d, _ := initTestI2cDeviceWithMockedSys()
// act
err := d.WriteBlockData(10, 0x01, make([]byte, 33))
// assert
require.ErrorContains(t, err, "Writing blocks larger than 32 bytes (33) not supported")
}
func Test_setAddress(t *testing.T) {
// arrange
d, msc := initTestI2cDeviceWithMockedSys()
// act
err := d.setAddress(0xff)
// assert
require.NoError(t, err)
assert.Equal(t, uintptr(0xff), msc.devAddress)
}
func Test_queryFunctionality(t *testing.T) {
tests := map[string]struct {
requested uint64
dev string
syscallImpl func(trap, a1, a2 uintptr, a3 unsafe.Pointer) (r1, r2 uintptr, err SyscallErrno)
wantErr string
wantFile bool
wantFuncs uint64
}{
"ok": {
requested: I2C_FUNC_SMBUS_READ_BYTE,
dev: dev,
syscallImpl: getSyscallFuncImpl(0x00),
wantFile: true,
wantFuncs: 0x7E0000,
},
"dev_null_error": {
dev: os.DevNull,
syscallImpl: getSyscallFuncImpl(0x00),
wantErr: " : /dev/null: no such file",
},
"query_funcs_error": {
dev: dev,
syscallImpl: getSyscallFuncImpl(0x01),
wantErr: "Querying functionality failed with syscall.Errno operation not permitted",
wantFile: true,
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
// arrange
d, msc := initTestI2cDeviceWithMockedSys()
d.location = tc.dev
msc.Impl = tc.syscallImpl
// act
err := d.queryFunctionality(tc.requested, "test"+name)
// assert
if tc.wantErr != "" {
require.ErrorContains(t, err, tc.wantErr)
} else {
require.NoError(t, err)
}
if tc.wantFile {
assert.NotNil(t, d.file)
} else {
assert.Equal(t, (*MockFile)(nil), d.file)
}
assert.Equal(t, tc.wantFuncs, d.funcs)
})
}
}