tornado学习总结(2)

  上一篇介绍tornado使用的基本知识点,这篇文章介绍tornado关键概念,以及目前经过几个项目优化后所采用的项目结构。

1. 基础概念

1.1异步

  即将进行的操作需要请求其它系统或其它函数去执行,而且当前主进程并不(立即)关心它们的执行结果,只是提供一个处理返回结果的函数入口给它们使用
  ( eg. 去书店买书,请老板查找所需书是否存在,因为所有书的信息都已经录入电脑了,所以查找这个过程比较快,此时我就采用同步方式等待老板的查询结果,如果有的话就继续后面的买书过程;如果老板查询没有这本书,但是告诉我可以留下电话号码,等书进货到了,他会打电话通知我,我再选择购买方式,这个就是异步)

  异步通信(前后端请求响应)和异步处理(系统内部处理)是不一样的,当描述前后端通信的时候,异步指的是前段非阻塞方式,同步指的是前端阻塞方式。异步可以是非阻塞或阻塞的。

1.1.1 回调函数 def buyBook():
def buyBookWay():
    ***do buy book on line***
if queryBookIsExist():
    ***do buy book off line***
else:
    BookStore.stockBooks(tel,callback=buyBookWay)
1.1.2 协程 def buyBook():
def buyBookWay():
    ***do buy book on line***
if queryBookIsExist():
    ***do buy book off line***
else:
    yield BookStore.stockBooks(tel)
    buyBookWay()

1.2 阻塞

  当所需要的资源(如cpu,数据库,IO等)被其它事件占用时,就会造成当前处理过程被阻塞。
  tornado默认是单线程,在linux中tornado是基于epoll事件驱动框架,所以在网络时间上是无阻塞的,但是执行其它一些耗时操作的时候还是会阻塞其他请求

2. 项目结构

|--apps                          #应用目录
    |--main                        #子应用
        |--dao                    #数据库操作目录
            |--dbCURD.py
        |--service                #基础功能服务目录
            |--bussiness.py
        |--handlers.py            #url请求处理类目录(RequestHandler及其子类)
        |--models.py             #数据库模型
        |--tests.py                #测试用例
        |--urls.py                #url路由定义
    |--admin
    |--app_x
|--exts                          #应用需要的其他外部文件放到此目录
    |--data.dat  
    |--private.pem  
    |--public.pem  
|--libs                          #应用的第三方库放这里
|--logs                          #日志文件目录
|--plugin                          #公共组件服务目录
    |--base.py
    |--logger.py
|--testSuit                        #测试集目录
|--static                        #静态文件目录,setting中配置static_path指定,引用时使用{{static_url('相对路径')}}。前后端分离应用可删除该目录                      
|--templates                    #模板文件目录,setting中配置template_path指定。前后端分离应用可删除该目录
|--config.yml                    #配置文件
|--server.py                    #应用入口,主执行文件
|--urls.py                        #主路由文件
|--setting.py                    #应用配置文件
|--requirements.txt                #依赖包文件
|--initDB.py                    #数据库初始化文件,创建数据库表

3. 文件

file: /server.py

#coding = utff-8
import tornado.ioloop
import tornado.httpserver
import tornado.options
import tornado.web
from urls import urls
from setting import settings
from tornado.options import define,options

application = tornado.web.Application(
    handlers=urls,
    **settings
)

define("port",default=8000,help="run on the given prot",type = int)

def main():
    tornado.options.parse_command_line()
    http_server = tornado.httpserver.HTTPServer(application,xheaders=True)

    # http_server.listen(options.port)
    #上面修改成下面可以使用多进程运行,windows下os.fork()无法执行,cpu数量为1
    http_server.bind(options.port)
    http_server.start(1)            #1指的是1个CPU进程,0表示根据CPU核心数量自动决定进程数量

    print "Development server is running at http://127.0.0.1:%s"%options.port
    print "Quit the server with Control-C"
    tornado.ioloop.IOLoop.instance().start()


if __name__ == "__main__":
    main()

file: /urls.py

# coding=utf-8
from importlib import import_module
from tornado.web import URLSpec
import sys
reload(sys)
sys.setdefaultencoding("utf-8")

def include(module):
    res = import_module(module)
    urls = getattr(res, 'urls', res)
    return urls

def url_wrapper(urls):
    """
    接受一个数组格式的参数,数组元素类型可以是以下几种情况:
    1、   (r'/a/', include('apps.main.urls'),"main"),
    2、   (r'/x',TestHandler,"test"),
    3、   URLSpec(r'/x',TestHandler,name="test"),
    """
    wrapper_list = []
    for url in urls:
        if isinstance(url,URLSpec):
            path, handles, name = url.regex.pattern,url.handler_class,url.name
        else:
            path, handles = url[0],url[1]
            name = None if len(url)<3 else url[2]

        if isinstance(handles, (tuple, list)):             #如果是include,则包含的handles是元组或列表形式
            for handle in handles:
                if isinstance(handle, URLSpec):
                    pattern, handle_class, url_name = handle.regex.pattern, handle.handler_class, handle.name
                else:
                    pattern, handle_class = handle[0],handle[1]
                    url_name = None if len(handle)<3 else handle[2]
                if name==None:
                    retname = url_name                      #如果外层name为None,则reverse_url直接使用内层name
                else:
                    retname = name+'_'+url_name if url_name else None           #如果外层name不为None,内层name为空则不定义name,即无法被reverse_url使用,内层不为空则reverse_url调用名称为【外层name_内层name】
                wrapper_list.append(URLSpec('{0}{1}'.format(path, pattern), handle_class,name=retname))
        else:
            wrapper_list.append(URLSpec(path, handles,name=name))
    return wrapper_list

urls = url_wrapper([
    (r'/main/',include('apps.main.urls'),"api"),
    (r'/admin/',include('apps.admin.urls'),"admin")    
])

file: setting.py

#coding=utf-8
import sys
reload(sys)
sys.setdefaultencoding('utf-8')


import os,base64,uuid
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from plugin.base import getCfgValue

INSTALLED_APPS=[
    "apps.main",
    "apps.admin"
]

DATABASE = {
    'DBDRIVER':'mysqldb',
    'NAME':getCfgValue('mysql','dbname'),
    'USER':getCfgValue('mysql','user'),
    'PASSWORD':getCfgValue('mysql','password'),
    'HOST':getCfgValue('mysql','host'),
    'PORT':getCfgValue('mysql','port')
}

Base = declarative_base()
engine = create_engine('mysql+{DBDRIVER}://{USER}:{PASSWORD}@{HOST}:{PORT}/{NAME}?charset=utf8'.format(**DATABASE),pool_recycle=3600)

settings=dict(
    template_path = os.path.join(os.path.dirname(__file__),"templates"),
    static_path = os.path.join(os.path.dirname(__file__),"statics"),
    debug = False,
    cookie_secret = base64.b64encode(uuid.uuid4().bytes + uuid.uuid4().bytes),              #用来使用get_secure_cookie方法
    xsrf_cookies = False,
    login_url = '/mockServer/login',
)

file: /plugin/logger.py

#coding=utf-8
import os,datetime,sys

log_path = os.path.join(os.path.dirname(os.path.dirname(__file__)),"logs")

def log(level,content,where):
    now = str(datetime.datetime.today())
    today = str(datetime.datetime.today().date())
    filePath = os.path.join(log_path,'log_'+today+'.log')
    isErr = False
    try:
        content = content.decode("utf-8")
    except:
        isErr = True
    if isErr:
        try:
            content = content.decode(sys.getfilesystemencoding())
        except:
            pass
    with open(filePath,'a') as f:
        f.write(('[%s][%s]%s [IN %s]\n'%(level,now,content,where)).encode(sys.getfilesystemencoding()))

def error(msg,where='unknown'):
    log('ERROR',msg,where)

def info(msg,where='unknown'):
    log('INFO', msg, where)

def warning(msg,where='unknown'):
    log('WARNING', msg, where)


if __name__=='__main__':
    error('你的文件没有访问权限!','line 15-line 19')
    info('访问http://www.baidu.com')
    warning('删除/x/y/z成功!')

file: /apps/xxx/models.py

#coding=utf-8
import sys
reload(sys)
sys.setdefaultencoding('utf-8')

from sqlalchemy import Table
from sqlalchemy import Column,String,Integer,Text,ForeignKey,DateTime
#常用字段类型有String,Integer,Text,Boolean,SmallInteger,DateTime
from sqlalchemy.orm import relationship
from setting import Base
from datetime import datetime


class User(Base):

    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    username = Column(String(64), nullable=False, index=True)
    password = Column(String(64), nullable=False)
    email = Column(String(64), nullable=False, index=True)
    articles = relationship('Article', backref='author')                #relationship只描述关系,不定义字段


    def __repr__(self):
        return '%s(%r)' % (self.__class__.__name__, self.username)


class UserInfo(Base):

    __tablename__ = 'userinfos'

    id = Column(Integer, primary_key=True)
    name = Column(String(64))
    qq = Column(String(11))
    phone = Column(String(11))
    link = Column(String(64))
    user_id = Column(Integer, ForeignKey('users.id'))           #多方需要定义外键字段,指定唯一值(为什么不在一对多的唯一值方定义?因为那样子就无法从一查找到具体值)
    user = relationship('User', backref='userinfo', uselist=False)  #定义一对一关系

class Article(Base):

    __tablename__ = 'articles'

    id = Column(Integer, primary_key=True)
    title = Column(String(255), nullable=False, index=True)
    content = Column(Text)
    user_id = Column(Integer, ForeignKey('users.id'))
    cate_id = Column(Integer, ForeignKey('categories.id'))
    tags = relationship('Tag', secondary='article_tag', backref='articles')
    createtime = Column(DateTime, default=datetime.now)
    updatetime = Column(DateTime, default=datetime.now, onupdate=datetime.now)

    def __repr__(self):
        return '%s(%r)' % (self.__class__.__name__, self.title)


class Category(Base):

    __tablename__ = 'categories'

    id = Column(Integer, primary_key=True)
    name = Column(String(64), nullable=False, index=True)
    articles = relationship('Article', backref='category')

    def __repr__(self):
        return '%s(%r)' % (self.__class__.__name__, self.name)


##  额外表,用来定义多对多关系  ###
article_tag = Table(
    'article_tag', Base.metadata,
    Column('article_id', Integer, ForeignKey('articles.id')),
    Column('tag_id', Integer, ForeignKey('tags.id'))
)


class Tag(Base):

    __tablename__ = 'tags'

    id = Column(Integer, primary_key=True)
    name = Column(String(64), nullable=False, index=True)

    def __repr__(self):
        return '%s(%r)' % (self.__class__.__name__, self.name)




if __name__ == "__main__":
    pass

file: /apps/xxx/urls.py

# coding=utf-8

from handlers import IndexHandler, LoginHandler, NewProjectHandler, ListAllProjectsHandler, NewModuleHandler, \
    GetProjectNameHandler, ListProjectModulesHandler, NewApiHandler, ListProjectApis, GetProjectDescHandler, \
    ListApisByMidHandler, GetApiDataHandler, SaveApiDataHandler, ListAllResponseTypesHandler, GetConsulEnvInfoHandler, \
    SaveConsulEnvInfoHandler,GetAuthInfoHandler,LogoutHandler
from tornado.web import url

urls = [
    url(r'', IndexHandler, name="index"),
    url(r'login', LoginHandler, name='login'),
    url(r'logout', LogoutHandler, name='logout'),
    url(r'getauthinfo', GetAuthInfoHandler, 'getauthinfo'),
    url(r'newProject', NewProjectHandler, name='newProject'),
    url(r'listAllProjects', ListAllProjectsHandler, name='listAllProjects'),
    url(r'newModule', NewModuleHandler, name='newModule'),
    url(r'getProjectName', GetProjectNameHandler, name='getProjectName'),
    url(r'listProjectModules', ListProjectModulesHandler, name='listProjectModules'),
    url(r'newApi', NewApiHandler, name='newApi'),
    url(r'listProjectApis', ListProjectApis, name='listProjectApis'),
    url(r'getProjectDesc', GetProjectDescHandler, name='getProjectDesc'),
    url(r'listApisByMid', ListApisByMidHandler, name='listApisByMid'),
    url(r'getApiData', GetApiDataHandler, name='getApiData'),
    url(r'saveApiData', SaveApiDataHandler, name='saveApiData'),
    url(r'listAllResponseTypes', ListAllResponseTypesHandler, name='listAllResponseTypes'),
    url(r'getConsulEnvInfo', GetConsulEnvInfoHandler, name='getConsulEnvInfo'),
    url(r'saveConsulEnvInfo', SaveConsulEnvInfoHandler, name='saveConsulEnvInfo'),
]

file: /apps/xxx/handlers.py

# coding=utf-8
import tornado.web
import tornado.concurrent
import tornado.gen
from concurrent.futures import ThreadPoolExecutor
from plugin import logger
from dao.main_curd import *
import sys, time, json
from tornado.httpclient import HTTPRequest
try:
    from tornado.curl_httpclient import CurlAsyncHTTPClient as AsyncHTTPClient
except ImportError:
    from tornado.simple_httpclient import SimpleAsyncHTTPClient as AsyncHTTPClient

reload(sys)
sys.setdefaultencoding('utf8')

_result = {}  # 存储格式为:_result[tid]={'status': 'success', 'msg': context}
TIMEOUT = 30
MAX_WORKERS = 50


class BaseHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)

    def get_current_user(self):
        return self.get_secure_cookie("user")


######### handler类异步处理编写方法 ##############
####前端同步获取结果####
class Test1Handler(BaseHandler):
    @tornado.concurrent.run_on_executor
    def background_task(self):
        # do some thing asynchronously
        res = 'hello,world'
        return res

    @tornado.gen.coroutine
    def get(self):
        res = yield self.background_task()
        self.write(res)

####前端异步获取结果####
##第一步,通知服务器执行处理,并生成、存储tid,并返回tid到前端
class Test2Handler(BaseHandler):
    @tornado.concurrent.run_on_executor
    def background_task(self, tid):
        try:
            # do some thing asynchronously
            res = {'status': 'success', 'msg': ''}
        except Exception, e:
            res = {'status': 'failed', 'msg': e.message}
        _result[tid] = res

    @tornado.gen.coroutine
    def get(self):
        tid = str(int(time.time() * 10000))
        yield self.background_task(tid)
        self.write(tid)

    @tornado.gen.coroutine
    def post(self):
        tid = str(int(time.time() * 10000))
        yield self.background_task(tid)
        self.write(tid)

# 第二步,根据tid查询结果内容
class AsynGetResultHandler(BaseHandler):
    @tornado.concurrent.run_on_executor
    def background_task(self, tid, timeout):
        start = time.time()
        while not tid in _result.keys():
            if time.time() - start > timeout:
                break
            time.sleep(0.2)
        if tid in _result.keys():
            out = _result[tid]  # 结果
            del _result[tid]  # 删除tid的数据。
            return out
        else:
            return "timeout."

    @tornado.gen.coroutine
    def get(self, timeout=TIMEOUT):
        tid = self.get_argument("tid")
        res = yield self.background_task(tid, timeout)
        self.write(res)

file: /apps/xxx/dao/dbCURD.py

# coding=utf-8
import sys, json

reload(sys)
sys.setdefaultencoding('utf-8')
from setting import engine
from sqlalchemy.orm import sessionmaker
from apps.main.service.consulOPT import getService,getAllService,newAgent
from apps.main.models import *

Session = sessionmaker(bind=engine)

#############新增、创建#####################
def addUser(account, passwd, email, role, name='', status=True):
    session = Session()
    user = User(account=account, name=name, passwd=passwd, email=email, role=role, status=status)
    session.add(user)
    try:
        session.commit()
    except Exception, e:
        session.rollback()
        raise e
    finally:
        ret = user.id
        session.close()
    return ret