# -*- coding: utf8 -* from flask import Flask, request, session, g, redirect, url_for, \ abort, render_template, flash from functools import wraps from contextlib import closing import sqlite3 import MySQLdb as mdb from time import time, localtime, strftime import locale import random # configuration DEBUG = True SECRET_KEY = "\xf3'\xd2\xf7\xa4[.h\x8e\x11|\xda\x00\x9fyS\xfe\xb3(!\x91'6\x16" USERNAME = 'admin' PASSWORD = 'pipo' SQLITE_FILENAME = '/var/roulette/players.db' SQLITE_SCHEMA = 'schema.sql' MYSQL_HOST = 'mysql.rez' MYSQL_USER = 'rezo_admin_ro' MYSQL_PASSWORD = 'rezopaspipo' MYSQL_DB = 'rezo_admin' BAN_DURATION = 30. * 60. IMMUNITY_FILE = '/var/www/roulette/immunity' ASSHOLES_FILE = '/var/www/roulette/assholes' IMMUNITY = [ 'Lazare Olivry', 'Brieuc Lacroix', 'Elliot Butty', 'Jean-Christophe Carli', 'Juliette Tibayrenc', 'Elise Laurent', 'Goulven Kermarec', 'Siqi Liu', ] ASSHOLES = [] app = Flask(__name__) app.config.from_object(__name__) app.secret_key = SECRET_KEY random.seed(time()) locale.setlocale(locale.LC_ALL, 'fr_FR.utf8') # Utilisation de la base SQLite def connect_sqlite(): return sqlite3.connect(SQLITE_FILENAME) def init_db(): # Initialisation de la base SQLite with closing(connect_sqlite()) as con_sqlite: with app.open_resource('schema.sql') as f: con_sqlite.cursor().executescript(f.read().decode("utf-8")) con_sqlite.commit() # Connexion à la base SQLite locale con_sqlite = connect_sqlite() cur_sqlite = con_sqlite.cursor() # Connexion à la base MySQL sur babel con_mysql = mdb.connect(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DB, \ charset='utf8', use_unicode=True) cur_mysql = con_mysql.cursor(mdb.cursors.DictCursor) # Remplissage de la table players à partir de la table utilisateurs cur_mysql.execute("""select id,prenom,nom from utilisateurs where etat='STATE_ACTIVE' and ecole_id=1 and id<>1 and typeUtilisateur='membre'""") rows = cur_mysql.fetchall() print('players :') for row in rows: if row['prenom'] + ' ' + row['nom'] not in IMMUNITY: print(row) cur_sqlite.execute("""insert into players values (?,?,?,?)""", \ ((row["id"]), row["prenom"], row["nom"], 0)) # Remplissage de la table ip à partir de la table equipements cur_mysql.execute("""select equipements.id,utilisateurs.id,equipements.ip from utilisateurs inner join equipements on utilisateurs.id=equipements.utilisateur_id where utilisateurs.ecole_id=1 and utilisateurs.id<>1 and utilisateurs.etat='STATE_ACTIVE' and equipements.etat='STATE_ACTIVE' and utilisateurs.typeUtilisateur='membre'""") rows = cur_mysql.fetchall() print('machines :') for row in rows: print(row) cur_sqlite.execute("""insert into machines values (?,?,?)""", \ (row["id"], row["utilisateurs.id"], row["ip"])) con_sqlite.commit() cur_sqlite.close() cur_mysql.close() def duration_format(seconds): hours = seconds // 3600 seconds -= 3600*hours minutes = seconds // 60 seconds -= 60*minutes s_str = seconds <= 1 and 'seconde' or 'secondes' m_str = minutes <= 1 and 'minute' or 'minutes' h_str = hours <= 1 and 'heure' or 'heures' if hours == 0: if minutes == 0: return '%01d %s' % (seconds, s_str) return '%01d %s et %01d %s' % (minutes, m_str, seconds, s_str) return '%01d %s, %01d %s et %01d %s' % (hours, h_str, minutes, m_str, seconds, s_str) def get_ip(): return request.remote_addr def get_player(player_id): con = connect_sqlite() cur = con.cursor() cur.execute("""select id,firstname,name,ban_end from players where id=(?)""", [player_id]) row = cur.fetchone() con.close() return {'id': row[0], 'firstname': row[1], 'name': row[2], 'ban_end': row[3]} def get_player_from_ip(ip): con = connect_sqlite() cur = con.cursor() cur.execute("""select players.id,players.firstname,players.name, machines.id,machines.ip,players.ban_end from players inner join machines on players.id=machines.player_id where machines.ip=(?)""", [ip]) row = cur.fetchone() con.close() user = None if row is not None: user = {'id': row[0], 'firstname': row[1], 'name': row[2], \ 'machine_id': row[3], 'ip': row[4], 'ban_end': row[5]} return user def get_player_from_full_name(firstname, name): con = connect_sqlite() cur = con.cursor() cur.execute("""select players.id,players.firstname,players.name, machines.id,machines.ip,players.ban_end from players inner join machines on players.id=machines.player_id where players.firstname=(?) and players.name=(?)""", [firstname, name]) row = cur.fetchone() con.close() user = None if row is not None: user = {'id': row[0], 'firstname': row[1], 'name': row[2], \ 'machine_id': row[3], 'ip': row[4], 'ban_end': row[5]} return user def is_banned(user_id): con = connect_sqlite() cur = con.cursor() cur.execute("""select ban_end from players where id=(?)""", [user_id]) ban_end = cur.fetchone()[0] con.close() return time() < ban_end def playable_required(f): @wraps(f) def decorated_function(*args, **kwargs): ip=get_ip() user = get_player_from_ip(ip) if 'ip' in session: # On enregistre l'ip afin d'éviter les problèmes lors du déplacement des user (rez <-> Supelec) session['ip'] = ip # Attention : un utilisateur inscrit ne peut pas être forcé à être # désinscrit s'il n'enlève pas son cookie de session. On évite la # réexécution de la requête. if 'subscribed' not in session or not session['subscribed'] or session['ip'] != ip: session['subscribed'] = user is not None session['ip'] = ip if DEBUG: print('New user : ',session) if not session['subscribed']: return render_template('not_subscribed.html') # Un utilisateur banni ne peut pas jouer if user and is_banned(user['id']): return banned() return f(*args, **kwargs) return decorated_function def get_players_not_banned(): con = connect_sqlite() cur = con.cursor() cur.execute("""select id,firstname,name from players where (?) > ban_end """, [time()]) rows = cur.fetchall() con.close() return [{'id': row[0], 'firstname': row[1], 'name': row[2]} for row in rows] def cheat(player_id, target_id): success = random.choice([True, False]) try: ok = [line.strip().partition(' ') for line in IMMUNITY] ok = [get_player_from_full_name(names[0], names[2])['id'] for names in ok] ko = [line.strip().partition(' ') for line in ASSHOLES] ko = [get_player_from_full_name(names[0], names[2])['id'] for names in ko] if target_id in ko: success = True elif player_id in ko: success = False elif target_id in ok: success = False except TypeError: pass return success def ban(player_id, target_id, success): player = get_player(player_id) target = get_player(target_id) banned_player = success and target or player con = connect_sqlite() cur = con.cursor() cur.execute("""select id,ban_end from players where id=(?)""", [banned_player['id']]) ban_end = cur.fetchone()[0] ban_end = time() + BAN_DURATION cur.execute("""update players set ban_end=(?) where id=(?)""", [ban_end, banned_player['id']]) cur.execute("""insert into bans (player_id,target_id,success,time) values (?,?,?,?)""", [player['id'], target['id'], \ success and 1 or 0, time()]) con.commit() con.close() def unban(player_id): con = connect_sqlite() cur = con.cursor() cur.execute("""update players set ban_end=(?) where id=(?)""", [time() - BAN_DURATION, player_id]) con.commit() con.close() def get_bans(player_id): con = connect_sqlite() cur = con.cursor() # Bannissements concernant le joueur : cur.execute("""select player_id,target_id,success,time from bans where target_id=(?) or player_id=(?)""", [player_id, player_id]) rows = cur.fetchall() con.close() return [{'player_id': row[0], 'target_id': row[1], \ 'success': row[2], 'time': row[3]} for row in rows] def banned(): player = get_player_from_ip(get_ip()) if DEBUG: print(player,'is banned and tries to play') last_ban = sorted(get_bans(player['id']), key=lambda p: p['time'], \ reverse=False)[-1] if last_ban['target_id'] == player['id'] and last_ban['success'] == 1: source = get_player(last_ban['player_id']) explanation = u'Tu t\'es fait bannir par %s %s.' \ % (source['firstname'], source['name']) else: explanation = u'Tu t\'es banni toi-même, pas de chance...' timeleft = duration_format(int(player['ban_end'] - time())) return render_template('banned.html', \ explanation=explanation, timeleft=timeleft) @app.route('/banned_ip') def banned_ip(): # Liste des ip pour récupération par babel et plop if not DEBUG: if get_ip() not in ['10.7.0.254']: abort(403) con = connect_sqlite() cur = con.cursor() cur.execute("""select machines.ip from players inner join machines on players.id=machines.player_id where players.ban_end>(?)""", [time()]) rows = cur.fetchall() con.close() return '\n'.join([row[0] for row in rows]) @app.route('/') @playable_required def home(): ip = get_ip() player = get_player_from_ip(ip) #if ip not in ['10.69.8.5', '10.69.8.202']: # abort(403) if DEBUG: print(player, 'arrived') if session.get('logged_in'): pass bans = sorted(get_bans(player['id']), \ key=lambda ban: ban['time'], \ reverse=True) bans_hist = [] for ban in bans: date = strftime('%Hh%M (%A)', localtime(ban['time'])) source = get_player(ban['player_id']) target = get_player(ban['target_id']) if target['id'] == player['id']: if ban['success']: entry = ('ban', u'%s : %s %s a réussi à t\'avoir.' \ % (date, source['firstname'], source['name'])) else: entry = ('warn', u'%s : %s %s a essayé de te bannir, en vain.' \ % (date, source['firstname'], source['name'])) else: if ban['success']: entry = ('ok', u'%s : Tu as banni %s %s avec succès.' \ % (date, target['firstname'], target['name'])) else: entry = ('ban', u'%s : Tu as échoué en voulant bannir %s %s.' \ % (date, target['firstname'], target['name'])) bans_hist.append(entry) return render_template('home.html', bans_hist=bans_hist) @app.route('/jouer', methods=['GET', 'POST']) @playable_required def play(): ip = get_ip() player = get_player_from_ip(ip) # Traitement de la requête de bannissement if request.method == 'POST': target_id = request.form['target_id'] if target_id != 'none': if is_banned(target_id): flash(u'Utilisateur déjà banni, il faut en choisir un autre.') else: success = cheat(player['id'], target_id) if success: target = get_player(target_id) ban(player['id'], target_id, True) flash(u'Trop cool, %s a été tranché pour un bon moment.' \ % target['firstname']) else: ban(player['id'], target_id, False) return banned() # Liste des joueurs non bannis, triée dans l'ordre croissant ou décroissant players = sorted(get_players_not_banned(), \ key=lambda player: player['firstname'], \ reverse = random.choice([True, False])) # sans le joueur actuel players = filter(lambda p: p['id'] != player['id'], players) return render_template('play.html', players=players) @app.route('/login', methods=['GET', 'POST']) def login(): error = None if request.method == 'POST': if request.form['username'] != app.config['USERNAME']: error = 'Invalid username' elif request.form['password'] != app.config['PASSWORD']: error = 'Invalid password' else: session['logged_in'] = True flash('You were logged in') return redirect(url_for('home')) return render_template('login.html', error=error) @app.route('/logout') def logout(): session.pop('logged_in', None) flash('You were logged out') return redirect(url_for('home')) if __name__ == '__main__': app.run()