上一篇介绍tornado使用的基本知识点,这篇文章介绍tornado关键概念,以及目前经过几个项目优化后所采用的项目结构。
1. 基础概念
1.1异步
即将进行的操作需要请求其它系统或其它函数去执行,而且当前主进程并不(立即)关心它们的执行结果,只是提供一个处理返回结果的函数入口给它们使用
( eg. 去书店买书,请老板查找所需书是否存在,因为所有书的信息都已经录入电脑了,所以查找这个过程比较快,此时我就采用同步方式等待老板的查询结果,如果有的话就继续后面的买书过程;如果老板查询没有这本书,但是告诉我可以留下电话号码,等书进货到了,他会打电话通知我,我再选择购买方式,这个就是异步)
异步通信(前后端请求响应)和异步处理(系统内部处理)是不一样的,当描述前后端通信的时候,异步指的是前段非阻塞方式,同步指的是前端阻塞方式。异步可以是非阻塞或阻塞的。
1.1.1 回调函数 def buyBook():
def buyBookWay():
***do buy book on line***
if queryBookIsExist():
***do buy book off line***
else:
BookStore.stockBooks(tel,callback=buyBookWay)
1.1.2 协程 def buyBook():
def buyBookWay():
***do buy book on line***
if queryBookIsExist():
***do buy book off line***
else:
yield BookStore.stockBooks(tel)
buyBookWay()
1.2 阻塞
当所需要的资源(如cpu,数据库,IO等)被其它事件占用时,就会造成当前处理过程被阻塞。
tornado默认是单线程,在linux中tornado是基于epoll事件驱动框架,所以在网络时间上是无阻塞的,但是执行其它一些耗时操作的时候还是会阻塞其他请求
2. 项目结构
|--apps #应用目录
|--main #子应用
|--dao #数据库操作目录
|--dbCURD.py
|--service #基础功能服务目录
|--bussiness.py
|--handlers.py #url请求处理类目录(RequestHandler及其子类)
|--models.py #数据库模型
|--tests.py #测试用例
|--urls.py #url路由定义
|--admin
|--app_x
|--exts #应用需要的其他外部文件放到此目录
|--data.dat
|--private.pem
|--public.pem
|--libs #应用的第三方库放这里
|--logs #日志文件目录
|--plugin #公共组件服务目录
|--base.py
|--logger.py
|--testSuit #测试集目录
|--static #静态文件目录,setting中配置static_path指定,引用时使用{{static_url('相对路径')}}。前后端分离应用可删除该目录
|--templates #模板文件目录,setting中配置template_path指定。前后端分离应用可删除该目录
|--config.yml #配置文件
|--server.py #应用入口,主执行文件
|--urls.py #主路由文件
|--setting.py #应用配置文件
|--requirements.txt #依赖包文件
|--initDB.py #数据库初始化文件,创建数据库表
3. 文件
file: /server.py
#coding = utff-8
import tornado.ioloop
import tornado.httpserver
import tornado.options
import tornado.web
from urls import urls
from setting import settings
from tornado.options import define,options
application = tornado.web.Application(
handlers=urls,
**settings
)
define("port",default=8000,help="run on the given prot",type = int)
def main():
tornado.options.parse_command_line()
http_server = tornado.httpserver.HTTPServer(application,xheaders=True)
# http_server.listen(options.port)
#上面修改成下面可以使用多进程运行,windows下os.fork()无法执行,cpu数量为1
http_server.bind(options.port)
http_server.start(1) #1指的是1个CPU进程,0表示根据CPU核心数量自动决定进程数量
print "Development server is running at http://127.0.0.1:%s"%options.port
print "Quit the server with Control-C"
tornado.ioloop.IOLoop.instance().start()
if __name__ == "__main__":
main()
file: /urls.py
# coding=utf-8
from importlib import import_module
from tornado.web import URLSpec
import sys
reload(sys)
sys.setdefaultencoding("utf-8")
def include(module):
res = import_module(module)
urls = getattr(res, 'urls', res)
return urls
def url_wrapper(urls):
"""
接受一个数组格式的参数,数组元素类型可以是以下几种情况:
1、 (r'/a/', include('apps.main.urls'),"main"),
2、 (r'/x',TestHandler,"test"),
3、 URLSpec(r'/x',TestHandler,name="test"),
"""
wrapper_list = []
for url in urls:
if isinstance(url,URLSpec):
path, handles, name = url.regex.pattern,url.handler_class,url.name
else:
path, handles = url[0],url[1]
name = None if len(url)<3 else url[2]
if isinstance(handles, (tuple, list)): #如果是include,则包含的handles是元组或列表形式
for handle in handles:
if isinstance(handle, URLSpec):
pattern, handle_class, url_name = handle.regex.pattern, handle.handler_class, handle.name
else:
pattern, handle_class = handle[0],handle[1]
url_name = None if len(handle)<3 else handle[2]
if name==None:
retname = url_name #如果外层name为None,则reverse_url直接使用内层name
else:
retname = name+'_'+url_name if url_name else None #如果外层name不为None,内层name为空则不定义name,即无法被reverse_url使用,内层不为空则reverse_url调用名称为【外层name_内层name】
wrapper_list.append(URLSpec('{0}{1}'.format(path, pattern), handle_class,name=retname))
else:
wrapper_list.append(URLSpec(path, handles,name=name))
return wrapper_list
urls = url_wrapper([
(r'/main/',include('apps.main.urls'),"api"),
(r'/admin/',include('apps.admin.urls'),"admin")
])
file: setting.py
#coding=utf-8
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import os,base64,uuid
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from plugin.base import getCfgValue
INSTALLED_APPS=[
"apps.main",
"apps.admin"
]
DATABASE = {
'DBDRIVER':'mysqldb',
'NAME':getCfgValue('mysql','dbname'),
'USER':getCfgValue('mysql','user'),
'PASSWORD':getCfgValue('mysql','password'),
'HOST':getCfgValue('mysql','host'),
'PORT':getCfgValue('mysql','port')
}
Base = declarative_base()
engine = create_engine('mysql+{DBDRIVER}://{USER}:{PASSWORD}@{HOST}:{PORT}/{NAME}?charset=utf8'.format(**DATABASE),pool_recycle=3600)
settings=dict(
template_path = os.path.join(os.path.dirname(__file__),"templates"),
static_path = os.path.join(os.path.dirname(__file__),"statics"),
debug = False,
cookie_secret = base64.b64encode(uuid.uuid4().bytes + uuid.uuid4().bytes), #用来使用get_secure_cookie方法
xsrf_cookies = False,
login_url = '/mockServer/login',
)
file: /plugin/logger.py
#coding=utf-8
import os,datetime,sys
log_path = os.path.join(os.path.dirname(os.path.dirname(__file__)),"logs")
def log(level,content,where):
now = str(datetime.datetime.today())
today = str(datetime.datetime.today().date())
filePath = os.path.join(log_path,'log_'+today+'.log')
isErr = False
try:
content = content.decode("utf-8")
except:
isErr = True
if isErr:
try:
content = content.decode(sys.getfilesystemencoding())
except:
pass
with open(filePath,'a') as f:
f.write(('[%s][%s]%s [IN %s]\n'%(level,now,content,where)).encode(sys.getfilesystemencoding()))
def error(msg,where='unknown'):
log('ERROR',msg,where)
def info(msg,where='unknown'):
log('INFO', msg, where)
def warning(msg,where='unknown'):
log('WARNING', msg, where)
if __name__=='__main__':
error('你的文件没有访问权限!','line 15-line 19')
info('访问http://www.baidu.com')
warning('删除/x/y/z成功!')
file: /apps/xxx/models.py
#coding=utf-8
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
from sqlalchemy import Table
from sqlalchemy import Column,String,Integer,Text,ForeignKey,DateTime
#常用字段类型有String,Integer,Text,Boolean,SmallInteger,DateTime
from sqlalchemy.orm import relationship
from setting import Base
from datetime import datetime
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(64), nullable=False, index=True)
password = Column(String(64), nullable=False)
email = Column(String(64), nullable=False, index=True)
articles = relationship('Article', backref='author') #relationship只描述关系,不定义字段
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.username)
class UserInfo(Base):
__tablename__ = 'userinfos'
id = Column(Integer, primary_key=True)
name = Column(String(64))
qq = Column(String(11))
phone = Column(String(11))
link = Column(String(64))
user_id = Column(Integer, ForeignKey('users.id')) #多方需要定义外键字段,指定唯一值(为什么不在一对多的唯一值方定义?因为那样子就无法从一查找到具体值)
user = relationship('User', backref='userinfo', uselist=False) #定义一对一关系
class Article(Base):
__tablename__ = 'articles'
id = Column(Integer, primary_key=True)
title = Column(String(255), nullable=False, index=True)
content = Column(Text)
user_id = Column(Integer, ForeignKey('users.id'))
cate_id = Column(Integer, ForeignKey('categories.id'))
tags = relationship('Tag', secondary='article_tag', backref='articles')
createtime = Column(DateTime, default=datetime.now)
updatetime = Column(DateTime, default=datetime.now, onupdate=datetime.now)
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.title)
class Category(Base):
__tablename__ = 'categories'
id = Column(Integer, primary_key=True)
name = Column(String(64), nullable=False, index=True)
articles = relationship('Article', backref='category')
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.name)
## 额外表,用来定义多对多关系 ###
article_tag = Table(
'article_tag', Base.metadata,
Column('article_id', Integer, ForeignKey('articles.id')),
Column('tag_id', Integer, ForeignKey('tags.id'))
)
class Tag(Base):
__tablename__ = 'tags'
id = Column(Integer, primary_key=True)
name = Column(String(64), nullable=False, index=True)
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.name)
if __name__ == "__main__":
pass
file: /apps/xxx/urls.py
# coding=utf-8
from handlers import IndexHandler, LoginHandler, NewProjectHandler, ListAllProjectsHandler, NewModuleHandler, \
GetProjectNameHandler, ListProjectModulesHandler, NewApiHandler, ListProjectApis, GetProjectDescHandler, \
ListApisByMidHandler, GetApiDataHandler, SaveApiDataHandler, ListAllResponseTypesHandler, GetConsulEnvInfoHandler, \
SaveConsulEnvInfoHandler,GetAuthInfoHandler,LogoutHandler
from tornado.web import url
urls = [
url(r'', IndexHandler, name="index"),
url(r'login', LoginHandler, name='login'),
url(r'logout', LogoutHandler, name='logout'),
url(r'getauthinfo', GetAuthInfoHandler, 'getauthinfo'),
url(r'newProject', NewProjectHandler, name='newProject'),
url(r'listAllProjects', ListAllProjectsHandler, name='listAllProjects'),
url(r'newModule', NewModuleHandler, name='newModule'),
url(r'getProjectName', GetProjectNameHandler, name='getProjectName'),
url(r'listProjectModules', ListProjectModulesHandler, name='listProjectModules'),
url(r'newApi', NewApiHandler, name='newApi'),
url(r'listProjectApis', ListProjectApis, name='listProjectApis'),
url(r'getProjectDesc', GetProjectDescHandler, name='getProjectDesc'),
url(r'listApisByMid', ListApisByMidHandler, name='listApisByMid'),
url(r'getApiData', GetApiDataHandler, name='getApiData'),
url(r'saveApiData', SaveApiDataHandler, name='saveApiData'),
url(r'listAllResponseTypes', ListAllResponseTypesHandler, name='listAllResponseTypes'),
url(r'getConsulEnvInfo', GetConsulEnvInfoHandler, name='getConsulEnvInfo'),
url(r'saveConsulEnvInfo', SaveConsulEnvInfoHandler, name='saveConsulEnvInfo'),
]
file: /apps/xxx/handlers.py
# coding=utf-8
import tornado.web
import tornado.concurrent
import tornado.gen
from concurrent.futures import ThreadPoolExecutor
from plugin import logger
from dao.main_curd import *
import sys, time, json
from tornado.httpclient import HTTPRequest
try:
from tornado.curl_httpclient import CurlAsyncHTTPClient as AsyncHTTPClient
except ImportError:
from tornado.simple_httpclient import SimpleAsyncHTTPClient as AsyncHTTPClient
reload(sys)
sys.setdefaultencoding('utf8')
_result = {} # 存储格式为:_result[tid]={'status': 'success', 'msg': context}
TIMEOUT = 30
MAX_WORKERS = 50
class BaseHandler(tornado.web.RequestHandler):
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
def get_current_user(self):
return self.get_secure_cookie("user")
######### handler类异步处理编写方法 ##############
####前端同步获取结果####
class Test1Handler(BaseHandler):
@tornado.concurrent.run_on_executor
def background_task(self):
# do some thing asynchronously
res = 'hello,world'
return res
@tornado.gen.coroutine
def get(self):
res = yield self.background_task()
self.write(res)
####前端异步获取结果####
##第一步,通知服务器执行处理,并生成、存储tid,并返回tid到前端
class Test2Handler(BaseHandler):
@tornado.concurrent.run_on_executor
def background_task(self, tid):
try:
# do some thing asynchronously
res = {'status': 'success', 'msg': ''}
except Exception, e:
res = {'status': 'failed', 'msg': e.message}
_result[tid] = res
@tornado.gen.coroutine
def get(self):
tid = str(int(time.time() * 10000))
yield self.background_task(tid)
self.write(tid)
@tornado.gen.coroutine
def post(self):
tid = str(int(time.time() * 10000))
yield self.background_task(tid)
self.write(tid)
# 第二步,根据tid查询结果内容
class AsynGetResultHandler(BaseHandler):
@tornado.concurrent.run_on_executor
def background_task(self, tid, timeout):
start = time.time()
while not tid in _result.keys():
if time.time() - start > timeout:
break
time.sleep(0.2)
if tid in _result.keys():
out = _result[tid] # 结果
del _result[tid] # 删除tid的数据。
return out
else:
return "timeout."
@tornado.gen.coroutine
def get(self, timeout=TIMEOUT):
tid = self.get_argument("tid")
res = yield self.background_task(tid, timeout)
self.write(res)
file: /apps/xxx/dao/dbCURD.py
# coding=utf-8
import sys, json
reload(sys)
sys.setdefaultencoding('utf-8')
from setting import engine
from sqlalchemy.orm import sessionmaker
from apps.main.service.consulOPT import getService,getAllService,newAgent
from apps.main.models import *
Session = sessionmaker(bind=engine)
#############新增、创建#####################
def addUser(account, passwd, email, role, name='', status=True):
session = Session()
user = User(account=account, name=name, passwd=passwd, email=email, role=role, status=status)
session.add(user)
try:
session.commit()
except Exception, e:
session.rollback()
raise e
finally:
ret = user.id
session.close()
return ret