当尝试同时创建从给定顶点到其他各种唯一顶点的数百条边时,我收到以下错误。
{
"requestId": "b90671d4-d8f0-4cca-b0c3-c908ff91d022",
"code": "ConcurrentModificationException",
"detailedMessage": "Failed to complete Insert operation for a VertexProperty due to conflicting concurrent operations. Please retry. 0 transactions are currently rolling back.",
"message": "Failed to complete Insert operation for a VertexProperty due to conflicting concurrent operations. Please retry. 0 transactions are currently rolling back."
}
这让我特别困惑,因为据我所知,我只是创建边属性,并且没有边被修改两次,也没有任何两个顶点 a->b 具有多个边同时写入。我想知道这是否只是一个写得不好的错误消息,并且 gremlin 在从给定顶点创建许多边并向它们写入属性方面存在一些问题?我能想到的另一种可能性是我误解了我的查询,我实际上正在编写顶点属性而不是边属性,但我测试了类似的查询,但情况似乎并非如此。
我正在调试的代码:
router.post(
'/users/contacts',
asyncHandler(async (req: Request, res: Response) => {
var successfulPhoneNumbers: string[] = [];
var failedPhoneNumbers: string[] = [];
try {
const g = req.g;
let { userId, contacts }: { userId: string; contacts: Contact[] } = req.body;
if (!userId || !contacts) {
return res.status(400).end('Required userId or contacts not supplied');
}
if (contacts.length > 300) {
return res.status(400).end('Maximum 300 contacts allowed');
}
let limit = pLimit(300);
var now = new Date(Date.now()).toISOString();
// filter out duplicates
contacts = contacts.filter(
(contact, index, self) =>
index === self.findIndex((t) => t.phoneNumber === contact.phoneNumber),
);
let contactsPromises = contacts.map(async (contact) => {
let addContactProperties = (
query: gremlin.process.GraphTraversal,
): gremlin.process.GraphTraversal => {
query = query.property('createdAt', now);
if (contact.firstName) {
query = query.property('firstName', contact.firstName);
}
if (contact.lastName) {
query = query.property('lastName', contact.lastName);
}
if (contact.email) {
query = query.property('email', contact.email);
}
if (contact.birthday) {
query = query.property('birthday', contact.birthday);
}
if (contact.address) {
query = query.property('address', contact.address);
}
return query;
};
return limit(async () => {
try {
let phoneNumber = contact.phoneNumber;
if (!isPhoneNumberValid(phoneNumber)) {
failedPhoneNumbers.push(phoneNumber);
return;
}
// if the contact is on the system, add an edge to their user node
var contactAsUser = await g.V().hasLabel('user').has('phoneNumber', phoneNumber).next();
var contactAsContact = await g
.V()
.hasLabel('contact')
.has('phoneNumber', phoneNumber)
.next();
if (!contactAsUser.done) {
let query = g
.V(userId)
.coalesce(
__.outE('HAS_CONTACT').where(__.inV().hasId(contactAsUser.value.id)),
__.addE('HAS_CONTACT').to(__.V(contactAsUser.value.id)),
);
query = addContactProperties(query);
await query.iterate();
} else if (!contactAsContact.done) {
let query = g
.V(userId)
.coalesce(
__.outE('HAS_CONTACT').where(__.inV().hasId(contactAsContact.value.id)),
__.addE('HAS_CONTACT').to(__.V(contactAsContact.value.id)),
);
query = addContactProperties(query);
await query.iterate();
} else {
// if the contact is not on the system, add an edge to a new contact node
let query = g
.V(userId)
.coalesce(
__.outE('HAS_CONTACT').where(__.inV().has('phoneNumber', phoneNumber)),
__.addV('contact')
.as('contact')
.property(single, 'phoneNumber', phoneNumber)
.addE('HAS_CONTACT')
.from_(__.V(userId))
.to(__.select('contact')),
);
query = addContactProperties(query);
await query.iterate();
}
successfulPhoneNumbers.push(phoneNumber);
} catch (err: any) {
console.log(err.toString());
failedPhoneNumbers.push(contact.phoneNumber);
}
});
});
await Promise.all(contactsPromises);
return res.json({
successfulPhoneNumbers: successfulPhoneNumbers,
failedPhoneNumbers: failedPhoneNumbers,
});
} catch (err: any) {
return res.status(500).json({
error: err.message ?? err,
successfulPhoneNumbers: successfulPhoneNumbers,
failedPhoneNumbers: failedPhoneNumbers,
});
} finally {
await req.dc.close();
}
}),
);
问题就在这里:
let query = g
.V(userId)
.coalesce(
__.outE('HAS_CONTACT').where(__.inV().has('phoneNumber', phoneNumber)),
__.addV('contact')
.as('contact')
.property(single, 'phoneNumber', phoneNumber)
.addE('HAS_CONTACT')
.from_(__.V(userId))
.to(__.select('contact')),
);
query = addContactProperties(query);
await query.iterate();
2个问题:首先,合并是多余的,是以前版本的遗留物。其次,由于某种原因,删除 g.V(userId).addV().property() 清除了并发错误。为什么?也许与从另一个顶点添加一个顶点的行为有关?一个小谜团。