まずは使用例を見て欲しい。この画像は携帯カメラで撮った自作の簡単な迷路だ(画像上)。それに対して指定した2点間の最短経路を赤線で示してみた(画像下)。ピクセル単位で計測しているので赤線が若干ガタガタしていて完全な最短経路ではないがほぼ最短と考えていいだろう。迷路画像(画像上)をmaze01.jpgとし、スタート地点の座標が(240, 160)、ゴール地点の座標が(210, 400)の場合、コマンドラインで以下のように実行する。
maze_solver.py maze01.jpg -s 240 160 -g 210 400
maze_solver.py maze01.jpg -s 240 160 -g 210 400 -o maze01out.jpg
このプログラムは2つのパートで成り立っていて、一つは画像認識を使った画像データの領域分けであり、もう一つはその領域内の2点間の経路探索である。それぞれ、連結成分ラベル付け(connected component labeling, CCL)アルゴリズムとA*探索アルゴリズムを利用して処理している。詳細についてはそれぞれのリンク先と最後に示したソースコードを参照して欲しい。

maze_solver.py maze02.png -s 270 130 -g 0 0
maze_solver.py maze02.png -s 270 130 -g 490 268
Usage: maze_solver.py [options] imagefile Options: -h, --help show this help message and exit -o OUT_IMAGE, --output-image=OUT_IMAGE output-imagefile -s START, --start=START start position [default: (0, 0)] -g GOAL, --goal=GOAL goal position [default: (0, 0)] -a AREA, --area-size=AREA area size [default: 60000] -t THRESHOLD, --threshold=THRESHOLD threshold of the CCL [default: 30]
#!/usr/bin/env python # -*- coding: utf-8 -*- import sys, os, math, heapq, Image, ImageDraw # Connected Component Labeling - Label Equivalence method class CCL: def __init__(self, imagefile, threshold, area): im = Image.open(imagefile) im = im.convert("RGB") self.orig_size = im.size orig_area = self.orig_size[0] * self.orig_size[1] if orig_area <= area: self.size_rate = 1.0 else: self.size_rate = math.sqrt(float(area) / orig_area) im = im.resize((int(self.orig_size[0] * self.size_rate), int(self.orig_size[1] * self.size_rate))) self.size = im.size self.th = threshold self.W = im.size[0] self.D = im.getdata() self.N = len(self.D) self.L = [] self.R = [] for i in range(len(self.D)): self.L.append(i) self.R.append(i) def diff(self, d1, d2): return abs(d1[0] - d2[0]) + abs(d1[1] - d2[1]) + abs(d1[2] - d2[2]) def scanning(self): has_label = False for i in range(self.N): label = self.N if i - self.W >= 0 and self.diff(self.D[i], self.D[i-self.W]) <= self.th: label = min(label, self.L[i-self.W]) if i + self.W < self.N and self.diff(self.D[i], self.D[i+self.W]) <= self.th: label = min(label, self.L[i+self.W]) if i % self.W != 0 and self.diff(self.D[i], self.D[i-1]) <= self.th: label = min(label, self.L[i-1]) if (i + 1) % self.W != 0 and self.diff(self.D[i], self.D[i+1]) <= self.th: label = min(label, self.L[i+1]) if label < self.L[i]: self.R[self.L[i]] = label has_label = True return has_label def analysis(self): for i in range(self.N): label = self.L[i] if label == i: ref = label label = self.R[ref] while True: ref = label label = self.R[ref] if ref == label: break self.R[i] = label def labeling(self): for i in range(self.N): self.L[i] = self.R[self.R[self.L[i]]] def calculate(self): while True: if self.scanning(): self.analysis() self.labeling() else: return self.L, self.W, self.size_rate, self.orig_size, self.size # A* Search Algorithm class AStar: def __init__(self, data, start, goal): self.data = data self.nrow, self.ncol = len(data), len(data[0]) self.start = start self.goal = goal if data[self.start[0]][self.start[1]] != data[self.goal[0]][self.goal[1]]: print "No route!" sys.exit(0) self.passage = data[self.start[0]][self.start[1]] def heuristic(self, pos): return math.sqrt((pos[0] - self.goal[0])**2 + (pos[1] - self.goal[1])**2) def distance(self, path): if len(path) == 0: return 0 sum = 0 p1 = self.start for p2 in path: sum += math.sqrt((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2) p1 = p2 return int(sum) def neighborhood(self, pos): row, col = pos neighbors = [] if row > 0 and self.data[row-1][col] == self.passage: neighbors.append((row - 1, col)) if row < self.nrow - 1 and self.data[row+1][col] == self.passage: neighbors.append((row + 1, col)) if col > 0 and self.data[row][col-1] == self.passage: neighbors.append((row, col - 1)) if col < self.ncol - 1 and self.data[row][col+1] == self.passage: neighbors.append((row, col + 1)) if row > 0 and col > 0 and self.data[row-1][col-1] == self.passage: neighbors.append((row - 1, col - 1)) if row < self.nrow - 1 and col > 0 and self.data[row+1][col-1] == self.passage: neighbors.append((row + 1, col - 1)) if row > 0 and col < self.ncol - 1 and self.data[row-1][col+1] == self.passage: neighbors.append((row - 1, col + 1)) if row < self.nrow - 1 and col < self.ncol - 1 and self.data[row+1][col+1] == self.passage: neighbors.append((row + 1, col + 1)) return neighbors def search(self): path = [] queue = [] checked = [self.start] heapq.heappush(queue, (self.distance(checked) + self.heuristic(self.start), checked)) while len(queue) > 0: score, path = heapq.heappop(queue) last = path[-1] if last == self.goal: return path neighbors = self.neighborhood(last) for nb in neighbors: if nb in checked: continue checked.append(nb) newpath = path + [nb] heapq.heappush(queue, (self.distance(newpath) + self.heuristic(nb), newpath)) return [] def draw_path(in_image, out_image, path, SR): im = Image.open(in_image) im = im.convert("RGB") draw = ImageDraw.Draw(im) draw.line([(int(p[1] / SR), int(p[0] / SR)) for p in path], fill=255) if out_image: im.save(out_image) else: im.show() def main(args): from optparse import OptionParser usage = "usage: %prog [options] imagefile" parser = OptionParser(usage=usage) parser.add_option("-o", "--output-image", type="string", help="output-imagefile", dest="out_image", default="") parser.add_option("-s", "--start", type="int", nargs=2, help="start position [default: %default]", dest="start", default=(0, 0)) parser.add_option("-g", "--goal", type="int", nargs=2, help="goal position [default: %default]", dest="goal", default=(0, 0)) parser.add_option("-a", "--area-size", type="int", help="area size [default: %default]", dest="area", default=60000) parser.add_option("-t", "--threshold", type="int", help="threshold of the CCL [default: %default]", dest="threshold", default=30) options, args = parser.parse_args() if len(args) == 0: parser.print_help() parser.exit() in_image = args[0] print "Connected component labeling phase..." ccl = CCL(in_image, options.threshold, options.area) D, W, SR, ORIG_SZ, SZ = ccl.calculate() print " Width: %d (%d), Height: %d (%d)" % (ORIG_SZ[0], SZ[0], ORIG_SZ[1], SZ[1]) data = [D[i:i+W] for i in range(0, len(D), W)] print "A* searching phase..." print " Start: (%d, %d), Goal: (%d, %d)" % (options.start + options.goal) start = (int(options.start[1] * SR), int(options.start[0] * SR)) goal = (int(options.goal[1] * SR), int(options.goal[0] * SR)) astar = AStar(data, start, goal) path = astar.search() print "Drawing path on the imagefile..." draw_path(in_image, options.out_image, path, SR) if __name__ == "__main__": main(sys.argv)