NOISSUE - Improve VerneMQ plugin code, add configurable gRPC pool size (#836)

* Enable gRPC support

Signed-off-by: drasko <drasko.draskovic@gmail.com>

* Add poolboy process pool

Signed-off-by: drasko <drasko.draskovic@gmail.com>

* Add Verne docker-compose

Signed-off-by: drasko <drasko.draskovic@gmail.com>

* Improve code, add settings for pool size

Signed-off-by: drasko <drasko.draskovic@gmail.com>
This commit is contained in:
Drasko DRASKOVIC 2019-09-03 01:05:36 +02:00 committed by GitHub
parent 38d85ae03b
commit ec4b46b669
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 27 additions and 7 deletions

View File

@ -233,6 +233,8 @@ services:
DOCKER_VERNEMQ_PLUGINS__VMQ_ACL: "off"
DOCKER_VERNEMQ_PLUGINS__MFX_AUTH: "on"
DOCKER_VERNEMQ_PLUGINS__MFX_AUTH__PATH: /mainflux/_build/default
DOCKER_VERNEMQ_LOG__CONSOLE__LEVEL: warning
MF_MQTT_VERNEMQ_GRPC_POOL_SIZE: 1000
ports:
- ${MF_MQTT_ADAPTER_PORT}:${MF_MQTT_ADAPTER_PORT}
- ${MF_MQTT_ADAPTER_WS_PORT}:${MF_MQTT_ADAPTER_WS_PORT}

View File

@ -23,7 +23,15 @@ start_link() ->
%% ===================================================================
init([]) ->
SizeArgs = [{size, 10}, {max_overflow, 10}],
PoolSize = case os:getenv("MF_MQTT_VERNEMQ_GRPC_POOL_SIZE") of
false ->
10;
PoolSizeEnv ->
{PoolSizeInt, _PoolSizeRest} = string:to_integer(PoolSizeEnv),
PoolSizeInt
end,
SizeArgs = [{size, PoolSize}, {max_overflow, PoolSize * 1.5}],
PoolArgs = [{name, {local, grpc_pool}}, {worker_module, mfx_grpc}],
WorkerArgs = [],
PoolSpec = poolboy:child_spec(grpc_pool, PoolArgs ++ SizeArgs, WorkerArgs),

View File

@ -16,7 +16,6 @@
init(_Args) ->
error_logger:info_msg("mfx_grpc genserver has started (~w)~n", [self()]),
[{_, GrpcUrl}] = ets:lookup(mfx_cfg, grpc_url),
{ok, {_, _, GrpcHost, GrpcPort, _, _}} = http_uri:parse(GrpcUrl),
error_logger:info_msg("grpc host: ~p, port: ~p", [GrpcHost, GrpcPort]),

View File

@ -4,6 +4,7 @@
start_link/0,
init/1,
publish/2,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
@ -35,6 +36,11 @@ publish(Subject, Message) ->
error_logger:info_msg("mfx_nats genserver publish ~p ~p", [Subject, Message]),
gen_server:cast(?MODULE, {publish, Subject, Message}).
% Currently unused, but kept to avoid compiler warnings (it expects handle_call/3 in the gen_server)
handle_call(Name, _From, _State) ->
Reply = lists:flatten(io_lib:format("Hello ~s from mfx_nats genserver", [Name])),
{reply, Reply, _State}.
handle_cast({publish, Subject, Message}, #state{conn = NatsConn} = State) ->
error_logger:info_msg("mfx_nats genserver cast ~p ~p ~p", [Subject, NatsConn, Message]),
nats:pub(NatsConn, Subject, #{payload => Message}),

View File

@ -5,6 +5,7 @@
start_link/0,
init/1,
publish/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2
@ -28,10 +29,14 @@ init(_Args) ->
publish(Message) ->
gen_server:cast(?MODULE, {publish, Message}).
% Currently unused, but kept to avoid compiler warnings (it expects handle_call/3 in the gen_server)
handle_call(Name, _From, _State) ->
Reply = lists:flatten(io_lib:format("Hello ~s from mfx_redis genserver", [Name])),
{reply, Reply, _State}.
handle_cast({publish, Message}, #state{conn = RedisConn} = State) ->
[{redis_conn, Conn}] = ets:lookup(mfx_cfg, redis_conn),
error_logger:info_msg("mfx_redis genserver cast ~p ~p", [RedisConn, Message]),
eredis:q(Conn, ["XADD" | Message]),
eredis:q(RedisConn, ["XADD" | Message]),
{noreply, State}.
handle_info(_Info, State) ->

View File

@ -6,7 +6,7 @@
size = 100
format = "text"
qos = 2
retain = true
retain = false
[mqtt.tls]
mtls = false
@ -14,8 +14,8 @@
ca = "ca.crt"
[test]
pubs = 3
subs = 1
pubs = 100
subs = 30
count = 100
[log]