我的团队一直试图让我们的一些WebJobs custom queue processors的有单身的行为,但我们还没有真正得到与[Singleton(Mode = SingletonMode.Listener)]
属性,也不符合设置QueueProcessorFactoryContext.BatchSize = 1
这种行为。这是造成夜间进程一下子踩住数据库 - 与许多的他们超时 - 并已成为一个有点头疼。
这是多了还是少了什么我们CustomQueueProcessorFactory样子:
public class CustomQueueProcessorFactory : IQueueProcessorFactory
{
public QueueProcessor Create(QueueProcessorFactoryContext context)
{
if (context == null)
throw new ArgumentNullException(nameof(context));
if (context.Queue.Name == Constants.UploadQueueName
|| context.Queue.Name == Constants.BuildQueueName)
{
context.BatchSize = 1;
}
return new QueueProcessor(context);
}
}
这是配置我们JobHost时参考:
var config = new JobHostConfiguration();
config.Queues.QueueProcessorFactory = new CustomQueueProcessorFactory();
我们还设立了一些功能与像QueueTrigger
s:
public static async Task ExecuteBackgroundRequest([QueueTrigger(Constants.BackgroundQueueName)] BackgroundRequest background, TextWriter logger)
{
await ExecuteRequest(background, logger);
}
[Singleton(Mode = SingletonMode.Listener)]
public static async Task ExecuteUploadRequest([QueueTrigger(Constants.UploadQueueName)] BackgroundRequest background, TextWriter logger)
{
await ExecuteRequest(background, logger);
}
[Singleton(Mode = SingletonMode.Listener)]
public static async Task ExecuteBuildRequest([QueueTrigger(Constants.BuildQueueName)] BackgroundRequest background, TextWriter logger)
{
await ExecuteRequest(background, logger);
}
我们使用的是包Microsoft.Azure.WebJobs
(+ .Core
,.Extensions
)V2.0.0和WindowsAzure.Storage
V8.0.0,这是有些过时,所以我一直在探索一个潜在的解决方案是更新到WebJobs的最新稳定版(v3.0.4) 。这开启蠕虫的一个全新的就可以了,因为配置已经完全重做,所有的类都被搬来搬去。该文件似乎稀疏/分散,所以我还没有决定在哪里(或者即使)我可以在每QueueProcessor基础自定义属性,如对一些队列,而较高的其他BATCHSIZE设置为1。
有WebJobs在那里我可以使用上述CustomQueueProcessorFactory逻辑限制BATCHSIZE的一些版本?或者辛格尔顿属性实际上将确保只有一个后台进程是在同一时间访问特定的队列?可以QueueProcessorFactories在最新版本WebJobs的配置?
帮助上对这些问题将非常感激!
该WebJobs SDK通过其SingletonAttribute促进共同分布式锁定场景。你可以简单地套用到SingletonAttribute工作职能,以确保该函数的所有调用将被序列化,甚至是跨横向扩展的情况。如果你的函数需要访问其他分布式资源或者执行不应该/不能同时进行其他操作,这非常有用。
[Singleton]
public static async Task ProcessImage([BlobTrigger("images")] Stream image)
{
// Process the image
}
如在此例中,只有processImage来功能的单一实例会在任何给定时间运行。当该功能是由被添加到图像容器的新图像触发时,运行时将首先尝试获取锁(BLOB租赁)。一旦收购,锁的功能执行期间举行(和BLOB租约到期),以确保没有其他情况下运行。如果在此功能运行的另一个功能实例被触发时,它会等待锁,定期轮询它。
辛格尔顿使用Azure Blob Leases在幕后实现分布式锁。所有管理团块租赁,续租等的复杂性是由SDK处理。
辛格尔顿锁细节也显示在仪表盘WebJobs,包括一个正在进行中的功能执行当前的锁定状态,以及功能等了多久了锁定取得前。您可以使用这些信息来查看和管理锁争用。
不并发场景需要使用辛格尔顿的。一些触发器必须通过其配置设置为并发管理的内在支持。在这种情况下,它可能会更有效的为您使用的内置支持。您可以使用这些设置,以确保您的功能运行在单个实例单。为了保证仅功能的单个实例横跨向外扩展正在运行的实例,除了可以在功能上适用的监听电平的Singleton锁(例如,[单例(模式= SingletonMode.Listener)])。对于一些触发器的配置旋钮是:
QueueTrigger - 您可以设置JobHostConfiguration.Queues.BatchSize 1个ServiceBusTrigger - 您可以设置ServiceBusConfiguration.MessageOptions.MaxConcurrentCalls 1 FileTrigger - 您可以设置FileProcessor.MaxDegreeOfParallelism 1但是,对于那些不支持并发控制固有的触发器,或者如果你希望通过辛格尔顿范围做更高级的锁定(见下文),辛格尔顿是正确的道路要走。
另外我想说的是实现像下面采用双锁你的CustomQueueProcessorFactory: -
if (_instance == null)
{
lock (SyncObject)
{
if (_instance == null)
{
_instance = new CustomQueueProcessor();
}
}
}
return _instance;