Asyncio - Reading from the Keyboard
Continuing the series on Python 3.4’s asyncio module, we will see how to create a simple text-based game. The goal of the game is to display a maze and let the player move around using the numeric keypad (4 - left, 6 - right, 8 - up, 2 - down, and S for exit). To exercise our old DOS programming muscles with asynchronous programming, we will display the clock on the last line. The final result should look like this image:
You need to install colorconsole and Python 3.4.1 to run this program.
from random import shuffle, randrange
from colorconsole import terminal
from concurrent.futures import ThreadPoolExecutor
import datetime
import asyncio
# Colorconsole: https://github.com/lskbr/colorconsole
# Make_make: http://rosettacode.org/wiki/Maze_generation#Python
def make_maze(w=16, h=8):
vis = [[0] * w + [1] for _ in range(h)] + [[1] * (w + 1)]
ver = [["| "] * w + ['|'] for _ in range(h)] + [[]]
hor = [["+--"] * w + ['+'] for _ in range(h + 1)]
def walk(x, y):
vis[y][x] = 1
d = [(x - 1, y), (x, y + 1), (x + 1, y), (x, y - 1)]
shuffle(d)
for (xx, yy) in d:
if vis[yy][xx]: continue
if xx == x: hor[max(y, yy)][x] = "+ "
if yy == y: ver[y][max(x, xx)] = " "
walk(xx, yy)
walk(randrange(w), randrange(h))
maze = []
for (a, b) in zip(hor, ver):
maze.append("".join(a))
maze.append("".join(b))
return maze
class Jogo:
LARGURA = 24
ALTURA = 11
def __init__(self):
self.tela = terminal.get_terminal(conEmu=False)
self.tela.enable_unbuffered_input_mode()
self.labirinto_cores = (terminal.colors["RED"], terminal.colors["BLACK"])
self.jogador_carac = (terminal.colors["WHITE"], terminal.colors["BLUE"], '*')
self.labirinto = make_maze(Jogo.LARGURA, Jogo.ALTURA)
self.loop = asyncio.get_event_loop()
self.tpool = ThreadPoolExecutor(max_workers=2)
while True:
self.x = randrange(Jogo.LARGURA*3)
self.y = randrange(Jogo.ALTURA*2)
if self.pode_mover(self.x, self.y):
break
self.jogando = True
def fim_do_jogo(self):
self.jogando = False
self.loop.stop()
@asyncio.coroutine
def le_teclado(self):
while self.jogando:
key = yield from self.loop.run_in_executor(self.tpool, self.tela.getch)
if(key!=None):
nx, ny = self.x, self.y
if key == b'4':
if nx > 1:
nx-=1
elif key == b'6':
if nx < Jogo.LARGURA*3-1:
nx+=1
elif key == b'8':
if ny > 0:
ny -=1
elif key == b'2':
if ny < Jogo.ALTURA*2:
ny +=1
elif key == b"S":
self.fim_do_jogo()
break
if self.pode_mover(nx,ny) and (nx, ny) != (self.x,self.y):
self.x, self.y = nx, ny
self.desenha()
def pode_mover(self, x,y):
return self.labirinto[y][x]==' '
def desenha(self):
self.tela.set_color(*self.labirinto_cores)
#self.tela.clear()
self.tela.gotoXY(0,0)
self.tela.set_title("Labirinto")
for linha in self.labirinto:
print(linha)
self.tela.gotoXY(self.x, self.y)
self.tela.cprint(*self.jogador_carac)
@asyncio.coroutine
def relogio(self):
while self.jogando:
self.tela.print_at(10,23,datetime.datetime.now().strftime("%d/%m/%y %H:%M:%S"))
yield from asyncio.sleep(1)
def execute(self):
self.tela.clear()
self.desenha()
try:
asyncio.async(self.le_teclado())
asyncio.async(self.relogio())
self.loop.run_forever()
except KeyboardInterrupt:
print("exit")
finally:
self.tpool.shutdown()
self.loop.close()
self.tela.restore_buffered_mode()
jogo = Jogo()
jogo.execute()
To add some variation to the game, I downloaded a function that generates mazes from this site. The program was modified to return a list of strings, which is used to detect the walls of the maze.
@asyncio.coroutine
def le_teclado(self):
while self.jogando:
key = yield from self.loop.run_in_executor(self.tpool, self.tela.getch)
if(key!=None):
nx, ny = self.x, self.y
if key == b'4':
if nx > 1:
nx-=1
elif key == b'6':
if nx < Jogo.LARGURA*3-1:
nx+=1
elif key == b'8':
if ny > 0:
ny -=1
elif key == b'2':
if ny < Jogo.ALTURA*2:
ny +=1
elif key == b"S":
self.fim_do_jogo()
break
if self.pode_mover(nx,ny) and (nx, ny) != (self.x,self.y):
self.x, self.y = nx, ny
self.desenha()
The le_teclado method was created as a coroutine, but since the colorconsole module wasn’t designed to work with coroutines, we need to call self.tela.getch using an Executor, in this case, our ThreadPool. This way, our blocking call will be executed on a thread of the ThreadPool and our event loop will continue to execute normally.
When we press a key, the function self.tela.getch will return and from there, we’ll work with the result in the yield from. The rest of the method checks if the key is a movement or exit command. Finally, it checks if the player moved or if the new position would be a wall. If the position was changed, redesenha a tela, so that the new position of the player is visible.
@asyncio.coroutine
def relogio(self):
while self.jogando:
self.tela.print_at(10,23,datetime.datetime.now().strftime("%d/%m/%y %H:%M:%S"))
yield from asyncio.sleep(1)
The relogio method displays the current date and time every second on the screen. This execution occurs even if we don’t press any keys, confirming the correct execution of our event loop.
This program can be run to contain enemies and move them in another coroutine, similar to the clock.