NO_WAIT

主にプログラミング

ツイート中の頻出キーフレーズと連想語から関心事を可視化する実験: Twitter Streaming API→Yahoo API→各種補完API→Gephi

Twitter Streaming APIを使用して自分のツイートを取得し、頻出語を調べることを思い立ちました。 Twitterアカウントを通して見た自分の関心事がどんなものであるかを調べ、 どれくらい当たっているかを見る遊びでしたが、結果は当たらずといえども遠からずといった程度でした。 既存のサービスを呼び出すだけのプログラミングであり、 簡単に実装できると思いきや、コード量は思ったより多くなり苦労しました。 この記事では、使用したライブラリやAPIの備忘録を兼ねて、プログラム例とツイートの解析結果可視化例を紹介します。

実験の概要

下記の流れで実験しました。自分のTweet中のキーフレーズだけでは面白くないので、 ちょっと血迷ってGoogleなどが提供する補完サービス(いわゆるサジェスト)の結果を 「連想語」として利用することを考えました。

  • Twitter Streaming APIを使用して自分のつぶやき(お気に入りも含む)を取得。
  • 自分のTweetをYahoo Keyphrase APIで解析し、キーフレーズのリストを得る。
  • 元となったツイートのIDと取得したキーフレーズを関連づけて適当なデータベースに保存する。
  • さらに、GoogleAmazonなどの検索語句補完サービスを呼び出し、各キーフレーズから連想される語句のリストを得る。
  • キーフレーズと、それから連想される語句を関連づけて、適当なデータベースに保存する。
  • 保存した関連性データをPythonで読み込み、NetworkXライブラリでグラフを作成する。
  • 作成したグラフ中の高次数ノードを調べ、関心の高い語、および潜在的な関心事(補完語句)を調べる。
  • NetworkXを使用してGraphMLデータを出力し、Gephiで可視化する。

可視化のためのグラフ作成ステップがありますが、これは

  • TweetのID → キーフレーズ → 連想語

という参照関係を有向グラフとして作成するものです。 多くのTweetから参照されるキーフレーズは入次数が多くなり、 可視化するときに大きく表示すれば目立ちます。

2015年1月におけるツイートの解析結果

まずは結果から。

頻出語ランキング

被参照回数3回以上のキーフレーズを上位から並べると次のようになりました。

キーフレーズ 被参照回数
11
haskell 10
世界 4
関数型 4
ソースコード 3
3
hs_abc 3
kazu_yamamoto 3
3
コンピュータ 3

プログラマなので、大体それらしい単語が頻出しています。 Haskellには強い関心を持っていますがいまだ使ったことがありません。 Haskell関連でkazu_yamamoto氏のTweetをよくお気に入りに入れていたようです。 猫がトップですが、Twitterで猫画像についつい釣られてお気に入りに入れてしまうため、 この結果となったようです。

続いてGephiによる可視化例です。 Gephiはオープンソースのグラフ表示ソフトです。 GraphML等のグラフデータフォーマットを読み込むことができ、グラフデータを 様々なレイアウトで可視化でき、手軽でそこそこ見栄えもするので気に入っています。

猫好きらしい

f:id:shinaisan:20150302074007p:plain

もっともよく参照されている語「猫」を中心にグラフを表示しました。 下表は配色の説明です:

種類
キーフレーズ
黄色 TwitterのStatus ID …略…
Googleによる補完 猫転送装置
Youtubeによる補完 猫になりたい
Amazonによる補完 猫タワー
水色 Wikipediaによる補完 猫缶

猫になりたいです。 ところで「猫転送装置」ってなんでしょうGoogle先生

Haskell使ったことないのに興味津々らしい

f:id:shinaisan:20150302074028p:plain

プログラムとゲームとバグ…

これらはさすがにつながっています…

f:id:shinaisan:20150302074041p:plain

ストップワード」として除外した語

下記単語は解析結果から除外することにしました。

除外した語 除外理由
英語 Googleがよくサジェストする。検索者がよく英語辞典のつもりで「XX 英語」と検索をするためか。連想語としては不適と判断。
意味 英語と同様。
動画 GoogleYoutubeから。例えば「猫 動画」などと検索されていると思われる。連想語としては不適と判断。
画像 動画と同様。
(曖昧さ回避) Wikipediaから。明らかに不適。
rt TwitterRetweetによく含まれる「RT」の文字がキーフレーズとして抽出されてしまったもの。ほとんどの場合不適。

上の「ストップワード」は実際のデータを目で見て適不適の判断をしています。 文脈や主観の違いによっては解析対象に含めた方がいいと思われる語、例えば「英語」などがあり、 判断が難しい所です。

NetworkXによるGraphMLの出力

グラフ処理のためにNetworkXライブラリをPythonから使用します。

処理対象のデータ

処理対象のデータは下記のJSON形式で取得済みであるとします。 これらのデータは後述する「解析に用いたプログラム」により得ます。

キーフレーズデータ

データ形式: 下記オブジェクトの配列。

{"keyphrases":[文字列1, 文字列2, …], "documentId": TwitterのStatus IDを表す文字列}

データ例(要素):

{"keyphrases":["ICMP TTL Exceeded","tracerouteコマンド","UDPパケット","ホスト","max_ttl"],"documentId":"550463345485045761"}

データ取得例(Python):

def iter_keyphrases(json):
    return ((ent["documentId"], kp.lower())
            for ent in json for kp in ent["keyphrases"])

連想語データ

データ形式: 下記オブジェクトの配列。

{"word": キーフレーズ(文字列)
 "associations":[
   {"completions":
     [連想語11, 連想語12, …], "service":サービス名1(文字列)},
   {"completions":
     [連想語21, 連想語22, …],"service":サービス名2(文字列)},
   … ]}

データ例(要素):

{"word":"ICMP",
 "associations":[
   {"completions":
     ["icmp","icmpとは", …], "service":"google"},
   {"completions":
     ["icmp","icmp error xbox 360", …],"service":"youtube"},
   {"completions":
     ["icm"],"service":"amazon"},
   {"completions":
     ["ICMPv6"],"service":"wikipedia"}]}

データ取得例(Python):

def iter_word_assoc(json):
    return ((ent["word"].lower(), assoc["service"], comp.lower())
            for ent in json for assoc in ent["associations"] for comp in assoc["completions"])

グラフの作成

  • import networkx as nxとNetworkXライブラリをインポートしておきます。
  • TwitterのStatus ID → キーフレーズ → 連想語 という風に参照・被参照の方向性があるため、有向グラフをnx.DiGraph()で作成します。
  • 作成したグラフに対してadd_nodeメソッドでノードを、add_edgeメソッドで辺を追加します。add_nodeにはキーワード引数が指定でき、指定したキーと値の組がそのまま追加するノードの属性となります。(Gephiで可視化する際に属性の値に応じて色を変える等が可能です。)
# -*- coding: utf-8-unix -*-
import networkx as nx
import json
from operator import itemgetter
from itertools import islice

# 【略】上記 iter_keyphrases, iter_word_assocをここで定義

# 有向グラフを作成
g = nx.DiGraph()

# ストップワードの集合
stopwords = set(['rt', u'(曖昧さ回避)', u'英語', u'意味', u'動画', u'画像'])

# 【略】キーフレーズデータベースから前述した形式のJSONデータを変数jsonに取得する

# グラフを作成(ID -> キーフレーズの辺を登録していく)
for (id, w) in iter_keyphrases(json):
    g.add_node(id, type = "id")
    if not w in stopwords:
        g.add_node(w, type = "keyphrase")
        g.add_edge(id, w)

# 【略】連想語データベースから前述した形式のJSONデータを変数jsonに取得する

# グラフを作成(キーフレーズ->連想語の辺を登録していく)
for (w, serv, comp) in iter_word_assoc(json):
    if (comp != w) and (not w in stopwords) and (not comp in stopwords):
        if not comp in g.nodes():
            g.add_node(comp, type = serv)
        g.add_edge(w, comp)

# 作成したグラフをコピーしてから低次数ノードを削除していく
# キーフレーズについてのみ入次数が3以上のノードのみ残す
# (それ以上残すと多すぎてGephiによる可視化動作が重くなる)
r = g.copy()
for n, d in r.in_degree().items():
    r.node[n]['marked'] = (r.node[n]['type'] != 'keyphrase' or d > 2) # 残すノードのみをマーク
r.remove_nodes_from(n for n in r.nodes() if not r.node[n]['marked'])
r.remove_nodes_from(n for (n, d) in r.degree().items() if d == 0)
nx.write_graphml(r, "result.graphml")

頻出語のランキング

頻出キーフレーズのランキングを得る場合、ID → キーフレーズの関連データだけからなる 有向グラフを作成し、入次数(in_degree())の大きなノードのラベルを列挙すればよいでしょう。

from operator import itemgetter
from itertools import islice
# 上位10フレーズ
for n, d in islice(sorted(g.in_degree().items(), key = itemgetter(1), reverse = True), 10):
    print n + " " + str(d)

Gephiによるグラフの可視化

GraphMLの読み込み

メニュー File → Open からファイルを選択するだけです。 GraphMLファイルの他GMLなど様々なフォーマットをサポートしているようです。

ノードの大きさを入次数に比例させる

Ranking Windowで赤い逆三角形のようなアイコンを選択すると、ノードの大きさを自動計算させるための設定画面を呼び出せます。 パラメータをDegree(次数)、InDegree(入次数)、OutDegree(出次数)のいずれかから選択し、大きさの範囲をMin size、Max sizeで指定します。Applyボタンを押すことで グラフのノードサイズが選択したパラメータに依存して変化します。デフォルトでは比例関係で大きさを決定しますが、 Spline Editorによる細かな特性設定も可能です。

今回の例では入次数に比例させることで、 より多く言及された語句を大きく表示させることができています。

f:id:shinaisan:20150302074526p:plain

ノードの属性による色分け

Partition Windowで属性による色分けが可能です。 更新ボタンを押すことで、ドロップダウンメニューに、現在選択可能な属性の一覧が表示されます。 いずれかを選択してApplyすれば、Gephiが各属性値に応じた色分けを自動的に行ってくれます。

f:id:shinaisan:20150302074540p:plain

解析に用いたプログラム

Twitter Streaming APIによるTweetの監視

RubyスクリプトTwitterのつぶやきを監視します。 あらかじめTwitterにアプリケーションを登録し、必要なConsumer Key/Secret、OAuth Token/Secretを取得しておきました。 Twitter Streaming APIを使用するためtweetstream Gemを導入しておきます(gem install tweetstream)。

スクリプトは下記のような3部構成となっています。

  • 設定: TweetStreamの設定。TwitterのOAuth設定。
  • TwitterUserTrackerクラス定義: スクリプトの骨子。TweetStreamがつぶやきを取得するたびにハンドラを呼び出す。
  • 実行部: 上のクラスをインスタンス化。つぶやきを検出すると別途定義する解析実行関数を呼び出す。

twitter_user_tracker.rb

require 'tweetstream'

TweetStream.configure do |config|
  config.consumer_key       = Twitter_Consumer_Key
  config.consumer_secret    = Twitter_Consumer_Secret
  config.oauth_token        = Twitter_Oauth_Token
  config.oauth_token_secret = Twitter_Oauth_Token_Secret
  config.auth_method        = :oauth
end

class TwitterUserTracker
  def initialize(&handler)
    client = TweetStream::Client.new
    client.on_anything do |status|
      event = status[:event]
      obj = nil
      if event.nil? # つぶやき
        obj = {
          id: status[:id_str],
          text: status[:text]
        }
        obj = nil if obj.values.any? &:nil?
      elsif event == "favorite" # お気に入り
        target_object = status[:target_object]
        obj = {
          id: target_object[:id_str],
          text: target_object[:text]
        }
      end
      handler.call(obj) unless obj.nil?
    end
    # Ctrl-Cで終了
    trap(:INT) do
      client.stop
    end
    # 自アカウントの監視を開始
    # (with: "user"抜きで自アカウントのタイムラインを監視できる)
    # (with: "user"付きだと自アカウントの行動のみが監視対象となる)
    client.userstream({with: "user"})
  end
end

if __FILE__ == $0
  tracker = TwitterUserTracker.new do |obj|
    id = obj[:id]
    sentence = obj[:text]
    [別途定義する解析用の関数](id, sentence) # 後述
  end
end

上のコードで、「別途定義する解析用の関数」とぼかしてありますが、 この部分はRubyで実装した関数を直接呼び出すのが普通でしょう。 今回の実験では、趣味に走ってAmazonが提供するAWS Lambdaの関数として実装し、 これをRubyコードから呼び出してみました。 このような構成にした理由は、解析処理をAWS Lambdaに外部化することで、 Twitterの監視に専念させるためです。 もちろんLambdaに興味があってとにかく試してみたかったというのが一番大きな動機ではあります… 今回の例ではLambdaを経由せず直接呼び出した方がコードははるかに簡単になったのではと思います。

上のif __FILE__ == $0ブロックは実際には下記のようになっています。 実行にはrequire aws-sdkが必要です。

require aws-sdk
if __FILE__ == $0
  lamb = Aws::Lambda::Client.new(region: REGION,
                                 access_key_id: AWS_ACCESS_KEY,
                                 secret_access_key: AWS_ACCESS_SECRET)
  # esc: 本来は必要ない関数だが止むを得ず定義
  def esc(s); s.unpack("U*").map{|u| "%u" + ("%04x" % u)}.join; end
  tracker = TwitterUserTracker.new do |obj|
    id = obj[:id]
    sentence = obj[:text]
    lamb.invoke_async(function_name: "analyze-sentence", invoke_args: {id: id, sentence: esc(sentence)}.to_json)
  end
end

sentence_analysis.js

下記仕様のLambda関数analyze-sentenceを実装しています。

項目 内容
Function Name analyze-sentence
Handler Name analyze
Arguments sentence: escapeされた解析対象の文。
id: 文の識別子(TwitterのIDが渡される)。

sentenceに与えられた文をYahoo Keyphrase APIに渡し、得られたキーフレーズとIDを保存します。 続いて、各キーフレーズに対してもう一つのLambda関数suggestを呼び出します(invokeSuggest関数)。 (下記コード例ではデータベースの保存処理は非本質的なので省略しています。)

var async = require('async');
var Aws = require('aws-sdk');
var y = require('./lib/yahoo_keyphrase'); // 自作ライブラリ(後述)

// analyze関数をexportする。
// この関数名をManagement ConsoleでのLambda関数定義時に使用することになる。
exports.analyze = function(event, context) {
    var id = event.id;
    var sentence = unescape(event.sentence);
    console.log("Sending request to Yahoo ... " + sentence);
    // async.waterfallは、配列に指定された順序で関数を呼び出す。
    // 次の関数へは前の関数がcallbackを呼んでから移る。
    // 前の関数がcallbackに渡した引数は次の関数の第1引数となる。
    async.waterfall([
        function(callback) {
            y.keyphrases(sentence, callback);
        },
        function(keyphrases, callback) {
            console.log("Keyphrases: " + JSON.stringify(keyphrases));
            // async.parallelを使用すると、配列に含まれる関数を同時並行的に呼び出し、
            // すべてが完了した後、結果をまとめてコールバックで受け取れる。
            // ここではキーフレーズの保存と、
            // 各キーフレーズについて別途定義するsuggest関数の呼び出しを
            // 並行的に実行している。
            async.parallel([
                function(callback) {
                    /* 【略】idとkeyphrasesを紐づけてデータベースに保存 */
                }
            ].concat(keyphrases.map(function(kp) {
                return function(callback) {
                    // 連鎖的に別のLambda関数を呼び出す
                    invokeSuggest(kp, callback);
                };
            })), function(err, r) {
                callback(err, r);
            });
        }
    ], function(err) {
        console.log("async.waterfall done.");
        if (err) {
            console.log("ERROR: " + err);
        }
        context.done(null, "Bye.");
    });
    console.log("Sending request to Yahoo ... done");
};

// 別途作成するLambda関数'suggest'を呼び出す
var invokeSuggest = function(word, callback) {
    word = escape(word);
    Aws.config.loadFromPath("./credential/aws_config.json"); // AWSのキーをjson形式で用意しておく
    var lambda = new Aws.Lambda();
    lambda.invokeAsync({FunctionName: "suggest",
                        InvokeArgs: JSON.stringify({
                            word: word,
                            services: ["google", "youtube", "amazon", "wikipedia"]
                        })}, callback);
};

lib/yahoo_keyphrase.js

この自作ライブラリがexportするkeyphrases関数は、Yahoo APIへのアクセスをラップします。 結果はコールバックで取得します。コールバックにはエラー情報とキーフレーズの配列がこの順に引数として渡されます。

var http = require('http');
var querystring = require('querystring');
var xml2json = require('xml2json');

var appid = YAHOO_APP_ID;

exports.keyphrases = function(s, callback) {
    var q = querystring.stringify({sentence: s, appid: appid});
    var options = {
        hostname: 'jlp.yahooapis.jp',
        port: 80,
        path: "/KeyphraseService/V1/extract?" + q
    };
    var req = http.get(options, function(res) {
        var buffer = null;
        if (res.statusCode != 200) {
            callback(res.statusCode, []);
            return;
        }
        res.on('data', function(chunk) {
            if (buffer) {
                buffer = Buffer.concat([buffer, chunk]);
            } else {
                buffer = chunk;
            }
        });
        res.on('end', function() {
            var json = xml2json.toJson(buffer, {object: true, arrayNotation: true});
            callback(null, json.ResultSet[0].Result.map(function(o) {return o.Keyphrase[0]}));
        });
    }).on("error", function(e) {
        callback(e.message, []);
    });
    console.log("HTTP Request sent.");
};

suggest.js

Lambda関数analyze-sentenceから連鎖的に呼ばれるもう1つのLambda関数です。 この関数はword引数を受け取ると、services引数に指定されたサービスで wordに対する検索語句補完を行います。 async.waterfallに渡した2目の関数で補完結果を受け取って、それをデータベースに保存します。 補完サービス呼び出しには別途定義するweb_completionライブラリを使用しています。

var async = require('async');
var wc = require('./lib/web_completion'); // 自作ライブラリ(後述)
exports.suggest = function(event, context) {
    var services = event.services;
    var word = unescape(event.word);
    async.waterfall([
        function(callback) {
            wc.complete.all(services, word, callback);
        },
        function(suggestions, callback) {
            /* 【略】 wordとsuggestionsを紐づけてデータベースに登録。 */
        }
    ], function(err) {
        if (err) {
            console.log("ERROR: " + err);
        }
        context.done(null, "Bye.");
    });
    console.log("async.waterfall has started.");
};

lib/web_completion.js

GoogleAmazonYoutubeWikipediaの提供する検索語句補完APIを呼び出すライブラリです。 下記関数を公開しています:

関数 処理内容
complete.single 与えられた語句で単一のサービスによる補完を行う。
complete.all 語句とサービスのリストが与えられ、サービスごとにsingleを呼び出す。これをリスト中の全サービスについてasync.parallelで同時並行的に行う。

コード中、request関数ではレスポンスの切れ端を受け取るたびBuffer.concatしていますが、 もう少しスマートな書き方はないものでしょうか…

var async = require('async');
var http = require('http');
var querystring = require('querystring');
var iconv = require('iconv');

var request = function(hostname, path, params, callback) {
    var q = querystring.stringify(params);
    var options = {
        hostname: hostname,
        port: 80,
        path: path + "?" + q
    };
    var req = http.request(options, function(res) {
        var buffer = null;
        if (res.statusCode != 200) {
            callback(res.statusCode, []);
            return;
        }
        res.on('data', function(chunk) {
            if (buffer) {
                buffer = Buffer.concat([buffer, chunk]);
            } else {
                buffer = chunk;
            }
        });
        res.on("end", function() {
            callback(null, buffer);
        });
    }).on("error", function(e) {
        callback(e.message, []);
    });
    req.end();
};

exports.complete = {
    single: function(service, word, callback) {
        var response = function(err, res, sourceEncoding) {
            if (err) {
                callback(err, []);
            } else {
                if (sourceEncoding) {
                    var conv = new iconv.Iconv(sourceEncoding, 'UTF-8//TRANSLIT//IGNORE');
                    res = conv.convert(res);
                }
                var json = JSON.parse(res);
                // json[1]には、表記揺れによる重複の除去などの処理を通すべきだが、ここでは省略
                var completions = json[1];
                callback(null, completions);
            }
        };
        var googleResponse = function(err, res) {
            return response(err, res, "sjis"); // Googleはなぜかsjisを返す
        };
        var services = {
            google: function(word) {
                request("www.google.co.jp", "/complete/search", {hl: "ja", output: "firefox", q: word}, googleResponse);
            },
            youtube: function(word) {
                request("www.google.com", "/complete/search", {hl: "ja", ds: "yt", output: "firefox", q: word}, googleResponse);
            },
            wikipedia: function(word) {
                request("ja.wikipedia.org", "/w/api.php", {action: "opensearch", format: "json", search: word}, response);
            },
            amazon: function(word) {
                request("completion.amazon.co.jp", "/search/complete", {mkt: 6, "search-alias": "aps", q: word}, response);
            }
        };
        return services[service](word);
    },
    all: function(services, word, callback) {
        async.parallel(services.map(function(service) {
            return function(callback) {
                console.log("Begin completion of " + word + " by " + service);
                exports.complete.single(service, word, function(err, res) {
                    console.log("Finish completion of " + word + " by " + service + " : " + res.slice(0, 3));
                    callback(err, {service: service, completions: res});
                });
            };
        }), callback);
    }
};

反省

Twitterにおける自分のつぶやきとお気に入りをYahoo Keyphrase APIにかけ、 頻出語を調べるだけで関心事がわかるか…一般的には無理でしょう。 下記のようなツイートの場合、情報が断片的であったり、テキスト情報が全くなかったりするためです:

  • 会話の途中である場合。
  • 画像つきのツイートで、本文がほとんどないか、あっても画像と全く関係がない場合。
  • 他サイトへのリンク付きのツイートで、本文がほとんどない場合。

それにしても、ツイートをかき集めて、外部のWeb APIを呼び出すだけのプログラミングです。 RubyJavaScript、そしてPythonと言語もいろいろ、使用したツールやライブラリもいろいろです。 こんな単純なことをするのにずいぶん多くの技術を使用し、思ったよりコードを書いてしまいました。 プログラミングとはこういうものでしょうか。 どうせ書き捨て前提、遊びのプログラミングですので、短くかつ意図は明解なように書きたいものです。 なんだか無駄に長くなってしまいます。

グラフ可視化については、Gephi以外に、Cytoscapeが使えそうなので、暇なときに試してみたいと思います。

参考