本来还以为挺有成就感了,结果,奶奶,竟然有个pkcs7规范,使用 PyCrypto 进行 AES/ECB/PKCS#5(7) 加密

import struct, zlib, StringIO
from Crypto.Cipher import AES
def zip_data(data):
    '''压缩数据的方法'''
    return zlib.compress(data)
def unzip_data(data):
    '''解压缩数据'''
    return zlib.decompress(data)
def pkcs7_padding(s):
    BS = AES.block_size
    return s + (BS - len(s) % BS) * chr(BS - len(s) % BS)
def pkcs7_unpadding(s):
    return s[0:-ord(s[-1])]
def aes_encrypt(text, key, mode=AES.MODE_CBC):
    '''AES加密算法'''
    cryptor = AES.new(key, mode, key)
    #这里密钥key 长度必须为16(AES-128)
    text = pkcs7_padding(text)
    ciphertext = cryptor.encrypt(text)
    return ciphertext
def aes_decrypt(text, key, mode=AES.MODE_CBC):
    '''AES解密算法'''
    cryptor = AES.new(key, mode, key)
    plain_text  = cryptor.decrypt(text)
    plain_text = pkcs7_unpadding(plain_text)
    return plain_text

================更新于2014-11-24 13:00===================
      在测试python的AES加密时候发现网上的一个算法有缺陷,由于AES要求加密的字符串为16的倍数,所以不足需要不足长度,那么来看网上的算法

def aes_encrypt(text, key, mode=AES.MODE_CBC):
    PADDING = '\0' #不足填充空
    cryptor = AES.new(key, mode, b'0000000000000000')
    #这里密钥key 长度必须为16(AES-128)
    length = 16
    count = len(text)
    if count < length:
        add = (length-count)
        #\0 backspace
        text = text + (PADDING * add)
    elif count > length:
        add = (length-(count % length))
        text = text + (PADDING * add)
    ciphertext = cryptor.encrypt(text)
    return ciphertext
def aes_decrypt(text, key, mode=AES.MODE_CBC):
    PADDING = '{'
    cryptor = AES.new(key, mode, b'0000000000000000')
    plain_text  = cryptor.decrypt(text)
    return plain_text.rstrip(PADDING)

      粗看没有问题,但内容先gzip后,再做aes解密,gzip后的\0会被去掉,结果导致ungzip失败,真是个悲剧。再来看这位博主用”{“来填充: Creating 128 bit AES Ciphertext and Key with Python PyCrypto,但是要是{也冲突如何是好呢,那么解决方法来了:Python Encrypting with PyCrypto AES。解决方法为在内容结尾填充文本长度,代码如下:

def aes_encrypt(text, key, mode=AES.MODE_CBC):
    cryptor = AES.new(key, mode, b'0000000000000000')
    #这里密钥key 长度必须为16(AES-128)
    length = 16
    count = len(text)
    lengx = length - (count % length)
    text += chr(count)*lengx
    ciphertext = cryptor.encrypt(text)
    return ciphertext
def aes_decrypt(text, key, mode=AES.MODE_CBC):
    cryptor = AES.new(key, mode, b'0000000000000000')
    plain_text  = cryptor.decrypt(text)
    return plain_text[:-ord(plain_text[-1])]

      当然代码还是有问题,chr只能支持0-255,那么解决方法可以根据需要定义结尾为比如最后4个字节为文本长度,那么代码如下:

def aes_encrypt(text, key, mode=AES.MODE_CBC):
    cryptor = AES.new(key, mode, b'0000000000000000')
    #这里密钥key 长度必须为16(AES-128)
    length = 16
    count = len(text)
    lengx = length - (count % length)
    countbin = struct.pack('>L', count) #四个字节的int
    intlen = len(countbin)
    padding = ''
    if lengxL', plain_text[-4:])[0]
    return plain_text[:dlen]

测试方法如下:

def test_aes():
    seckey = 'asdf'*4
    text = 'testasdsdfdsdfsd1'*200
    atxt = aes_encrypt(text, seckey)
    text = aes_decrypt(atxt, seckey)
    print text

       python压缩和解压缩的方法,解压缩很容易找到,压缩的方法可以参考这里:http://stackoverflow.com/questions/8506897/how-do-i-gzip-compress-a-string-in-python

def gzip_data(data):
    '''压缩数据的方法'''
    buf = StringIO.StringIO()
    f = gzip.GzipFile(fileobj=buf, mode="w")
    f.write(data)
    f.close()
    return buf.getvalue()
def ungzip_data(data):
    '''解压缩数据'''
    buf = StringIO.StringIO(data)
    f = gzip.GzipFile(fileobj=buf)
    return f.read()
def test_gzip():
    '''测试'''
    istr = 'asdfasd'
    gdata  = gzip_data(istr)
    print ungzip_data(gdata)

      第一次测试的时候出现错误:
File "D:\Python26\lib\gzip.py", line 212, in read
self._read(readsize)
File "D:\Python26\lib\gzip.py", line 267, in _read
self._read_eof()
File "D:\Python26\lib\gzip.py", line 304, in _read_eof
hex(self.crc)))
IOError: CRC check failed 0x28d60008 != 0x0L

是因为在gzip_data中write后没有close掉,看官方文档如下:Calling a GzipFile object’s close() method does not close fileobj, since you might wish to append more material after the compressed data. This also allows you to pass a StringIO object opened for writing as fileobj, and retrieve the resulting memory buffer using the StringIO object’s getvalue() method.

25 / 08 / 2014 buling

      之前做了一个单机游戏,由于记录的游戏大小是下载器的大小,现在想还原为安装包大小,被迫从excel中解析大小出来,刚开始使用 xlrd模块,在解析某些excel文件时出现:xlrd.compdoc.CompDocError: Workbook corruption: seen[2] == 4错误信息,被迫改为别的方案,当然,最好的方法使用win32com包调用excel提供的开放接口,刚开始还是报错,错误原因没有记录下来,后来搜索得知是安装的office软件有问题,之前安装的是wps,卸载后安装了微软office2010,测试正常,下面对两个模块读取excel的方法做记录

ipmort xlrd
def test_xlrd():
    xfile = 'xxx/1399864038_0.xls'
    xls = xlrd.open_workbook(xfile)
    table = xls.sheets()[0]
    nrow = table.nrows
    for i in range(nrow):
        row = table.row_values(i)
        #print cell 5 value
        print row[5]

如果上面的方法会出错,请选择win32com模块的方法,如下

def test_win32com():
    xfile = 'F:/wamp/www/softadmin/pcmgr_app/python/bin/xxx/1399864038_0.xls'
    xlApp = win32com.client.Dispatch('Excel.Application')
    xlApp.Visible = False
    xls = xlApp.Workbooks.Open(xfile)
    #index from 1
    table = xls.Worksheets(1)
    #get used info
    info = table.UsedRange
    nrows = info.Rows.Count
    for i in range(nrow):
        cell = table.Rows[i].Cells
        #print cell 5 value
        print cell[5].Value
    #close without save
    xls.Close(SaveChanges=0)

o…

无标签信息 0 条

今天在对python的扩展做压力测试时发现内存一直在猛涨,结果发现原来是在返回PyDict_New的时候出现了问题,原来的代码如下:

    PyObject *d = PyDict_New();
    map::iterator it=result.begin();
    for(; it!=result.end(); ++it){
        PyDict_SetItem(d, Py_BuildValue("s", it->first.c_str()), Py_BuildValue("s", it->second.c_str()));
    }

后来google发现一篇文章写的很不错:http://blog.csdn.net/littlegrizzly/article/details/7701096,就不多说了,直接上修正后的代码:

    for(; it!=result.end(); ++it){
        PyObject *key = Py_BuildValue("s", it->first.c_str());
        PyObject *val = Py_BuildValue("s", it->second.c_str());
        PyDict_SetItem(d, key, val);
        Py_XDECREF(key);
        Py_XDECREF(val);
    }
无标签信息 0 条

      这两天一直在忙着配置中心的开发,现在终于到了python的扩展开发部分,其它都好说,主要是在当c++线程检测到配置更新时回调通知python的问题上,下面来看示例
在python端是这样:

import ConfigApp
def callback(name):
    print 'update_____________', name
config = ConfigApp.Config()
config.register_modify(callback)

上面的register_modify在c++扩展开发中怎么实现呢,且看代码

//回调句柄
static PyObject *update_callback = NULL;
//配置更新通知,参考:http://docs.python.org/2/extending/extending.html 官方demo
static PyObject* Config_method_register_modify(Config *self, PyObject *pArgs, PyObject *kwds){
    PyObject *temp;
    int ret = -1;
    if (PyArg_ParseTuple(pArgs, "O:set_callback", &temp)) {
        if (!PyCallable_Check(temp)) {
            PyErr_SetString(PyExc_TypeError, "parameter must be callable");
            return NULL;
        }
        Py_XINCREF(temp);             /* Add a reference to new callback */
        Py_XDECREF(update_callback);  /* Dispose of previous callback */
        update_callback = temp;       /* Remember new callback */
        ret = AppApi::register_modify(_modify_callback);
    }
    return Py_BuildValue("i", ret);
}

接下来就看我们注册的c++函数_modify_callback的实现过程:

static void _modify_callback(const char *module){
    if (!PyCallable_Check(update_callback)) {
        PyErr_SetString(PyExc_TypeError, "callable have some problem!");
        return ;
    }
    PyObject *arglist = Py_BuildValue("(s)", module);
    PyObject_CallObject(update_callback, arglist);
    Py_DECREF(arglist);
}

看完啊,千万不要把上面代码copy走了!在异步回调的时候出现 :python2.6: line 4: 16251 Segmentation fault
      真要命,这如何是好,主要是我咋搜索呢,百般尝试发现搜索“python extension callback thread”是很靠谱的,最终确认了这样的一篇文章“C++调用PythonAPI线程状态和全局解释器锁 ” ,Py_BEGIN_ALLOW_THREADS这个尝试失败,我就不扯蛋了,直接说官方文档“Initialization, Finalization, and Threads”,按官方现在的代码应该是下面这样的:

//....上面一样,省略
    PyGILState_STATE gstate = PyGILState_Ensure();
    PyObject *arglist = Py_BuildValue("(s)", module);
    PyObject_CallObject(update_callback, arglist);
    Py_DECREF(arglist);
    PyGILState_Release(gstate);
//....上面一样,省略

      啊,回调终于成功了!如果你这个时候走了,那你又悲剧了,因为回调完成后就“Segmentation fault”,然后就继续google,得到结论:http://stackoverflow.com/questions/5140998/why-does-pygilstate-release-segfault-in-this-case,改代码如下:

    //下面这句话很重要
    PyEval_InitThreads();
    //########################
    PyGILState_STATE gstate = PyGILState_Ensure();
    PyObject *arglist = Py_BuildValue("(s)", module);
    PyObject_CallObject(update_callback, arglist);
    Py_DECREF(arglist);
    PyGILState_Release(gstate);

      恭喜恭喜

无标签信息 1 条

      发现好久没有写过日志了,突然懒起来拽都拽不动,看着要过年了,又开售,天天成堆的群发短信弄的我焦头烂额,一晃春节马上就过年了。年前的时候接到一个33e9的短信网关开发需求,这个网关不同是,统一了三个网关上下行短信号码。其实说来也没啥,但他们要求用java开发,只有java提供的jar包是用socket开发,其它语言只能使用webservice,这么大用户量,用webservice不太现实,还好以前也写过java。
      之前有一套python开发的短信网关,主要分这么几块:1、http的api进程接受请求,短信入库,并插入日志库;2、3网关进程接收到来自http进程的信号通知,取数据发送,等待状态报告、上行消息等;3、日志进程,整理数据、发送状态、状态报告、上行消息等;4、上行服务负责把上行消息归类并通知各个上行处理插件进程。
      为了快速开发,把以前的python脚本升级了一下(主要是进程代码处理上),把网关进程换成了33e9的java代码,在python中有个threading.Event的模块非常好用,所以在java中就自造了一个

public class ThreadEvent {
    private boolean isset=false;
    private Thread _thread;
    /**
     * 事件是否被设置
     * @return
     */
    public Boolean isSet(){
        return isset;
    }
    /**
     * 设置事件
     * @return
     */
    public synchronized boolean set(){
        isset = true;
        notify();
        return isset;
    }
    /**
     * 事件等待
     * @param timeout wait in milliseconds
     * @throws Exception
     */
    public synchronized void xwait(int timeout) throws Exception{
        wait(timeout);
    }
    /**
     * 清除事件设置
     * @return
     */
    public synchronized boolean clear(){
        isset = false;
        return !isset;
    }
}

接下来是信号通知的代码,感知到数据更新后快速响应,而非sleep轮询

    @SuppressWarnings("sun.misc.Signal")
    private synchronized void init(){
        dataEvent = new ThreadEvent();
        dataEvent.set();
        SignalHandler dataSig = new SignalHandler() {
            @Override
            public void handle(Signal signal) {
                dataEvent.set();
            }
        };
        //捕捉收到短信信号
        Signal sig = new Signal("USR2");
        Signal.handle(sig, dataSig);
    }
    public void run(){
        smsBean = new SmsBean();
        msgSend = new MsgSend();
        msgSend.start();
        int num = 0;
        while(!Service.END){
            try{
                if(!dataEvent.isSet()){
                    dataEvent.xwait(20000);
                }
                //System.out.println("check test");
                num = sendSms();
                if(num<1){
                    dataEvent.clear();
                }
            }catch (Exception ex){
                dataEvent.clear();
                ex.printStackTrace();
                Logging.error("smsMain while error:%s", ex.getMessage());
            }
        }
    }

      io信号通知事件更新的做法除了实现简单,而且可以减少不必要的轮询,而且快速响应,当然缺点是只能在单机部署代码。(或者能多机,但还不知道实现),其余代码都基本可以忽略不计了。
      开发完成后,写了一个小上行处理app,用户上行短信,下发一条对用户的回答,采用的是时下最火的小黄鸡代码啦,很简单,但很好用,代码上

    def get_content(self, cont):
        url = 'http://api.simsimi.com/request.p'
        cont = cont.decode('gbk').encode('utf8')
        params = 'key=your-key&lc=ch&ft=1.00&text='+cont
        f = urllib.urlopen(url, params)
        ret = f.read()
        f.close()
        if ret:
            xret = JsonUtil.read(ret)
            ret = xret.has_key('response') and xret.get('response').decode('utf8').encode('gbk') or '谢谢你发来短信'
        else:
            ret = '谢谢你发来短信'
        return ret

      之前系统出了一次问题,主要是一个服务开启了太多句柄,导致最后socket超出最大值,这次增加进程句柄数量监控:

def check_pid_handle_num(maxnum = 500):
    try:
        #获取当前所有进程打开句柄数
        cmd = "/usr/sbin/lsof -n|awk '{print $2}'|sort|uniq -c|sort -nr"
        p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
        #轮询检查当前所有进程
        ret = {}
        for i in p.stdout.readlines():
            d = i.strip(' ').strip('\n').split(' ')
            if d[0] > maxnum:
                cmdd = "ps -C -p "+d[1]+"|awk '{print $6}'"
                pp = subprocess.Popen(cmdd, shell=True, stdout=subprocess.PIPE)
                #放入当前进程和句柄数量
                ret[d[1]] = {'serv':pp.stdout.readlines()[1], 'num':d[0]}
                pp.terminate()
        return ret
    except:
        logging.error('error in check_pid_handle_num:%s', traceback.format_exc())
        return []

      如果超出了最大数,方法返回的是个字典{‘进程id’:{‘serv’:’服务名’, ‘num’:’打开句柄数’}}。查看系统当前最大句柄数:ulimit -n

      虽然在这次的德州demo中我只是把消息服务作为了一个线程处理,但并不影响实际的消息管理,其实也没有什么要说的,消息格式:消息长度+版本号+事件名+事件对应参数详情。之前写短信网关时候对SocketServer没有理解太深,没有设置socket的超时时间,结果导致大量请求挂起,导致最后服务挂起。
      今天一连写了三篇博客,大部分是对最近这段时间的所作描述,因为表述不怎么好,所以就贴了比较多的代码。以后再回过头来看曾经写的代码会是什么样的心情呢。在flash上我也写了一些模块,本本快没电了,现在也有点晚了,所以就放在下一篇文章中在描述吧。

#coding=gbk
import time
import struct
import socket
import logging
import traceback
import threading
import binascii
import Object
import Config
import ClientEvent
import SocketServer
import MsgPackage
import Connection
import Game.M as M
import Game.I as I
class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
    '''线程请求管理'''
    def setup(self):
        self.request.settimeout(Config.CLIENT_OUTTIME)
    def send(self, command, info):
        _data = MsgPackage.pack(command, info)
        _len = len(_data)
        _data = struct.pack('>L', _len) + _data
        self.request.send(_data)
        logging.info('send[%s] data[%s]:%s', M.get(command), _len, info)
    def handle(self):
        try:
            while True:
                ctime = time.time()
                buf = self.request.recv(4)
                if not buf:
                    break
                cmd_len = struct.unpack('>L', buf)[0]
                data = ''
                while len(data) < cmd_len:
                    obuf = self.request.recv(cmd_len - len(data))
                    if not obuf:
                        raise Exception('not receive anything!')
                    data += obuf
                _data = MsgPackage.unpack(data)
                logging.info('get[%s] data[%s]:%s', M.get(_data['c']), cmd_len, data)
                ret = ClientEvent.deal(_data['c'], _data['i'], self)
                if ret: #有返回消息
                    self.send(*ret)
        except socket.timeout:#连接超时
            cur_thread = threading.currentThread()
            emsg = cur_thread.getName() + ' timeout connect!'
            logging.info(emsg)
        except:
            logging.error('error in ThreadedTCPRequestHandler :%s, res:', traceback.format_exc())
        self.request.close()
        Connection.unbind(self.request)
class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    pass
class Server:
    def __init__(self, host, port):
        self.host = host
        self.port = port
        self.server = None
        ThreadedTCPServer.allow_reuse_address = True
    def run(self):
        server_check = threading.Thread(target=self.check)
        server_check.setDaemon(True)
        server_check.start()
        self.server = ThreadedTCPServer((self.host, self.port), ThreadedTCPRequestHandler)
        self.server.serve_forever()
    def check(self):
        while not Object.END_EVENT.isSet():
            Object.END_EVENT.wait(2)
        self.shutdown()
    def stop(self):
        self.shutdown()
if __name__ == '__main__':
    Server('0.0.0.0', 25525).run()

      下面这个是上文提到的GameServer代码,主要是轮询赛桌当前应该处理的事件和自处理事件。没有什么技术含量,主要是补充上篇博客的阅读:

#coding=gbk
import time
import traceback
import logging
import threading
import Config
import Queue
import Object
_INSTANCE_ = None
class GameServer:
    def __init__(self):
        self._table_conf = {}
        self._table_msgq = {}
    def join_table(self, table):
        if self._table_conf.has_key(table.table_id):
            return -1
        self._table_conf[table.table_id] = table
        self._table_msgq[table.table_id] = Queue.Queue()
        logging.info('table[%s] created!', table.table_id)
    def do_event(self, table_id, uid, command, info):
        '''处理一个事件'''
        t_queue = self._table_msgq.get(table_id, None)
        if not t_queue:
            return -1
        t_queue.put({'uid':uid, 'event':command, 'info':info})
        return True
    def get_event(self, table_id):
        '''获取一个事件'''
        t_queue = self._table_msgq.get(table_id, None)
        if not t_queue:
            return {}
        try:
            return t_queue.get_nowait()
        except:
            return {}
    def start(self):
        t = threading.Thread(target = self.run)
        t.setDaemon(True)
        t.start()
        return self
    def run(self):
        while not Object.END_EVENT.isSet():
            try:
                for tid, table in self._table_conf.items():
                    #logging.info('%s, %s, %s', table.status, table.wait_next_uid, table.wait_end_uid)
                    if table.status != Config.GAMEOVER_STATUS:
                        table.check(self.get_event(tid))
                time.sleep(1)
            except:
                logging.error('error in GameServer.run:%s', traceback.format_exc())
def getInstance():
    '''获取游戏服务进程'''
    global _INSTANCE_
    if not _INSTANCE_:
        _INSTANCE_ = GameServer()
    return _INSTANCE_

      上周末写了一个德州牌桌demo,桌子有点大,所以我把一个桌子分成了两半,一半是处理用户请求的,包括:加注、看牌、弃牌、跟注、all in等。一半是牌桌自己事件,包括:抽取庄家位,发底牌,发翻牌,发转牌,发河牌,算奖等。其实大部分方法来说都是一两句代码,分出方法是为了维护方便,也是为了能好意思贴出来。
      先看这个牌桌这半个类吧,写了这么几年代码,总结来说:1、代码应该简洁;2、代码要有思想;3、代码逻辑性强;4、代码模块要独立。虽然我不敢说自己的代码真能做到,但我一直是往这些方面靠拢。感觉这个牌桌有这么几点自我感觉还行:1、游戏每个事件都有不同方法;2、游戏中几乎没有跟游戏无关的代码(除了事件监听和发送)。其实做事件的目的是为了通知用户赛桌的变化和对赛事包括牌谱持久化,牌谱分析统计等等。
      代码中存在一些如M、I的模块,还有Config等涉及公司机密,就不方便贴出来了,M是定义的事件类型;I是定义的消息中的参数名。Config中包括一些状态定义等等,缺少也不会影响代码阅读,自己补充一点点就能跑起来。
      先看一下,牌桌测试程序吧,这个就是常说的测试案例,有这么一个代码就好了,你可以随意修改服务,修改完成运行一次就可以知道服务有么有什么问题:

    gserv = Game.GameServer.getInstance()
    t = Game.Table.Table('id123', 6, 5)
    t.match_type = Game.Config.IMMEDIATE
    t.init_event()
    t.user_join({'u1' : [1000, 1],
                 'u2' : [1000, 2],
                 'u3' : [1000, 3],
                 'u4' : [1000, 4],
                 'u5' : [1000, 5]})
    gserv.start().join_table(t)
    n, j = 2, 1
    while not Object.END_EVENT.isSet():
        if t.wait_next_uid != 0 and n>0:
            gserv.do_event(t.table_id, t.wait_next_uid, M.CALL, {})
            n -= 1
        if t.status == 'turn' and j == 1:
            gserv.do_event(t.table_id, t.wait_next_uid, M.RAISE, {'chips':100})
            j = 0
        time.sleep(1)

      代码贴的实在是有点多了,关于GameServer我就再写一篇来描述消息接收和游戏轮询做介绍吧

#coding=gbk
import time
import logging
import M, I
import Code
import Event
import Config
import Poker
import TableUser
import EventHelper
class Table(TableUser.TableUser):
    def __init__(self, table_id, seat_num = 6, action_time = 20):
        TableUser.TableUser.__init__(self)
        self.table_id = table_id     #桌子id
        self.table_name = ''         #桌子名称
        self.table_desc = ''         #桌子简介
        self.table_poker = []        #桌上当前牌
        self.match_id = None         #牌桌所属赛事id
        self.match_type = None       #牌桌所属赛事类型
        self.seat_num = seat_num     #当前桌子座位数
        self.action_time = action_time #玩家行动超时时间
        self._poker = None           #扑克牌队列
        self.table_info = {          #牌桌相关信息
                           'bblind':100,   'sblind':50, 'ante':0, 'dealer_uid':0,
                           'bblind_uid':0, 'sblind_uid':0, 'dealer_uid':0}
        self.status_list = [Config.START_STATUS, Config.BLIND_STATUS, Config.PREFLOP_STATUS,
                            Config.FLOP_STATUS, Config.TURN_STATUS, Config.DRIVE_STATUS,
                            Config.PRIZE_STATUS, Config.GAMEOVER_STATUS]
        self.free_seat = range(1, seat_num+1) #空座位列表[座位号]
        self.pot_info = []           #奖池信息
        self.pot_uinfo = []          #奖池对应的用户信息
        self.status = Config.INIT_STATUS #初始化状态
    def init_event(self):
        '''初始化事件'''
        TableUser.TableUser.init_event(self)
        self.event.add_event_listener(Event.PREFLOP_COMPLETE, EventHelper.on_preflop_msg)
        self.event.add_event_listener(Event.FLOP_COMPLETE, EventHelper.on_flop_msg)
        self.event.add_event_listener(Event.TURN_COMPLETE, EventHelper.on_turn_msg)
        self.event.add_event_listener(Event.DRIVE_COMPLETE, EventHelper.on_drive_msg)
        self.event.add_event_listener(Event.BLIND_COMPLETE, EventHelper.on_blind_msg)
        self.event.add_event_listener(Event.READY_COMPLETE, EventHelper.on_table_ready)
        return self
    def reset(self):
        '''重置桌子信息'''
        self.table_poker = []
        self.wait_next_uid = 0
        self.user_max_chips = 0
        self._poker = Poker.get_poker_queue()
    def get_dealer_uid(self):
        '''获取庄家位子'''
        import random
        return random.choice(self.cplayer_list)
    def first_blind_uid(self):
        '''第一次设置大小盲'''
        dealer_uid = self.get_dealer_uid()
        plen = len(self.cplayer_list)
        dindex = self.cplayer_list.index(dealer_uid)
        bindex = (dindex+2) % plen
        self.table_info['sblind_uid'] = self.cplayer_list[(dindex+1) % plen]
        self.table_info['bblind_uid'] = self.cplayer_list[(dindex+2) % plen]
        self.table_info['dealer_uid'] = dealer_uid
        self.iplayer_list = self.cplayer_list[bindex+1:] + self.cplayer_list[0:bindex+1]
    def set_blind_uid(self):
        '''设置盲注相关信息'''
        lbb = self.table_info['bblind_uid']
        if not lbb: #如果是第一手
            self.first_blind_uid()
            return None
        bblind_uid = sblind_uid = 0
        if lbb in self.cplayer_list:
            sblind_uid = lbb
        else: #如果上一手的大盲离开
            sblind_uid = 0
        bindex = (self.cplayer_list.index(lbb) + 1) % len(self.cplayer_list)
        bblind_uid = self.cplayer_list[bindex]
        self.table_info['sblind_uid'] = sblind_uid
        self.table_info['bblind_uid'] = bblind_uid
        dindex = (self.cplayer_list.index(sblind_uid) - 1) % len(self.cplayer_list)
        self.table_info['dealer_uid'] = self.cplayer_list[dindex]
        self.iplayer_list = self.cplayer_list[bindex+1:] + self.cplayer_list[0:bindex+1]
    def start(self):
        '''开始游戏'''
        #检查是否可以开始比赛
        self.cplayer_list = self.get_player_list()
        if len(self.cplayer_list) < self.min_player_num:
            self.status = Config.INIT_STATUS
            return False
        logging.info('[%s] start game!', self.table_id)
        self.reset()
        self.status = Config.BLIND_STATUS
        event_content = { 'cplayer_list':self.cplayer_list,
                          'player_dict' :self.player_dict,
                          'table_id'    :self.table_id}
        self.event.dispatch_event(Event.READY_COMPLETE, content = event_content)
        return False
    def blind(self):
        '''设置大小盲注'''
        self.set_blind_uid()
        logging.info('[%s] set blind over!:%s', self.table_id, self.table_info)
        return False
    def cut_card(self):
        '''切牌'''
        ret = self._poker.get()
        logging.debug('[cut_card]:%s', ret)
        return ret
    def pre_flop(self):
        '''底牌圈'''
        self.cut_card()
        for pos in self.cplayer_list:
            self.player_dict[pos].poker_list = [self._poker.get(), self._poker.get()]
        event_content = {'cplayer_list':self.cplayer_list,
                         'player_dict' :self.player_dict,
                         'table_id'    :self.table_id}
        self.event.dispatch_event(Event.PREFLOP_COMPLETE, content = event_content)
        self.do_blind_chips() #扣除大小盲
        return True
    def flop(self):
        '''翻牌圈'''
        self.cut_card()
        self.table_poker.extend([self._poker.get(), self._poker.get(), self._poker.get()])
        event_content = {'cplayer_list':self.cplayer_list,
                         'table_poker':self.table_poker,
                         'table_id'   :self.table_id}
        self.event.dispatch_event(Event.FLOP_COMPLETE, content = event_content)
        return True
    def turn(self):
        '''转牌圈'''
        self.cut_card()
        self.table_poker.append(self._poker.get())
        event_content = {'cplayer_list':self.cplayer_list,
                         'table_poker':self.table_poker,
                         'table_id'   :self.table_id}
        self.event.dispatch_event(Event.TURN_COMPLETE, content = event_content)
        return True
    def drive(self):
        '''河牌圈'''
        self.cut_card()
        self.table_poker.append(self._poker.get())
        event_content = {'cplayer_list':self.cplayer_list,
                         'table_poker':self.table_poker,
                         'table_id'   :self.table_id}
        self.event.dispatch_event(Event.DRIVE_COMPLETE, content = event_content)
        return True
    def prize(self):
        '''派奖'''
        _uinfo = {}
        for uid in self.cplayer_list:
            _player = self.player_dict[uid]
            if _player.status != Config.FOLD:
                _uinfo[uid] = _player.poker_list + self.table_poker
        ret = None
        if len(_uinfo) > 1:
            ret = Poker.compare(_uinfo)
        print _uinfo
        logging.info('[%s] prize :%s', self.table_id, ret)
        return False
    def game_over(self):
        '''游戏结束'''
        for i in self.cplayer_list:
            player = self.player_dict[i]
            player.poker_list = []
        logging.info('game_over!')
        return False
    def _do_blind_chips(self, type, uchips = None):
        '''扣除盲注'''
        player = self.player_dict[self.table_info[type+'_uid']]
        bchips = uchips and uchips or self.table_info[type]
        info = {'chips':bchips}
        self.user_raise(self.table_info[type+'_uid'], info)
        return bchips
    def do_blind_chips(self):
        '''扣除大小盲注'''
        schips = self._do_blind_chips('sblind') #扣除小盲注
        splayer = self.player_dict[self.table_info['sblind_uid']]
        #当前只有大小盲,且小盲all in
        if len(self.cplayer_list) == 2 and splayer.status == Config.ALL_IN:
            bchips = self._do_blind_chips('bblind', schips) #扣除大盲注
        else:
            bchips = self._do_blind_chips('bblind') #扣除大盲注
        event_content = {'cplayer_list':self.cplayer_list,
                         'table_info'  :self.table_info,
                         'blind_info'  :{self.table_info['bblind_uid']:bchips,
                                         self.table_info['sblind_uid']:schips},
                         'table_id'    :self.table_id}
        self.event.dispatch_event(Event.BLIND_COMPLETE, content = event_content)
    def check(self, data = {}):
        '''服务检查'''
        if self.status == Config.INIT_STATUS: #初始态
            return None
        if data.get('uid', None): #用户有请求
            self.player_bet(data['uid'], data['event'], data['info'])
        if self.wait_next_uid == 0 : #做游戏事情
            if self._check_game_over():
                self.status = Config.PRIZE_STATUS
            if not hasattr(self, self.status):
                self.error('error status attr:%s', self.status)
                return None
            if getattr(self, self.status)():
                self._turn_player(self._get_wait_next())
            self.status = self._get_next_status() #跳转到下一状态
        else: #判断用户状态
            player = self.player_dict[self.wait_next_uid]
            if time.time() > player.out_time: #超时
                player.status = Config.NURSE
            if player.status == Config.NURSE: #托管中
                self.user_nurse(self.wait_next_uid)
    def error(self, msg = ''):
        logging.info('error:%s', msg)
    ####################################################################################
    def _check_game_over(self):
        '''判断游戏是否完成'''
        if len(self.iplayer_list) - self.fold_unum == 1: #只剩下一个用户
            return True
        return False
    def _get_next_status(self):
        '''获取下一个游戏状态'''
        if self.status == self.status_list[-1]:
            return self.status
        sindex = self.status_list.index(self.status)
        return self.status_list[sindex + 1]

      下面在来看一下,上面类中继承的TableUser.py模块,该模块主要为了响应用户事件,包括用户超时自动设为托管状态等。

#coding=gbk
import time
import logging
import M, I
import Code
import Event
import Player
import Config
import EventHelper
class TableUser:
    '''桌上玩家动作'''
    def __init__(self):
        self.player_dict = {}        #当前牌桌用户列表 {玩家uid:player}
        self.cplayer_list = []       #当前玩家列表[玩家uid]
        self.nplayer_list = []       #新加入玩家list[玩家uid]
        self.iplayer_list = []       #当前说话玩家顺序
        self.fold_unum = 0           #弃牌用户数
        self.wait_next_uid = 0       #等待的出价玩家uid
        self.user_max_chips = 0      #用户最大出价
        self.min_player_num = 2      #最小玩家数
        self.free_seat = []          #空座位列表[座位号]
        self.event = Event.Event()   #相关事件句柄
        self.user_method_conf = {M.CALL:self.user_call, M.RAISE:self.user_raise,
                                 M.FOLD:self.user_fold, M.CHECK:self.user_check,
                                 M.ALL_IN:self.user_all_in}
    def init_event(self):
        '''初始化事件'''
        self.event.add_event_listener(Event.USER_JOIN, EventHelper.on_user_join)
    def get_player_list(self):
        '''初始化当前玩家列表'''
        _cplayer_list = []
        for uid, player in self.player_dict.items():
            if player.chips > 0 : #取出有筹码的用户
                player.unplay_times = 0
                _cplayer_list.append(uid)
            else:
                player.unplay_times += 1
        if self.cplayer_list: #不是第一手,就有新手列表
            self.nplayer_list = set(_cplayer_list) - set(self.cplayer_list)
        _cplayer_list.sort(cmp=lambda x,y:self.player_dict[x].seat_pos-self.player_dict[y].seat_pos)
        return _cplayer_list
    def _user_join(self, uid, chips=0, seat_pos=None):
        '''玩家加入牌桌'''
        if not self.free_seat: #没有空座
            return Code.NOFREE_SEAT
        if not seat_pos: #分配座位号
            seat_pos = self.free_seat[0]
        elif seat_pos not in self.free_seat: #座位已经被占用
            return Code.EXIST_PLAYER
        player = Player.Player(uid, chips, Config.OK)
        player.seat_pos = seat_pos
        self.player_dict[uid] = player
        self.free_seat.remove(seat_pos)
        event_content = {'info'        :player,
                         'player_dict':self.player_dict,
                         'player_dict' :self.player_dict,
                         'table_id'    :self.table_id}
        self.event.dispatch_event(Event.USER_JOIN, content = event_content)
        return Code.OK
    def user_join(self, userdict = {'uid':['chips', 'seat_pos']}):
        '''用户加入牌桌'''
        code = Code.OK
        for uid, uinfo in userdict.items():
            code = self._user_join(uid, uinfo[0], uinfo[1])
        if self.status == Config.INIT_STATUS and self.match_type == Config.IMMEDIATE:
            self.start() #开始比赛
        return code
    def player_bet(self, uid, event, info):
        '''玩家下注'''
        if self.wait_next_uid != uid:
            return Code.NOTURN_YOU #还没轮到该用户
        if uid not in self.cplayer_list:
            return Code.NOPLAYER #用户非法
        if self.player_dict[uid].status in [Config.FOLD, Config.ALL_IN]:
            return Code.NOPRIM_DO #没有说话权利
        if not self.user_method_conf.has_key(event):
            return Code.DO_ERROR #没有该玩家事件
        ret = self.user_method_conf[event](uid, info)
        if ret is not True:
            return ret
        self.check_game(uid)
        return True
    def check_game(self, uid):
        '''做系统判断'''
        if self._check_street(uid):
            self._reset_street() #重置这一街情况
            self.wait_next_uid = 0
            return None
        next_uid = self._get_wait_next()
        self._turn_player(next_uid)
    def user_call(self, uid, info = None):
        '''玩家跟注'''
        player = self.player_dict[uid]
        uchip = sum(player.bid_list)
        ichips = 0
        if player.chips+uchip > self.user_max_chips:
            ichips = self.user_max_chips - uchip
            player.chips -= ichips
            player.bid_list.append(ichips)
            logging.info('[%s][%s] call :%s', self.table_id, uid, ichips)
        else: #筹码不够,all in
           self.user_all_in(uid)
        return True
    def user_raise(self, uid, info = None):
        '''玩家加注'''
        player = self.player_dict[uid]
        chips = info.get('chips')
        if player.chips > chips:
            player.chips -= chips
            player.bid_list.append(chips)
            self.user_max_chips = max(chips, self.user_max_chips)
            logging.info('[%s][%s] raise :%s', self.table_id, uid, chips)
        else: #筹码不够,all in
            self.user_all_in(uid)
        return True
    def user_fold(self, uid, info = None):
        '''玩家弃牌'''
        self.player_dict[uid].status = Config.FOLD
        self.fold_unum += 1
        logging.info('[%s][%s] fold!', self.table_id, uid)
        return True
    def user_check(self, uid, info = None):
        '''玩家看牌'''
        player = self.player_dict[uid]
        uchip = sum(player.bid_list)
        print 'user_check test[%s, %s, %s]' % (uid, uchip, self.user_max_chips)
        if uchip >= self.user_max_chips:
            logging.info('[%s][%s] check!', self.table_id, uid)
            return True
        return Code.NOPRIM_CHECK #没有权限看牌
    def user_all_in(self, uid, info = None):
        '''玩家ALL IN'''
        player = self.player_dict[uid]
        chips = player.chips
        player.chips = 0
        player.status = Config.ALL_IN
        player.bid_list.append(chips)
        self.user_max_chips = max(chips, self.user_max_chips)
        logging.info('[%s][%s] all in :%s', self.table_id, uid, chips)
        return True
    def user_nurse(self, uid):
        '''玩家智能托管'''
        player = self.player_dict[uid]
        if self.user_check(uid) is not True: #看牌失败
            self.user_fold(uid)
        self.check_game(uid)
####################################################################
    def _turn_player(self, uid):
        '''轮到某玩家说话'''
        logging.info('status:%s, wait:%s', self.status, uid)
        self.wait_next_uid = uid
        if uid == 0:
            return uid
        player = self.player_dict[uid]
        player.out_time = time.time()+self.action_time
    def _reset_street(self):
        '''一街完成重置'''
        self.user_max_chips = 0
        uchips = 0 #统计底池
        for i in self.cplayer_list:
            player = self.player_dict[i]
            uchips += sum(player.bid_list)
            player.bid_list = []
        self.pot_info.append(uchips)
    def _check_street(self, uid):
        '''检查一街是否完成'''
        if uid != self.iplayer_list[-1]:
            return False
        for i in self.cplayer_list:
            player = self.player_dict[i]
            if player.status in [Config.FOLD, Config.ALL_IN]:
                continue
            if sum(player.bid_list) < self.user_max_chips:
                return False
        return True
    def _get_wait_next(self):
        '''设置当前轮到下一玩家'''
        index = 0
        if self.wait_next_uid:
            index = (self.iplayer_list.index(self.wait_next_uid)+1) % len(self.iplayer_list)
        next_uid = 0
        print 'iplayer_list:%s, index:%s, nuid:%s ' % (self.iplayer_list, index, self.wait_next_uid)
        for uid in self.iplayer_list[index:]+self.iplayer_list[0:index]:
            player = self.player_dict[uid]
            if player.status not in [Config.FOLD, Config.ALL_IN]:
                next_uid = uid
                break
        if self.wait_next_uid == next_uid:
            next_uid = 0
        return next_uid

      上次说到元旦的时候coding了一个德州的服务,这段时间刚好插不上什么手就调试了一下服务的通信问题。到目前这个服务都是一个进程,分三个块:1、消息连接管理线程(负责接收客户端消息,并做调度);2、赛事管理线程(目前还没有独立成线程,还没有做到锦标赛一步);3、赛桌管理线程(主要负责管理当前开启的赛桌正常运行)。
      为了跟客户端代码靠齐,我diy了一个python版event模块。其实就是以前写的插件方法上做了一点修饰。先看用法吧:

self.event = Event.Event()   #相关事件句柄
# 监听用户加入牌桌事件,EventHelper是事件方法集合模块
self.event.add_event_listener(Event.USER_JOIN, EventHelper.on_user_join)
#在处理完用户加入牌桌后加上如下代码:
event_content = {'uid':123, 'seat_pos':2}
self.event.dispatch_event(Event.USER_JOIN, content = event_content)

      下面这个是Event.py的全部代码,写的不怎么的,就凑合着看吧,稍后我把游戏逻辑Table.py代码贴出来,基于状态,虽然这个想法我是听来的,我也没有找到更好的方法。

#coding=gbk
import logging
import traceback
READY_COMPLETE        = 'ready_over'         #准备就绪
BLIND_COMPLETE        = 'blind_over'         #扣除盲注完
PREFLOP_COMPLETE      = 'send_preflop_over'  #发底牌
FLOP_COMPLETE         = 'send_flop_over'     #发翻牌完
TURN_COMPLETE         = 'send_turn_over'     #发转牌完
DRIVE_COMPLETE        = 'send_drive_over'    #发河牌完
PRIZE_COMPLETE        = 'prize_over'         #派奖完成
USER_JOIN             = 'user_join'          #用户加入
USER_TURN             = 'user_turn'          #轮转用户
USER_CHECK            = 'user_check'         #玩家看牌
USER_CALL             = 'user_call'          #玩家跟注
USER_RAISE            = 'user_raise'         #玩家加注
USER_FOLD             = 'user_fold'          #玩家弃牌
USER_ALLIN            = 'user_allin'         #玩家all in
class Event:
    '''事件侦听和派发模块'''
    def __init__(self):
        self._event_config = {}
    def _get_unique_id(self, target, function, priority):
        return '%s_%s_%s' % (target, function.__hash__(), priority)
    def add_event_listener(self, target, function, priority=10):
        '''添加事件监听'''
        idx = self._get_unique_id(target, function, priority)
        if not self._event_config.has_key(target):
            self._event_config[target] = {}
        if not self._event_config[target].has_key(priority):
            self._event_config[target][priority] = {}
        self._event_config[target][priority][idx] = {'function' : function}
    def dispatch_event(self, target, content = None, *args, **kwargs):
        '''分发事件'''
        if not self._event_config.has_key(target):
            return None
        actions = self._event_config[target]
        prikey = actions.keys()
        prikey.sort() #按优先级排序
        for k in prikey:
            acs = actions[k]
            for ac in acs:
                a = acs[ac]['function']
                try:
                    a(content, *args, **kwargs)
                except:
                    logging.error('some error in %s-do_action:%s', target, traceback.format_exc())
无标签信息 0 条