From e19440c94dbf26edf0259a421431787fbb1bef6f Mon Sep 17 00:00:00 2001 From: hswick Date: Mon, 16 Apr 2018 10:50:34 -0500 Subject: [PATCH] Event pub sub working, with filtering, and using a registry --- lib/exw3.ex | 51 ++++++++++++++++++++++++++++------------------ test/exw3_test.exs | 10 +++++---- 2 files changed, 37 insertions(+), 24 deletions(-) diff --git a/lib/exw3.ex b/lib/exw3.ex index 10dc99c..3d7d3e8 100644 --- a/lib/exw3.ex +++ b/lib/exw3.ex @@ -176,17 +176,17 @@ defmodule ExW3 do use GenServer def start_link do - GenServer.start_link(__MODULE__, %{block_number: ExW3.block_number()}) + GenServer.start_link(__MODULE__, %{block_number: ExW3.block_number(), subscribers: %MapSet{}}, name: ExW3.EventPublisher) end def init(state) do - PubSub.start_link() + Registry.start_link(keys: :unique, name: Registry.ExW3.EventPubSub) schedule_block() {:ok, state} end - def subscribe(subscriber, event_signature) do - PubSub.subscribe(subscriber, event_signature) + def filter_unsubscribed(logs, state) do + Enum.filter(logs, fn log -> MapSet.member?(state[:subscribers], log["address"]) end) end def handle_info(:block, state) do @@ -196,9 +196,11 @@ defmodule ExW3 do tx_receipts = Enum.map(block["transactions"], fn tx -> ExW3.tx_receipt(tx["hash"]) end) for logs <- Enum.map(tx_receipts, fn receipt -> receipt["logs"] end) do - for log <- logs do + for log <- filter_unsubscribed(logs, state) do for topic <- log["topics"] do - PubSub.publish(String.slice(topic, 2..-1), log["data"]) + Registry.dispatch(Registry.ExW3.EventPubSub, String.slice(topic, 2..-1), fn entries -> + for {pid, _} <- entries, do: send(pid, {:eth_event, log["data"]}) + end) end end end @@ -207,6 +209,13 @@ defmodule ExW3 do {:noreply, Map.merge(state, %{block_number: block_number})} end + def handle_cast({:new_subscriber, {:address, address}}, state) do + {_, new_state} = Map.get_and_update(state, :subscribers, fn subscribers -> + {subscribers, MapSet.put(subscribers, address)} + end) + {:noreply, new_state} + end + defp schedule_block() do Process.send_after(self(), :block, 1000) end @@ -215,27 +224,29 @@ defmodule ExW3 do def decode_event(data, signature) do fs = ABI.FunctionSelector.decode(signature) + #IO.inspect fs + data - |> Base.decode16!(case: :lower) - |> ABI.TypeDecoder.decode(fs) + #|> ABI.TypeDecoder.decode(fs) end defmodule EventSubscriber do - def start_link(signature, callback) do - pid = spawn(fn -> loop(%{callback: callback, signature: signature}) end) - ExW3.EventPublisher.subscribe(pid, ExW3.encode_event(signature)) - {:ok, pid} + use GenServer + + def start_link(topic, address, callback) do + GenServer.start_link(__MODULE__, %{callback: callback, topic: topic, address: address}) end - def loop(state) do - receive do - message -> - apply(state[:callback], [ - ExW3.decode_event(String.slice(message, 2..-1), state[:signature]) - ]) + def init(state) do + encoded_event = ExW3.encode_event(state[:topic]) + Registry.register(Registry.ExW3.EventPubSub, encoded_event, []) + GenServer.cast(ExW3.EventPublisher, {:new_subscriber, {:address, state[:address]}}) + {:ok, state} + end - loop(state) - end + def handle_info({:eth_event, message}, state) do + apply state[:callback], [ExW3.decode_event(message, state[:topic])] + {:noreply, state} end end end diff --git a/test/exw3_test.exs b/test/exw3_test.exs index ea7fc02..a5d9715 100644 --- a/test/exw3_test.exs +++ b/test/exw3_test.exs @@ -58,6 +58,8 @@ defmodule EXW3Test do receipt = ExW3.tx_receipt tx_hash + #IO.inspect receipt + #IO.inspect ExW3.block receipt["blockNumber"] {:ok, result} = ExW3.Contract.method(storage, :get) @@ -80,14 +82,14 @@ defmodule EXW3Test do {:ok, event_pub} = ExW3.EventPublisher.start_link {:ok, pid} = ExW3.EventSubscriber.start_link( - "Simple(uint256,bytes32)", + "Simple(uint256,bytes32)", + contract_address, fn event_data -> str = event_data - |> Enum.at(1) - |> ExW3.bytes_to_string - assert str == "Hello, World!" + IO.inspect str + end )