结合Neo4j和Hadoop(第1部分)


为什么要把这两种不同的东西结合起来?

Hadoop非常适合数据处理,但是平面文件的最终结果不能很好地呈现给客户,而且很难在excel中可视化您的网络数据。

Neo4J非常适合处理我们的网络数据。当可视化我们不同的数据集时,我们经常使用它。
因此,我们用Hadoop准备数据集,并将其导入Neo4J(图形数据库),以便能够查询和可视化数据。
我们有很多不同的方法来查看数据集,所以我们倾向于创建一个新的数据提取,每隔几天查看一些新的属性。

这个博客是关于我们如何结合Hadoop和Neo4J,并描述了我们在寻找最佳解决方案的过程中所经历的阶段。

我们就是这样开始的。

第一阶段:

-使用蜂巢准备数据。对于那些不熟悉Hadoop生态系统的人来说,Hive是一个工具,它使您能够使用SQL编写查询,这些查询被转换成映射/缩减作业。我们用它从我们的数据中创建一个节点表和一个边表。

这一系列查询的最终结果是两组文件,我们可以将它们从Hadoop集群中取出,放到本地机器上。

节点表/文件如下所示:

NodeId  Property1    Property2    PropertyN
AAAÂ Â Â Â  nameOfA Â Â Â  Â amountOfAÂ Â  Â someAThing
BBBÂ Â  Â Â nameOfBÂ Â Â Â  Â amountOfBÂ Â  Â someBThing
CCCÂ Â Â  Â nameOfCÂ Â  Â  Â amountOfCÂ Â  Â someCThing
DDD Â Â Â  nameOfDÂ  Â Â Â Â amountOfDÂ Â  Â someDThing

边缘表/文件如下所示:

fromNodeId    ToNodeId    EdgeProperty1    EdgePropertyN
AAAÂ Â  Â Â Â  Â Â Â  BBBÂ Â  Â Â Â Â  Â someDate1Â Â  Â Â Â  Â someNumber1
AAAÂ Â  Â Â Â  Â Â Â Â DDDÂ Â Â  Â Â Â  Â someDate2Â Â  Â Â Â  Â someNumber2
BBBÂ Â  Â Â Â  Â Â  Â DDDÂ  Â  Â Â Â  Â someDate3Â Â  Â Â Â  Â someNumber3
CCCÂ Â  Â Â Â  Â Â  Â BBBÂ Â Â  Â Â Â  Â someDate4Â Â  Â Â Â  Â someNumber4
DDDÂ Â  Â Â Â  Â Â  Â BBBÂ Â Â  Â Â Â  Â someDate5Â Â  Â Â Â  Â someNumber5
DDDÂ Â  Â Â Â Â Â Â  Â CCCÂ Â Â  Â Â Â  Â someDate6Â Â  Â Â Â  Â someNumber6

-为了将这些套件载入Neo4J,我们使用了batchinserter。
在我们建立第一个进口商时,Neo4J的版本只有1.6左右。所以我们写了一些代码并开始导入。
我们正在讨论的数据集大约有3000万个节点,每个节点有9个属性,大约有6.5亿条边,每个边有4个属性。

import java.io.BufferedReader;
import java.io.InputStreamReader;
import org.neo4j.kernel.impl.batchinsert.BatchInserter;
import org.neo4j.kernel.impl.batchinsert.BatchInserterImpl;
 
BatchInserter db = new BatchInserterImpl(<outputPath>, <config>)
 
long[] idCache = new long[<nrOfNodes>];
 
BufferedReader reader = new BufferedReader(new InputStreamReader(<InputStreamThingy>), 100 * 1024 *1024)
String line;
while ((line = reader.readLine()) != null) {
    String[] parts = line.split('\t');
    int myOwnId = Integer.parseInt(parts[0]);
 
    //some property magic goes here
    idCache[myOwnId] = db.createNode(<propertiesMap>);
}
reader.close();
 
//Edges
reader = new BufferedReader(newInputStreamReader(<InputStreamThingyforEdges>), 100 * 1024 *1024)
while ((line = reader.readLine()) != null) {
    String[] parts = line.split('\t');
    int fromNodeOwnId = Integer.parseInt(parts[0]);
    int toNodeOwnId = Integer.parseInt(parts[1]);
 
    //some property magic goes here
    db.createRelationship(idCache[fromNodeOwnId], idCache[toNodeOwnId], <RelationshipType>, <propertiesMap>);
}
reader.close();

我们在16Gb内存的台式机上导入这些节点和边缘,大约需要20个小时才能完成。

第二阶段:

所以我们需要加快速度。在这一阶段,我们将从hadoop集群中获取节点和边缘文件的部分移至本地机器,并直接从集群中读取它们。

还是用蜂巢来准备数据
让导入程序直接从集群中读取文件(不再需要副本)

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
 
FileSystem fs = FileSystem.get(<hadoop configuration>);
 
FileStatus[] files = fs.globStatus(new Path(<pathToNodesTableOrFile>));
for (FileStatus file : files) {
    DataInputStream dis = new DataInputstream(fs.open(file.getPath()));
    BufferedReader reader  = new BufferedReader(new InputStreamReader(dis), 100 * 1024 * 1024);
 
    while ((line = reader.readLine()) != null) {
    }
}

在Hadoop集群的一个工作节点(有32Gb)上运行导入程序(确保没有Hadoop进程正在运行,以便我们可以获得全部32GB)
这大约需要16个小时才能完成,我们需要将数据从那台机器传送到运行Neo4J数据库的机器上(大约80 GB需要2个小时)

第三阶段:

在创建完整的Neo4J数据库所需的总时间方面,我们几乎没有取得什么进展,因此我们需要尝试更多。
在邮件列表中,有一些传言说最新版本的Neo4J (1.8)在batchinserter的性能上有了一些重大改进
因此,我们升级了导入程序代码,以使用1.8版的Neo4J(版本1.6)

我们自己的代码中唯一的代码变化是我们创建BatchInserter类的方式。其他优化在Neo4J代码中,主要是持久性窗口池中的分页子系统的工作方式。

import org.neo4j.unsafe.batchinsert.BatchInserter;
import org.neo4j.unsafe.batchinsert.BatchInserters;
 
BatchInserter db = BatchInserters.inserter(<outputPath>, <config>)
仍然使用蜂巢来准备数据
在Hadoop集群的一个工作节点上运行导入程序

大约需要3个小时才能完成,我们需要将数据从那台机器传送到运行Neo4J数据库的机器上(大约80 GB需要2个小时)

这就是我们现在的样子。但是我们还是有锦囊妙计。
在我们等待这些导入完成的时候,我们决定看看能否让batchimporter以分布式方式工作,这样我们就可以利用我们的16节点Hadoop集群更快地创建Neo4J初始数据库。

在下一篇博客中,我将详细介绍这一探索。