From d1fa980f11ca31c50f030a099947705eb69683ac Mon Sep 17 00:00:00 2001 From: Maciej Szlosarczyk Date: Mon, 27 May 2019 10:34:29 +0300 Subject: [PATCH] Create OTP structure for TCP side of the module --- apps/epp_proxy/src/epp_tcp_acceptor.erl | 44 ++++++++++++ apps/epp_proxy/src/epp_tcp_worker.erl | 95 +++++++++++++++++++++++++ apps/epp_proxy/src/pool_supervisor.erl | 16 +++++ 3 files changed, 155 insertions(+) create mode 100644 apps/epp_proxy/src/epp_tcp_acceptor.erl create mode 100644 apps/epp_proxy/src/epp_tcp_worker.erl create mode 100644 apps/epp_proxy/src/pool_supervisor.erl diff --git a/apps/epp_proxy/src/epp_tcp_acceptor.erl b/apps/epp_proxy/src/epp_tcp_acceptor.erl new file mode 100644 index 0000000..757106e --- /dev/null +++ b/apps/epp_proxy/src/epp_tcp_acceptor.erl @@ -0,0 +1,44 @@ +-module(epp_tcp_acceptor). + +-behaviour(gen_server). +-define(SERVER, ?MODULE). +-define(POOL_SUPERVISOR, pool_supervisor). +-define(WORKER, epp_tcp_worker). + + +%% gen_server callbacks +-export([init/1, handle_cast/2, handle_call/3, start_link/1]). + +-record(state, {socket, port, options}). + +start_link(Port) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, Port, []). + +init(Port) -> + Options = [binary, + {packet, raw}, + {active, false}, + {reuseaddr, true}], + + {ok, ListenSocket} = gen_tcp:listen(Port, Options), + gen_server:cast(self(), accept), + + {ok, #state{socket=ListenSocket, port=Port, options=Options}}. + +handle_cast(accept, State = #state{socket=ListenSocket, port=Port, options=Options}) -> + {ok, AcceptSocket} = gen_tcp:accept(ListenSocket), + {ok, NewOwner} = create_worker(AcceptSocket), + ok = gen_tcp:controlling_process(AcceptSocket, NewOwner), + gen_server:cast(NewOwner, greeting), + gen_server:cast(self(), accept), + {noreply, State#state{socket=ListenSocket, port=Port, options=Options}}. + +handle_call(_E, _From, State) -> {noreply, State}. + +create_worker(Socket) -> + ChildSpec = #{id => rand:uniform(), + type => worker, + modules => [?WORKER], + restart => temporary, + start => {?WORKER, start_link, [Socket]}}, + supervisor:start_child(?POOL_SUPERVISOR, ChildSpec). diff --git a/apps/epp_proxy/src/epp_tcp_worker.erl b/apps/epp_proxy/src/epp_tcp_worker.erl new file mode 100644 index 0000000..da3980f --- /dev/null +++ b/apps/epp_proxy/src/epp_tcp_worker.erl @@ -0,0 +1,95 @@ +-module(epp_tcp_worker). + +-behaviour(gen_server). +-define(SERVER, ?MODULE). + +%% gen_server callbacks +-export([init/1, handle_cast/2, handle_call/3, start_link/1]). +-export([terminate/2, code_change/3]). + +-record(state,{socket, length, session_id}). + +init(Socket) -> + logger:info("Created a test process"), + {ok, #state{socket=Socket}}. + +start_link(Socket) -> + gen_server:start_link(?MODULE, Socket, []). + +handle_cast(serve, State = #state{socket=Socket}) -> + {noreply, State#state{socket=Socket}}; +handle_cast(greeting, State = #state{socket=Socket}) -> + SessionId = session_id(), + + {Status, StatusCode, Headers, ClientRef} = + hackney:request(get, router:route_request("hello"), [], "", + [{cookie, [<<"session=">>, SessionId]}, insecure]), + + Body = <<"Some BODY">>, + + Length = byte_size(Body) + 4, + ByteSize = << Length:32/big >>, + io:format("State: ~p~n", [State]), + write_line(Socket, ByteSize), + write_line(Socket, Body), + gen_server:cast(self(), read_length), + {noreply, State#state{socket=Socket}}; +handle_cast(read_length, State = #state{socket=Socket}) -> + case read_length(Socket) of + {ok, Data} -> + NewState = State#state{length=Data}, + gen_server:cast(self(), read_frame), + {noreply, NewState}; + {error, _Details} -> + {stop, normal, State} + end; +handle_cast(read_frame, State = #state{socket=Socket, length=Length}) -> + case read_frame(Socket, Length) of + {ok, Data} -> + ByteLength = byte_size(Data) + 4, + ByteSize = << ByteLength:32/big >>, + write_line(Socket, ByteSize), + write_line(Socket, Data), + gen_server:cast(self(), read_length), + {noreply, State}; + {error, _Details} -> + {stop, normal, State} + end; +handle_cast(Message, State) -> + logger:info("Received message ~p.~n", [Message]), + {noreply, State}. + +handle_call(_E, _From, State) -> {noreply, State}. +terminate(_Reason, _State) -> + ok. +code_change(_OldVersion, State, _Extra) -> {ok, State}. + +%% Private function +write_line(Socket, Line) -> + gen_tcp:send(Socket, Line). + +read_length(Socket) -> + case gen_tcp:recv(Socket, 4) of + {ok, Data} -> + Length = binary:decode_unsigned(Data, big), + io:format("Preparing to receive: ~p~n", [Length]), + {ok, Length - 4}; + {error, Reason} -> + io:format("Error: ~p~n", [Reason]), + {error, Reason} + end. + +read_frame(Socket, FrameLength) -> + case gen_tcp:recv(Socket, FrameLength) of + {ok, Data} -> + io:format("Frame: ~p~n", [Data]), + {ok, Data}; + {error, Reason} -> + io:format("Error: ~p~n", [Reason]), + {error, Reason} + end. + +session_id() -> + UniqueMap = epp_util:create_map(self()), + BinaryHash = epp_util:create_session_id(UniqueMap), + BinaryHash. diff --git a/apps/epp_proxy/src/pool_supervisor.erl b/apps/epp_proxy/src/pool_supervisor.erl new file mode 100644 index 0000000..cb617b4 --- /dev/null +++ b/apps/epp_proxy/src/pool_supervisor.erl @@ -0,0 +1,16 @@ +-module(pool_supervisor). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +init([]) -> + SupFlags = #{strategy => one_for_one, intensity => 3, period => 60}, + {ok, {SupFlags, []}}.