Source code for schroedingerchess.server

from twisted.internet.protocol import Protocol, Factory
from twisted.internet import reactor
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet.defer import inlineCallbacks, Deferred

from chess import ChessBoard, LightBoard, IllegalMove

import json
import time


[docs]class ChessServerProtocol(Protocol): """ Class to handles the communication protocol with a game client. """ def __init__(self): self.player_id = None self.game_id = None self.game = None self.color = None
[docs] def connectionMade(self): """ Sends the game information when a new client is connected. """ self.state = "GREETING" self.player_id = self.factory.addWaitingPlayer(self) print("Player {} connected".format(self.player_id)) msg = {"type": "greetings"} msg["player_id"] = self.player_id msg["greetings"] = "looking forward hearing from you" self.sendMessage(msg) self.state = "WAITING_FOR_GREETINGS"
[docs] def sendMessage(self, msg): """ Sends a message to the client. :param msg: A dictionary representing a message. """ self.transport.write(json.dumps(msg).encode())
[docs] def sendMessageToAll(self, msg): """ Sends a message to the client and the other player's client. :param msg: A dictionary representing a message. """ if self.game is not None: self.game.sendMessageToAll(msg) else: raise RuntimeError("trying to send a message when the game is not created / over")
[docs] def sendMessageToOther(self, msg): """ Sends a message to the client and the other player's client. :param msg: A dictionary representing a message. """ if self.game is not None: self.game.sendMessageTo(msg, 0 if self.color else 1) else: raise RuntimeError("trying to send a message when the game is not created / over")
[docs] def dataReceived(self, data): """ Handles the reception of data by the server :param data: A byte representing a JSON-encoded object (default encoding UTF-8) """ # msg = json.loads(data.decode()) messages = self.split_messages(data) update = False for msg_json in messages: msg = json.loads(msg_json) if self.state == "GREETING": raise RuntimeError("Should no happen.") elif self.state == "WAITING_FOR_GREETINGS": if msg["type"] == "player-info": self.handleGreetings(msg) elif self.state == "PLAYING": if msg["type"] == "move": self.handleMove(msg) if msg["type"] == "automove": self.handleMove(msg, auto=True) elif msg["type"] == "chat": self.sendMessageToOther({"type": "chat", "content": msg["content"]}) elif msg["type"] == "endgame": self.handleEndGame(msg["color"]) else: self.refuseMessage(msg)
# TODO : reduce lightboard update delay
[docs] def handleGreetings(self, msg): self.color = msg["color"] self.factory.assignColor(self.player_id, self.color)
[docs] def refuseMessage(self, msg): """ Handles the reception of messages when waiting for second player. All instructions are illegal. :param msg: A dictionary representing a message. """ self.sendMessage({"type": "illegal-request", "request": msg})
[docs] def handleMove(self, msg, auto=False): try: player = msg["color"] if auto: move = self.game.autoMove() else: move = msg["description"] x1, y1, x2, y2 = [int(x) for x in move] self.game.move(x1, y1, x2, y2, player) msg = {"type": "move", "description": (x1, y1, x2, y2)} self.sendMessageToAll(msg) self.game.updateLightBoard() except IllegalMove as e: msg = {"type": "illegal-move", "description": str(e)} self.sendMessage(msg)
[docs] def handleEndGame(self, color): if self.game is not None: reactor.callLater(0, self.game.checkEnd, color)
[docs] def connectionLost(self, reason): """ Handles the connection lost with the client (executes automatically). :param reason: Reason of the disconnection """ if self.game is None: self.factory.removePlayerFromWaitingList(self.player_id) else: # Disconnect other player and remove the game self.sendMessageToOther({"type": "chat", "content": "Other player disconnected"}) time.sleep(0.5) self.game.disconnectPlayers() self.factory.removeGame(self.game_id) print("Player {} disconnected".format(self.player_id))
[docs] def disconnect(self): """ Disconnect current player / protocol """ self.transport.loseConnection()
[docs] def split_messages(self, data): messages = [] split = data.decode().split("}{") if len(split) == 1: return split for i, sp in enumerate(split): m = sp if i > 0: m = "{" + m if i < len(split)-1: m = m + "}" messages.append(m) return messages
[docs]class ChessServer(Factory): """ Game server : produces a new ChessServerProtocol each time a client arrives """ # This will be used by the default buildProtocol to create new protocols: protocol = ChessServerProtocol def __init__(self): """ Constructor. """ self.games = {} # list of games self.waitingBlackPlayers = ([], {}) # waiting white players self.waitingWhitePlayers = ([], {}) # waiting black players self.waitingPlayers = {} # list of waiting players self.gameIndex = 0 # total number of games ever launched self.playerIndex = 0 # total number of players ever connected
[docs] def addWaitingPlayer(self, client): self.waitingPlayers[self.playerIndex] = client self.playerIndex += 1 return self.playerIndex - 1
[docs] def assignColor(self, player_id, color): client = self.waitingPlayers.pop(player_id) if color == 0: self.waitingWhitePlayers[0].append(player_id) self.waitingWhitePlayers[1][player_id] = client elif color == 1: self.waitingBlackPlayers[0].append(player_id) self.waitingBlackPlayers[1][player_id] = client else: if len(self.waitingBlackPlayers) < len(self.waitingWhitePlayers): self.waitingWhitePlayers[0].append(player_id) self.waitingBlackPlayers[1][player_id] = client else: self.waitingWhitePlayers[0].append(player_id) self.waitingWhitePlayers[1][player_id] = client self.tryToMatchPlayers()
[docs] def tryToMatchPlayers(self): while len(self.waitingBlackPlayers[0]) >= 1 and len(self.waitingWhitePlayers[0]) >= 1: whites_id = self.waitingWhitePlayers[0].pop(0) blacks_id = self.waitingBlackPlayers[0].pop(0) whites_client = self.waitingWhitePlayers[1].pop(whites_id) blacks_client = self.waitingBlackPlayers[1].pop(blacks_id) game = Game(whites_client, blacks_client, self.gameIndex) self.games[self.gameIndex] = game print("Matched player {} and {} into game {}".format(whites_id, blacks_id, self.gameIndex)) self.gameIndex += 1
[docs] def removePlayerFromWaitingList(self, player_id): if player_id in self.waitingPlayers: del self.waitingPlayers[player_id] elif player_id in self.waitingWhitePlayers[0]: self.waitingWhitePlayers[0].remove(player_id) del self.waitingWhitePlayers[1][player_id] elif player_id in self.waitingBlackPlayers[0]: self.waitingBlackPlayers[0].remove(player_id) del self.waitingBlackPlayers[1][player_id]
[docs] def removeGame(self, game_id): try: del self.games[game_id] print("Terminated game {}".format(game_id)) except KeyError: # The other player already deleted the game pass
[docs]class Game: """ Class to represent a game instance. """ def __init__(self, whites, blacks, gameIndex): self.game_id = gameIndex self.validMovesCounter = 0 self.white = whites self.white.game_id = self.game_id self.white.game = self self.black = blacks self.black.game_id = self.game_id self.black.game = self self.chessBoard = ChessBoard() self.lightBoard = LightBoard() self.notifyReady()
[docs] def notifyReady(self): self.sendMessageToAll({"type": "status", "status": "ready"}) self.white.state = "PLAYING" self.black.state = "PLAYING"
[docs] def move(self, x1, y1, x2, y2, color): piece = self.chessBoard.grid[x1][y1] if piece is not None and piece.color is not color: raise IllegalMove("Trying to move a place that does not belong to the player.") self.chessBoard.move(x1, y1, x2, y2, disp=False) self.lightBoard.move(x1, y1, x2, y2) self.validMovesCounter += 1
[docs] def autoMove(self): return self.chessBoard.auto_move()
[docs] def checkEnd(self, color): outcome = self.chessBoard.end_game() msg = {"type": "chat", "content": outcome} self.sendMessageTo(msg, color)
[docs] def updateLightBoardTask(self): for col in [0, 1]: for i, piece in enumerate(self.chessBoard.pieces[col]): if piece is not None: color = piece.color position = piece.position natures = self.chessBoard.all_legal_natures(piece) pieceIndex = i + col * 24 self.lightBoard.setPiece(pieceIndex, color, position, natures) msg = {"type": "lightboard", "description": self.lightBoard.wrapUp()} self.sendMessageToAll(msg)
[docs] def updateLightBoard(self): """ Schedules an update board task. """ reactor.callLater(0.5, self.updateLightBoardTask)
[docs] def sendMessageToAll(self, msg): self.white.sendMessage(msg) self.black.sendMessage(msg)
[docs] def sendMessageTo(self, msg, color): if color == 0: self.white.sendMessage(msg) elif color == 1: self.black.sendMessage(msg) else: pass
[docs] def disconnectPlayers(self): self.white.disconnect() self.black.disconnect()
if __name__ == '__main__': address = input("Host:Port (localhost:6000): ") if not address: host, port = "localhost", 6000 else: host, port = address.split(":") endpoint = TCP4ServerEndpoint(reactor, int(port)) endpoint.listen(ChessServer()) reactor.run()