4
0
Fork 0

Added event filtering, with formatted data

event-listener
hswick 7 years ago
parent dbd6262a40
commit f19651ef4d
  1. 50
      lib/exw3.ex
  2. 15
      test/exw3_test.exs

@ -295,8 +295,8 @@ defmodule ExW3 do
GenServer.start_link(__MODULE__, [], name: EventPoller) GenServer.start_link(__MODULE__, [], name: EventPoller)
end end
def subscribe(filter_id) do def filter(filter_id) do
GenServer.cast(EventPoller, {:subscribe, filter_id}) GenServer.cast(EventPoller, {:filter, filter_id})
end end
@impl true @impl true
@ -306,7 +306,7 @@ defmodule ExW3 do
end end
@impl true @impl true
def handle_cast({:subscribe, filter_id}, state) do def handle_cast({:filter, filter_id}, state) do
{:noreply, [filter_id | state]} {:noreply, [filter_id | state]}
end end
@ -314,7 +314,7 @@ defmodule ExW3 do
def handle_info(:work, state) do def handle_info(:work, state) do
# Do the desired work here # Do the desired work here
Enum.each state, fn filter_id -> Enum.each state, fn filter_id ->
send EventListener, {:event, filter_id, ExW3.get_filter_changes(filter_id)} send Listener, {:event, filter_id, ExW3.get_filter_changes(filter_id)}
end end
schedule_work() # Reschedule once more schedule_work() # Reschedule once more
@ -322,21 +322,21 @@ defmodule ExW3 do
end end
defp schedule_work() do defp schedule_work() do
Process.send_after(self(), :work, 1000) # In 1 sec Process.send_after(self(), :work, 500) # In 1/2 sec
end end
end end
defmodule Listener do defmodule EventListener do
def start_link do def start_link do
Poller.start_link() Poller.start_link()
{:ok, pid} = Task.start_link(fn -> loop(%{}) end) {:ok, pid} = Task.start_link(fn -> loop(%{}) end)
Process.register(pid, EventListener) Process.register(pid, Listener)
:ok :ok
end end
def subscribe(filter_id, pid) do def filter(filter_id, event_signature, pid) do
Poller.subscribe(filter_id) Poller.filter(filter_id)
send EventListener, {:subscribe, filter_id, pid} send Listener, {:filter, filter_id, event_signature, pid}
end end
def listen(callback) do def listen(callback) do
@ -346,13 +346,19 @@ defmodule ExW3 do
listen(callback) listen(callback)
end end
defp loop(map) do defp loop(state) do
receive do receive do
{:subscribe, filter_id, pid} -> {:filter, filter_id, event_signature, pid} ->
loop(Map.put(map, filter_id, pid)) loop(Map.put(state, filter_id, %{pid: pid, signature: event_signature}))
{:event, filter_id, data} -> {:event, filter_id, logs} ->
send Map.get(map, filter_id), {:event, {filter_id, data}} filter_attributes = Map.get(state, filter_id)
loop(map) Enum.each(logs, fn log ->
data = Map.get(log, "data")
new_data = ExW3.decode_event(data, filter_attributes[:signature])
new_log = Map.put(log, :data, new_data)
send filter_attributes[:pid], {:event, {filter_id, new_log}}
end)
loop(state)
end end
end end
end end
@ -404,8 +410,8 @@ defmodule ExW3 do
GenServer.call(pid, {:tx_receipt, tx_hash}) GenServer.call(pid, {:tx_receipt, tx_hash})
end end
def subscribe(pid, event_name, other_pid, event_data \\ %{}) do def filter(pid, event_name, other_pid, event_data \\ %{}) do
GenServer.call(pid, {:subscribe, {event_name, other_pid, event_data}}) GenServer.call(pid, {:filter, {event_name, other_pid, event_data}})
end end
# Server # Server
@ -514,10 +520,14 @@ defmodule ExW3 do
{:noreply, [{:address, address} | state]} {:noreply, [{:address, address} | state]}
end end
def handle_call({:subscribe, {event_name, other_pid, event_data}}, _from, state) do def handle_call({:filter, {event_name, other_pid, event_data}}, _from, state) do
unless Process.whereis(Listener) do
raise "EventListener process not alive. Call ExW3.EventListener.start_link before using ExW3.Contract.subscribe"
end
payload = Map.merge(%{address: state[:address], topics: [state[:event_names][event_name]]}, event_data) payload = Map.merge(%{address: state[:address], topics: [state[:event_names][event_name]]}, event_data)
filter_id = ExW3.new_filter(payload) filter_id = ExW3.new_filter(payload)
Listener.subscribe(filter_id, other_pid) event_signature = state[:events][state[:event_names][event_name]][:signature]
EventListener.filter(filter_id, event_signature, other_pid)
{:reply, filter_id, state ++ [event_name, filter_id]} {:reply, filter_id, state ++ [event_name, filter_id]}
end end

@ -147,10 +147,12 @@ defmodule EXW3Test do
) )
ExW3.Contract.at(EventTester, address) ExW3.Contract.at(EventTester, address)
ExW3.Listener.start_link()
filter_id = ExW3.Contract.subscribe(EventTester, "Simple", self()) {:ok, agent} = Agent.start_link(fn -> [] end)
ExW3.EventListener.start_link()
filter_id = ExW3.Contract.filter(EventTester, "Simple", self())
{:ok, tx_hash} = {:ok, tx_hash} =
ExW3.Contract.send( ExW3.Contract.send(
@ -159,14 +161,17 @@ defmodule EXW3Test do
["Hello, World!"], ["Hello, World!"],
%{from: Enum.at(context[:accounts], 0)} %{from: Enum.at(context[:accounts], 0)}
) )
receive do receive do
{:event, {filter_id, data}} -> {:event, {filter_id, data}} ->
IO.inspect data Agent.update(agent, fn list -> [data | list] end)
after 3_000 -> after 3_000 ->
raise "Never received event" raise "Never received event"
end end
state = Agent.get(agent, fn list -> list end)
assert Enum.at(state, 0) |> is_map
ExW3.uninstall_filter(filter_id) ExW3.uninstall_filter(filter_id)
end end

Loading…
Cancel
Save