从一条到多条并行创建边时出现并发错误

问题描述 投票:0回答:1

当尝试同时创建从给定顶点到其他各种唯一顶点的数百条边时,我收到以下错误。

{
    "requestId": "b90671d4-d8f0-4cca-b0c3-c908ff91d022",
    "code": "ConcurrentModificationException",
    "detailedMessage": "Failed to complete Insert operation for a VertexProperty due to conflicting concurrent operations. Please retry. 0 transactions are currently rolling back.",
    "message": "Failed to complete Insert operation for a VertexProperty due to conflicting concurrent operations. Please retry. 0 transactions are currently rolling back."
}

这让我特别困惑,因为据我所知,我只是创建边属性,并且没有边被修改两次,也没有任何两个顶点 a->b 具有多个边同时写入。我想知道这是否只是一个写得不好的错误消息,并且 gremlin 在从给定顶点创建许多边并向它们写入属性方面存在一些问题?我能想到的另一种可能性是我误解了我的查询,我实际上正在编写顶点属性而不是边属性,但我测试了类似的查询,但情况似乎并非如此。

我正在调试的代码:

router.post(
  '/users/contacts',
  asyncHandler(async (req: Request, res: Response) => {
    var successfulPhoneNumbers: string[] = [];
    var failedPhoneNumbers: string[] = [];
    try {
      const g = req.g;

      let { userId, contacts }: { userId: string; contacts: Contact[] } = req.body;

      if (!userId || !contacts) {
        return res.status(400).end('Required userId or contacts not supplied');
      }

      if (contacts.length > 300) {
        return res.status(400).end('Maximum 300 contacts allowed');
      }

      let limit = pLimit(300);

      var now = new Date(Date.now()).toISOString();

      // filter out duplicates
      contacts = contacts.filter(
        (contact, index, self) =>
          index === self.findIndex((t) => t.phoneNumber === contact.phoneNumber),
      );

      let contactsPromises = contacts.map(async (contact) => {
        let addContactProperties = (
          query: gremlin.process.GraphTraversal,
        ): gremlin.process.GraphTraversal => {
          query = query.property('createdAt', now);

          if (contact.firstName) {
            query = query.property('firstName', contact.firstName);
          }
          if (contact.lastName) {
            query = query.property('lastName', contact.lastName);
          }
          if (contact.email) {
            query = query.property('email', contact.email);
          }
          if (contact.birthday) {
            query = query.property('birthday', contact.birthday);
          }
          if (contact.address) {
            query = query.property('address', contact.address);
          }

          return query;
        };

        return limit(async () => {
          try {
            let phoneNumber = contact.phoneNumber;
            if (!isPhoneNumberValid(phoneNumber)) {
              failedPhoneNumbers.push(phoneNumber);
              return;
            }

            // if the contact is on the system, add an edge to their user node

            var contactAsUser = await g.V().hasLabel('user').has('phoneNumber', phoneNumber).next();
            var contactAsContact = await g
              .V()
              .hasLabel('contact')
              .has('phoneNumber', phoneNumber)
              .next();

            if (!contactAsUser.done) {
              let query = g
                .V(userId)
                .coalesce(
                  __.outE('HAS_CONTACT').where(__.inV().hasId(contactAsUser.value.id)),
                  __.addE('HAS_CONTACT').to(__.V(contactAsUser.value.id)),
                );
              query = addContactProperties(query);
              await query.iterate();
            } else if (!contactAsContact.done) {
              let query = g
                .V(userId)
                .coalesce(
                  __.outE('HAS_CONTACT').where(__.inV().hasId(contactAsContact.value.id)),
                  __.addE('HAS_CONTACT').to(__.V(contactAsContact.value.id)),
                );
              query = addContactProperties(query);
              await query.iterate();
            } else {
              // if the contact is not on the system, add an edge to a new contact node
              let query = g
                .V(userId)
                .coalesce(
                  __.outE('HAS_CONTACT').where(__.inV().has('phoneNumber', phoneNumber)),
                  __.addV('contact')
                    .as('contact')
                    .property(single, 'phoneNumber', phoneNumber)
                    .addE('HAS_CONTACT')
                    .from_(__.V(userId))
                    .to(__.select('contact')),
                );
              query = addContactProperties(query);
              await query.iterate();
            }
            successfulPhoneNumbers.push(phoneNumber);
          } catch (err: any) {
            console.log(err.toString());
            failedPhoneNumbers.push(contact.phoneNumber);
          }
        });
      });

      await Promise.all(contactsPromises);

      return res.json({
        successfulPhoneNumbers: successfulPhoneNumbers,
        failedPhoneNumbers: failedPhoneNumbers,
      });
    } catch (err: any) {
      return res.status(500).json({
        error: err.message ?? err,
        successfulPhoneNumbers: successfulPhoneNumbers,
        failedPhoneNumbers: failedPhoneNumbers,
      });
    } finally {
      await req.dc.close();
    }
  }),
);

node.js gremlin amazon-neptune
1个回答
0
投票

问题就在这里:

let query = g
                .V(userId)
                .coalesce(
                  __.outE('HAS_CONTACT').where(__.inV().has('phoneNumber', phoneNumber)),
                  __.addV('contact')
                    .as('contact')
                    .property(single, 'phoneNumber', phoneNumber)
                    .addE('HAS_CONTACT')
                    .from_(__.V(userId))
                    .to(__.select('contact')),
                );
              query = addContactProperties(query);
              await query.iterate();

2个问题:首先,合并是多余的,是以前版本的遗留物。其次,由于某种原因,删除 g.V(userId).addV().property() 清除了并发错误。为什么?也许与从另一个顶点添加一个顶点的行为有关?一个小谜团。

© www.soinside.com 2019 - 2024. All rights reserved.