|
@@ -102,10 +102,10 @@ namespace Abc.Zebus.Persistence.RocksDb
|
|
|
// Message
|
|
|
if (!_outOfOrderAcks.TryRemove(entry.MessageId, out _))
|
|
|
// Message before ack
|
|
|
- _db.Put(key, entry.MessageBytes, _messagesColumnFamily);
|
|
|
+ _db.Put(key, entry.MessageBytes, _messagesColumnFamily);
|
|
|
else
|
|
|
// Otherwise ignore the message and remove the ack as it has already been acked
|
|
|
- _db.Remove(key, _acksColumnFamily);
|
|
|
+ _db.Remove(key, _acksColumnFamily);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -190,7 +190,9 @@ namespace Abc.Zebus.Persistence.RocksDb
|
|
|
Buffer.BlockCopy(peerPart, 0, key, 0, peerPart.Length);
|
|
|
|
|
|
var tickPart = BitConverter.GetBytes(ticks);
|
|
|
- Array.Reverse(tickPart); // change endianness
|
|
|
+ if (BitConverter.IsLittleEndian)
|
|
|
+ Array.Reverse(tickPart); // change endianness so sorting will be correct
|
|
|
+
|
|
|
Buffer.BlockCopy(tickPart, 0, key, peerPart.Length, sizeof(long));
|
|
|
|
|
|
var messageIdPart = messageId.ToByteArray();
|
|
@@ -215,14 +217,14 @@ namespace Abc.Zebus.Persistence.RocksDb
|
|
|
|
|
|
private static Guid ReadMessageIdFromKey(byte[] keyBytes)
|
|
|
{
|
|
|
- var messageIdBytes = new byte [_guidLength];
|
|
|
+ var messageIdBytes = new byte[_guidLength];
|
|
|
Buffer.BlockCopy(keyBytes, keyBytes.Length - _guidLength, messageIdBytes, 0, _guidLength);
|
|
|
var messageId = new Guid(messageIdBytes);
|
|
|
return messageId;
|
|
|
}
|
|
|
|
|
|
- private static PeerId ReadPeerKey(byte[] keyBytes) => new PeerId(Encoding.UTF8.GetString(keyBytes));
|
|
|
- private static byte[] GetPeerKey(PeerId peerId) => Encoding.UTF8.GetBytes(peerId.ToString());
|
|
|
+ private static PeerId ReadPeerKey(byte[] keyBytes) => new PeerId(Encoding.UTF8.GetString(keyBytes));
|
|
|
+ private static byte[] GetPeerKey(PeerId peerId) => Encoding.UTF8.GetBytes(peerId.ToString());
|
|
|
public static byte[] CreateKeyBuffer(PeerId entryPeerId) => new byte[Encoding.UTF8.GetByteCount(entryPeerId.ToString()) + sizeof(long) + _guidLength];
|
|
|
|
|
|
public static bool CompareStart(byte[] x, byte[] y, int length)
|