把外部数据写入 Neo4j 是项目落地的第一步。本文按数据规模与在线/离线需求,梳理常见存储格式、导入方式与对接工具——从小文件 CSV 到亿级边 bulk import,再到 Python 驱动与 CDC 流式同步。
段末注释:ETL(extract, transform, load,抽取、转换、加载)指将源系统数据清洗后载入目标库的流程;CDC(change data capture,变更数据捕获)指捕获源库增量变更并同步到下游。
一、导入方式选型
1 | flowchart TD |
| 方式 | 规模 | 在线 | 速度 |
|---|---|---|---|
| LOAD CSV | 小~中 | 是 | 中 |
| 官方驱动批写 | 小~中 | 是 | 中 |
| neo4j-admin import | 大 | 否(空库) | 极快 |
| APOC | 中 | 是 | 中~高 |
| Spark Connector | 大 | 离线 | 高 |
二、常见数据格式
2.1 边列表 CSV(最常见)
edges.csv:
1 | src_id,dst_id,rel_type,score |
nodes.csv(可选,节点属性单独维护):
1 | id,label,name,organism |
2.2 单文件「长表」
1 | start_label,start_key,end_label,end_key,relationship,prop_key,prop_value |
2.3 JSON
1 | { |
2.4 GraphML / GML
图工具(yEd、Cytoscape、NetworkX)常用交换格式;通常经 NetworkX 或 Java 工具 转为 CSV 再导入。
2.5 RDF / Turtle
语义网三元组;Neo4j 侧可用 Neosemantics (n10s) 插件导入 RDF,或 ETL 转为属性图。
2.6 SQL 关系库
外键表 → 边;实体表 → 节点。工具:自研脚本、SQL 导出 CSV、Debezium CDC。
三、LOAD CSV(在线中小规模)
3.1 准备文件
将 CSV 放入 Neo4j 的 import/ 目录(Docker 挂载 -v ./import:/var/lib/neo4j/import)。
3.2 导入节点
1 | LOAD CSV WITH HEADERS FROM 'file:///nodes.csv' AS row |
3.3 导入关系
1 | LOAD CSV WITH HEADERS FROM 'file:///edges.csv' AS row |
3.4 远程 CSV
1 | LOAD CSV WITH HEADERS FROM 'https://example.com/data.csv' AS row |
需在 neo4j.conf 开启:
1 | dbms.security.allow_csv_import_from_file_urls=true |
3.5 注意事项
| 点 | 建议 |
|---|---|
| 大文件 | 用 USING PERIODIC COMMIT 1000(Neo4j 4.x)或 IN TRANSACTIONS(5.x) |
| 类型转换 | toInteger()、toFloat()、datetime() |
| 空值 | CASE WHEN row.score = '' THEN null ELSE toFloat(row.score) END |
| 先导节点再导边 | 避免 MATCH 找不到端点 |
Neo4j 5 分批示例:
1 | LOAD CSV WITH HEADERS FROM 'file:///edges.csv' AS row |
四、neo4j-admin database import(离线海量)
适用于初始化空库、节点/边达千万~百亿级。速度比在线 LOAD CSV 快一个数量级以上。
4.1 准备 Neo4j 原生 CSV 格式
nodes.csv(每行一个节点,:ID 与 :LABEL 为特殊列):
1 | nodeId:ID, name,:LABEL |
relationships.csv:
1 | :START_ID,:END_ID,:TYPE,score:float |
4.2 执行导入(数据库停止)
1 | neo4j stop |
Docker 环境:
1 | docker exec -it neo4j neo4j-admin database import full \ |
限制:目标库须为空或新建;导入期间库不可用。
五、APOC 扩展导入
APOC(Awesome Procedures on Cypher)提供大量过程函数,需安装插件(Docker 环境变量 NEO4J_PLUGINS='["apoc"]')。
5.1 从 JSON 文件
1 | CALL apoc.load.json('file:///graph.json') YIELD value |
5.2 从 JDBC(关系库直读)
1 | CALL apoc.load.jdbc('jdbc:postgresql://host:5432/mydb', 'SELECT id, name FROM users') |
5.3 周期性增量
1 | CALL apoc.periodic.iterate( |
六、官方驱动(应用层对接)
6.1 Python:批量 MERGE
1 | from neo4j import GraphDatabase |
6.2 Java / Spring Data Neo4j
1 |
|
Repository 层 @Query 写 Cypher,或 Neo4jClient 执行原生语句。
6.3 JavaScript(Node)
1 | const session = driver.session({ database: 'neo4j' }); |
七、Python 生态:pandas / NetworkX
7.1 pandas → 驱动
1 | import pandas as pd |
7.2 NetworkX → CSV → Neo4j
1 | import networkx as nx |
7.3 neo4j-graphdatascience
Graph Data Science 库的 Python 客户端可读写投影图(见 Neo4j-07),适合算法流水线而非初始全量导入。
八、流式与 CDC 同步
| 工具 | 模式 | 说明 |
|---|---|---|
| Kafka + Neo4j Connector | 流式 | 官方/社区 Kafka Connect sink |
| Debezium | CDC | 捕获 PG/MySQL binlog → Kafka → Neo4j |
| Airbyte | 批/增量 | 可视化 ETL,有 Neo4j destination |
| 自研定时任务 | 批 | 读 watermark,MERGE 增量 |
典型架构:
1 | flowchart LR |
增量 MERGE 关键:源表主键 = 图节点唯一键,保证幂等。
九、导出(迁出与备份前置)
1 | // CSV 导出(APOC) |
Neo4j 5 原生:
1 | neo4j-admin database dump neo4j --to-path=/backups/ |
详见 Neo4j-06.运维迁移与备份。
十、导入 checklist
- 建模确认:节点唯一键、关系类型、属性类型
- 先建约束/索引,再导数据
- 先导节点,再导边
- 百万级以上评估 bulk import 或 IN TRANSACTIONS
- 导入后
MATCH (n) RETURN count(n)与源表对账 - 监控堆内存与事务日志空间
十一、小结
| 场景 | 推荐 |
|---|---|
| 学习 / 小样本 | Browser + LOAD CSV |
| 应用内写图 | 官方驱动 + UNWIND 批写 |
| 初始化大图 | neo4j-admin import |
| 异构源持续同步 | CDC + MERGE 或 Airbyte |
| JSON / JDBC | APOC |
| 下一篇 | 内容 |
|---|---|
| Neo4j-05.查询进阶与性能调优 | 路径、索引、PROFILE |
| Neo4j-06.运维迁移与备份 | dump/restore |