NOISSUE - Fix proto files in VerneMQ (#834)
* Add socket pool and fix pattern matching Signed-off-by: drasko <drasko.draskovic@gmail.com> * Fix hackney req Signed-off-by: drasko <drasko.draskovic@gmail.com> * Change docker-compose Signed-off-by: drasko <drasko.draskovic@gmail.com> * Correct protobuf and add MQTT loback ignore Signed-off-by: drasko <drasko.draskovic@gmail.com> * Bring back docker-compose Signed-off-by: drasko <drasko.draskovic@gmail.com>
This commit is contained in:
parent
9ccc37c4b1
commit
655f421ca9
|
@ -269,4 +269,4 @@ services:
|
|||
networks:
|
||||
- mainflux-base-net
|
||||
environment:
|
||||
MF_UI_PORT: ${MF_UI_PORT}
|
||||
MF_UI_PORT: ${MF_UI_PORT}
|
|
@ -82,7 +82,7 @@ make -j 16
|
|||
Then generate Erlang proto files:
|
||||
```
|
||||
mkdir -p ./src/proto
|
||||
./gpb/bin/protoc-erl -I ./gpb/ ../*.proto -o ./src/proto
|
||||
./gpb/bin/protoc-erl -pkgs -I ./gpb/ ../../*.proto -o ./src/proto
|
||||
cp ./gpb/include/gpb.hrl ./src/proto/
|
||||
```
|
||||
|
||||
|
|
|
@ -110,7 +110,7 @@ auth_on_register({_IpAddr, _Port} = Peer, {_MountPoint, _ClientId} = SubscriberI
|
|||
parseTopic(Topic) when length(Topic) == 3 ->
|
||||
ChannelId = lists:nth(2, Topic),
|
||||
NatsSubject = [<<"channel.">>, ChannelId],
|
||||
[{chanel_id, ChannelId}, {content_type, ""}, {nats_subject, NatsSubject}];
|
||||
[{chanel_id, ChannelId}, {content_type, ""}, {subtopic, <<>>}, {nats_subject, NatsSubject}];
|
||||
parseTopic(Topic) when length(Topic) > 3 ->
|
||||
ChannelId = lists:nth(2, Topic),
|
||||
case lists:nth(length(Topic) - 1, Topic) of
|
||||
|
@ -124,7 +124,8 @@ parseTopic(Topic) when length(Topic) > 3 ->
|
|||
_ ->
|
||||
Subtopic = lists:sublist(Topic, 4, length(Topic) - 3),
|
||||
NatsSubject = [<<"channel.">>, ChannelId, <<".">>, string:join([[X] || X <- Subtopic], ".")],
|
||||
[{chanel_id, ChannelId}, {content_type, ""}, {nats_subject, NatsSubject}]
|
||||
Subtopic2 = string:join([[X] || X <- Subtopic], "/"),
|
||||
[{chanel_id, ChannelId}, {content_type, ""}, {subtopic, Subtopic2}, {nats_subject, NatsSubject}]
|
||||
end.
|
||||
|
||||
auth_on_publish(UserName, {_MountPoint, _ClientId} = SubscriberId, QoS, Topic, Payload, IsRetain) ->
|
||||
|
@ -144,12 +145,13 @@ auth_on_publish(UserName, {_MountPoint, _ClientId} = SubscriberId, QoS, Topic, P
|
|||
%%
|
||||
|
||||
% Topic is list of binaries, ex: [<<"channels">>, <<"1">>, <<"messages">>, <<"subtopic_1">>, ...]
|
||||
[{chanel_id, ChannelId}, {content_type, ContentType}, {nats_subject, NatsSubject}] = parseTopic(Topic),
|
||||
[{chanel_id, ChannelId}, {content_type, ContentType}, {subtopic, Subtopic}, {nats_subject, NatsSubject}] = parseTopic(Topic),
|
||||
case access(UserName, ChannelId) of
|
||||
ok ->
|
||||
RawMessage = #'RawMessage'{
|
||||
RawMessage = #'mainflux.RawMessage'{
|
||||
'channel' = ChannelId,
|
||||
'publisher' = UserName,
|
||||
'subtopic' = Subtopic,
|
||||
'protocol' = "mqtt",
|
||||
'contentType' = ContentType,
|
||||
'payload' = Payload
|
||||
|
@ -170,7 +172,7 @@ auth_on_subscribe(UserName, ClientId, [{Topic, _QoS}|_] = Topics) ->
|
|||
%% 2. return 'next' -> leave it to other plugins to decide
|
||||
%% 3. return {error, whatever} -> auth chain is stopped, and no SUBACK is sent
|
||||
|
||||
[{chanel_id, ChannelId}, _, _] = parseTopic(Topic),
|
||||
[{chanel_id, ChannelId}, _, _, _] = parseTopic(Topic),
|
||||
access(UserName, ChannelId).
|
||||
|
||||
%%% Redis ES
|
||||
|
|
|
@ -32,7 +32,7 @@ init(_Args) ->
|
|||
{ok, []}.
|
||||
|
||||
publish(Subject, Message) ->
|
||||
error_logger:info_msg("mfx_nats genserver publish ~p ~p ~p", [Subject, Message]),
|
||||
error_logger:info_msg("mfx_nats genserver publish ~p ~p", [Subject, Message]),
|
||||
gen_server:cast(?MODULE, {publish, Subject, Message}).
|
||||
|
||||
handle_call(Name, _From, _State) ->
|
||||
|
@ -64,34 +64,41 @@ loop(Conn) ->
|
|||
{Conn, {msg, <<"teacup.control">>, _, <<"exit">>}} ->
|
||||
error_logger:info_msg("NATS received exit msg", []);
|
||||
{Conn, {msg, Subject, _ReplyTo, NatsMsg}} ->
|
||||
#'RawMessage'{'contentType' = ContentType, 'payload' = Payload} = message:decode_msg(NatsMsg, 'RawMessage'),
|
||||
#'mainflux.RawMessage'{'protocol' = Protocol, 'contentType' = ContentType, 'payload' = Payload} = message:decode_msg(NatsMsg, 'mainflux.RawMessage'),
|
||||
error_logger:info_msg("Received NATS protobuf msg with payload: ~p and ContentType: ~p~n", [Payload, ContentType]),
|
||||
ContentType2 = re:replace(ContentType, "/","_",[global,{return,list}]),
|
||||
ContentType3 = re:replace(ContentType2, "\\+","-",[global,{return,binary}]),
|
||||
{_, PublishFun, {_, _}} = vmq_reg:direct_plugin_exports(?MODULE),
|
||||
% Topic needs to be in the form of the list, like [<<"channel">>,<<"6def78cd-b441-4fd8-8680-af7e3bbea187">>]
|
||||
Topic = case re:split(Subject, <<"\\.">>) of
|
||||
[<<"channel">>, ChannelId] ->
|
||||
case ContentType of
|
||||
<<"">> ->
|
||||
[<<"channels">>, ChannelId, <<"messages">>];
|
||||
_ ->
|
||||
[<<"channels">>, ChannelId, <<"messages">>, <<"ct">>, ContentType3]
|
||||
end;
|
||||
[<<"channel">>, ChannelId, Subtopic] ->
|
||||
case ContentType of
|
||||
<<"">> ->
|
||||
[<<"channels">>, ChannelId, <<"messages">>, Subtopic];
|
||||
_ ->
|
||||
[<<"channels">>, ChannelId, <<"messages">>, Subtopic, <<"ct">>, ContentType3]
|
||||
end;
|
||||
Other ->
|
||||
error_logger:info_msg("Could not match topic: ~p~n", [Other]),
|
||||
error
|
||||
end,
|
||||
error_logger:info_msg("Subject: ~p, Topic: ~p, PublishFunction: ~p~n", [Subject, Topic, PublishFun]),
|
||||
PublishFun(Topic, Payload, #{qos => 0, retain => false}),
|
||||
loop(Conn);
|
||||
case Protocol of
|
||||
"mqtt" ->
|
||||
error_logger:info_msg("Ignoring MQTT message loopback", []),
|
||||
loop(Conn);
|
||||
_ ->
|
||||
error_logger:info_msg("Re-publishing on MQTT broker", []),
|
||||
ContentType2 = re:replace(ContentType, "/","_",[global,{return,list}]),
|
||||
ContentType3 = re:replace(ContentType2, "\\+","-",[global,{return,binary}]),
|
||||
{_, PublishFun, {_, _}} = vmq_reg:direct_plugin_exports(?MODULE),
|
||||
% Topic needs to be in the form of the list, like [<<"channel">>,<<"6def78cd-b441-4fd8-8680-af7e3bbea187">>]
|
||||
Topic = case re:split(Subject, <<"\\.">>) of
|
||||
[<<"channel">>, ChannelId] ->
|
||||
case ContentType of
|
||||
<<"">> ->
|
||||
[<<"channels">>, ChannelId, <<"messages">>];
|
||||
_ ->
|
||||
[<<"channels">>, ChannelId, <<"messages">>, <<"ct">>, ContentType3]
|
||||
end;
|
||||
[<<"channel">>, ChannelId, Subtopic] ->
|
||||
case ContentType of
|
||||
<<"">> ->
|
||||
[<<"channels">>, ChannelId, <<"messages">>, Subtopic];
|
||||
_ ->
|
||||
[<<"channels">>, ChannelId, <<"messages">>, Subtopic, <<"ct">>, ContentType3]
|
||||
end;
|
||||
Other ->
|
||||
error_logger:info_msg("Could not match topic: ~p~n", [Other]),
|
||||
error
|
||||
end,
|
||||
error_logger:info_msg("Subject: ~p, Topic: ~p, PublishFunction: ~p~n", [Subject, Topic, PublishFun]),
|
||||
PublishFun(Topic, Payload, #{qos => 0, retain => false}),
|
||||
loop(Conn)
|
||||
end;
|
||||
Other ->
|
||||
error_logger:info_msg("Received other msg: ~p~n", [Other]),
|
||||
loop(Conn)
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,54 @@
|
|||
%% -*- coding: utf-8 -*-
|
||||
%% Automatically generated, do not edit
|
||||
%% Generated by gpb_compile version 4.9.0
|
||||
|
||||
-ifndef(internal).
|
||||
-define(internal, true).
|
||||
|
||||
-define(internal_gpb_version, "4.9.0").
|
||||
|
||||
-ifndef('MAINFLUX.ACCESSREQ_PB_H').
|
||||
-define('MAINFLUX.ACCESSREQ_PB_H', true).
|
||||
-record('mainflux.AccessReq',
|
||||
{token = [] :: iodata() | undefined, % = 1
|
||||
chanID = [] :: iodata() | undefined % = 2
|
||||
}).
|
||||
-endif.
|
||||
|
||||
-ifndef('MAINFLUX.THINGID_PB_H').
|
||||
-define('MAINFLUX.THINGID_PB_H', true).
|
||||
-record('mainflux.ThingID',
|
||||
{value = [] :: iodata() | undefined % = 1
|
||||
}).
|
||||
-endif.
|
||||
|
||||
-ifndef('MAINFLUX.ACCESSBYIDREQ_PB_H').
|
||||
-define('MAINFLUX.ACCESSBYIDREQ_PB_H', true).
|
||||
-record('mainflux.AccessByIDReq',
|
||||
{thingID = [] :: iodata() | undefined, % = 1
|
||||
chanID = [] :: iodata() | undefined % = 2
|
||||
}).
|
||||
-endif.
|
||||
|
||||
-ifndef('MAINFLUX.TOKEN_PB_H').
|
||||
-define('MAINFLUX.TOKEN_PB_H', true).
|
||||
-record('mainflux.Token',
|
||||
{value = [] :: iodata() | undefined % = 1
|
||||
}).
|
||||
-endif.
|
||||
|
||||
-ifndef('MAINFLUX.USERID_PB_H').
|
||||
-define('MAINFLUX.USERID_PB_H', true).
|
||||
-record('mainflux.UserID',
|
||||
{value = [] :: iodata() | undefined % = 1
|
||||
}).
|
||||
-endif.
|
||||
|
||||
-ifndef('GOOGLE.PROTOBUF.EMPTY_PB_H').
|
||||
-define('GOOGLE.PROTOBUF.EMPTY_PB_H', true).
|
||||
-record('google.protobuf.Empty',
|
||||
{
|
||||
}).
|
||||
-endif.
|
||||
|
||||
-endif.
|
File diff suppressed because it is too large
Load Diff
|
@ -7,9 +7,9 @@
|
|||
|
||||
-define(message_gpb_version, "4.9.0").
|
||||
|
||||
-ifndef('RAWMESSAGE_PB_H').
|
||||
-define('RAWMESSAGE_PB_H', true).
|
||||
-record('RawMessage',
|
||||
-ifndef('MAINFLUX.RAWMESSAGE_PB_H').
|
||||
-define('MAINFLUX.RAWMESSAGE_PB_H', true).
|
||||
-record('mainflux.RawMessage',
|
||||
{channel = [] :: iodata() | undefined, % = 1
|
||||
subtopic = [] :: iodata() | undefined, % = 2
|
||||
publisher = [] :: iodata() | undefined, % = 3
|
||||
|
@ -19,9 +19,9 @@
|
|||
}).
|
||||
-endif.
|
||||
|
||||
-ifndef('MESSAGE_PB_H').
|
||||
-define('MESSAGE_PB_H', true).
|
||||
-record('Message',
|
||||
-ifndef('MAINFLUX.MESSAGE_PB_H').
|
||||
-define('MAINFLUX.MESSAGE_PB_H', true).
|
||||
-record('mainflux.Message',
|
||||
{channel = [] :: iodata() | undefined, % = 1
|
||||
subtopic = [] :: iodata() | undefined, % = 2
|
||||
publisher = [] :: iodata() | undefined, % = 3
|
||||
|
@ -29,16 +29,16 @@
|
|||
name = [] :: iodata() | undefined, % = 5
|
||||
unit = [] :: iodata() | undefined, % = 6
|
||||
value :: {floatValue, float() | integer() | infinity | '-infinity' | nan} | {stringValue, iodata()} | {boolValue, boolean() | 0 | 1} | {dataValue, iodata()} | undefined, % oneof
|
||||
valueSum = undefined :: message:'SumValue'() | undefined, % = 11
|
||||
valueSum = undefined :: message:'mainflux.SumValue'() | undefined, % = 11
|
||||
time = 0.0 :: float() | integer() | infinity | '-infinity' | nan | undefined, % = 12
|
||||
updateTime = 0.0 :: float() | integer() | infinity | '-infinity' | nan | undefined, % = 13
|
||||
link = [] :: iodata() | undefined % = 14
|
||||
}).
|
||||
-endif.
|
||||
|
||||
-ifndef('SUMVALUE_PB_H').
|
||||
-define('SUMVALUE_PB_H', true).
|
||||
-record('SumValue',
|
||||
-ifndef('MAINFLUX.SUMVALUE_PB_H').
|
||||
-define('MAINFLUX.SUMVALUE_PB_H', true).
|
||||
-record('mainflux.SumValue',
|
||||
{value = 0.0 :: float() | integer() | infinity | '-infinity' | nan | undefined % = 1
|
||||
}).
|
||||
-endif.
|
||||
|
|
|
@ -22,4 +22,4 @@ count = 100
|
|||
quiet = false
|
||||
|
||||
[mainflux]
|
||||
connections_file = "../provision/connections.toml"
|
||||
connections_file = "../provision/mfconn.toml"
|
Loading…
Reference in New Issue