本文主要涉及圖遊走算法DeepWalk的代碼實現。
1. DeepWalk採樣算法
對於給定的節點,DeepWalk會等概率的選取下一個相鄰節點加入路徑,直至達到最大路徑長度,或者沒有下一個節點可選。Graph類的實現可參考 https://github.com/PaddlePaddle/PGL/blob/main/pgl/graph.py,DeepWalk的代碼詳見 ./deepwalk.py
安裝依賴
構建一張圖,其中包含了10個節點以及14條邊
請實現Graph類的random_walk函數
%%writefile userdef_graph.pyfrom pgl.graph import Graph
import numpy as npfrom pgl.utils.logger import logclass UserDefGraph(Graph): def random_walk(self, nodes, walk_len): """ 輸入:nodes - 當前節點id list (batch_size,) walk_len - 最大路徑長度 int 輸出:以當前節點為起點得到的路徑 list (batch_size, walk_len)
用到的函數 1. self.successor(nodes) 描述:獲取當前節點的下一個相鄰節點id列表 輸入:nodes - list (batch_size,) 輸出:succ_nodes - list of list ((num_successors_i,) for i in range(batch_size)) 2. self.outdegree(nodes) 描述:獲取當前節點的出度 輸入:nodes - list (batch_size,) 輸出:out_degrees - list (batch_size,) """ walks = [[node] for node in nodes] walks_ids = np.arange(0, len(nodes)) cur_nodes = np.array(nodes) for l in range(walk_len): """選取有下一個節點的路徑繼續採樣,否則結束""" outdegree = self.outdegree(cur_nodes) walk_mask = (outdegree != 0) if not np.any(walk_mask): break cur_nodes = cur_nodes[walk_mask] walks_ids = walks_ids[walk_mask] outdegree = outdegree[walk_mask]
succ_nodes = self.successor(cur_nodes) sample_index = np.floor( np.random.rand(outdegree.shape[0]) * outdegree).astype("int64")
next_nodes = [] for s, ind, walk_id in zip(succ_nodes, sample_index, walks_ids): walks[walk_id].append(s[ind]) next_nodes.append(s[ind]) cur_nodes = np.array(next_nodes) log.info(walks ) return walksPGL官網的random_walk原始碼:
def random_walk(self, nodes, max_depth): """Implement of random walk. This function get random walks path for given nodes and depth. Args: nodes: Walk starting from nodes max_depth: Max walking depth Return: A list of walks. """ walk = [] for node in nodes: walk.append([node])
cur_walk_ids = np.arange(0, len(nodes)) cur_nodes = np.array(nodes) for l in range(max_depth): outdegree = self.outdegree(cur_nodes) mask = (outdegree != 0) if np.any(mask): cur_walk_ids = cur_walk_ids[mask] cur_nodes = cur_nodes[mask] outdegree = outdegree[mask] else: break succ = self.successor(cur_nodes) sample_index = np.floor( np.random.rand(outdegree.shape[0]) * outdegree).astype("int64")
nxt_cur_nodes = [] for s, ind, walk_id in zip(succ, sample_index, cur_walk_ids): walk[walk_id].append(s[ind]) nxt_cur_nodes.append(s[ind]) cur_nodes = np.array(nxt_cur_nodes) return walk運行腳本:
!python my_deepwalk.py --use_my_random_walk --epoch 5運行結果:
[INFO] 2021-02-13 17:27:03,835 [my_deepwalk.py: 302]: Namespace(batch_size=512, epoch=5, hidden_size=128, neg_num=20, processes=2, save_path='./tmp/deepwalk', use_my_random_walk=True, walk_len=5, win_size=5)[INFO] 2021-02-13 17:27:04,651 [my_deepwalk.py: 219]: Start random walk on disk...[INFO] 2021-02-13 17:27:04,675 [userdef_graph.py: 51]: [[9, 7, 2, 0], [8, 0], [6, 5, 0], [0], [2, 1], [3, 1], [1], [5, 0], [7, 3, 1], [4, 0]][INFO] 2021-02-13 17:27:04,675 [userdef_graph.py: 51]: [[4, 0], [0], [2, 1], [7, 1], [5, 0], [8, 0], [1], [6, 5, 0], [9, 7, 3, 1], [3, 1]][INFO] 2021-02-13 17:27:04,677 [userdef_graph.py: 51]: [[6, 0], [7, 3, 1], [5, 0], [4, 0], [3, 1], [8, 0], [2, 0], [1], [0], [9, 7, 0]][INFO] 2021-02-13 17:27:04,677 [userdef_graph.py: 51]: [[9, 7, 1], [5, 0], [3, 1], [7, 2, 1], [2, 1], [0], [8, 0], [4, 0], [1], [6, 5, 0]][INFO] 2021-02-13 17:27:04,678 [userdef_graph.py: 51]: [[9, 7, 0], [8, 0], [0], [4, 0], [1], [2, 1], [5, 0], [3, 1], [7, 3, 1], [6, 4, 0]][INFO] 2021-02-13 17:27:04,679 [my_deepwalk.py: 230]: Random walk on disk Done.2021-02-13 17:27:04,680-WARNING: paddle.fluid.layers.py_reader() may be deprecated in the near future. Please use paddle.fluid.io.DataLoader.from_generator() instead.[INFO] 2021-02-13 17:27:04,730 [my_deepwalk.py: 278]: Step 1 DeepWalk Loss: 0.718020 0.020003 s/step.附錄:my_deepwalk.py
"""DeepWalk代碼文件"""
import argparseimport timeimport osimport ioimport mathfrom multiprocessing import Poolimport glob
import numpy as np
from pgl import data_loaderfrom pgl.utils.logger import logimport paddle.fluid as fluidimport paddle.fluid.layers as l
def deepwalk_model(graph, hidden_size=16, neg_num=5): """ 該函數為Skip Gram模型部分,即課堂所講的 Skip Gram + 負採樣 函數參數含義: graph: 圖 hidden_size: 節點維度 neg_num: 負採樣數目 """
pyreader = l.py_reader( capacity=70, shapes=[[-1, 1, 1], [-1, 1, 1], [-1, neg_num, 1]], dtypes=['int64', 'int64', 'int64'], lod_levels=[0, 0, 0], name='train', use_double_buffer=True)
embed_init = fluid.initializer.UniformInitializer(low=-1.0, high=1.0) weight_init = fluid.initializer.TruncatedNormal(scale=1.0 / math.sqrt(hidden_size))
src, pos, negs = l.read_file(pyreader)
embed_src = l.embedding( input=src, size=[graph.num_nodes, hidden_size], param_attr=fluid.ParamAttr( name='content', initializer=embed_init))
weight_pos = l.embedding( input=pos, size=[graph.num_nodes, hidden_size], param_attr=fluid.ParamAttr( name='weight', initializer=weight_init)) weight_negs = l.embedding( input=negs, size=[graph.num_nodes, hidden_size], param_attr=fluid.ParamAttr( name='weight', initializer=weight_init))
pos_logits = l.matmul( embed_src, weight_pos, transpose_y=True) neg_logits = l.matmul( embed_src, weight_negs, transpose_y=True)
ones_label = pos_logits * 0. + 1. ones_label.stop_gradient = True pos_loss = l.sigmoid_cross_entropy_with_logits(pos_logits, ones_label)
zeros_label = neg_logits * 0. zeros_label.stop_gradient = True neg_loss = l.sigmoid_cross_entropy_with_logits(neg_logits, zeros_label)
loss = (l.reduce_mean(pos_loss) + l.reduce_mean(neg_loss)) / 2
return pyreader, loss
def gen_pair(walks, left_win_size=2, right_win_size=2): """ 該函數用於生成正樣本對 函數參數含義: walks: 多條節點遊走序列 left_win_size: 左窗口值大小 right_win_size: 右窗口值大小 """ src = [] pos = [] for walk in walks: for left_offset in range(1, left_win_size + 1): src.extend(walk[left_offset:]) pos.extend(walk[:-left_offset]) for right_offset in range(1, right_win_size + 1): src.extend(walk[:-right_offset]) pos.extend(walk[right_offset:]) src, pos = np.array(src, dtype=np.int64), np.array(pos, dtype=np.int64) src, pos = np.expand_dims(src, -1), np.expand_dims(pos, -1) src, pos = np.expand_dims(src, -1), np.expand_dims(pos, -1) return src, pos
def deepwalk_generator(graph, batch_size=512, walk_len=5, win_size=2, neg_num=5, epoch=200, filelist=None): """ 此函數用於生成訓練所需要的(中心節點、正樣本、負樣本) """ def walks_generator(): if filelist is not None: bucket = [] for filename in filelist: with io.open(filename) as inf: for line in inf: walk = [int(x) for x in line.strip('\n').split(' ')] bucket.append(walk) if len(bucket) == batch_size: yield bucket bucket = [] if len(bucket): yield bucket else: for _ in range(epoch): for nodes in graph.node_batch_iter(batch_size): walks = graph.random_walk(nodes, walk_len) yield walks
def wrapper(): for walks in walks_generator(): src, pos = gen_pair(walks, win_size, win_size) if src.shape[0] == 0: continue negs = graph.sample_nodes([len(src), neg_num, 1]).astype(np.int64) yield [src, pos, negs]
return wrapper
def process(args): idx, graph, save_path, epoch, batch_size, walk_len, seed = args with open('%s/%s' % (save_path, idx), 'w') as outf: for _ in range(epoch): np.random.seed(seed) for nodes in graph.node_batch_iter(batch_size): walks = graph.random_walk(nodes, walk_len) for walk in walks: outf.write(' '.join([str(token) for token in walk]) + '\n')
def main(args): """ 主函數 """ hidden_size = args.hidden_size neg_num = args.neg_num epoch = args.epoch save_path = args.save_path batch_size = args.batch_size walk_len = args.walk_len win_size = args.win_size
if not os.path.isdir(save_path): os.makedirs(save_path) dataset = data_loader.ArXivDataset()
num_nodes = 10
edge_list = [(2, 0), (2, 1), (3, 1),(4, 0), (5, 0), (6, 0), (6, 4), (6, 5), (7, 0), (7, 1), (7, 2), (7, 3), (8, 0), (9, 7)]
d = 16 feature = np.random.randn(num_nodes, d).astype("float32")
edge_feature = np.random.randn(len(edge_list), 1).astype("float32")
if args.use_my_random_walk: from userdef_graph import UserDefGraph ''' pgl_graph = dataset.graph dataset.graph = UserDefGraph(num_nodes=pgl_graph.num_nodes, edges=pgl_graph.edges, node_feat=pgl_graph.node_feat, edge_feat=pgl_graph.edge_feat) ''' dataset.graph = UserDefGraph(num_nodes = num_nodes, edges = edge_list, node_feat = {'feature':feature}, edge_feat ={'edge_feature': edge_feature})
log.info("Start random walk on disk...") walk_save_path = os.path.join(save_path, "walks") if not os.path.isdir(walk_save_path): os.makedirs(walk_save_path) pool = Pool(args.processes) args_list = [(x, dataset.graph, walk_save_path, 1, batch_size, walk_len, np.random.randint(2**32)) for x in range(epoch)] pool.map(process, args_list) filelist = glob.glob(os.path.join(walk_save_path, "*")) log.info("Random walk on disk Done.")
train_steps = int(dataset.graph.num_nodes / batch_size) * epoch
place = fluid.CPUPlace() deepwalk_prog = fluid.Program() startup_prog = fluid.Program()
with fluid.program_guard(deepwalk_prog, startup_prog): with fluid.unique_name.guard(): deepwalk_pyreader, deepwalk_loss = deepwalk_model( dataset.graph, hidden_size=hidden_size, neg_num=neg_num) lr = 0.0001 adam = fluid.optimizer.Adam(lr) adam.minimize(deepwalk_loss)
deepwalk_pyreader.decorate_tensor_provider( deepwalk_generator( dataset.graph, batch_size=batch_size, walk_len=walk_len, win_size=win_size, epoch=epoch, neg_num=neg_num, filelist=filelist))
deepwalk_pyreader.start()
exe = fluid.Executor(place) exe.run(startup_prog)
prev_time = time.time() step = 0
while 1: try: deepwalk_loss_val = exe.run(deepwalk_prog, fetch_list=[deepwalk_loss], return_numpy=True)[0] cur_time = time.time() use_time = cur_time - prev_time prev_time = cur_time step += 1 if step == 1 or step % 10 == 0: log.info("Step %d " % step + "DeepWalk Loss: %f " % deepwalk_loss_val + " %f s/step." % use_time) except fluid.core.EOFException: deepwalk_pyreader.reset() break
fluid.io.save_persistables(exe, os.path.join(save_path, "paddle_model"), deepwalk_prog)
if __name__ == '__main__': parser = argparse.ArgumentParser(description='deepwalk') parser.add_argument("--use_my_random_walk", action='store_true', help="use_my_random_walk") parser.add_argument("--hidden_size", type=int, default=128) parser.add_argument("--neg_num", type=int, default=20) parser.add_argument("--epoch", type=int, default=1) parser.add_argument("--batch_size", type=int, default=512) parser.add_argument("--walk_len", type=int, default=5) parser.add_argument("--win_size", type=int, default=5) parser.add_argument("--save_path", type=str, default="./tmp/deepwalk") parser.add_argument("--processes", type=int, default=2) args = parser.parse_args() log.info(args) main(args)