4
0
Fork 0

Event pub sub working, with filtering, and using a registry

contracts
hswick 7 years ago
parent 6cf9295896
commit e19440c94d
  1. 51
      lib/exw3.ex
  2. 10
      test/exw3_test.exs

@ -176,17 +176,17 @@ defmodule ExW3 do
use GenServer use GenServer
def start_link do def start_link do
GenServer.start_link(__MODULE__, %{block_number: ExW3.block_number()}) GenServer.start_link(__MODULE__, %{block_number: ExW3.block_number(), subscribers: %MapSet{}}, name: ExW3.EventPublisher)
end end
def init(state) do def init(state) do
PubSub.start_link() Registry.start_link(keys: :unique, name: Registry.ExW3.EventPubSub)
schedule_block() schedule_block()
{:ok, state} {:ok, state}
end end
def subscribe(subscriber, event_signature) do def filter_unsubscribed(logs, state) do
PubSub.subscribe(subscriber, event_signature) Enum.filter(logs, fn log -> MapSet.member?(state[:subscribers], log["address"]) end)
end end
def handle_info(:block, state) do def handle_info(:block, state) do
@ -196,9 +196,11 @@ defmodule ExW3 do
tx_receipts = Enum.map(block["transactions"], fn tx -> ExW3.tx_receipt(tx["hash"]) end) tx_receipts = Enum.map(block["transactions"], fn tx -> ExW3.tx_receipt(tx["hash"]) end)
for logs <- Enum.map(tx_receipts, fn receipt -> receipt["logs"] end) do for logs <- Enum.map(tx_receipts, fn receipt -> receipt["logs"] end) do
for log <- logs do for log <- filter_unsubscribed(logs, state) do
for topic <- log["topics"] do for topic <- log["topics"] do
PubSub.publish(String.slice(topic, 2..-1), log["data"]) Registry.dispatch(Registry.ExW3.EventPubSub, String.slice(topic, 2..-1), fn entries ->
for {pid, _} <- entries, do: send(pid, {:eth_event, log["data"]})
end)
end end
end end
end end
@ -207,6 +209,13 @@ defmodule ExW3 do
{:noreply, Map.merge(state, %{block_number: block_number})} {:noreply, Map.merge(state, %{block_number: block_number})}
end end
def handle_cast({:new_subscriber, {:address, address}}, state) do
{_, new_state} = Map.get_and_update(state, :subscribers, fn subscribers ->
{subscribers, MapSet.put(subscribers, address)}
end)
{:noreply, new_state}
end
defp schedule_block() do defp schedule_block() do
Process.send_after(self(), :block, 1000) Process.send_after(self(), :block, 1000)
end end
@ -215,27 +224,29 @@ defmodule ExW3 do
def decode_event(data, signature) do def decode_event(data, signature) do
fs = ABI.FunctionSelector.decode(signature) fs = ABI.FunctionSelector.decode(signature)
#IO.inspect fs
data data
|> Base.decode16!(case: :lower) #|> ABI.TypeDecoder.decode(fs)
|> ABI.TypeDecoder.decode(fs)
end end
defmodule EventSubscriber do defmodule EventSubscriber do
def start_link(signature, callback) do use GenServer
pid = spawn(fn -> loop(%{callback: callback, signature: signature}) end)
ExW3.EventPublisher.subscribe(pid, ExW3.encode_event(signature)) def start_link(topic, address, callback) do
{:ok, pid} GenServer.start_link(__MODULE__, %{callback: callback, topic: topic, address: address})
end end
def loop(state) do def init(state) do
receive do encoded_event = ExW3.encode_event(state[:topic])
message -> Registry.register(Registry.ExW3.EventPubSub, encoded_event, [])
apply(state[:callback], [ GenServer.cast(ExW3.EventPublisher, {:new_subscriber, {:address, state[:address]}})
ExW3.decode_event(String.slice(message, 2..-1), state[:signature]) {:ok, state}
]) end
loop(state) def handle_info({:eth_event, message}, state) do
end apply state[:callback], [ExW3.decode_event(message, state[:topic])]
{:noreply, state}
end end
end end
end end

@ -58,6 +58,8 @@ defmodule EXW3Test do
receipt = ExW3.tx_receipt tx_hash receipt = ExW3.tx_receipt tx_hash
#IO.inspect receipt
#IO.inspect ExW3.block receipt["blockNumber"] #IO.inspect ExW3.block receipt["blockNumber"]
{:ok, result} = ExW3.Contract.method(storage, :get) {:ok, result} = ExW3.Contract.method(storage, :get)
@ -80,14 +82,14 @@ defmodule EXW3Test do
{:ok, event_pub} = ExW3.EventPublisher.start_link {:ok, event_pub} = ExW3.EventPublisher.start_link
{:ok, pid} = ExW3.EventSubscriber.start_link( {:ok, pid} = ExW3.EventSubscriber.start_link(
"Simple(uint256,bytes32)", "Simple(uint256,bytes32)",
contract_address,
fn event_data -> fn event_data ->
str = str =
event_data event_data
|> Enum.at(1)
|> ExW3.bytes_to_string
assert str == "Hello, World!" IO.inspect str
end end
) )

Loading…
Cancel
Save