|
|
@ -2199,27 +2199,31 @@ Cache::firstPendingMessage(const std::string &room_id) |
|
|
|
auto txn = lmdb::txn::begin(env_); |
|
|
|
auto txn = lmdb::txn::begin(env_); |
|
|
|
auto pending = getPendingMessagesDb(txn, room_id); |
|
|
|
auto pending = getPendingMessagesDb(txn, room_id); |
|
|
|
|
|
|
|
|
|
|
|
auto pendingCursor = lmdb::cursor::open(txn, pending); |
|
|
|
{ |
|
|
|
lmdb::val tsIgnored, pendingTxn; |
|
|
|
auto pendingCursor = lmdb::cursor::open(txn, pending); |
|
|
|
while (pendingCursor.get(tsIgnored, pendingTxn, MDB_NEXT)) { |
|
|
|
lmdb::val tsIgnored, pendingTxn; |
|
|
|
auto eventsDb = getEventsDb(txn, room_id); |
|
|
|
while (pendingCursor.get(tsIgnored, pendingTxn, MDB_NEXT)) { |
|
|
|
lmdb::val event; |
|
|
|
auto eventsDb = getEventsDb(txn, room_id); |
|
|
|
if (!lmdb::dbi_get(txn, eventsDb, pendingTxn, event)) { |
|
|
|
lmdb::val event; |
|
|
|
lmdb::dbi_del(txn, pending, tsIgnored, pendingTxn); |
|
|
|
if (!lmdb::dbi_get(txn, eventsDb, pendingTxn, event)) { |
|
|
|
continue; |
|
|
|
lmdb::dbi_del(txn, pending, tsIgnored, pendingTxn); |
|
|
|
} |
|
|
|
continue; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
try { |
|
|
|
try { |
|
|
|
mtx::events::collections::TimelineEvent te; |
|
|
|
mtx::events::collections::TimelineEvent te; |
|
|
|
mtx::events::collections::from_json( |
|
|
|
mtx::events::collections::from_json( |
|
|
|
json::parse(std::string_view(event.data(), event.size())), te); |
|
|
|
json::parse(std::string_view(event.data(), event.size())), te); |
|
|
|
|
|
|
|
|
|
|
|
txn.commit(); |
|
|
|
pendingCursor.close(); |
|
|
|
return te; |
|
|
|
txn.commit(); |
|
|
|
} catch (std::exception &e) { |
|
|
|
return te; |
|
|
|
nhlog::db()->error("Failed to parse message from cache {}", e.what()); |
|
|
|
} catch (std::exception &e) { |
|
|
|
lmdb::dbi_del(txn, pending, tsIgnored, pendingTxn); |
|
|
|
nhlog::db()->error("Failed to parse message from cache {}", |
|
|
|
continue; |
|
|
|
e.what()); |
|
|
|
|
|
|
|
lmdb::dbi_del(txn, pending, tsIgnored, pendingTxn); |
|
|
|
|
|
|
|
continue; |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -2231,13 +2235,16 @@ Cache::firstPendingMessage(const std::string &room_id) |
|
|
|
void |
|
|
|
void |
|
|
|
Cache::removePendingStatus(const std::string &room_id, const std::string &txn_id) |
|
|
|
Cache::removePendingStatus(const std::string &room_id, const std::string &txn_id) |
|
|
|
{ |
|
|
|
{ |
|
|
|
auto txn = lmdb::txn::begin(env_); |
|
|
|
auto txn = lmdb::txn::begin(env_); |
|
|
|
auto pending = getPendingMessagesDb(txn, room_id); |
|
|
|
auto pending = getPendingMessagesDb(txn, room_id); |
|
|
|
auto pendingCursor = lmdb::cursor::open(txn, pending); |
|
|
|
|
|
|
|
lmdb::val tsIgnored, pendingTxn; |
|
|
|
{ |
|
|
|
while (pendingCursor.get(tsIgnored, pendingTxn, MDB_NEXT)) { |
|
|
|
auto pendingCursor = lmdb::cursor::open(txn, pending); |
|
|
|
if (std::string_view(pendingTxn.data(), pendingTxn.size()) == txn_id) |
|
|
|
lmdb::val tsIgnored, pendingTxn; |
|
|
|
lmdb::cursor_del(pendingCursor); |
|
|
|
while (pendingCursor.get(tsIgnored, pendingTxn, MDB_NEXT)) { |
|
|
|
|
|
|
|
if (std::string_view(pendingTxn.data(), pendingTxn.size()) == txn_id) |
|
|
|
|
|
|
|
lmdb::cursor_del(pendingCursor); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
txn.commit(); |
|
|
|
txn.commit(); |
|
|
|