是否可以使用Firebase实时数据库来实现分布式互斥体?

问题描述 投票:0回答:2

我正在考虑使用这样的事务来实现一种分布式锁:

const lockId = 'myLock';
const lockRef = firebaseAdmin.database().ref(`/locks/${lockId}`);
lockRef.transaction(function(current) {
  if (current === null) {
    return '1';
  }
}, function(error, committed) {
  if (committed) {
    // .... Do the synchronized work I need ...
    lockRef.remove();
  }
});

我的问题是:只有数据不存在时才会用null调用update函数吗?

一般来说,这是实现分布式锁的有效方法吗?

javascript firebase firebase-realtime-database transactions mutex
2个回答
2
投票

由于 @FrankvanPuffelen 在他们的答案中所述的原因,您最初的尝试将不起作用。

但实现这一点是可能的(尽管并不是那么简单)。我与不同的边缘情况进行了相当长的斗争,最后想出了这个解决方案,它通过了无数不同的测试,验证了这可以防止所有可能的竞争条件和死锁:

import crypto from 'crypto';
import { promisify } from 'util';

import * as firebaseAdmin from 'firebase-admin';

const randomBytes = promisify(crypto.randomBytes);

// A string which is stored in the place of the value to signal that the mutex holder has
// encountered an error. This must be unique value for each mutex so that we can distinguish old,
// stale rejection states from the failures of the mutex that we are currently waiting for.
const rejectionSignal = (mutexId: string): string => `rejected${mutexId}`;

const isValidValue = (value: unknown): boolean => {
  // `value` could be string in the form `rejected...` which signals failure,
  // using this function makes sure we don't return that as a "valid" value.
  return !!value && (typeof value !== 'string' || !value.startsWith('rejected'));
};

export const getOrSetValueWithLocking = async <T>(id: string, value: T): Promise<T> => {
  const ref = firebaseAdmin.database().ref(`/myValues/${id}`);

  const mutexRef = firebaseAdmin.database().ref(`/mutexes/myValues/${id}`);

  const attemptingMutexId = (await randomBytes(16)).toString('hex');

  const mutexTransaction = await mutexRef.transaction((data) => {
    if (data === null) {
      return attemptingMutexId;
    }
  });

  const owningMutexId = mutexTransaction.snapshot.val();

  if (mutexTransaction.committed) {
    // We own the mutex (meaning that `attemptingMutexId` equals `owningMutexId`).
    try {
      const existing = (await ref.once('value')).val();
      if (isValidValue(existing)) {
        return existing;
      }
      /*
        --- YOU CAN DO ANYTHING HERE ---
        E.g. create `value` here instead of passing it as an argument.
      */
      await ref.set(value);
      return value;
    } catch (e) {
      await ref.set(rejectionSignal(owningMutexId));
      throw e;
    } finally {
      // Since we own the mutex, we MUST make sure to release it, no matter what happens.
      await mutexRef.remove();
    }
  } else {
    // Some other caller owns the mutex -> if the value is not yet
    // available, wait for them to insert it or to signal a failure.
    return new Promise((resolve, reject) => {
      ref.on('value', (snapshot) => {
        const val = snapshot.val();
        if (isValidValue(val)) {
          resolve(val);
        } else if (val === rejectionSignal(owningMutexId)) {
          reject(new Error('Mutex holder encountered an error and was not able to set a value.'));
        } // else: Wait for a new value.
      });
    });
  }
};

我的用例是在 Vercel 中运行 Next.js API 路由,其中并行执行无服务器函数的唯一共享状态是 Firebase 实时数据库。


0
投票

最初将使用客户端对当前值的最佳猜测来调用交易。如果客户端内存中没有当前值,则最好的猜测是没有当前值。

这意味着,如果您得到

null
,则无法保证数据库中实际不存在任何值。

另请参阅:

© www.soinside.com 2019 - 2024. All rights reserved.