我有一个长期运行的应用程序,任务是每2/3秒插入数据。大多数情况下,它运行良好。但是有时候我会超时。每当它插入约50条记录时,我都会检查一次。我检查了更多的负载,例如超过2000行。它完美地工作。一天中只有几次会引发超时异常。
来源:Microsoft.WindowsAzure.StorageTargetSite:T EndExecuteAsyncTStackTrace:在Microsoft.WindowsAzure.Storage.Core.Executor.Executor.EndExecuteAsync [T](IAsyncResult结果) 在Microsoft.WindowsAzure.Storage.Core.Util.AsyncExtensions。<> c__DisplayClass2`1.b__0(IAsyncResult ar)---从之前引发异常的位置开始的堆栈结束跟踪--- 在System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() 在System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(任务任务) 在System.Runtime.CompilerServices.TaskAwaiter.GetResult() 在smi.Server.Shared.VehicleHistoryLibrary.ATVehicleHistoryContext.d__4.MoveNext()
这是我的代码
ThreadPool.SetMinThreads(1024, 256);
ServicePointManager.DefaultConnectionLimit = 256;
ServicePointManager.UseNagleAlgorithm = false;
ServicePointManager.Expect100Continue = false;
var tableEntityGroups = histories.Select(h => new TrackHistoryTableEntity(h)).GroupBy(e => e.PartitionKey).ToDictionary(g => g.Key, g => g.ToList());
List<Task> tasks = new List<Task>();
foreach (var kvp in tableEntityGroups)
{
//Merge Track history records with the same FixTaken second into one, taking the average
var mergedHistories = kvp.Value.GroupBy(v => v.RowKey).Select(g => new TrackHistoryTableEntity()
{
PartitionKey = g.First().PartitionKey,
RowKey = g.First().RowKey,
A = g.Select(v => v.A).Average(),
N = g.Select(v => v.N).Average(),
V = g.Select(v => v.V).Average(),
B = g.Select(v => v.B).Average(),
D = g.Select(v => v.D).Sum()
});
TableBatchOperation batchOperation = new TableBatchOperation();
foreach (var v in mergedHistories)
{
batchOperation.Add(TableOperation.InsertOrReplace(v));
if (batchOperation.Count >= 100)
{
tasks.Add(TrackHistoryTable.ExecuteBatchAsync(batchOperation));
batchOperation = new TableBatchOperation();
}
}
if (batchOperation.Count > 0)
{
tasks.Add(TrackHistoryTable.ExecuteBatchAsync(batchOperation));
}
var splitKey = kvp.Value[0].PartitionKey.Split('_');
tasks.Add(TrackHistoryTracksTable.ExecuteAsync(TableOperation.InsertOrReplace(new TableEntity(splitKey[0], Int32.Parse(splitKey[1]).ToString()))));
if (trackPartitionUpdates)
tasks.Add(TrackHistoryPartitionUpdatesTable.ExecuteAsync(TableOperation.InsertOrReplace(new TableEntity(TrackHistoryTableEntity.GetHourTimestamp(DateTime.UtcNow).ToString(), kvp.Value[0].PartitionKey))));
}
await Task.WhenAll(tasks.ToArray());
这里有两个注意事项: