我正在 Python 中使用
neo4j
驱动程序,在我的 jupyter 笔记本中我有以下代码:
driver = GraphDatabase.driver('bolt://localhost:7687',\
auth=(os.getenv('NEO_USERNAME'),
os.getenv('NEO_PASSWORD')))
def create_constraint(tx):
tx.run("CREATE CONSTRAINT entityIndex IF NOT EXISTS FOR (e:Entity) REQUIRE (e.EntityId) IS UNIQUE")
def create_entity(tx, row):
tx.run("MERGE (e:Entity {EntityId: $entity_id}) ON CREATE SET n.LastAccess = timestamp() ON MATCH SET n.LastAccess = timestamp()", entity_id=row["entity_id"])
with driver.session() as session:
session.execute_write(create_constraint)
with driver.session() as session:
for idx, row in tqdm(df.iterrows(), total=len(df)):
try:
session.execute_write(create_entity, row)
except Exception as e:
print(e)
我不得不说,它对于小数据帧工作得很好,但是一旦超过 200000 行,性能就开始大幅下降......
我想知道我是否正确使用它,当然是否有更高效的方法来完成相同的任务。
您将需要使用
UNWIND
并对行进行批处理。这是我在 Python 训练中使用的 Python 代码片段:
import csv
from neo4j import GraphDatabase
URI = 'bolt://localhost:7687'
AUTH = ('neo4j', 'password')
driver = GraphDatabase.driver(URI, auth=AUTH)
query = '''
UNWIND $trackInfos AS trackInfo
CREATE (track:Track {id: trackInfo.id})
SET track.name = trackInfo.name
'''
BATCH_SIZE = 10_000
current_batch = []
def commit_batch(current_batch):
records, _, _ = driver.execute_query(
query,
trackInfos=current_batch,
database_='testload'
)
# reset current_batch to empty list
current_batch.clear()
def process_row(row, current_batch):
current_batch.append(row)
if len(current_batch) >= BATCH_SIZE:
commit_batch(current_batch)
with open('./files/sample_tracks_medium.csv', 'r') as file:
reader = csv.DictReader(file)
for row in reader:
process_row(row, current_batch)
else:
commit_batch(current_batch)
还有一些资源: