技術情報

TechReport - テックレポート

MongoDB de GraphDB(松本陽介)

概要

MongoDBでNeo4jやOrientDBの欠点を補ったGraphDBとして利用するClientをJavaで作成しました。

目次

序論

コミュニティサービスやソーシャルゲームにおいて、ユーザ同士のつながり情報は非常に重要な要素です。
つながり情報はRelationalDBで持つよりもGraphDBで持ちたい所ですが、現在オープンソースで公開されているGraphDBを検証してみると、
1ユーザに万単位でつなげていくと極端に遅くなり、1ユーザに10万単位でつながることがあるアメーバのサービスでは採用することができません。
そこで、速くてスキーマレスで、社内で実績もあるMongoDBをGraphDBとして利用してみます。

内容

1. GraphDBとは

GraphDBは、人や物を表すNodeと、つながりを表すEdgeという2種類の要素を持ち、それぞれが隣接する要素に対してのポインタを保持しているデータベースです。

GraphDBには無向グラフと有向グラフの2種類があり、分かりやすい例ではFacebookの友達は必ず互いにつながるため無向グラフ、
Twitterのフォローは片方から一方的につながるため有向グラフとなっています。

 


また、NodeとEdgeにそれぞれPropertyを持つものをプロパティグラフといい、つながった日付やステータスをEdgeのPropertyに持つ使い方が多いです。
アメーバのサービスでは有向グラフかつプロパティグラフのものがほとんどだと思います。


GraphDBは隣接する要素のポインタを持つことによって、インデックスを参照せずにダイレクトに次の要素に行くことができます。
これはIndex-Freeと呼ばれるもので、グラフ全体が大きくなっていってもNodeから次のNodeへ行くコストが変わらないという利点があります。

2. GraphDBの欠点

GraphDBは利点だらけのように見えますが、実際は欠点もあります。

Index-Freeはグラフ全体の量には影響を受けなくても、隣接する要素のポインタの量には影響を受けてしまいます。
GraphDBのオープンソースで有名所のNeo4jとOrientDBも、1つのNodeに対してEdgeポインタが増えていくと実用できないレベルまで遅くなっていってしまいました。

プロパティグラフではNodeだけでなく、ステータスやフラグなどEdgeのPropertyも抽出条件としてつながり情報を取得することが多いと思います。
GraphDBで行おうとすると隣接するポインタだけでは処理することができず、基本的にはインデックスを利用することになります。

そこで、スキーマレスでインデックスを自由に作成できるMongoDBをGraphDBとして利用できるようにするClientをJavaで作成します。

3. MongoDB de GraphDBの設計

MongoDBではカラム名が全てのデータに書かれるため、カラム名が長いとデータ量が増えるほど無駄に容量を使ってしまいます。
したがって、カラム名を1文字にしてデータ圧縮をしつつ、Javaからは利用しやすい名前になっていることが望ましいです。
MorphiaのEntityではそれが可能なため、Morphiaを利用して実装します。

DB名は「graph」とし、以下のテーブルを使用します。
テーブルやインデックスはMorphiaで自動生成しますが、そのままではクラス名をカラムに入れてしまうため、「noClassnameStored」オプションを「true」にします。


■Node

カラム名 説明
_id ObjectId
t Nodeの種類
n Nodeの名前
c Nodeの作成日
u Nodeの更新日

・Nodeを特定する複合インデックス (t, n)


■Edge

カラム名 説明
_id ObjectId
r Edgeの種類
i つながり元NodeのObjectId
o つながり先NodeのObjectId
s Edgeのステータス
c Edgeの作成日
u Edgeの更新日

・外向きのつながりを取得する複合インデックス (r, i, -c)
・内向きのつながりを取得する複合インデックス (r, o, -c)
・Edgeを特定する複合インデックス (r, i, o)


インデックスを張っていても、つながっている数を取得するには全件みることになるため、Countテーブルを使用します。


■Count

カラム名 説明
_id ObjectId
n NodeのObjectId
d つながりの方向
r つながりの種類
c つながりの数

・Countを特定する複合インデックス (n, d, r)


つながり情報はNodeとEdgeの両方の情報を合わせる必要があるため、以下のDTOを用意します。


■Relational

フィールド名 説明
type Nodeの種類
name Nodeの名前
relation Edgeの種類
status Edgeのステータス

4. ソースコード


jp.ameba.mongo.graph
 |
 ---- bean
 |  |
 |  ---- Relational.java
 |
 ---- config
 |  |
 |  ---- MongoGraphConfig.java
 |
 ---- constant
 |  |
 |  ---- Direction.java
 |
 ---- dao
 |  |
 |  ---- CountDAO.java
 |  |
 |  ---- EdgeDAO.java
 |  |
 |  ---- NodeDAO.java
 |
 ---- entity
 |  |
 |  ---- Count.java
 |  |
 |  ---- Edge.java
 |  |
 |  ---- Node.java
 |
 ---- exception
 |  |
 |  ---- MongoGraphException.java
 |
 ---- service
 |  |
 |  ---- GraphService.java
 |
 ---- MongoGraphClient.java


Relational.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package jp.ameba.mongo.graph.bean;
 
/**
 * つながり情報
 * @author matsumoto_yosuke
 *
 */
public class Relational {
 
    private final String type;
    private final String name;
    private final String relation;
    private final int status;
 
    public Relational(String type, String name, String relation, int status) {
        this.type = type;
        this.name = name;
        this.relation = relation;
        this.status = status;
    }
 
    /**
     * つながっている対象の種類を返す
     * @return 種類
     */
    public String getType() {
        return type;
    }
    /**
     * つながっている対象の名前を返す
     * @return 名前
     */
    public String getName() {
        return name;
    }
    /**
     * つながりの種類を返す
     * @return 種類
     */
    public String getRelation() {
        return relation;
    }
    /**
     * つながりのステータスを返す
     * @return ステータス
     */
    public int getStatus() {
        return status;
    }
 
    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((type == null) ? 0 : type.hashCode());
        result = prime * result + ((name == null) ? 0 : name.hashCode());
        result = prime * result + ((relation == null) ? 0 : relation.hashCode());
        result = prime * result + status;
        return result;
    }
 
    @Override
    public boolean equals(Object obj) {
        if (this == obj) return true;
        if (obj == null) return false;
        if (getClass() != obj.getClass()) return false;
        Relational other = (Relational) obj;
        if (type == null) {
            if (other.type != null) return false;
        } else if (!type.equals(other.type)) return false;
        if (name == null) {
            if (other.name != null) return false;
        } else if (!name.equals(other.name)) return false;
        if (relation == null) {
            if (other.relation != null) return false;
        } else if (!relation.equals(other.relation)) return false;
        if (status != other.status) return false;
        return true;
    }
 
    @Override
    public String toString() {
        return "Relational [type=" + type + ", name=" + name
                + ", relation=" + relation+ ", status=" + status + "]";
    }
}
MongoGraphConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package jp.ameba.mongo.graph.config;
 
/**
 * MongoGraph設定情報
 * @author matsumoto_yosuke
 *
 */
public class MongoGraphConfig {
 
    private final String host;
    private final int port;
    private final int traverseLimit;
    private final int deleteLimit;
    private final long deleteSleepTime;
 
    public MongoGraphConfig(String host, int port) {
        this(host, port, 500, 500, 50);
    }
    public MongoGraphConfig(String host, int port,
            int traverseLimit, int deleteLimit, long deleteSleepTime) {
        this.host = host;
        this.port = port;
        this.traverseLimit = traverseLimit;
        this.deleteLimit = deleteLimit;
        this.deleteSleepTime = deleteSleepTime;
    }
 
    /**
     * MongoDBのホストを返す
     * @return ホスト
     */
    public String getHost() {
        return host;
    }
    /**
     * MongoDBのポートを返す
     * @return ポート
     */
    public int getPort() {
        return port;
    }
    /**
     * Node探索時の一度に処理する件数を返す
     * @return 処理件数
     */
    public int getTraverseLimit() {
        return traverseLimit;
    }
    /**
     * Node削除時の一度に処理する件数を返す
     * @return 処理件数
     */
    public int getDeleteLimit() {
        return deleteLimit;
    }
    /**
     * Node削除時のスリープ時間(ミリ秒)を返す
     * @return スリープ時間(ミリ秒)
     */
    public long getDeleteSleepTime() {
        return deleteSleepTime;
    }
}
Direction.java
1
2
3
4
5
6
7
8
9
10
11
12
13
package jp.ameba.mongo.graph.constant;
 
/**
 * つながりの方向
 * @author matsumoto_yosuke
 *
 */
public enum Direction {
    /** 内向き(例:フォロワー) */
    INCOMING,
    /** 外向き(例:フォロー) */
    OUTGOING;
}
CountDAO.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package jp.ameba.mongo.graph.dao;
 
import org.bson.types.ObjectId;
 
import jp.ameba.mongo.graph.entity.Count;
 
import com.google.code.morphia.Datastore;
import com.google.code.morphia.dao.BasicDAO;
 
/**
 * CountテーブルのDAO
 * @author matsumoto_yosuke
 *
 */
public class CountDAO extends BasicDAO<Count, ObjectId> {
 
    public CountDAO(Datastore ds) {
        super(ds);
    }
}
EdgeDAO.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package jp.ameba.mongo.graph.dao;
 
import org.bson.types.ObjectId;
 
import jp.ameba.mongo.graph.entity.Edge;
 
import com.google.code.morphia.Datastore;
import com.google.code.morphia.dao.BasicDAO;
 
/**
 * EdgeテーブルのDAO
 * @author matsumoto_yosuke
 *
 */
public class EdgeDAO extends BasicDAO<Edge, ObjectId> {
 
    public EdgeDAO(Datastore ds) {
        super(ds);
    }
}
NodeDAO.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package jp.ameba.mongo.graph.dao;
 
import jp.ameba.mongo.graph.entity.Node;
 
import org.bson.types.ObjectId;
 
import com.google.code.morphia.Datastore;
import com.google.code.morphia.dao.BasicDAO;
 
/**
 * NodeテーブルのDAO
 * @author matsumoto_yosuke
 *
 */
public class NodeDAO extends BasicDAO<Node, ObjectId> {
 
    public NodeDAO(Datastore ds) {
        super(ds);
    }
}
Count.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package jp.ameba.mongo.graph.entity;
 
import org.bson.types.ObjectId;
 
import com.google.code.morphia.annotations.Entity;
import com.google.code.morphia.annotations.Id;
import com.google.code.morphia.annotations.Index;
import com.google.code.morphia.annotations.Indexes;
import com.google.code.morphia.annotations.Property;
 
/**
 * Countテーブルのエンティティ
 * @author matsumoto_yosuke
 *
 */
@Entity(noClassnameStored=true)
@Indexes({@Index("n, d, r")})
public class Count {
 
    public static final String NODE = "n";
    public static final String DIRECTION = "d";
    public static final String RELATION = "r";
    public static final String COUNT = "c";
 
    @Id
    private ObjectId id;
 
    @Property(NODE)
    private ObjectId node;
    @Property(DIRECTION)
    private String direction;
    @Property(RELATION)
    private String relation;
    @Property(COUNT)
    private long count;
 
    public Count() { }
 
    public Count(ObjectId node, String direction, String relation, long count) {
        this.node = node;
        this.direction = direction;
        this.relation = relation;
        this.count = count;
    }
 
    /**
     * IDを返す
     * @return ID
     */
    public ObjectId getId() {
        return id;
    }
    /**
     * IDをセット
     * @param id ID
     */
    public void setId(ObjectId id) {
        this.id = id;
    }
    /**
     * NodeのIDを返す
     * @return NodeのID
     */
    public ObjectId getNode() {
        return node;
    }
    /**
     * NodeのIDをセット
     * @param node NodeのID
     */
    public void setNode(ObjectId node) {
        this.node = node;
    }
    /**
     * つながりの方向を返す
     * @return つながりの方向
     */
    public String getDirection() {
        return direction;
    }
    /**
     * つながりの方向をセット
     * @param direction つながりの方向
     */
    public void setDirection(String direction) {
        this.direction = direction;
    }
    /**
     * つながりの種類を返す
     * @return つながりの種類
     */
    public String getRelation() {
        return relation;
    }
    /**
     * つながりの種類をセット
     * @param relation つながりの種類
     */
    public void setRelation(String relation) {
        this.relation = relation;
    }
    /**
     * つながりの数を返す
     * @return つながりの数
     */
    public long getCount() {
        return count;
    }
    /**
     * つながりの数をセット
     * @param count つながりの数
     */
    public void setCount(long count) {
        this.count = count;
    }
 
    @Override
    public String toString() {
        return "Meta [id=" + id + ", node=" + node + ", direction=" + direction
                + ", relation=" + relation + ", count=" + count + "]";
    }
}
Edge.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package jp.ameba.mongo.graph.entity;
 
import org.bson.types.ObjectId;
 
import com.google.code.morphia.annotations.Entity;
import com.google.code.morphia.annotations.Id;
import com.google.code.morphia.annotations.Index;
import com.google.code.morphia.annotations.Indexes;
import com.google.code.morphia.annotations.Property;
 
/**
 * Edgeテーブルのエンティティ
 * @author matsumoto_yosuke
 *
 */
@Entity(noClassnameStored=true)
@Indexes({@Index("r, i, -c"),@Index("r, o, -c"),@Index("r, i, o")})
public class Edge {
 
    public static final String RELATION = "r";
    public static final String IN_NODE = "i";
    public static final String OUT_NODE = "o";
    public static final String STATUS = "s";
    public static final String CREATE_DATE = "c";
    public static final String UPDATE_DATE = "u";
 
    @Id
    private ObjectId id;
 
    @Property(RELATION)
    private String relation;
    @Property(IN_NODE)
    private ObjectId inNode;
    @Property(OUT_NODE)
    private ObjectId outNode;
    @Property(STATUS)
    private int status;
    @Property(CREATE_DATE)
    private long createDate;
    @Property(UPDATE_DATE)
    private long updateDate;
 
    public Edge() { }
 
    public Edge(String relation, ObjectId inNode, ObjectId outNode, int status) {
        this.relation = relation;
        this.inNode = inNode;
        this.outNode = outNode;
        this.status = status;
    }
 
    /**
     * IDを返す
     * @return ID
     */
    public ObjectId getId() {
        return id;
    }
    /**
     * IDをセット
     * @param id ID
     */
    public void setId(ObjectId id) {
        this.id = id;
    }
    /**
     * 種類を返す
     * @return 種類
     */
    public String getRelation() {
        return relation;
    }
    /**
     * 種類をセット
     * @param relation 種類
     */
    public void setRelation(String relation) {
        this.relation = relation;
    }
    /**
     * つながり元のIDを返す
     * @return ID
     */
    public ObjectId getInNode() {
        return inNode;
    }
    /**
     * つながり元のIDをセット
     * @param inNode ID
     */
    public void setInNode(ObjectId inNode) {
        this.inNode = inNode;
    }
    /**
     * つながり先のIDを返す
     * @return ID
     */
    public ObjectId getOutNode() {
        return outNode;
    }
    /**
     * つながり先のIDをセット
     * @param outNode ID
     */
    public void setOutNode(ObjectId outNode) {
        this.outNode = outNode;
    }
    /**
     * ステータスを返す
     * @return ステータス
     */
    public int getStatus() {
        return status;
    }
    /**
     * ステータスをセット
     * @param status ステータス
     */
    public void setStatus(int status) {
        this.status = status;
    }
    /**
     * 作成日を返す
     * @return 作成日
     */
    public long getCreateDate() {
        return createDate;
    }
    /**
     * 作成日をセット
     * @param createDate 作成日
     */
    public void setCreateDate(long createDate) {
        this.createDate = createDate;
    }
    /**
     * 更新日を返す
     * @return 更新日
     */
    public long getUpdateDate() {
        return updateDate;
    }
    /**
     * 更新日をセット
     * @param updateDate 更新日
     */
    public void setUpdateDate(long updateDate) {
        this.updateDate = updateDate;
    }
 
    @Override
    public String toString() {
        return "Relationship [id=" + id + ", relation=" + relation
                + ", inNode=" + inNode + ", outNode=" + outNode + ", status=" + status
                + ", createDate=" + createDate + ", updateDate=" + updateDate + "]";
    }
}
Node.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package jp.ameba.mongo.graph.entity;
 
import org.bson.types.ObjectId;
 
import com.google.code.morphia.annotations.Entity;
import com.google.code.morphia.annotations.Id;
import com.google.code.morphia.annotations.Index;
import com.google.code.morphia.annotations.Indexes;
import com.google.code.morphia.annotations.Property;
 
/**
 * Nodeテーブルのエンティティ
 * @author matsumoto_yosuke
 *
 */
@Entity(noClassnameStored=true)
@Indexes({@Index("t, n")})
public class Node {
 
    public static final String TYPE = "t";
    public static final String NAME = "n";
    public static final String CREATE_DATE = "c";
    public static final String UPDATE_DATE = "u";
 
    @Id
    private ObjectId id;
 
    @Property(TYPE)
    private String type;
    @Property(NAME)
    private String name;
    @Property(CREATE_DATE)
    private long createDate;
    @Property(UPDATE_DATE)
    private long updateDate;
 
    public Node() { }
 
    public Node(String type, String name) {
        this.type = type;
        this.name = name;
    }
 
    /**
     * IDを返す
     * @return ID
     */
    public ObjectId getId() {
        return id;
    }
    /**
     * IDをセット
     * @param id ID
     */
    public void setId(ObjectId id) {
        this.id = id;
    }
    /**
     * 種類を返す
     * @return 種類
     */
    public String getType() {
        return type;
    }
    /**
     * 種類をセット
     * @param type 種類
     */
    public void setType(String type) {
        this.type = type;
    }
    /**
     * 名前を返す
     * @return 名前
     */
    public String getName() {
        return name;
    }
    /**
     * 名前をセット
     * @param name 名前
     */
    public void setName(String name) {
        this.name = name;
    }
    /**
     * 作成日を返す
     * @return 作成日
     */
    public long getCreateDate() {
        return createDate;
    }
    /**
     * 作成日をセット
     * @param createDate 作成日
     */
    public void setCreateDate(long createDate) {
        this.createDate = createDate;
    }
    /**
     * 更新日を返す
     * @return 更新日
     */
    public long getUpdateDate() {
        return updateDate;
    }
    /**
     * 更新日をセット
     * @param updateDate 更新日
     */
    public void setUpdateDate(long updateDate) {
        this.updateDate = updateDate;
    }
 
    @Override
    public String toString() {
        return "Node [id=" + id + ", type=" + type + ", name=" + name
                + ", createDate=" + createDate + ", updateDate=" + updateDate + "]";
    }
}
MongoGraphException.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package jp.ameba.mongo.graph.exception;
 
/**
 * MongoGraphで発生した例外
 * @author matsumoto_yosuke
 *
 */
public class MongoGraphException extends Exception {
 
    private static final long serialVersionUID = -5048702885014247176L;
 
    public MongoGraphException() {
        super();
    }
 
    public MongoGraphException(String message, Throwable cause) {
        super(message, cause);
    }
 
    public MongoGraphException(String message) {
        super(message);
    }
 
    public MongoGraphException(Throwable cause) {
        super(cause);
    }
 
    public static MongoGraphException wrap(Throwable cause) throws MongoGraphException {
        throw (cause instanceof MongoGraphException) ? (MongoGraphException) cause : new MongoGraphException(cause);
    }
}
GraphService.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
package jp.ameba.mongo.graph.service;
 
import java.net.UnknownHostException;
import java.util.List;
 
import jp.ameba.mongo.graph.config.MongoGraphConfig;
import jp.ameba.mongo.graph.constant.Direction;
import jp.ameba.mongo.graph.dao.CountDAO;
import jp.ameba.mongo.graph.dao.EdgeDAO;
import jp.ameba.mongo.graph.dao.NodeDAO;
import jp.ameba.mongo.graph.entity.Count;
import jp.ameba.mongo.graph.entity.Edge;
import jp.ameba.mongo.graph.entity.Node;
import jp.ameba.mongo.graph.exception.MongoGraphException;
 
import org.bson.types.ObjectId;
 
import com.google.code.morphia.Datastore;
import com.google.code.morphia.Key;
import com.google.code.morphia.Morphia;
import com.google.code.morphia.mapping.Mapper;
import com.google.code.morphia.query.Query;
import com.google.code.morphia.query.UpdateOperations;
import com.google.code.morphia.query.UpdateResults;
import com.mongodb.Mongo;
import com.mongodb.MongoException;
import com.mongodb.WriteResult;
 
/**
 * グラフサービス<br>
 * インスタンス化するとMongoDBと接続し、<br>
 * テーブルやインデックスがない場合は作成する。<br>
 * 終了時に{@link #close()}を実行すること。
 *
 * @author matsumoto_yosuke
 *
 */
public class GraphService {
 
    private static final String DB_NAME = "graph";
    private static final String QUERY_DESC = "-";
 
    private final Mongo mongo;
    private final Morphia morphia;
    private final Datastore datastore;
    private final NodeDAO nodeDAO;
    private final EdgeDAO edgeDAO;
    private final CountDAO countDAO;
    private final int deleteLimit;
    private final long deleteSleepTime;
 
    public GraphService(MongoGraphConfig mongoConfig) throws UnknownHostException, MongoException {
        mongo = new Mongo(mongoConfig.getHost(), mongoConfig.getPort());
        morphia = new Morphia();
        morphia.map(Node.class).map(Edge.class);
        datastore = morphia.createDatastore(mongo, DB_NAME);
        nodeDAO = new NodeDAO(datastore);
        nodeDAO.ensureIndexes();
        edgeDAO = new EdgeDAO(datastore);
        edgeDAO.ensureIndexes();
        countDAO = new CountDAO(datastore);
        countDAO.ensureIndexes();
        deleteLimit = mongoConfig.getDeleteLimit();
        deleteSleepTime = mongoConfig.getDeleteSleepTime();
    }
 
    /**
     * Nodeを取得して返す
     * @param id ID
     * @return Node
     */
    public Node selectNode(ObjectId id) {
        return nodeDAO.get(id);
    }
 
    /**
     * Nodeを取得して返す
     * @param type 種類
     * @param name 名前
     * @return Node
     */
    public Node selectNode(String type, String name) {
        return nodeDAO.createQuery().filter(Node.TYPE, type).filter(Node.NAME, name).get();
    }
 
    /**
     * Nodeを生成して返す
     * @param type 種類
     * @param name 名前
     * @return Node
     */
    public Node saveNode(String type, String name) {
        long now = System.currentTimeMillis();
        Node node = new Node(type, name);
        node.setCreateDate(now);
        node.setUpdateDate(now);
        Key<Node> key = nodeDAO.save(node);
        node.setId((ObjectId)key.getId());
        return node;
    }
 
    /**
     * Nodeを削除する
     * @param id ID
     * @throws MongoGraphException
     */
    public void deleteNode(ObjectId id) throws MongoGraphException {
        WriteResult writeResult = nodeDAO.deleteById(id);
        checkWriteResult(writeResult);
    }
 
    /**
     * Edgeを取得して返す
     * @param relation 種類
     * @param inNode つながり元
     * @param outNode つながり先
     * @return Edge
     */
    public Edge selectEdge(String relation, ObjectId inNode, ObjectId outNode) {
        return edgeDAO.createQuery().filter(Edge.RELATION, relation)
                .filter(Edge.IN_NODE, inNode).filter(Edge.OUT_NODE, outNode).get();
    }
 
    /**
     * NodeにつながっているEdgeを取得して返す
     * @param relation 種類
     * @param direction つながりの方向
     * @param id ID
     * @param limit 取得数
     * @param offset 位置
     * @return Edgeリスト
     */
    public List<Edge> selectEdges(String relation, Direction direction, ObjectId id, int limit, int offset) {
        Query<Edge> query = edgeDAO.createQuery();
        if (relation != null) query.filter(Edge.RELATION, relation);
 
        if (direction == Direction.INCOMING) query.filter(Edge.OUT_NODE, id);
        else query.filter(Edge.IN_NODE, id);
 
        return query.order(QUERY_DESC + Edge.CREATE_DATE).limit(limit).offset(offset).asList();
    }
 
    /**
     * Edgeを生成して返す
     * @param relation 種類
     * @param inNode つながり元
     * @param outNode つながり先
     * @param status ステータス
     * @return Edge
     */
    public Edge saveEdge(String relation, ObjectId inNode, ObjectId outNode, int status) {
        long now = System.currentTimeMillis();
        Edge edge = new Edge(relation, inNode, outNode, status);
        edge.setCreateDate(now);
        edge.setUpdateDate(now);
        Key<Edge> key = edgeDAO.save(edge);
        edge.setId((ObjectId)key.getId());
        return edge;
    }
 
    /**
     * Edgeを更新する
     * @param id ID
     * @param status ステータス
     * @throws MongoGraphException
     */
    public void updateEdge(ObjectId id, int status) throws MongoGraphException {
        Query<Edge> query = edgeDAO.createQuery().filter(Mapper.ID_KEY, id);
        UpdateOperations<Edge> operations = edgeDAO.createUpdateOperations();
        operations.set(Edge.STATUS, status);
        operations.set(Edge.UPDATE_DATE, System.currentTimeMillis()).isolated();
 
        UpdateResults<Edge> updateResult = edgeDAO.updateFirst(query, operations);
        checkWriteResult(updateResult.getWriteResult());
    }
 
    /**
     * Edgeを削除する
     * @param id ID
     * @throws MongoGraphException
     */
    public void deleteEdge(ObjectId id) throws MongoGraphException {
        WriteResult writeResult = edgeDAO.deleteById(id);
        checkWriteResult(writeResult);
    }
 
    /**
     * NodeにつながっているEdgeを全て削除する
     * @param id ID
     * @throws MongoGraphException
     */
    public void deleteAllEdge(ObjectId id) throws MongoGraphException {
        Direction[] directions = Direction.values();
        for (Direction direction : directions) {
            long count = getCount(id, direction, null);
            int offset = 0;
            while (offset < count) {
                List<Edge> edges = selectEdges(null, direction, id, deleteLimit, offset);
                if (edges == null || edges.size() < 1) break;
                for (Edge edge : edges) {
                    deleteEdge(edge.getId());
                    decrCount(edge.getInNode(), Direction.OUTGOING, edge.getRelation());
                    decrCount(edge.getOutNode(), Direction.INCOMING, edge.getRelation());
                }
                try { Thread.sleep(deleteSleepTime); } catch (InterruptedException ignore) { }
                offset = offset + deleteLimit;
            }
        }
    }
 
    /**
     * NodeにつながっているEdgeの数を取得して返す
     * @param nodeId NodeのID
     * @param direction つながりの方向
     * @param relation 種類
     * @return Edge数
     */
    public long getCount(ObjectId nodeId, Direction direction, String relation) {
        Query<Count> query = countDAO.createQuery()
                .filter(Count.NODE, nodeId).filter(Count.DIRECTION, direction.name());
        if (relation != null) query.filter(Count.RELATION, relation);
 
        List<Count> countList = query.asList();
        if (countList == null) return 0;
 
        long total = 0L;
        for (Count count : countList) total = total + count.getCount();
        return total;
    }
 
    /**
     * NodeにつながっているEdgeの数を加算する
     * @param nodeId NodeのID
     * @param direction つながりの方向
     * @param relation 種類
     * @throws MongoGraphException
     */
    public void incrCount(ObjectId nodeId, Direction direction, String relation) throws MongoGraphException {
        Query<Count> query = countDAO.createQuery().filter(Count.NODE, nodeId)
                .filter(Count.DIRECTION, direction.name()).filter(Count.RELATION, relation);
        UpdateOperations<Count> operations = countDAO.createUpdateOperations().inc(Count.COUNT);
 
        UpdateResults<Count> updateResult = countDAO.updateFirst(query, operations);
        checkWriteResult(updateResult.getWriteResult());
    }
 
    /**
     * NodeにつながっているEdgeの数を減算する
     * @param nodeId NodeのID
     * @param direction つながりの方向
     * @param relation 種類
     * @throws MongoGraphException
     */
    public void decrCount(ObjectId nodeId, Direction direction, String relation) throws MongoGraphException {
        Query<Count> query = countDAO.createQuery().filter(Count.NODE, nodeId)
                .filter(Count.DIRECTION, direction.name()).filter(Count.RELATION, relation);
        UpdateOperations<Count> operations = countDAO.createUpdateOperations().dec(Count.COUNT);
 
        UpdateResults<Count> updateResult = countDAO.updateFirst(query, operations);
        checkWriteResult(updateResult.getWriteResult());
    }
 
    /**
     * Countを取得して返す
     * @param nodeId NodeのID
     * @param direction つながりの方向
     * @param relation 種類
     * @return Count
     */
    public Count selectCount(ObjectId nodeId, Direction direction, String relation) {
        return countDAO.createQuery().filter(Count.NODE, nodeId)
                .filter(Count.DIRECTION, direction.name()).filter(Count.RELATION, relation).get();
    }
 
    /**
     * Countを全て取得して返す
     * @param nodeId NodeのID
     * @return Countリスト
     */
    public List<Count> selectCounts(ObjectId nodeId) {
        return countDAO.createQuery().filter(Count.NODE, nodeId).asList();
    }
 
    /**
     * Countを生成して返す
     * @param nodeId NodeのID
     * @param direction つながりの方向
     * @param relation 種類
     * @return Count
     */
    public Count saveCount(ObjectId nodeId, Direction direction, String relation) {
        Count count = new Count(nodeId, direction.name(), relation, 1);
        Key<Count> key = countDAO.save(count);
        count.setId((ObjectId)key.getId());
        return count;
    }
 
    /**
     * Countを削除する
     * @param id ID
     * @throws MongoGraphException
     */
    public void deleteCount(ObjectId id) throws MongoGraphException {
        WriteResult writeResult = countDAO.deleteById(id);
        checkWriteResult(writeResult);
    }
 
    /**
     * Countを全て削除する
     * @param id NodeのID
     * @throws MongoGraphException
     */
    public void deleteAllCount(ObjectId nodeId) throws MongoGraphException {
        List<Count> countList = countDAO.createQuery().filter(Count.NODE, nodeId).asList();
        if (countList == null) return;
        for (Count count : countList) deleteCount(count.getId());
    }
 
    /**
     * 書き込み結果をチェックする<br>
     * 失敗していたら{@link MongoGraphException}をthrowする
     * @param result 書き込み結果
     * @throws MongoGraphException
     */
    private void checkWriteResult(WriteResult result) throws MongoGraphException {
        if (result == null || result.getLastError() == null || result.getLastError().ok()) return;
        MongoGraphException.wrap(result.getLastError().getException());
    }
 
    /**
     * MongoDBとの接続を閉じる
     */
    public void close() {
        if (mongo != null) mongo.close();
    }
}
MongoGraphClient.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
package jp.ameba.mongo.graph;
 
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
 
import org.bson.types.ObjectId;
 
import com.mongodb.MongoException;
 
import jp.ameba.mongo.graph.bean.Relational;
import jp.ameba.mongo.graph.config.MongoGraphConfig;
import jp.ameba.mongo.graph.constant.Direction;
import jp.ameba.mongo.graph.entity.Count;
import jp.ameba.mongo.graph.entity.Edge;
import jp.ameba.mongo.graph.entity.Node;
import jp.ameba.mongo.graph.exception.MongoGraphException;
import jp.ameba.mongo.graph.service.GraphService;
 
/**
 * MongoDBでグラフを操作するクライアント<br>
 * 終了時に{@link #close()}を実行すること。
 * @author matsumoto_yosuke
 *
 */
public class MongoGraphClient {
 
    private final GraphService service;
    private final int traverseLimit;
 
    public MongoGraphClient(MongoGraphConfig mongoConfig) throws UnknownHostException, MongoException {
        service = new GraphService(mongoConfig);
        traverseLimit = mongoConfig.getTraverseLimit();
    }
 
    /**
     * つながり情報を取得して返す
     * @param type 対象の種類
     * @param name 対象の名前
     * @param relation つながりの種類
     * @param direction つながりの方向
     * @param limit 取得数
     * @param offset 位置
     * @return つながり情報
     * @throws MongoGraphException
     */
    public List<Relational> selectRelational(String type, String name, String relation,
            Direction direction, int limit, int offset) throws MongoGraphException {
        List<Relational> relationals = new ArrayList<Relational>();
 
        Node root = service.selectNode(type, name);
        if (root == null) throw new MongoGraphException("root node not found.");
 
        List<Edge> edges = service.selectEdges(relation, direction, root.getId(), limit, offset);
        if (edges == null) return relationals;
 
        for (Edge edge : edges) {
            ObjectId id;
            if (direction == Direction.INCOMING) id = edge.getInNode();
            else id = edge.getOutNode();
            Node node = service.selectNode(id);
            relationals.add(new Relational(node.getType(), node.getName(), edge.getRelation(), edge.getStatus()));
        }
        return relationals;
    }
 
    /**
     * つながっている数を取得して返す
     * @param type 対象の種類
     * @param name 対象の名前
     * @param relation つながりの種類
     * @param direction つながりの方向
     * @return つながっている数
     * @throws MongoGraphException
     */
    public long countRelational(String type, String name, String relation, Direction direction)
            throws MongoGraphException {
        Node root = service.selectNode(type, name);
        if (root == null) throw new MongoGraphException("root node not found.");
 
        return service.getCount(root.getId(), direction, relation);
    }
 
    /**
     * つながり情報を追加する
     * @param inType つながり元の種類
     * @param inName つながり元の名前
     * @param outType つながり先の種類
     * @param outName つながり先の名前
     * @param relation つながりの種類
     * @param status つながりのステータス
     * @throws MongoGraphException
     */
    public void addRelational(String inType, String inName, String outType, String outName,
            String relation, int status) throws MongoGraphException {
        Node inNode = service.selectNode(inType, inName);
        if (inNode == null) inNode = service.saveNode(inType, inName);
 
        Node outNode = service.selectNode(outType, outName);
        if (outNode == null) outNode = service.saveNode(outType, outName);
 
        Edge edge = service.selectEdge(relation, inNode.getId(), outNode.getId());
        if (edge != null) throw new MongoGraphException("edge already exists.");
 
        Count inCount = service.selectCount(inNode.getId(), Direction.OUTGOING, relation);
        if (inCount == null) service.saveCount(inNode.getId(), Direction.OUTGOING, relation);
        else service.incrCount(inNode.getId(), Direction.OUTGOING, relation);
 
        Count outCount = service.selectCount(outNode.getId(), Direction.INCOMING, relation);
        if (outCount == null) service.saveCount(outNode.getId(), Direction.INCOMING, relation);
        else service.incrCount(outNode.getId(), Direction.INCOMING, relation);
 
        service.saveEdge(relation, inNode.getId(), outNode.getId(), status);
    }
 
    /**
     * つながり情報を更新する
     * @param inType つながり元の種類
     * @param inName つながり元の名前
     * @param outType つながり先の種類
     * @param outName つながり先の名前
     * @param relation つながりの種類
     * @param status つながりのステータス
     * @throws MongoGraphException
     */
    public void updateRelational(String inType, String inName, String outType, String outName,
            String relation, int status) throws MongoGraphException {
        Node inNode = service.selectNode(inType, inName);
        if (inNode == null) throw new MongoGraphException("in node not found.");
 
        Node outNode = service.selectNode(outType, outName);
        if (outNode == null) throw new MongoGraphException("out node not found.");
 
        Edge edge = service.selectEdge(relation, inNode.getId(), outNode.getId());
        if (edge == null) throw new MongoGraphException("edge not found.");
 
        service.updateEdge(edge.getId(), status);
    }
 
    /**
     * つながり情報を削除する
     * @param inType つながり元の種類
     * @param inName つながり元の名前
     * @param outType つながり先の種類
     * @param outName つながり先の名前
     * @param relation つながりの種類
     * @throws MongoGraphException
     */
    public void deleteRelational(String inType, String inName, String outType, String outName,
            String relation) throws MongoGraphException {
        Node inNode = service.selectNode(inType, inName);
        if (inNode == null) throw new MongoGraphException("in node not found.");
 
        Node outNode = service.selectNode(outType, outName);
        if (outNode == null) throw new MongoGraphException("out node not found.");
 
        Edge edge = service.selectEdge(relation, inNode.getId(), outNode.getId());
        if (edge == null) throw new MongoGraphException("edge not found.");
 
        service.deleteEdge(edge.getId());
 
        service.decrCount(inNode.getId(), Direction.OUTGOING, relation);
        service.decrCount(outNode.getId(), Direction.INCOMING, relation);
    }
 
    /**
     * つながり情報を探索して返す
     * @param type 対象の種類
     * @param name 対象の名前
     * @param relation つながりの種類
     * @param direction つながりの方向
     * @param depth 探索する深さ
     * @return つながり情報
     * @throws MongoGraphException
     */
    public List<Relational> traverseRelational(String type, String name, String relation,
            int status, Direction direction, int depth) throws MongoGraphException {
        Node root = service.selectNode(type, name);
        if (root == null) throw new MongoGraphException("root node not found.");
 
        List<Relational> relationalList = new ArrayList<Relational>();
 
        long count = service.getCount(root.getId(), direction, relation);
        int offset = 0;
        while (offset < count) {
            List<Edge> edges = service.selectEdges(relation, direction, root.getId(), traverseLimit, offset);
            if (edges == null || edges.size() < 1) break;
            for (Edge edge : edges) {
                if (edge.getStatus() != status) continue;
 
                ObjectId id;
                if (direction == Direction.INCOMING) id = edge.getInNode();
                else id = edge.getOutNode();
                Node node = service.selectNode(id);
                if (depth > 1) {
                    List<Relational> tmpList = traverseRelational(node.getType(), node.getName(),
                            relation, status, direction, depth - 1);
                    for (Relational relational : tmpList) {
                        if (!relationalList.contains(relational)) relationalList.add(relational);
                    }
                } else {
                    relationalList.add(new Relational(node.getType(), node.getName(), edge.getRelation(), edge.getStatus()));
                }
            }
            offset = offset + traverseLimit;
        }
 
        return relationalList;
    }
 
    /**
     * 対象のつながり情報を全て削除する
     * @param type 対象の種類
     * @param name 対象の名前
     * @throws MongoGraphException
     */
    public void deleteAllRelational(String type, String name) throws MongoGraphException {
        Node root = service.selectNode(type, name);
        if (root == null) throw new MongoGraphException("root node not found.");
 
        service.deleteAllEdge(root.getId());
        service.deleteNode(root.getId());
        service.deleteAllCount(root.getId());
    }
 
    /**
     * MongoDBとの接続を閉じる
     */
    public void close() {
        if (service != null) service.close();
    }
}

5. Client利用方法

MongoGraphConfigにホストやポートを設定し、MongoGraphClientを生成してMongoDBと接続します。

MongoGraphClient
1
2
MongoGraphConfig config = new MongoGraphConfig("127.0.0.1", 27017);
MongoGraphClient client = new MongoGraphClient(config);

つながり元のNodeとつながり先のNode、つながりの種類、つながりのステータスを指定してつながりを追加します。

addRelational
1
2
3
// アメーバID"ameba-1"さんがアメーバID"ameba-2"さんにアメンバー申請
client.addRelational("amebaId", "ameba-1", "amebaId", "ameba-2", "amember", 0);

つながり元のNodeとつながり先のNode、つながりの種類、つながりのステータスを指定してつながりのステータスを更新します。

updateRelational
1
2
3
4
// アメーバID"ameba-2"さんがアメーバID"ameba-1"さんのアメンバー申請を承認
client.updateRelational("amebaId", "ameba-1", "amebaId", "ameba-2", "amember", 1);

つながり元のNodeとつながり先のNode、つながりの種類を指定してつながりを削除します。

deleteRelational
1
2
3
// アメーバID"ameba-2"さんがアメーバID"ameba-3"さんをアメンバーから削除
client.deleteRelational("amebaId", "ameba-3", "amebaId", "ameba-2", "amember");

取得元のNodeとつながりの種類、つながりの方向を指定してつながりを取得します。

selectRelational
1
2
3
// アメーバID"ameba-1"さんのフォロワーを取得
List<Relational> followers = client.selectRelational("amebaId", "ameba-1", "now", Direction.INCOMING, 100, 0);

取得元のNodeとつながりの種類、つながりの方向を指定してつながっている数を取得します。

countRelational
1
2
3
// アメーバID"ameba-1"さんのフォロー数を取得
long followCount = client.countRelational("amebaId", "ameba-1", "now", Direction.OUTGOING);

取得元のNodeとつながりの種類、つながりの方向、探索する深さを指定してつながりを取得します。

traverseRelational
1
2
3
4
// アメーバID"ameba-1"さんのフォローしている人がフォローしている人を取得
List<Relational> follows = client.traverseRelational("amebaId", "ameba-1", "now", 1, Direction.OUTGOING, 2);

Nodeのつながりを全て削除します。

deleteAllRelational
1
2
// アメーバID"ameba-1"さんのつながりを全て削除
client.deleteAllRelational("amebaId", "ameba-1");

MongoDBとの接続を閉じます。

close
1
client.close();

6. パフォーマンス検証

■更新系
つながりの追加、更新、削除は1ユーザに対してつながりが何万件あっても2〜3msと高速で、Neo4jやOrientDBでは増えるにどんどん遅くなっていたのでとても良い結果と言えます。


■参照系
つながりの取得はつながりの数には影響されず、offsetが0は34ms、1万は52ms、10万は215ms、20万は402ms、30万は603msとなり、offsetの数のみ影響されていました。
Neo4jやOrientDBではつながりの数の影響も受けていたので、改善されていることが分かります。
また、つながりをページングで最後まで辿るのはユーザの操作ではまずありえず、バッチ等のサービス側の処理であることを考慮すると、速度も良好と言えます。

まとめ

MongoDBでNeo4jやOrientDBの欠点を補ったGraphDBとして利用するClientを作成しました。
NodeにはアメーバIDやasIDなどのユーザ、グルっぽやサークルなどのグループなど、大抵のものは入れられるようになっているので、かなり利用可能範囲は広いと思います。

今回の設計ではNeo4jやOrientDBでボトルネックとなっていたNodeのEdgeポインタを削除しましたが、
これはRedisのソート済みセットでEdgeポインタを用意すれば、つながりの数とoffsetの数どちらにも影響されず、10万単位でつながっていても常に爆速で引くことができます。
ただし、MongoDBとRedisどちらもメモリに乗り切る量しか使えないためサーバ台数がとても多くなってしまうのと、データが分かれることによって不整合が起こるリスクがあります。

設計はそのままでoffsetの数に影響されないようにする場合は、ページングの位置を持ったインデックスを用意するなどの工夫をすれば改善できると思います。

参考文献

GraphDB徹底入門 - http://www.slideshare.net/doryokujin/graphdbgraphdb
MongoDB - http://www.mongodb.org/
Morphia - http://code.google.com/p/morphia/

TOP