MF-820 - Fetch messages for a particular device (#843)
* feat: Add Support for Publisher Query in Postgres Signed-off-by: Parham Alvani <parham.alvani@gmail.com> * chore: Remove Redundant Case Signed-off-by: Parham Alvani <parham.alvani@gmail.com> * chore: Add Test for Postgres Query Signed-off-by: Parham Alvani <parham.alvani@gmail.com>
This commit is contained in:
parent
b83439fcc2
commit
c8cb2655c0
|
@ -34,19 +34,18 @@ func New(db *sqlx.DB) readers.MessageRepository {
|
|||
}
|
||||
|
||||
func (tr postgresRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) (readers.MessagesPage, error) {
|
||||
subtopicQuery := ""
|
||||
if query["subtopic"] != "" {
|
||||
subtopicQuery = `AND subtopic = :subtopic`
|
||||
}
|
||||
q := fmt.Sprintf(`SELECT * FROM messages
|
||||
WHERE channel = :channel %s ORDER BY time DESC
|
||||
LIMIT :limit OFFSET :offset;`, subtopicQuery)
|
||||
WHERE %s ORDER BY time DESC
|
||||
LIMIT :limit OFFSET :offset;`, fmtCondition(chanID, query))
|
||||
|
||||
params := map[string]interface{}{
|
||||
"channel": chanID,
|
||||
"limit": limit,
|
||||
"offset": offset,
|
||||
"subtopic": query["subtopic"],
|
||||
"channel": chanID,
|
||||
"limit": limit,
|
||||
"offset": offset,
|
||||
"subtopic": query["subtopic"],
|
||||
"publisher": query["publisher"],
|
||||
"name": query["name"],
|
||||
"protocol": query["protocol"],
|
||||
}
|
||||
|
||||
rows, err := tr.db.NamedQuery(q, params)
|
||||
|
@ -89,6 +88,21 @@ func (tr postgresRepository) ReadAll(chanID string, offset, limit uint64, query
|
|||
return page, nil
|
||||
}
|
||||
|
||||
func fmtCondition(chanID string, query map[string]string) string {
|
||||
condition := `channel = :channel`
|
||||
for name := range query {
|
||||
switch name {
|
||||
case
|
||||
"subtopic",
|
||||
"publisher",
|
||||
"name",
|
||||
"protocol":
|
||||
condition = fmt.Sprintf(`%s AND %s = :%s`, condition, name, name)
|
||||
}
|
||||
}
|
||||
return condition
|
||||
}
|
||||
|
||||
type dbMessage struct {
|
||||
ID string `db:"id"`
|
||||
Channel string `db:"channel"`
|
||||
|
|
|
@ -142,6 +142,18 @@ func TestMessageReadAll(t *testing.T) {
|
|||
Messages: subtopicMsgs,
|
||||
},
|
||||
},
|
||||
"read message with publisher/protocols": {
|
||||
chanID: chanID.String(),
|
||||
offset: 0,
|
||||
limit: msgsNum,
|
||||
query: map[string]string{"publisher": pubID.String(), "protocol": "mqtt"},
|
||||
page: readers.MessagesPage{
|
||||
Total: msgsNum,
|
||||
Offset: 0,
|
||||
Limit: msgsNum,
|
||||
Messages: messages,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for desc, tc := range cases {
|
||||
|
|
Loading…
Reference in New Issue