From ec4b46b6698d58925fb96c8d9bf0761629d502f6 Mon Sep 17 00:00:00 2001 From: Drasko DRASKOVIC Date: Tue, 3 Sep 2019 01:05:36 +0200 Subject: [PATCH] NOISSUE - Improve VerneMQ plugin code, add configurable gRPC pool size (#836) * Enable gRPC support Signed-off-by: drasko * Add poolboy process pool Signed-off-by: drasko * Add Verne docker-compose Signed-off-by: drasko * Improve code, add settings for pool size Signed-off-by: drasko --- docker/docker-compose-verne.yml | 2 ++ mqtt/verne/src/mfx_auth_sup.erl | 10 +++++++++- mqtt/verne/src/mfx_grpc.erl | 1 - mqtt/verne/src/mfx_nats.erl | 6 ++++++ mqtt/verne/src/mfx_redis.erl | 9 +++++++-- tools/mqtt-bench/templates/config.toml | 6 +++--- 6 files changed, 27 insertions(+), 7 deletions(-) diff --git a/docker/docker-compose-verne.yml b/docker/docker-compose-verne.yml index 00f815f4..84923b02 100644 --- a/docker/docker-compose-verne.yml +++ b/docker/docker-compose-verne.yml @@ -233,6 +233,8 @@ services: DOCKER_VERNEMQ_PLUGINS__VMQ_ACL: "off" DOCKER_VERNEMQ_PLUGINS__MFX_AUTH: "on" DOCKER_VERNEMQ_PLUGINS__MFX_AUTH__PATH: /mainflux/_build/default + DOCKER_VERNEMQ_LOG__CONSOLE__LEVEL: warning + MF_MQTT_VERNEMQ_GRPC_POOL_SIZE: 1000 ports: - ${MF_MQTT_ADAPTER_PORT}:${MF_MQTT_ADAPTER_PORT} - ${MF_MQTT_ADAPTER_WS_PORT}:${MF_MQTT_ADAPTER_WS_PORT} diff --git a/mqtt/verne/src/mfx_auth_sup.erl b/mqtt/verne/src/mfx_auth_sup.erl index 95675e53..d7c6eddb 100644 --- a/mqtt/verne/src/mfx_auth_sup.erl +++ b/mqtt/verne/src/mfx_auth_sup.erl @@ -23,7 +23,15 @@ start_link() -> %% =================================================================== init([]) -> - SizeArgs = [{size, 10}, {max_overflow, 10}], + PoolSize = case os:getenv("MF_MQTT_VERNEMQ_GRPC_POOL_SIZE") of + false -> + 10; + PoolSizeEnv -> + {PoolSizeInt, _PoolSizeRest} = string:to_integer(PoolSizeEnv), + PoolSizeInt + end, + + SizeArgs = [{size, PoolSize}, {max_overflow, PoolSize * 1.5}], PoolArgs = [{name, {local, grpc_pool}}, {worker_module, mfx_grpc}], WorkerArgs = [], PoolSpec = poolboy:child_spec(grpc_pool, PoolArgs ++ SizeArgs, WorkerArgs), diff --git a/mqtt/verne/src/mfx_grpc.erl b/mqtt/verne/src/mfx_grpc.erl index e4fa38ce..69819123 100644 --- a/mqtt/verne/src/mfx_grpc.erl +++ b/mqtt/verne/src/mfx_grpc.erl @@ -16,7 +16,6 @@ init(_Args) -> error_logger:info_msg("mfx_grpc genserver has started (~w)~n", [self()]), - [{_, GrpcUrl}] = ets:lookup(mfx_cfg, grpc_url), {ok, {_, _, GrpcHost, GrpcPort, _, _}} = http_uri:parse(GrpcUrl), error_logger:info_msg("grpc host: ~p, port: ~p", [GrpcHost, GrpcPort]), diff --git a/mqtt/verne/src/mfx_nats.erl b/mqtt/verne/src/mfx_nats.erl index def0b82c..5248b564 100644 --- a/mqtt/verne/src/mfx_nats.erl +++ b/mqtt/verne/src/mfx_nats.erl @@ -4,6 +4,7 @@ start_link/0, init/1, publish/2, + handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -35,6 +36,11 @@ publish(Subject, Message) -> error_logger:info_msg("mfx_nats genserver publish ~p ~p", [Subject, Message]), gen_server:cast(?MODULE, {publish, Subject, Message}). +% Currently unused, but kept to avoid compiler warnings (it expects handle_call/3 in the gen_server) +handle_call(Name, _From, _State) -> + Reply = lists:flatten(io_lib:format("Hello ~s from mfx_nats genserver", [Name])), + {reply, Reply, _State}. + handle_cast({publish, Subject, Message}, #state{conn = NatsConn} = State) -> error_logger:info_msg("mfx_nats genserver cast ~p ~p ~p", [Subject, NatsConn, Message]), nats:pub(NatsConn, Subject, #{payload => Message}), diff --git a/mqtt/verne/src/mfx_redis.erl b/mqtt/verne/src/mfx_redis.erl index 0688f4a0..74bbcb60 100644 --- a/mqtt/verne/src/mfx_redis.erl +++ b/mqtt/verne/src/mfx_redis.erl @@ -5,6 +5,7 @@ start_link/0, init/1, publish/1, + handle_call/3, handle_cast/2, handle_info/2, terminate/2 @@ -28,10 +29,14 @@ init(_Args) -> publish(Message) -> gen_server:cast(?MODULE, {publish, Message}). +% Currently unused, but kept to avoid compiler warnings (it expects handle_call/3 in the gen_server) +handle_call(Name, _From, _State) -> + Reply = lists:flatten(io_lib:format("Hello ~s from mfx_redis genserver", [Name])), + {reply, Reply, _State}. + handle_cast({publish, Message}, #state{conn = RedisConn} = State) -> - [{redis_conn, Conn}] = ets:lookup(mfx_cfg, redis_conn), error_logger:info_msg("mfx_redis genserver cast ~p ~p", [RedisConn, Message]), - eredis:q(Conn, ["XADD" | Message]), + eredis:q(RedisConn, ["XADD" | Message]), {noreply, State}. handle_info(_Info, State) -> diff --git a/tools/mqtt-bench/templates/config.toml b/tools/mqtt-bench/templates/config.toml index d64a1e27..11895618 100644 --- a/tools/mqtt-bench/templates/config.toml +++ b/tools/mqtt-bench/templates/config.toml @@ -6,7 +6,7 @@ size = 100 format = "text" qos = 2 - retain = true + retain = false [mqtt.tls] mtls = false @@ -14,8 +14,8 @@ ca = "ca.crt" [test] -pubs = 3 -subs = 1 +pubs = 100 +subs = 30 count = 100 [log]