是否有任何.NET数据结构/类组合允许将字节数据附加到缓冲区的末尾,但是所有的查找和读取都是从一开始,在我读取时缩短缓冲区?
MemoryStream
类似乎也是其中的一部分,但是我需要保持不同的读写位置,并且在读取后不会自动丢弃数据。
答案已经回复this question了,这基本上就是我要做的事情,但是我更喜欢我可以在同一进程的不同组件中进行异步I / O,就像普通管道甚至网络一样流(我需要先过滤/处理数据)。
我将发布一份我曾为工作项目写过的逻辑的剥离副本。这个版本的优点是它可以使用缓冲数据的链接列表,因此您在阅读时不必缓存大量内存和/或复制内存。此外,它的线程安全且行为类似于网络流,即:当没有可用数据时读取:等待直到有可用数据或超时。此外,当读取x个字节数且只有y个字节时,在读取所有字节后返回。我希望这有帮助!
public class SlidingStream : Stream
{
#region Other stream member implementations
...
#endregion Other stream member implementations
public SlidingStream()
{
ReadTimeout = -1;
}
private readonly object _writeSyncRoot = new object();
private readonly object _readSyncRoot = new object();
private readonly LinkedList<ArraySegment<byte>> _pendingSegments = new LinkedList<ArraySegment<byte>>();
private readonly ManualResetEventSlim _dataAvailableResetEvent = new ManualResetEventSlim();
public int ReadTimeout { get; set; }
public override int Read(byte[] buffer, int offset, int count)
{
if (_dataAvailableResetEvent.Wait(ReadTimeout))
throw new TimeoutException("No data available");
lock (_readSyncRoot)
{
int currentCount = 0;
int currentOffset = 0;
while (currentCount != count)
{
ArraySegment<byte> segment = _pendingSegments.First.Value;
_pendingSegments.RemoveFirst();
int index = segment.Offset;
for (; index < segment.Count; index++)
{
if (currentOffset < offset)
{
currentOffset++;
}
else
{
buffer[currentCount] = segment.Array[index];
currentCount++;
}
}
if (currentCount == count)
{
if (index < segment.Offset + segment.Count)
{
_pendingSegments.AddFirst(new ArraySegment<byte>(segment.Array, index, segment.Offset + segment.Count - index));
}
}
if (_pendingSegments.Count == 0)
{
_dataAvailableResetEvent.Reset();
return currentCount;
}
}
return currentCount;
}
}
public override void Write(byte[] buffer, int offset, int count)
{
lock (_writeSyncRoot)
{
byte[] copy = new byte[count];
Array.Copy(buffer, offset, copy, 0, count);
_pendingSegments.AddLast(new ArraySegment<byte>(copy));
_dataAvailableResetEvent.Set();
}
}
}
代码可以比接受的答案更简单。没有必要使用for
循环:
/// <summary>
/// This class is a very fast and threadsafe FIFO buffer
/// </summary>
public class FastFifo
{
private List<Byte> mi_FifoData = new List<Byte>();
/// <summary>
/// Get the count of bytes in the Fifo buffer
/// </summary>
public int Count
{
get
{
lock (mi_FifoData)
{
return mi_FifoData.Count;
}
}
}
/// <summary>
/// Clears the Fifo buffer
/// </summary>
public void Clear()
{
lock (mi_FifoData)
{
mi_FifoData.Clear();
}
}
/// <summary>
/// Append data to the end of the fifo
/// </summary>
public void Push(Byte[] u8_Data)
{
lock (mi_FifoData)
{
// Internally the .NET framework uses Array.Copy() which is extremely fast
mi_FifoData.AddRange(u8_Data);
}
}
/// <summary>
/// Get data from the beginning of the fifo.
/// returns null if s32_Count bytes are not yet available.
/// </summary>
public Byte[] Pop(int s32_Count)
{
lock (mi_FifoData)
{
if (mi_FifoData.Count < s32_Count)
return null;
// Internally the .NET framework uses Array.Copy() which is extremely fast
Byte[] u8_PopData = new Byte[s32_Count];
mi_FifoData.CopyTo(0, u8_PopData, 0, s32_Count);
mi_FifoData.RemoveRange(0, s32_Count);
return u8_PopData;
}
}
/// <summary>
/// Gets a byte without removing it from the Fifo buffer
/// returns -1 if the index is invalid
/// </summary>
public int PeekAt(int s32_Index)
{
lock (mi_FifoData)
{
if (s32_Index < 0 || s32_Index >= mi_FifoData.Count)
return -1;
return mi_FifoData[s32_Index];
}
}
}
我试过抛光Polity的代码。它远非优化,但可能只是有效。
public class SlidingStream : Stream {
public SlidingStream() {
ReadTimeout = -1;
}
private readonly object ReadSync = new object();
private readonly object WriteSync = new object();
private readonly ConcurrentQueue<ArraySegment<byte>> PendingSegments
= new ConcurrentQueue<ArraySegment<byte>>();
private readonly ManualResetEventSlim DataAvailable = new ManualResetEventSlim(false);
private ArraySegment<byte>? PartialSegment;
public new int ReadTimeout;
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => throw new NotImplementedException();
public override long Position {
get => throw new NotImplementedException();
set => throw new NotImplementedException();
}
private bool Closed;
public override void Close() {
Closed = true;
DataAvailable.Set();
base.Close();
}
public override int Read(byte[] buffer, int offset, int count) {
int msStart = Environment.TickCount;
lock (ReadSync) {
int read = 0;
while (read < count) {
ArraySegment<byte>? seg = TryDequeue(msStart);
if (seg == null) {
return read;
}
ArraySegment<byte> segment = seg.GetValueOrDefault();
int bite = Math.Min(count - read, segment.Count);
if (bite < segment.Count) {
PartialSegment = new ArraySegment<byte>(
segment.Array,
segment.Offset + bite,
segment.Count - bite
);
}
Array.Copy(segment.Array, segment.Offset, buffer, offset + read, bite);
read += bite;
}
return read;
}
}
private ArraySegment<byte>? TryDequeue(int msStart) {
ArraySegment<byte>? ps = PartialSegment;
if (ps.HasValue) {
PartialSegment = null;
return ps;
}
DataAvailable.Reset();
ArraySegment<byte> segment;
while (!PendingSegments.TryDequeue(out segment)) {
if (Closed) {
return null;
}
WaitDataOrTimeout(msStart);
}
return segment;
}
private void WaitDataOrTimeout(int msStart) {
int timeout;
if (ReadTimeout == -1) {
timeout = -1;
}
else {
timeout = msStart + ReadTimeout - Environment.TickCount;
}
if (!DataAvailable.Wait(timeout)) {
throw new TimeoutException("No data available");
}
}
public override void Write(byte[] buffer, int offset, int count) {
lock (WriteSync) {
byte[] copy = new byte[count];
Array.Copy(buffer, offset, copy, 0, count);
PendingSegments.Enqueue(new ArraySegment<byte>(copy));
DataAvailable.Set();
}
}
public override void Flush() {
throw new NotImplementedException();
}
public override long Seek(long offset, SeekOrigin origin) {
throw new NotImplementedException();
}
public override void SetLength(long value) {
throw new NotImplementedException();
}
}