|
|
|
@ -1241,8 +1241,25 @@ Cache::getTimelineMentions() |
|
|
|
|
return notifs; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
std::string |
|
|
|
|
Cache::previousBatchToken(const std::string &room_id) |
|
|
|
|
{ |
|
|
|
|
auto txn = lmdb::txn::begin(env_, nullptr); |
|
|
|
|
auto orderDb = getEventOrderDb(txn, room_id); |
|
|
|
|
|
|
|
|
|
auto cursor = lmdb::cursor::open(txn, orderDb); |
|
|
|
|
lmdb::val indexVal, val; |
|
|
|
|
if (!cursor.get(indexVal, val, MDB_FIRST)) { |
|
|
|
|
return ""; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
auto j = json::parse(std::string_view(val.data(), val.size())); |
|
|
|
|
|
|
|
|
|
return j.value("prev_batch", ""); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Cache::Messages |
|
|
|
|
Cache::getTimelineMessages(lmdb::txn &txn, const std::string &room_id, int64_t index, bool forward) |
|
|
|
|
Cache::getTimelineMessages(lmdb::txn &txn, const std::string &room_id, uint64_t index, bool forward) |
|
|
|
|
{ |
|
|
|
|
// TODO(nico): Limit the messages returned by this maybe?
|
|
|
|
|
auto orderDb = getOrderToMessageDb(txn, room_id); |
|
|
|
@ -1253,16 +1270,16 @@ Cache::getTimelineMessages(lmdb::txn &txn, const std::string &room_id, int64_t i |
|
|
|
|
lmdb::val indexVal, event_id; |
|
|
|
|
|
|
|
|
|
auto cursor = lmdb::cursor::open(txn, orderDb); |
|
|
|
|
if (index == std::numeric_limits<int64_t>::max()) { |
|
|
|
|
if (index == std::numeric_limits<uint64_t>::max()) { |
|
|
|
|
if (cursor.get(indexVal, event_id, forward ? MDB_FIRST : MDB_LAST)) { |
|
|
|
|
index = *indexVal.data<int64_t>(); |
|
|
|
|
index = *indexVal.data<uint64_t>(); |
|
|
|
|
} else { |
|
|
|
|
messages.end_of_cache = true; |
|
|
|
|
return messages; |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
if (cursor.get(indexVal, event_id, MDB_SET)) { |
|
|
|
|
index = *indexVal.data<int64_t>(); |
|
|
|
|
index = *indexVal.data<uint64_t>(); |
|
|
|
|
} else { |
|
|
|
|
messages.end_of_cache = true; |
|
|
|
|
return messages; |
|
|
|
@ -1296,7 +1313,7 @@ Cache::getTimelineMessages(lmdb::txn &txn, const std::string &room_id, int64_t i |
|
|
|
|
cursor.close(); |
|
|
|
|
|
|
|
|
|
// std::reverse(timeline.events.begin(), timeline.events.end());
|
|
|
|
|
messages.next_index = *indexVal.data<int64_t>(); |
|
|
|
|
messages.next_index = *indexVal.data<uint64_t>(); |
|
|
|
|
messages.end_of_cache = !ret; |
|
|
|
|
|
|
|
|
|
return messages; |
|
|
|
@ -1402,16 +1419,16 @@ Cache::getTimelineRange(const std::string &room_id) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
TimelineRange range{}; |
|
|
|
|
range.last = *indexVal.data<int64_t>(); |
|
|
|
|
range.last = *indexVal.data<uint64_t>(); |
|
|
|
|
|
|
|
|
|
if (!cursor.get(indexVal, val, MDB_FIRST)) { |
|
|
|
|
return {}; |
|
|
|
|
} |
|
|
|
|
range.first = *indexVal.data<int64_t>(); |
|
|
|
|
range.first = *indexVal.data<uint64_t>(); |
|
|
|
|
|
|
|
|
|
return range; |
|
|
|
|
} |
|
|
|
|
std::optional<int64_t> |
|
|
|
|
std::optional<uint64_t> |
|
|
|
|
Cache::getTimelineIndex(const std::string &room_id, std::string_view event_id) |
|
|
|
|
{ |
|
|
|
|
auto txn = lmdb::txn::begin(env_, nullptr, MDB_RDONLY); |
|
|
|
@ -1424,11 +1441,11 @@ Cache::getTimelineIndex(const std::string &room_id, std::string_view event_id) |
|
|
|
|
return {}; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return *val.data<int64_t>(); |
|
|
|
|
return *val.data<uint64_t>(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
std::optional<std::string> |
|
|
|
|
Cache::getTimelineEventId(const std::string &room_id, int64_t index) |
|
|
|
|
Cache::getTimelineEventId(const std::string &room_id, uint64_t index) |
|
|
|
|
{ |
|
|
|
|
auto txn = lmdb::txn::begin(env_, nullptr, MDB_RDONLY); |
|
|
|
|
auto orderDb = getOrderToMessageDb(txn, room_id); |
|
|
|
@ -2074,6 +2091,9 @@ Cache::saveTimelineMessages(lmdb::txn &txn, |
|
|
|
|
const std::string &room_id, |
|
|
|
|
const mtx::responses::Timeline &res) |
|
|
|
|
{ |
|
|
|
|
if (res.events.empty()) |
|
|
|
|
return; |
|
|
|
|
|
|
|
|
|
auto eventsDb = getEventsDb(txn, room_id); |
|
|
|
|
auto relationsDb = getRelationsDb(txn, room_id); |
|
|
|
|
|
|
|
|
@ -2090,16 +2110,16 @@ Cache::saveTimelineMessages(lmdb::txn &txn, |
|
|
|
|
using namespace mtx::events::state; |
|
|
|
|
|
|
|
|
|
lmdb::val indexVal, val; |
|
|
|
|
int64_t index = 0; |
|
|
|
|
auto cursor = lmdb::cursor::open(txn, orderDb); |
|
|
|
|
uint64_t index = std::numeric_limits<uint64_t>::max() / 2; |
|
|
|
|
auto cursor = lmdb::cursor::open(txn, orderDb); |
|
|
|
|
if (cursor.get(indexVal, val, MDB_LAST)) { |
|
|
|
|
index = *indexVal.data<int64_t>(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int64_t msgIndex = 0; |
|
|
|
|
auto msgCursor = lmdb::cursor::open(txn, order2msgDb); |
|
|
|
|
uint64_t msgIndex = std::numeric_limits<uint64_t>::max() / 2; |
|
|
|
|
auto msgCursor = lmdb::cursor::open(txn, order2msgDb); |
|
|
|
|
if (msgCursor.get(indexVal, val, MDB_LAST)) { |
|
|
|
|
msgIndex = *indexVal.data<int64_t>(); |
|
|
|
|
msgIndex = *indexVal.data<uint64_t>(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool first = true; |
|
|
|
@ -2111,39 +2131,19 @@ Cache::saveTimelineMessages(lmdb::txn &txn, |
|
|
|
|
continue; |
|
|
|
|
|
|
|
|
|
lmdb::val ev{}; |
|
|
|
|
bool success = |
|
|
|
|
lmdb::dbi_get(txn, eventsDb, lmdb::val(redaction->redacts), ev); |
|
|
|
|
if (!success) |
|
|
|
|
continue; |
|
|
|
|
|
|
|
|
|
mtx::events::collections::TimelineEvent te; |
|
|
|
|
|
|
|
|
|
try { |
|
|
|
|
mtx::events::collections::from_json( |
|
|
|
|
json::parse(std::string_view(ev.data(), ev.size())), te); |
|
|
|
|
} catch (std::exception &e) { |
|
|
|
|
nhlog::db()->error("Failed to parse message from cache {}", |
|
|
|
|
e.what()); |
|
|
|
|
continue; |
|
|
|
|
lmdb::dbi_put( |
|
|
|
|
txn, eventsDb, lmdb::val(redaction->redacts), lmdb::val(event.dump())); |
|
|
|
|
lmdb::dbi_put( |
|
|
|
|
txn, eventsDb, lmdb::val(redaction->event_id), lmdb::val(event.dump())); |
|
|
|
|
|
|
|
|
|
lmdb::val oldIndex{}; |
|
|
|
|
if (lmdb::dbi_get( |
|
|
|
|
txn, msg2orderDb, lmdb::val(redaction->redacts), oldIndex)) { |
|
|
|
|
lmdb::dbi_put( |
|
|
|
|
txn, order2msgDb, oldIndex, lmdb::val(redaction->event_id)); |
|
|
|
|
lmdb::dbi_put( |
|
|
|
|
txn, msg2orderDb, lmdb::val(redaction->event_id), oldIndex); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
auto redactedEvent = std::visit( |
|
|
|
|
[](const auto &ev) -> mtx::events::RoomEvent<mtx::events::msg::Redacted> { |
|
|
|
|
mtx::events::RoomEvent<mtx::events::msg::Redacted> replacement = |
|
|
|
|
{}; |
|
|
|
|
replacement.event_id = ev.event_id; |
|
|
|
|
replacement.room_id = ev.room_id; |
|
|
|
|
replacement.sender = ev.sender; |
|
|
|
|
replacement.origin_server_ts = ev.origin_server_ts; |
|
|
|
|
replacement.type = ev.type; |
|
|
|
|
return replacement; |
|
|
|
|
}, |
|
|
|
|
te.data); |
|
|
|
|
|
|
|
|
|
lmdb::dbi_put(txn, |
|
|
|
|
eventsDb, |
|
|
|
|
lmdb::val(redaction->redacts), |
|
|
|
|
lmdb::val(json(redactedEvent).dump())); |
|
|
|
|
} else { |
|
|
|
|
std::string event_id_val = event["event_id"].get<std::string>(); |
|
|
|
|
lmdb::val event_id = event_id_val; |
|
|
|
@ -2193,6 +2193,83 @@ Cache::saveTimelineMessages(lmdb::txn &txn, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
uint64_t |
|
|
|
|
Cache::saveOldMessages(const std::string &room_id, const mtx::responses::Messages &res) |
|
|
|
|
{ |
|
|
|
|
auto txn = lmdb::txn::begin(env_); |
|
|
|
|
auto eventsDb = getEventsDb(txn, room_id); |
|
|
|
|
auto relationsDb = getRelationsDb(txn, room_id); |
|
|
|
|
|
|
|
|
|
auto orderDb = getEventOrderDb(txn, room_id); |
|
|
|
|
auto msg2orderDb = getMessageToOrderDb(txn, room_id); |
|
|
|
|
auto order2msgDb = getOrderToMessageDb(txn, room_id); |
|
|
|
|
|
|
|
|
|
lmdb::val indexVal, val; |
|
|
|
|
uint64_t index = std::numeric_limits<uint64_t>::max() / 2; |
|
|
|
|
auto cursor = lmdb::cursor::open(txn, orderDb); |
|
|
|
|
if (cursor.get(indexVal, val, MDB_FIRST)) { |
|
|
|
|
index = *indexVal.data<uint64_t>(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
uint64_t msgIndex = std::numeric_limits<uint64_t>::max() / 2; |
|
|
|
|
auto msgCursor = lmdb::cursor::open(txn, order2msgDb); |
|
|
|
|
if (msgCursor.get(indexVal, val, MDB_FIRST)) { |
|
|
|
|
msgIndex = *indexVal.data<uint64_t>(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (res.chunk.empty()) |
|
|
|
|
return index; |
|
|
|
|
|
|
|
|
|
std::string event_id_val; |
|
|
|
|
for (const auto &e : res.chunk) { |
|
|
|
|
auto event = mtx::accessors::serialize_event(e); |
|
|
|
|
event_id_val = event["event_id"].get<std::string>(); |
|
|
|
|
lmdb::val event_id = event_id_val; |
|
|
|
|
lmdb::dbi_put(txn, eventsDb, event_id, lmdb::val(event.dump())); |
|
|
|
|
|
|
|
|
|
--index; |
|
|
|
|
|
|
|
|
|
json orderEntry = json::object(); |
|
|
|
|
orderEntry["event_id"] = event_id_val; |
|
|
|
|
|
|
|
|
|
nhlog::db()->debug("saving '{}'", orderEntry.dump()); |
|
|
|
|
|
|
|
|
|
lmdb::dbi_put( |
|
|
|
|
txn, orderDb, lmdb::val(&index, sizeof(index)), lmdb::val(orderEntry.dump())); |
|
|
|
|
|
|
|
|
|
// TODO(Nico): Allow blacklisting more event types in UI
|
|
|
|
|
if (event["type"] != "m.reaction" && event["type"] != "m.dummy") { |
|
|
|
|
--msgIndex; |
|
|
|
|
lmdb::dbi_put( |
|
|
|
|
txn, order2msgDb, lmdb::val(&msgIndex, sizeof(msgIndex)), event_id); |
|
|
|
|
|
|
|
|
|
lmdb::dbi_put( |
|
|
|
|
txn, msg2orderDb, event_id, lmdb::val(&msgIndex, sizeof(msgIndex))); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (event.contains("content") && event["content"].contains("m.relates_to")) { |
|
|
|
|
auto temp = event["content"]["m.relates_to"]; |
|
|
|
|
std::string relates_to = temp.contains("m.in_reply_to") |
|
|
|
|
? temp["m.in_reply_to"]["event_id"] |
|
|
|
|
: temp["event_id"]; |
|
|
|
|
|
|
|
|
|
if (!relates_to.empty()) |
|
|
|
|
lmdb::dbi_put(txn, relationsDb, lmdb::val(relates_to), event_id); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
json orderEntry = json::object(); |
|
|
|
|
orderEntry["event_id"] = event_id_val; |
|
|
|
|
orderEntry["prev_batch"] = res.end; |
|
|
|
|
lmdb::cursor_put( |
|
|
|
|
cursor.handle(), lmdb::val(&index, sizeof(index)), lmdb::val(orderEntry.dump())); |
|
|
|
|
nhlog::db()->debug("saving '{}'", orderEntry.dump()); |
|
|
|
|
|
|
|
|
|
txn.commit(); |
|
|
|
|
|
|
|
|
|
return msgIndex; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
mtx::responses::Notifications |
|
|
|
|
Cache::getTimelineMentionsForRoom(lmdb::txn &txn, const std::string &room_id) |
|
|
|
|
{ |
|
|
|
@ -2337,14 +2414,14 @@ Cache::deleteOldMessages() |
|
|
|
|
auto eventsDb = getEventsDb(txn, room_id); |
|
|
|
|
auto cursor = lmdb::cursor::open(txn, orderDb); |
|
|
|
|
|
|
|
|
|
int64_t first, last; |
|
|
|
|
uint64_t first, last; |
|
|
|
|
if (cursor.get(indexVal, val, MDB_LAST)) { |
|
|
|
|
last = *indexVal.data<int64_t>(); |
|
|
|
|
last = *indexVal.data<uint64_t>(); |
|
|
|
|
} else { |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
if (cursor.get(indexVal, val, MDB_FIRST)) { |
|
|
|
|
first = *indexVal.data<int64_t>(); |
|
|
|
|
first = *indexVal.data<uint64_t>(); |
|
|
|
|
} else { |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|