我在 Azure 函数中遇到线程重复问题。
该函数做了一些基本的事情:访问 SQL 数据库,调用一些 API 并完成执行。问题是:同一个 API 被多次调用。
行为如下:azure函数被队列触发,开始运行。它正确地开始执行,但过了一会儿,它会通过生成一个新的 Thd 并导致代码的所有剩余部分执行两次来复制它的执行。
我可以通过将函数设置为单例来解决它。这会起作用,但我需要并行执行,所以这不是一个选项。我需要的是让函数触发多次,但在内部它不会在两个线程中重复相同的执行。
我最初认为这是一个扩展问题:当流量很高时它会重复执行以获得更好的性能。但这不是真的:没有横向扩展触发器。
一些事实:
public void Run([QueueTrigger("testtest", Connection = "storageCnn")] string orderData)
{
try
{
_logger?.LogDebug($"{nameof(FunctionNAme)} created. DryRun = {_inDryRun}");
_logger?.LogInformation($"Running {nameof(FunctionNAme)}...");
//derserialize
var tuple = JsonConvert.DeserializeObject<ExternalOrderDTO>(orderData);
dynamic orderLineObj = LookupOrder(tuple.OrderLineId, tuple.ProductID);
dynamic orderLine = orderLineObj.OrderLine;
string cost = orderLineObj.Amount;
string deviceId = orderLineObj.DeviceId;
string paymentReference = orderLineObj.PaymentReference;
_logger?.LogInformation($"loaded order from database = {orderLine != null}...");
if (_inDryRun)
{
//just return dummy values
Thread.Sleep(60000); // wait 1 minute for testing (simulating work)
//set complete
UpdateDB(orderLine, OrderStatus.Complete);
//inform payment webservice to check payment of parent order.
_wsHandler.InvokePaymentWS(orderLine.VOBEOrderLineId);
return;
}
_logger?.LogInformation("We are NOT in test-mode, continuing normal flow");
//add to System A
var systemaArticles = GetArticlesFromSystema();
var articleId = MatchAndGetArticleId(systemaArticles, orderLine);
string systemAOrderRef = AddOrderToSystemA(orderLine, articleId, cost, deviceId, paymentReference);
_logger?.LogInformation($"systemA order Ref: {systemAOrderRef }");
if (int.TryParse(systemAOrderRef, out int systemRef) && systemRef > 0) //update status in database
{
_logger?.LogInformation($"adding in DB : systemRef '{orderLine.ExternalOrderLineReference}', OrderLine: '{orderLine.Id}'");
orderLine.ExternalOrderReference = systemRef .ToString();
UpdateDB(orderLine, OrderStatus.Processing);
//wait for 2 sec.
_logger?.LogInformation("sleeping for 2 sec before reading the QR...");
Thread.Sleep(2000);
_logger?.LogInformation($"Reading the QR code of orderLine {orderLine.Id} on system...");
RunSystemReadingQRSequence(orderLine);
}
else
throw new Exception($"failed to get orderreference from system for VOBEOrder: {orderLine.VOBEOrderLineId}");
}
catch (Exception e)
{
_logger?.LogError($"Exception while executing {nameof(FunctionNAme)}. \n{e.Message}");
_logger?.LogError(e.ToString());
throw;
}
}
private void RunSystemReadingQRSequence(Entity orderLine)
{
try
{
_logger?.LogInformation($"asking System A for QR code of orderLine {orderLine.VOBEOrderLineId}");
// call QR endpoint
var qr = ReadQrFromSystemA(orderLine);
// is no QR is received, it means something went wrong
if (String.IsNullOrEmpty(qr))
{
throw new Exception("failed to load the QR code. Returning msg to queue");
}
// if the value is present, store it in the DB
_logger?.LogInformation($"saving the Qr code into the DB...");
orderLine.QRCode = qr;
UpdateDB(orderLine, OrderStatus.Complete);
//inform payment webservice to check payment of parent order.
_logger?.LogInformation($"informing payment webservice to check payment of parent order...");
_wsHandler.InvokePaymentWS(orderLine.VOBEOrderLineId);
}
catch (Exception e)
{
_logger?.LogError($"Exception while asking System A QR code: {e.Message}");
_logger?.LogError(e.ToString());
throw new Exception("failed to load the QR code. Returning msg to queue");
}
}
public void InvokePaymentWS(int orderLineId)
{
var req = WebRequest.CreateHttp(_url);
req.AuthenticationLevel = AuthenticationLevel.None;
req.Method = HttpMethods.Post;
req.Headers.Add(HttpRequestHeader.ContentType, "application/json");
req.Headers.Add("Authorization", "bearer {_key.Trim()}");
try
{
using var stream = req.GetRequestStream();
var bytes = Encoding.UTF8.GetBytes($"{{ \"orderLineId\": {orderLineId} }}");
stream.Write(bytes, 0, bytes.Length);
HttpWebResponse response = req.GetResponse() as HttpWebResponse;
_logger?.LogInformation($"response from web service = {response.StatusCode}");
if (response?.StatusCode == HttpStatusCode.Accepted)
_logger?.LogInformation("all good from WebsService calls");
}
catch (Exception ex)
{
_logger?.LogError(ex, ex.Message);
}
}
我试图避免横向扩展,但这不是问题