diff --git a/lib/exw3.ex b/lib/exw3.ex index 621bf42..aac35b4 100644 --- a/lib/exw3.ex +++ b/lib/exw3.ex @@ -295,8 +295,8 @@ defmodule ExW3 do GenServer.start_link(__MODULE__, [], name: EventPoller) end - def subscribe(filter_id) do - GenServer.cast(EventPoller, {:subscribe, filter_id}) + def filter(filter_id) do + GenServer.cast(EventPoller, {:filter, filter_id}) end @impl true @@ -306,7 +306,7 @@ defmodule ExW3 do end @impl true - def handle_cast({:subscribe, filter_id}, state) do + def handle_cast({:filter, filter_id}, state) do {:noreply, [filter_id | state]} end @@ -314,7 +314,7 @@ defmodule ExW3 do def handle_info(:work, state) do # Do the desired work here Enum.each state, fn filter_id -> - send EventListener, {:event, filter_id, ExW3.get_filter_changes(filter_id)} + send Listener, {:event, filter_id, ExW3.get_filter_changes(filter_id)} end schedule_work() # Reschedule once more @@ -322,21 +322,21 @@ defmodule ExW3 do end defp schedule_work() do - Process.send_after(self(), :work, 1000) # In 1 sec + Process.send_after(self(), :work, 500) # In 1/2 sec end end - defmodule Listener do + defmodule EventListener do def start_link do Poller.start_link() {:ok, pid} = Task.start_link(fn -> loop(%{}) end) - Process.register(pid, EventListener) + Process.register(pid, Listener) :ok end - def subscribe(filter_id, pid) do - Poller.subscribe(filter_id) - send EventListener, {:subscribe, filter_id, pid} + def filter(filter_id, event_signature, pid) do + Poller.filter(filter_id) + send Listener, {:filter, filter_id, event_signature, pid} end def listen(callback) do @@ -346,13 +346,19 @@ defmodule ExW3 do listen(callback) end - defp loop(map) do + defp loop(state) do receive do - {:subscribe, filter_id, pid} -> - loop(Map.put(map, filter_id, pid)) - {:event, filter_id, data} -> - send Map.get(map, filter_id), {:event, {filter_id, data}} - loop(map) + {:filter, filter_id, event_signature, pid} -> + loop(Map.put(state, filter_id, %{pid: pid, signature: event_signature})) + {:event, filter_id, logs} -> + filter_attributes = Map.get(state, filter_id) + Enum.each(logs, fn log -> + data = Map.get(log, "data") + new_data = ExW3.decode_event(data, filter_attributes[:signature]) + new_log = Map.put(log, :data, new_data) + send filter_attributes[:pid], {:event, {filter_id, new_log}} + end) + loop(state) end end end @@ -404,8 +410,8 @@ defmodule ExW3 do GenServer.call(pid, {:tx_receipt, tx_hash}) end - def subscribe(pid, event_name, other_pid, event_data \\ %{}) do - GenServer.call(pid, {:subscribe, {event_name, other_pid, event_data}}) + def filter(pid, event_name, other_pid, event_data \\ %{}) do + GenServer.call(pid, {:filter, {event_name, other_pid, event_data}}) end # Server @@ -514,10 +520,14 @@ defmodule ExW3 do {:noreply, [{:address, address} | state]} end - def handle_call({:subscribe, {event_name, other_pid, event_data}}, _from, state) do + def handle_call({:filter, {event_name, other_pid, event_data}}, _from, state) do + unless Process.whereis(Listener) do + raise "EventListener process not alive. Call ExW3.EventListener.start_link before using ExW3.Contract.subscribe" + end payload = Map.merge(%{address: state[:address], topics: [state[:event_names][event_name]]}, event_data) filter_id = ExW3.new_filter(payload) - Listener.subscribe(filter_id, other_pid) + event_signature = state[:events][state[:event_names][event_name]][:signature] + EventListener.filter(filter_id, event_signature, other_pid) {:reply, filter_id, state ++ [event_name, filter_id]} end diff --git a/test/exw3_test.exs b/test/exw3_test.exs index 158553c..c3f5cda 100644 --- a/test/exw3_test.exs +++ b/test/exw3_test.exs @@ -147,10 +147,12 @@ defmodule EXW3Test do ) ExW3.Contract.at(EventTester, address) - - ExW3.Listener.start_link() - filter_id = ExW3.Contract.subscribe(EventTester, "Simple", self()) + {:ok, agent} = Agent.start_link(fn -> [] end) + + ExW3.EventListener.start_link() + + filter_id = ExW3.Contract.filter(EventTester, "Simple", self()) {:ok, tx_hash} = ExW3.Contract.send( @@ -159,14 +161,17 @@ defmodule EXW3Test do ["Hello, World!"], %{from: Enum.at(context[:accounts], 0)} ) - + receive do {:event, {filter_id, data}} -> - IO.inspect data + Agent.update(agent, fn list -> [data | list] end) after 3_000 -> raise "Never received event" end + state = Agent.get(agent, fn list -> list end) + assert Enum.at(state, 0) |> is_map + ExW3.uninstall_filter(filter_id) end