4
0
Fork 0

Deleted EventListener and EventPoller, replaced with simple get_changes method as part of Contract module

new-events
hswick 6 years ago
parent e9498bce69
commit 0aad3bcecd
  1. 241
      lib/exw3.ex
  2. 88
      test/exw3_test.exs

@ -384,138 +384,6 @@ defmodule ExW3 do
end
end
defmodule Poller do
use GenServer
def start_link do
GenServer.start_link(__MODULE__, [], name: EventPoller)
end
def filter(filter_id) do
GenServer.cast(EventPoller, {:filter, filter_id})
end
@impl true
def init(state) do
# Schedule work to be performed on start
schedule_work()
{:ok, state}
end
@impl true
def handle_cast({:filter, filter_id}, state) do
{:noreply, [filter_id | state]}
end
@impl true
def handle_info(:work, state) do
# Do the desired work here
Enum.each(state, fn filter_id ->
send(Listener, {:event, filter_id, ExW3.get_filter_changes(filter_id)})
end)
# Reschedule once more
schedule_work()
{:noreply, state}
end
defp schedule_work() do
# In 1/2 sec
Process.send_after(self(), :work, 500)
end
end
defmodule EventListener do
def start_link do
Poller.start_link()
{:ok, pid} = Task.start_link(fn -> loop(%{}) end)
Process.register(pid, Listener)
:ok
end
def filter(filter_id, event_fields, pid) do
Poller.filter(filter_id)
send(Listener, {:filter, filter_id, event_fields, pid})
end
def listen(callback) do
receive do
{:event, result} -> apply(callback, [result])
end
listen(callback)
end
defp extract_non_indexed_fields(data, names, signature) do
Enum.zip(names, ExW3.decode_event(data, signature)) |> Enum.into(%{})
end
defp format_log_data(log, event_attributes) do
non_indexed_fields =
extract_non_indexed_fields(
Map.get(log, "data"),
event_attributes[:non_indexed_names],
event_attributes[:signature]
)
indexed_fields =
if length(log["topics"]) > 1 do
[_head | tail] = log["topics"]
decoded_topics =
Enum.map(0..(length(tail) - 1), fn i ->
topic_type = Enum.at(event_attributes[:topic_types], i)
topic_data = Enum.at(tail, i)
{decoded} = ExW3.decode_data(topic_type, topic_data)
decoded
end)
Enum.zip(event_attributes[:topic_names], decoded_topics) |> Enum.into(%{})
else
%{}
end
new_data = Map.merge(indexed_fields, non_indexed_fields)
Map.put(log, "data", new_data)
end
defp loop(state) do
receive do
{:filter, filter_id, event_attributes, pid} ->
loop(Map.put(state, filter_id, %{pid: pid, event_attributes: event_attributes}))
{:event, filter_id, logs} ->
filter_attributes = Map.get(state, filter_id)
event_attributes = filter_attributes[:event_attributes]
unless logs == [] do
Enum.each(logs, fn log ->
formatted_log =
Enum.reduce(
[
ExW3.keys_to_decimal(log, [
"blockNumber",
"logIndex",
"transactionIndex",
"transactionLogIndex"
]),
format_log_data(log, event_attributes)
],
&Map.merge/2
)
send(filter_attributes[:pid], {:event, {filter_id, formatted_log}})
end)
end
loop(state)
end
end
end
defmodule Contract do
use GenServer
@ -524,7 +392,7 @@ defmodule ExW3 do
@spec start_link() :: {:ok, pid()}
@doc "Begins the Contract process to manage all interactions with smart contracts"
def start_link() do
GenServer.start_link(__MODULE__, %{}, name: ContractManager)
GenServer.start_link(__MODULE__, %{filters: %{}}, name: ContractManager)
end
@spec deploy(keyword(), []) :: {:ok, binary(), []}
@ -585,10 +453,19 @@ defmodule ExW3 do
GenServer.call(ContractManager, {:tx_receipt, {contract_name, tx_hash}})
end
def filter(contract_name, event_name, other_pid, event_data \\ %{}) do
@spec filter(keyword(), binary(), %{}) :: {:ok, binary()}
def filter(contract_name, event_name, event_data \\ %{}) do
GenServer.call(
ContractManager,
{:filter, {contract_name, event_name, other_pid, event_data}}
{:filter, {contract_name, event_name, event_data}}
)
end
@spec get_changes(binary(), integer()) :: {:ok, []}
def get_changes(filter_id, seconds \\ 0) do
GenServer.call(
ContractManager,
{:get_changes, {filter_id, seconds}}
)
end
@ -668,7 +545,10 @@ defmodule ExW3 do
names
end)
[events: Enum.into(signature_types_map, %{}), event_names: Enum.into(names_map, %{})]
[
events: Enum.into(signature_types_map, %{}),
event_names: Enum.into(names_map, %{})
]
end
# Helpers
@ -746,9 +626,9 @@ defmodule ExW3 do
)
end
defp add_helper(contract_info) do
defp register_helper(contract_info) do
if contract_info[:abi] do
contract_info ++ init_events(contract_info[:abi])
contract_info ++ init_events(contract_info[:abi])
else
raise "ABI not provided upon initialization"
end
@ -770,7 +650,7 @@ defmodule ExW3 do
end
def handle_cast({:register, {name, contract_info}}, state) do
{:noreply, Map.put(state, name, add_helper(contract_info))}
{:noreply, Map.put(state, name, register_helper(contract_info))}
end
# Calls
@ -850,12 +730,49 @@ defmodule ExW3 do
|> Map.delete(:topics)
end
def handle_call({:filter, {contract_name, event_name, other_pid, event_data}}, _from, state) do
def get_event_attributes(state, contract_name, event_name) do
contract_info = state[contract_name]
contract_info[:events][contract_info[:event_names][event_name]]
end
unless Process.whereis(Listener) do
raise "EventListener process not alive. Call ExW3.EventListener.start_link before using ExW3.Contract.subscribe"
end
defp extract_non_indexed_fields(data, names, signature) do
Enum.zip(names, ExW3.decode_event(data, signature)) |> Enum.into(%{})
end
defp format_log_data(log, event_attributes) do
non_indexed_fields =
extract_non_indexed_fields(
Map.get(log, "data"),
event_attributes[:non_indexed_names],
event_attributes[:signature]
)
indexed_fields =
if length(log["topics"]) > 1 do
[_head | tail] = log["topics"]
decoded_topics =
Enum.map(0..(length(tail) - 1), fn i ->
topic_type = Enum.at(event_attributes[:topic_types], i)
topic_data = Enum.at(tail, i)
{decoded} = ExW3.decode_data(topic_type, topic_data)
decoded
end)
Enum.zip(event_attributes[:topic_names], decoded_topics) |> Enum.into(%{})
else
%{}
end
new_data = Map.merge(indexed_fields, non_indexed_fields)
Map.put(log, "data", new_data)
end
def handle_call({:filter, {contract_name, event_name, event_data}}, _from, state) do
contract_info = state[contract_name]
event_signature = contract_info[:event_names][event_name]
topic_types = contract_info[:events][event_signature][:topic_types]
@ -870,11 +787,43 @@ defmodule ExW3 do
)
filter_id = ExW3.new_filter(payload)
event_attributes = contract_info[:events][contract_info[:event_names][event_name]]
EventListener.filter(filter_id, event_attributes, other_pid)
{:reply, filter_id, Map.put(state, :filters, Map.put(state[:filters], filter_id, %{contract_name: contract_name, event_name: event_name}))}
end
def handle_call({:get_changes, {filter_id, seconds}}, _from, state) do
:timer.sleep(1000 * seconds)
filter_info = Map.get(state[:filters], filter_id)
event_attributes = get_event_attributes(state, filter_info[:contract_name], filter_info[:event_name])
logs = ExW3.get_filter_changes(filter_id)
formatted_logs =
if logs != [] do
Enum.map(logs, fn log ->
formatted_log =
Enum.reduce(
[
ExW3.keys_to_decimal(log, [
"blockNumber",
"logIndex",
"transactionIndex",
"transactionLogIndex"
]),
format_log_data(log, event_attributes)
],
&Map.merge/2
)
formatted_log
end)
else
logs
end
{:reply, filter_id, Map.put(state, contract_name, contract_info ++ [event_name, filter_id])}
{:reply, {:ok, formatted_logs}, state}
end
def handle_call({:deploy, {name, args}}, _from, state) do

@ -185,7 +185,7 @@ defmodule EXW3Test do
assert data == "Hello, World!"
end
test "starts a Contract GenServer and uses the event listener", context do
test "Testing formatted get filter changes", context do
ExW3.Contract.register(:EventTester, abi: context[:event_tester_abi])
{:ok, address, _} =
@ -200,13 +200,9 @@ defmodule EXW3Test do
ExW3.Contract.at(:EventTester, address)
{:ok, agent} = Agent.start_link(fn -> [] end)
ExW3.EventListener.start_link()
# Test non indexed events
filter_id = ExW3.Contract.filter(:EventTester, "Simple", self())
filter_id = ExW3.Contract.filter(:EventTester, "Simple")
{:ok, _tx_hash} =
ExW3.Contract.send(
@ -216,16 +212,10 @@ defmodule EXW3Test do
%{from: Enum.at(context[:accounts], 0), gas: 30_000}
)
receive do
{:event, {_filter_id, data}} ->
Agent.update(agent, fn list -> [data | list] end)
after
3_000 ->
raise "Never received event"
end
{:ok, change_logs} = ExW3.Contract.get_changes(filter_id)
state = Agent.get(agent, fn list -> list end)
event_log = Enum.at(state, 0)
event_log = Enum.at(change_logs, 0)
assert event_log |> is_map
log_data = Map.get(event_log, "data")
@ -237,9 +227,7 @@ defmodule EXW3Test do
# Test indexed events
{:ok, agent} = Agent.start_link(fn -> [] end)
indexed_filter_id = ExW3.Contract.filter(:EventTester, "SimpleIndex", self())
indexed_filter_id = ExW3.Contract.filter(:EventTester, "SimpleIndex")
{:ok, _tx_hash} =
ExW3.Contract.send(
@ -249,16 +237,10 @@ defmodule EXW3Test do
%{from: Enum.at(context[:accounts], 0), gas: 30_000}
)
receive do
{:event, {_filter_id, data}} ->
Agent.update(agent, fn list -> [data | list] end)
after
3_000 ->
raise "Never received event"
end
{:ok, change_logs} = ExW3.Contract.get_changes(indexed_filter_id)
state = Agent.get(agent, fn list -> list end)
event_log = Enum.at(state, 0)
event_log = Enum.at(change_logs, 0)
assert event_log |> is_map
log_data = Map.get(event_log, "data")
assert log_data |> is_map
@ -269,13 +251,10 @@ defmodule EXW3Test do
# Test Indexing Indexed Events
{:ok, agent} = Agent.start_link(fn -> [] end)
indexed_filter_id =
ExW3.Contract.filter(
:EventTester,
"SimpleIndex",
self(),
%{
topics: [nil, ["Hello, World", "Hello, World!"]],
fromBlock: 1,
@ -291,51 +270,24 @@ defmodule EXW3Test do
%{from: Enum.at(context[:accounts], 0), gas: 30_000}
)
receive do
{:event, {_filter_id, data}} ->
Agent.update(agent, fn list -> [data | list] end)
after
3_000 ->
raise "Never received event"
end
{:ok, change_logs} = ExW3.Contract.get_changes(indexed_filter_id)
state = Agent.get(agent, fn list -> list end)
event_log = Enum.at(state, 0)
event_log = Enum.at(change_logs, 0)
assert event_log |> is_map
log_data = Map.get(event_log, "data")
assert log_data |> is_map
assert Map.get(log_data, "num") == 46
assert ExW3.bytes_to_string(Map.get(log_data, "data")) == "Hello, World!"
assert Map.get(log_data, "otherNum") == 42
ExW3.uninstall_filter(indexed_filter_id)
end
test "starts a EventTester", context do
ExW3.Contract.register(:EventTester, abi: context[:event_tester_abi])
{:ok, address, _} =
ExW3.Contract.deploy(
:EventTester,
bin: ExW3.load_bin("test/examples/build/EventTester.bin"),
options: %{
gas: 300_000,
from: Enum.at(context[:accounts], 0)
}
)
ExW3.Contract.at(:EventTester, address)
# Test Indexing Indexed Events with Map params
ExW3.EventListener.start_link()
{:ok, agent} = Agent.start_link(fn -> [] end)
# Tests filter with map params
indexed_filter_id =
ExW3.Contract.filter(
:EventTester,
"SimpleIndex",
self(),
%{
topics: %{num: 46, data: "Hello, World!"}
}
@ -349,23 +301,19 @@ defmodule EXW3Test do
%{from: Enum.at(context[:accounts], 0), gas: 30_000}
)
receive do
{:event, {_filter_id, data}} ->
Agent.update(agent, fn list -> [data | list] end)
after
3_000 ->
raise "Never received event"
end
# Demonstrating the delay capability
{:ok, change_logs} = ExW3.Contract.get_changes(indexed_filter_id, 1)
state = Agent.get(agent, fn list -> list end)
event_log = Enum.at(state, 0)
event_log = Enum.at(change_logs, 0)
assert event_log |> is_map
log_data = Map.get(event_log, "data")
assert log_data |> is_map
assert Map.get(log_data, "num") == 46
assert ExW3.bytes_to_string(Map.get(log_data, "data")) == "Hello, World!"
assert Map.get(log_data, "otherNum") == 42
ExW3.uninstall_filter(indexed_filter_id)
end
test "starts a Contract GenServer for Complex contract", context do

Loading…
Cancel
Save