2010年4月29日木曜日

Google App EngineとSafe Browsing APIを利用して短縮URLチェッカーを作ってみた

このブログ記事をはてなブックマークに追加

最近、Twitterやはてなブックマークなどを使うことが多くなり、字数制限付きのコメントを書くことが増えた。コメント中にURLを入れたいこともままあるのだが、長いURLだと字数をオーバーしてしまう。そのため短縮URLを使っているが、コメント内に直接短縮URLを入れるのがやっかいだった。そこで、Google App Engine (GAE)を使ってシンプルで高速な短縮URL変換ツールを作成してみた。

しかし、短縮URLは悪意のあるサイトの判別が難しいという欠点がある。見た目で判断できないので開くのを躊躇してしまう。そこで、今回作成したツールでは、短縮URLに変換するだけではなく、それを元のURLに戻す機能も追加した。さらに、Google Safe Browsing APIを利用してフィッシングサイトなどの悪意あるサイトの判別も試みている。

短縮URLチェッカー

使い方はシンプルで、テキストボックスに変換したいURLを入力すればbit.lyの短縮URLがテキストボックス表示され、bit.lyの短縮URLを入力すればもとのURLが表示される。テキストボックスをクリックすれば表示されているURLが選択されるのでクリップボードへのコピーも簡単だ。テキストボックスの下には展開されたURLがリンク付きで表示されるが、もしフィッシングサイトだと疑われる場合は、以下のように警告してくれる。


さて、今回のコード作成だが、bit.lyによる短縮URLの変換は簡単だった。というのも以前にPythonスクリプトで作ったことがあるからだ。それをそのままGAEに載せればいい。しかし、Safe Browsing APIによる悪意あるサイトの検出は思ったよりも面倒だった。まず、APIの使い方がよく分からない。使う前は任意のURLをAPIで確認するのかと思っていたのだが、そうではなく、MD5でハッシュ化したリストをダウンロードして、調べたいURLをハッシュ化してそれと照らし合わせなくてはならない。今回問題になったのはそのダウンロード容量で、フィッシングサイト用データが700KB程、マルウェア用データが10MBを超えていた。GAEでは一回のダウンロード容量の上限が1MBと決まっており、フィッシングサイト用データは何とかダウンロードできても、マルウェア用データは途中で切れてしまった。仕方がないので今回はフィッシングサイト用のみを使うことにした。ただし現在、Safe Browsing APIの新しいバージョンが開発中であり、それを使えば何とかなるかもしれない。自分がちょっと試したときには何故か新しいAPIが使えなかったので、今回は見送ることにした。気が向いたら試してみるかも。

APIによりダウンロードされるリストはMD5でハッシュ化されているが、ハッシュ化される前のURLは標準化された参照URLだ。そこで、調べるURLについても標準化して参照URLを作成しなくてはならない。あまり時間を掛けたくなかったので、今回は標準化については実装しなかった。参照URLの作成についてはThejaswi Puthraya氏が作成したコードを利用させてもらった。URLの標準化についてはGoogleのサイトにも参考例が載っているので、時間ができたら作ってみたい。

ダウンロードされたリストはデータストアに格納している。ハッシュ化されたURLを個別にデータストアに入れるとデータ数が非常に多くなってしまうので、ハッシュ化リストの先頭の2桁(16進数)をインデックスにし、256分割のデータとして格納した。また、リスト更新中にデータストア上のURLの参照が行われても問題が発生しないように、データモデルを2つ用意して更新は交互に行うことにした。加えて、データストアへのアクセスを減らすためMemcacheを利用している。

フィッシングサイト用リストはcronを使って30分毎に更新している。しかし、リスト更新の際に処理が30秒を超えることがあり、そのため更新に失敗してしまうことがある。これはGAEの制限なのだが、前述のデータ取得の容量制限と合わせて、このあたりが改善されるとより素晴らしくなるのだけどなぁ。無料で使わせてもらっているのに贅沢かもしれないけど。

追記(2010/5/2): リスト更新の半分以上が30秒を超えてエラーになったので、TaskQueueによる処理に変更した。

最後に今回作成したソースコード(実装したクラスのみ)を示しておく。

class Bitly(): def __init__(self): self.apiurl = "http://api.bit.ly/%s?version=2.0.1&%s=%s&login=BITLY_ID&apiKey=BITLY_API_KEY" self.api_index = { "shorten": "longUrl", "expand": "shortUrl" } self.info_index = { "shorten": "shortUrl", "expand": "longUrl" } def is_error(self, url, url_info): return url_info["statusCode"] != "OK" or (url_info["results"][url].has_key("statusCode") and url_info["results"][url]["statusCode"] != "OK") def bitly(self, url, api): url_data = urllib2.urlopen(self.apiurl % (api, self.api_index[api], urllib.quote(url))).read() url_info = simplejson.loads(url_data) if api == "expand": url = url.replace("http://bit.ly/", "") if self.is_error(url, url_info): return "" return url_info["results"][url][self.info_index[api]] def get(self, url): if url.lower().split(":")[0] not in ("http", "https", "ftp"): url = "http://" + url bitly_url = "" if re.match(r"http://bit.ly/", url) and not re.search("[^a-z^A-Z^0-9]", url.replace("http://bit.ly/", "")): self.shorten_url = url bitly_url = self.bitly(url, "expand") self.long_url = bitly_url if not bitly_url: self.long_url = url bitly_url = self.bitly(url, "shorten") self.shorten_url = bitly_url return bitly_url class DataProperty(db.Model): phishing = db.IntegerProperty() malware = db.IntegerProperty() class PhishingDataA(db.Model): index = db.IntegerProperty() hash_urls = db.TextProperty() class PhishingDataB(db.Model): index = db.IntegerProperty() hash_urls = db.TextProperty() class MalwareDataA(db.Model): index = db.IntegerProperty() hash_urls = db.TextProperty() class MalwareDataB(db.Model): index = db.IntegerProperty() hash_urls = db.TextProperty() class ShortUrl(webapp.RequestHandler): def __init__(self): self.html = u"""<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="ja" lang="ja"> <head> <meta http-equiv="content-type" content="text/html; charset=utf-8" /> <meta http-equiv="content-script-type" content="text/javascript" /> <meta http-equiv="content-style-type" content="text/css" /> <link type="text/css" rel="stylesheet" href="/stylesheets/main.css" /> <title>短縮URLチェッカー</title> </head> <body> <form action="/short_url" method="post"> <div>URL: <input type="text" name="content" size="31" maxlength="1024" value="%s" onclick="this.select()" /> <input type="submit" value="変換" /></div> </form> %s <div style="font-size:xx-small;"><p>短縮URLの変換およびフィッシングサイトの判別を行います。[ver.20100502]<br /><a href="http://code.google.com/apis/safebrowsing/safebrowsing_faq.html#whyAdvisory">Advisory provided by Google.</a></p></div> </body> </html>""" def lookup_by_url(self, url): """from http://github.com/theju/safebrowsing-python""" url_re = re.compile("^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?") url = url.lower() url_components = url_re.match(url).groups() lookup_list = set() hostname = url_components[3] hostname_comp = hostname.split(".") if not hostname_comp: raise AttributeError("Invalid URL.") for i in xrange(len(hostname_comp) - 1): filtered_hostname_comp = ".".join(hostname_comp[i:]) lookup_list.add(filtered_hostname_comp + "/") if url_components[4]: path = url_components[4].split('/') for j in xrange(len(path) + 1): filtered_paths = '/'.join(path[:j]) if not '.' in filtered_paths: lookup_list.add(filtered_hostname_comp + "%s/" % filtered_paths) lookup_list.add(filtered_hostname_comp + url_components[4]) if url_components[5]: lookup_list.add(filtered_hostname_comp + ''.join(url_components[4:6])) if url_components[7]: lookup_list.add(filtered_hostname_comp + ''.join(url_components[4:6]) + url_components[7]) return lookup_list def get(self): self.response.out.write(self.html % ("", "")) def post(self): content = self.request.get('content') bitly = Bitly() bitly_url = bitly.get(content) dp = memcache.get("dp") if not dp: dp = DataProperty().all().get() memcache.add("dp", dp) phishing, malware = dp.phishing, dp.malware P = PhishingDataA if phishing == 0 else PhishingDataB M = MalwareDataA if malware == 0 else MalwareDataB urls = self.lookup_by_url(bitly.long_url) hash_urls = [hashlib.md5(x).hexdigest() for x in urls] is_phishing, is_malware = False, False for hash_url in hash_urls: idx = int(hash_url[:2], 16) phishing_list = memcache.get("phishing_%d" % idx) if not phishing_list: phishing_list = P().all().filter("index =", idx).get().hash_urls memcache.add("phishing_%d" % idx, phishing_list) if re.search(hash_url, phishing_list): is_phishing = True """ malware_list = memcache.get("malware_%d" % idx) if not malware_list: malware_list = M().all().filter("index =", idx).get().hash_urls memcache.add("malware_%d" % idx, malware_list) if re.search(hash_url, malware_list): is_malware = True """ link = '<a href="%s">%s</a>' % (bitly.long_url, bitly.long_url) if is_phishing: link += u'<br />フィッシングサイトの疑いがあります。<span style="font-size:xx-small;color:gray;">詳しくは<a href="http://www.antiphishing.org">AntiPhishing.org</a>を参照してください。</span>' if bitly_url: self.response.out.write(self.html % (bitly_url, "<div><p>%s</p></div>" % link)) else: self.response.out.write(self.html % ("", u"<div><p>変換できませんでした。</p></div>")) class SafeBrowsing(webapp.RequestHandler): def __init__(self): self.urls = { "phishing": "http://sb.google.com/safebrowsing/update?client=api&apikey=SAFE_BROWSING_API_KEY&version=goog-black-hash:1:-1", "malware": "http://sb.google.com/safebrowsing/update?client=api&apikey=SAFE_BROWSING_API_KEY&version=goog-malware-hash:1:-1" } def get_data(self, target, selection): idx = 0 txt = "" all_data = urllib2.urlopen(self.urls[target]).read().split() for l in all_data: l = l.strip() if len(l) == 33 and l[0] == '+': current_idx = int(l[1:3], 16) if idx != current_idx: taskqueue.add(url="/short_url/safe_browsing/process", params={"target": target, "selection": selection, "idx": idx, "txt": txt}) txt = "" idx = current_idx txt += l taskqueue.add(url="/short_url/safe_browsing/process", params={"target": target, "selection": selection, "idx": idx, "txt": txt}) def get(self): target = self.request.get("target") if target in ("phishing", "malware"): dp = DataProperty().all().get() if not dp: dp = DataProperty(phishing=0, malware=0) dp.put() dp = dp.all().get() if target == "phishing": selection = abs(dp.phishing - 1) self.get_data(target, selection) dp.phishing = selection else: selection = abs(dp.malware - 1) self.get_data(target, selection) dp.malware = selection self.response.out.write("Done: %s, %d" % (target, selection)) dp.put() memcache.delete("dp") else: self.response.out.write("Incorrect: %s" % target) class SafeBrowsingProcess(webapp.RequestHandler): def __init__(self): self.data = { "phishing": lambda (idx): PhishingDataA if idx == 0 else PhishingDataB, "malware": lambda (idx): MalwareDataA if idx == 0 else MalwareDataB } def post(self): target = self.request.get("target") selection = int(self.request.get("selection")) idx = int(self.request.get("idx")) txt = self.request.get("txt") M = self.data[target](selection) model = M().all().filter("index =", idx).get() if not model: model = M(index=idx, hash_urls="") model.hash_urls = txt model.put() memcache.delete("phishing_%d" % idx)

1 コメント:

Blogger さんのコメント...

Did you know you can create short urls with AdFocus and make dollars from every visitor to your short links.