Tornado - 同步与异步以及使用 WebSocket 实现在线聊天
同步
按部就班的依次执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27import time
# 一个客户单的请求
def reqA():
print("开始处理reqA")
print("结束处理reqA")
# 另一个客户端的请求
def reqB():
print("开始处理reqB")
print("结束处理reqB")
# tornado服务
def main():
reqA()
reqB()
while 1:
time.sleep(0.5)
pass
if __name__ == "__main__":
main()
在请求中添加一个耗时操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38import time
# handler获取数据(数据库、其他服务器、循环耗时)
def longIo():
print("开始耗时操作")
time.sleep(5)
print("结束耗时操作")
return "sunck is a good man"
# 一个客户单的请求
def reqA():
print("开始处理reqA")
res = longIo()
print("接收到longIo的响应数据:", res)
print("结束处理reqA")
# 另一个客户端的请求
def reqB():
print("开始处理reqB")
time.sleep(2)
print("结束处理reqB")
# tornado服务
def main():
reqA()
reqB()
while 1:
time.sleep(0.5)
pass
if __name__ == "__main__":
main()
异步(2 种实现方法)
概述
对于耗时的操作,会交给别人 (另一个线程) 去处理,我们继续向下去执行,当比人结束耗时操作后再将结果反馈给我们
回调函数实现异步
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48import time
import threading
# handler获取数据(数据库、其他服务器、循环耗时)
def longIo(callback):
def run(cb):
print("开始耗时操作")
time.sleep(5)
print("结束耗时操作")
cb("sunck is a good man")
threading.Thread(target=run, args=(callback,)).start()
# 函数(回调函数)
def finish(data):
print("开始处理回调函数")
print("接收到longIo的响应数据:", data)
print("结束处理回调函数")
# 一个客户单的请求
def reqA():
print("开始处理reqA")
longIo(finish)
print("结束处理reqA")
# 另一个客户端的请求
def reqB():
print("开始处理reqB")
time.sleep(2)
print("结束处理reqB")
# tornado服务
def main():
reqA()
reqB()
while 1:
time.sleep(0.5)
pass
if __name__ == "__main__":
main()协程实现异步
版本 1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50import time
import threading
gen = None
# handler获取数据(数据库、其他服务器、循环耗时)
def longIo():
def run():
print("开始耗时操作")
time.sleep(5)
try:
global gen
gen.send("sunck is a good man")
except StopIteration as e:
pass
threading.Thread(target=run).start()
# 一个客户单的请求
def reqA():
print("开始处理reqA")
res = yield longIo()
print("接收到longIo的响应数据:", res)
print("结束处理reqA")
# 另一个客户端的请求
def reqB():
print("开始处理reqB")
time.sleep(2)
print("结束处理reqB")
# tornado服务
def main():
global gen
gen = reqA() # 生成一个生成器
next(gen) # 执行reqA
reqB()
while 1:
time.sleep(0.5)
pass
if __name__ == "__main__":
main()问题:版本 1 中在调用 reqA 的时候不能将其视为一个简单的函数,而是需要作为生成器来用
版本 2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60import time
import threading
gen = None
# handler获取数据(数据库、其他服务器、循环耗时)
def longIo():
def run():
print("开始耗时操作")
time.sleep(5)
try:
global gen
gen.send("sunck is a good man")
except StopIteration as e:
pass
threading.Thread(target=run).start()
def genCoroutine(func):
def wrapper(*args, **kwargs):
global gen
gen = func()
next(gen)
return wrapper
# 一个客户单的请求
def reqA():
print("开始处理reqA")
res = yield longIo()
print("接收到longIo的响应数据:", res)
print("结束处理reqA")
# 另一个客户端的请求
def reqB():
print("开始处理reqB")
time.sleep(2)
print("结束处理reqB")
# tornado服务
def main():
# global gen
# gen = reqA() #生成一个生成器
# next(gen) #执行reqA
reqA()
reqB()
while 1:
time.sleep(0.5)
pass
if __name__ == "__main__":
main()
问题:版本 2 中存在一个全局的 gen 变量,需要消除
版本 3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58import time
import threading
def genCoroutine(func):
def wrapper(*args, **kwargs):
gen1 = func() # reqA的生成器
gen2 = next(gen1) # longIo的生成器
def run(g):
res = next(g)
try:
gen1.send(res) # 返回给reqA数据
except StopIteration as e:
pass
threading.Thread(target=run, args=(gen2,)).start()
return wrapper
# handler获取数据(数据库、其他服务器、循环耗时)
def longIo():
print("开始耗时操作")
time.sleep(5)
print("结束耗时操作")
# 返回数据
yield "sunck is a good man"
# 一个客户单的请求
def reqA():
print("开始处理reqA")
res = yield longIo()
print("接收到longIo的响应数据:", res)
print("结束处理reqA")
# 另一个客户端的请求
def reqB():
print("开始处理reqB")
time.sleep(2)
print("结束处理reqB")
# tornado服务
def main():
reqA()
reqB()
while 1:
time.sleep(0.1)
pass
if __name__ == "__main__":
main()
概述
因为 epoll 主要是用来解决网络 IO 的并发问题,所以 Tornado 的异步也是主要体现在网络的 IO 异步上,即异步 Web 请求
tornado.httpclient.AsyncHTTPClient
Tornado 提供的异步 Web 请求客户端,用来进行异步 Web 请求
fetch(request, callback=None)
- 用于执行一个 Web 请求,并异步响应返回一个
tornado.httpclient.HttpResponse
- request 可以是一个 url,也可以是一个
tornado.httpclient.HTTPRequest
对象,如果插入的是 url,会自动生成一个 request 对象
HTTPRequest
- HTTP 请求类,该类的构造函数可以接收参数
- 参数
url
:字符串类型,要访问的网址,必传method
:字符串类型,HTTP 请求方式headers
:字典或者 HTTPHeaders(附加协议头)body
:HTTP 请求体
HTTPResponse
- HTTP 响应类
- 属性
code
:状态码reason
:状态码的描述body
:响应的数据error
:异常
@tornado.web.asynchronous
不关闭通信的通道
示例
测试接口
1
https://movie.douban.com/j/search_subjects?type=movie&tag=%E7%83%AD%E9%97%A8&page_limit=50&page_start=0
回调函数实现异步
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16class Students1Handler(RequestHandler):
def on_response(self, response):
if response.error:
self.send_error(500)
else:
data = json.loads(response.body)
self.write(data)
self.finish()
# 不关闭通信的通道
def get(self, *args, **kwargs):
# 获取所有的信息
url = 'https://movie.douban.com/j/search_subjects?type=movie&tag=%E7%83%AD%E9%97%A8&page_limit=50&page_start=0'
# 创建客户端
client = AsyncHTTPClient()
client.fetch(url, self.on_response)如果运行报错:
AttributeError: module 'tornado.web' has no attribute 'asynchronous'
,请卸载原来的 tornado,然后安装pip install tornado==5.1.1
协程实现异步
1
2
3
4
5
6
7
8
9
10
11class Students2Handler(RequestHandler):
def get(self, *args, **kwargs):
url = 'https://movie.douban.com/j/search_subjects?type=movie&tag=%E7%83%AD%E9%97%A8&page_limit=50&page_start=0'
client = AsyncHTTPClient()
res = yield client.fetch(url)
if res.error:
self.send_error(500)
else:
data = json.loads(res.body)
self.write(data)将异步 Web 请求单独出来
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16class Students3Handler(RequestHandler):
def get(self, *args, **kwargs):
res = yield self.getData()
self.write(res)
def getData(self):
url = 'https://movie.douban.com/j/search_subjects?type=movie&tag=%E7%83%AD%E9%97%A8&page_limit=50&page_start=0'
client = AsyncHTTPClient()
res = yield client.fetch(url)
if res.error:
ret = {"ret": 0}
else:
ret = json.loads(res.body)
raise tornado.gen.Return(ret)异步 mysql 图示
实时获取消息
- 前端轮询:有数据立即回复,没数据就回复没数据
- 长轮询:没有数据改变时不做任何响应
- WebSocket
概述
- WebSocket 是 HTML5 规范中提出新的客户端 - 服务器通信协议,该协议本身使用新的
ws://url
- WebSocket 是独立的、创建在 TCP 协议之上的协议,和 HTTP 的唯一的关系是使用了 HTTP 协议的 101 状态码进行协议切换。使用 TCP 的默认端口 80,可以绕过大多数防火墙
- WebSocket 使客户端与服务端之间的数据交互变得更加简单,允许服务器直接向客户端推送数据而不需要客户端的请求。两者可以建立持久链接,并且数据可以双向通信
- 目前大多数主流浏览器都已经支持 WebSocket
Tornado 的 WebSocket 模块
WebSocketHandle
:处理通信open()
:当一个 WebSocket 连接建立后被调用on_message(message)
:当客户端发送消息过来时调用on_close()
:当 WebSocket 连接关闭后调用write_message(message, binary=False)
:主动向客户端发送 message 消息,message 可以是字符串或者字典 (自动转为 Json 字符串)。如果 binary 为 False,则 message 会以 UT-8 编码发送。如果为 True,可以发送二进制模式,字节码。close()
:关闭 WebSocket 连接check_origin(origin)
:判断源 origin,对于符合条件的请求源允许连接
前端代码
1 |
|
服务器代码
1 | import tornado.web |