Tornado-同步与异步以及使用WebSocket实现在线聊天
同步与异步
同步
-
按部就班的依次执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27import time
# 一个客户单的请求
def reqA():
print("开始处理reqA")
print("结束处理reqA")
# 另一个客户端的请求
def reqB():
print("开始处理reqB")
print("结束处理reqB")
# tornado服务
def main():
reqA()
reqB()
while 1:
time.sleep(0.5)
pass
if __name__ == "__main__":
main()
-
在请求中添加一个耗时操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38import time
# handler获取数据(数据库、其他服务器、循环耗时)
def longIo():
print("开始耗时操作")
time.sleep(5)
print("结束耗时操作")
return "sunck is a good man"
# 一个客户单的请求
def reqA():
print("开始处理reqA")
res = longIo()
print("接收到longIo的响应数据:", res)
print("结束处理reqA")
# 另一个客户端的请求
def reqB():
print("开始处理reqB")
time.sleep(2)
print("结束处理reqB")
# tornado服务
def main():
reqA()
reqB()
while 1:
time.sleep(0.5)
pass
if __name__ == "__main__":
main()
异步(2种实现方法)
-
概述
对于耗时的操作,会交给别人(另一个线程)去处理,我们继续向下去执行,当比人结束耗时操作后再将结果反馈给我们
-
回调函数实现异步
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48import time
import threading
# handler获取数据(数据库、其他服务器、循环耗时)
def longIo(callback):
def run(cb):
print("开始耗时操作")
time.sleep(5)
print("结束耗时操作")
cb("sunck is a good man")
threading.Thread(target=run, args=(callback,)).start()
# 函数(回调函数)
def finish(data):
print("开始处理回调函数")
print("接收到longIo的响应数据:", data)
print("结束处理回调函数")
# 一个客户单的请求
def reqA():
print("开始处理reqA")
longIo(finish)
print("结束处理reqA")
# 另一个客户端的请求
def reqB():
print("开始处理reqB")
time.sleep(2)
print("结束处理reqB")
# tornado服务
def main():
reqA()
reqB()
while 1:
time.sleep(0.5)
pass
if __name__ == "__main__":
main() -
协程实现异步
-
版本1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50import time
import threading
gen = None
# handler获取数据(数据库、其他服务器、循环耗时)
def longIo():
def run():
print("开始耗时操作")
time.sleep(5)
try:
global gen
gen.send("sunck is a good man")
except StopIteration as e:
pass
threading.Thread(target=run).start()
# 一个客户单的请求
def reqA():
print("开始处理reqA")
res = yield longIo()
print("接收到longIo的响应数据:", res)
print("结束处理reqA")
# 另一个客户端的请求
def reqB():
print("开始处理reqB")
time.sleep(2)
print("结束处理reqB")
# tornado服务
def main():
global gen
gen = reqA() # 生成一个生成器
next(gen) # 执行reqA
reqB()
while 1:
time.sleep(0.5)
pass
if __name__ == "__main__":
main()问题:版本1中在调用reqA的时候不能将其视为一个简单的函数,而是需要作为生成器来用
-
版本2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60import time
import threading
gen = None
# handler获取数据(数据库、其他服务器、循环耗时)
def longIo():
def run():
print("开始耗时操作")
time.sleep(5)
try:
global gen
gen.send("sunck is a good man")
except StopIteration as e:
pass
threading.Thread(target=run).start()
def genCoroutine(func):
def wrapper(*args, **kwargs):
global gen
gen = func()
next(gen)
return wrapper
# 一个客户单的请求
def reqA():
print("开始处理reqA")
res = yield longIo()
print("接收到longIo的响应数据:", res)
print("结束处理reqA")
# 另一个客户端的请求
def reqB():
print("开始处理reqB")
time.sleep(2)
print("结束处理reqB")
# tornado服务
def main():
# global gen
# gen = reqA() #生成一个生成器
# next(gen) #执行reqA
reqA()
reqB()
while 1:
time.sleep(0.5)
pass
if __name__ == "__main__":
main()
问题:版本2中存在一个全局的gen变量,需要消除
-
版本3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58import time
import threading
def genCoroutine(func):
def wrapper(*args, **kwargs):
gen1 = func() # reqA的生成器
gen2 = next(gen1) # longIo的生成器
def run(g):
res = next(g)
try:
gen1.send(res) # 返回给reqA数据
except StopIteration as e:
pass
threading.Thread(target=run, args=(gen2,)).start()
return wrapper
# handler获取数据(数据库、其他服务器、循环耗时)
def longIo():
print("开始耗时操作")
time.sleep(5)
print("结束耗时操作")
# 返回数据
yield "sunck is a good man"
# 一个客户单的请求
def reqA():
print("开始处理reqA")
res = yield longIo()
print("接收到longIo的响应数据:", res)
print("结束处理reqA")
# 另一个客户端的请求
def reqB():
print("开始处理reqB")
time.sleep(2)
print("结束处理reqB")
# tornado服务
def main():
reqA()
reqB()
while 1:
time.sleep(0.1)
pass
if __name__ == "__main__":
main()
-
Tornado中的异步
概述
因为epoll主要是用来解决网络IO的并发问题,所以Tornado的异步也是主要体现在网络的IO异步上,即异步Web请求
tornado.httpclient.AsyncHTTPClient
Tornado提供的异步Web请求客户端,用来进行异步Web请求
fetch(request, callback=None)
- 用于执行一个Web请求,并异步响应返回一个
tornado.httpclient.HttpResponse
- request可以是一个url,也可以是一个
tornado.httpclient.HTTPRequest
对象,如果插入的是url,会自动生成一个request对象
HTTPRequest
- HTTP请求类,该类的构造函数可以接收参数
- 参数
url
:字符串类型,要访问的网址,必传method
:字符串类型,HTTP请求方式headers
:字典或者HTTPHeaders(附加协议头)body
:HTTP请求体
HTTPResponse
- HTTP响应类
- 属性
code
:状态码reason
:状态码的描述body
:响应的数据error
:异常
@tornado.web.asynchronous
不关闭通信的通道
示例
-
测试接口
1
https://movie.douban.com/j/search_subjects?type=movie&tag=%E7%83%AD%E9%97%A8&page_limit=50&page_start=0
-
回调函数实现异步
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16class Students1Handler(RequestHandler):
def on_response(self, response):
if response.error:
self.send_error(500)
else:
data = json.loads(response.body)
self.write(data)
self.finish()
# 不关闭通信的通道
def get(self, *args, **kwargs):
# 获取所有的信息
url = 'https://movie.douban.com/j/search_subjects?type=movie&tag=%E7%83%AD%E9%97%A8&page_limit=50&page_start=0'
# 创建客户端
client = AsyncHTTPClient()
client.fetch(url, self.on_response)如果运行报错:
AttributeError: module 'tornado.web' has no attribute 'asynchronous'
,请卸载原来的tornado,然后安装pip install tornado==5.1.1
-
协程实现异步
1
2
3
4
5
6
7
8
9
10
11class Students2Handler(RequestHandler):
def get(self, *args, **kwargs):
url = 'https://movie.douban.com/j/search_subjects?type=movie&tag=%E7%83%AD%E9%97%A8&page_limit=50&page_start=0'
client = AsyncHTTPClient()
res = yield client.fetch(url)
if res.error:
self.send_error(500)
else:
data = json.loads(res.body)
self.write(data) -
将异步Web请求单独出来
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16class Students3Handler(RequestHandler):
def get(self, *args, **kwargs):
res = yield self.getData()
self.write(res)
def getData(self):
url = 'https://movie.douban.com/j/search_subjects?type=movie&tag=%E7%83%AD%E9%97%A8&page_limit=50&page_start=0'
client = AsyncHTTPClient()
res = yield client.fetch(url)
if res.error:
ret = {"ret": 0}
else:
ret = json.loads(res.body)
raise tornado.gen.Return(ret) -
异步mysql图示
WebSocket
实时获取消息
- 前端轮询:有数据立即回复,没数据就回复没数据
- 长轮询:没有数据改变时不做任何响应
- WebSocket
概述
- WebSocket是HTML5规范中提出新的客户端-服务器通信协议,该协议本身使用新的
ws://url
- WebSocket是独立的、创建在TCP协议之上的协议,和HTTP的唯一的关系是使用了HTTP协议的101状态码进行协议切换。使用TCP的默认端口80,可以绕过大多数防火墙
- WebSocket使客户端与服务端之间的数据交互变得更加简单,允许服务器直接向客户端推送数据而不需要客户端的请求。两者可以建立持久链接,并且数据可以双向通信
- 目前大多数主流浏览器都已经支持WebSocket
Tornado的WebSocket模块
WebSocketHandle
:处理通信open()
:当一个WebSocket连接建立后被调用on_message(message)
:当客户端发送消息过来时调用on_close()
:当WebSocket连接关闭后调用write_message(message, binary=False)
:主动向客户端发送message消息,message可以是字符串或者字典(自动转为Json字符串)。如果binary为False,则message会以UT-8编码发送。如果为True,可以发送二进制模式,字节码。close()
:关闭WebSocket连接check_origin(origin)
:判断源origin,对于符合条件的请求源允许连接
前端代码
1 |
|
服务器代码
1 | import tornado.web |