Tornado - 同步与异步以及使用 WebSocket 实现在线聊天

同步与异步

同步

  • 按部就班的依次执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    import time


    # 一个客户单的请求
    def reqA():
    print("开始处理reqA")
    print("结束处理reqA")


    # 另一个客户端的请求
    def reqB():
    print("开始处理reqB")
    print("结束处理reqB")


    # tornado服务
    def main():
    reqA()
    reqB()
    while 1:
    time.sleep(0.5)
    pass


    if __name__ == "__main__":
    main()

  • 在请求中添加一个耗时操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    import time


    # handler获取数据(数据库、其他服务器、循环耗时)
    def longIo():
    print("开始耗时操作")
    time.sleep(5)
    print("结束耗时操作")
    return "sunck is a good man"


    # 一个客户单的请求
    def reqA():
    print("开始处理reqA")
    res = longIo()
    print("接收到longIo的响应数据:", res)
    print("结束处理reqA")


    # 另一个客户端的请求
    def reqB():
    print("开始处理reqB")
    time.sleep(2)
    print("结束处理reqB")


    # tornado服务
    def main():
    reqA()
    reqB()
    while 1:
    time.sleep(0.5)
    pass


    if __name__ == "__main__":
    main()

异步(2 种实现方法)

  • 概述

    对于耗时的操作,会交给别人 (另一个线程) 去处理,我们继续向下去执行,当比人结束耗时操作后再将结果反馈给我们

  1. 回调函数实现异步

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    import time
    import threading


    # handler获取数据(数据库、其他服务器、循环耗时)
    def longIo(callback):
    def run(cb):
    print("开始耗时操作")
    time.sleep(5)
    print("结束耗时操作")
    cb("sunck is a good man")

    threading.Thread(target=run, args=(callback,)).start()


    # 函数(回调函数)
    def finish(data):
    print("开始处理回调函数")
    print("接收到longIo的响应数据:", data)
    print("结束处理回调函数")


    # 一个客户单的请求
    def reqA():
    print("开始处理reqA")
    longIo(finish)
    print("结束处理reqA")


    # 另一个客户端的请求
    def reqB():
    print("开始处理reqB")
    time.sleep(2)
    print("结束处理reqB")


    # tornado服务
    def main():
    reqA()
    reqB()
    while 1:
    time.sleep(0.5)
    pass


    if __name__ == "__main__":
    main()

  2. 协程实现异步

    • 版本 1

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      import time
      import threading

      gen = None


      # handler获取数据(数据库、其他服务器、循环耗时)
      def longIo():
      def run():
      print("开始耗时操作")
      time.sleep(5)
      try:
      global gen
      gen.send("sunck is a good man")
      except StopIteration as e:
      pass

      threading.Thread(target=run).start()


      # 一个客户单的请求
      def reqA():
      print("开始处理reqA")
      res = yield longIo()
      print("接收到longIo的响应数据:", res)
      print("结束处理reqA")


      # 另一个客户端的请求
      def reqB():
      print("开始处理reqB")
      time.sleep(2)
      print("结束处理reqB")


      # tornado服务
      def main():
      global gen
      gen = reqA() # 生成一个生成器
      next(gen) # 执行reqA

      reqB()
      while 1:
      time.sleep(0.5)
      pass


      if __name__ == "__main__":
      main()

      问题:版本 1 中在调用 reqA 的时候不能将其视为一个简单的函数,而是需要作为生成器来用

    • 版本 2

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
         import time
      import threading

      gen = None


      # handler获取数据(数据库、其他服务器、循环耗时)
      def longIo():
      def run():
      print("开始耗时操作")
      time.sleep(5)
      try:
      global gen
      gen.send("sunck is a good man")
      except StopIteration as e:
      pass

      threading.Thread(target=run).start()


      def genCoroutine(func):
      def wrapper(*args, **kwargs):
      global gen
      gen = func()
      next(gen)

      return wrapper


      # 一个客户单的请求
      @genCoroutine
      def reqA():
      print("开始处理reqA")
      res = yield longIo()
      print("接收到longIo的响应数据:", res)
      print("结束处理reqA")


      # 另一个客户端的请求
      def reqB():
      print("开始处理reqB")
      time.sleep(2)
      print("结束处理reqB")


      # tornado服务
      def main():
      # global gen
      # gen = reqA() #生成一个生成器
      # next(gen) #执行reqA
      reqA()
      reqB()
      while 1:
      time.sleep(0.5)
      pass


      if __name__ == "__main__":
      main()

      问题:版本 2 中存在一个全局的 gen 变量,需要消除

    • 版本 3

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      import time
      import threading


      def genCoroutine(func):
      def wrapper(*args, **kwargs):
      gen1 = func() # reqA的生成器
      gen2 = next(gen1) # longIo的生成器

      def run(g):
      res = next(g)
      try:
      gen1.send(res) # 返回给reqA数据
      except StopIteration as e:
      pass

      threading.Thread(target=run, args=(gen2,)).start()

      return wrapper


      # handler获取数据(数据库、其他服务器、循环耗时)
      def longIo():
      print("开始耗时操作")
      time.sleep(5)
      print("结束耗时操作")
      # 返回数据
      yield "sunck is a good man"


      # 一个客户单的请求
      @genCoroutine
      def reqA():
      print("开始处理reqA")
      res = yield longIo()
      print("接收到longIo的响应数据:", res)
      print("结束处理reqA")


      # 另一个客户端的请求
      def reqB():
      print("开始处理reqB")
      time.sleep(2)
      print("结束处理reqB")


      # tornado服务
      def main():
      reqA()
      reqB()
      while 1:
      time.sleep(0.1)
      pass


      if __name__ == "__main__":
      main()

Tornado 中的异步

概述

因为 epoll 主要是用来解决网络 IO 的并发问题,所以 Tornado 的异步也是主要体现在网络的 IO 异步上,即异步 Web 请求

tornado.httpclient.AsyncHTTPClient

Tornado 提供的异步 Web 请求客户端,用来进行异步 Web 请求

fetch(request, callback=None)

  • 用于执行一个 Web 请求,并异步响应返回一个 tornado.httpclient.HttpResponse
  • request 可以是一个 url,也可以是一个 tornado.httpclient.HTTPRequest 对象,如果插入的是 url,会自动生成一个 request 对象

HTTPRequest

  • HTTP 请求类,该类的构造函数可以接收参数
  • 参数
    • url:字符串类型,要访问的网址,必传
    • method:字符串类型,HTTP 请求方式
    • headers:字典或者 HTTPHeaders(附加协议头)
    • body:HTTP 请求体

HTTPResponse

  • HTTP 响应类
  • 属性
    • code:状态码
    • reason:状态码的描述
    • body:响应的数据
    • error:异常

@tornado.web.asynchronous

不关闭通信的通道

示例

  • 测试接口

    1
    https://movie.douban.com/j/search_subjects?type=movie&tag=%E7%83%AD%E9%97%A8&page_limit=50&page_start=0
  • 回调函数实现异步

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    class Students1Handler(RequestHandler):
    def on_response(self, response):
    if response.error:
    self.send_error(500)
    else:
    data = json.loads(response.body)
    self.write(data)
    self.finish()

    @tornado.web.asynchronous # 不关闭通信的通道
    def get(self, *args, **kwargs):
    # 获取所有的信息
    url = 'https://movie.douban.com/j/search_subjects?type=movie&tag=%E7%83%AD%E9%97%A8&page_limit=50&page_start=0'
    # 创建客户端
    client = AsyncHTTPClient()
    client.fetch(url, self.on_response)

    如果运行报错:AttributeError: module 'tornado.web' has no attribute 'asynchronous',请卸载原来的 tornado,然后安装 pip install tornado==5.1.1

  • 协程实现异步

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    class Students2Handler(RequestHandler):
    @tornado.gen.coroutine
    def get(self, *args, **kwargs):
    url = 'https://movie.douban.com/j/search_subjects?type=movie&tag=%E7%83%AD%E9%97%A8&page_limit=50&page_start=0'
    client = AsyncHTTPClient()
    res = yield client.fetch(url)
    if res.error:
    self.send_error(500)
    else:
    data = json.loads(res.body)
    self.write(data)
  • 将异步 Web 请求单独出来

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    class Students3Handler(RequestHandler):
    @tornado.gen.coroutine
    def get(self, *args, **kwargs):
    res = yield self.getData()
    self.write(res)

    @tornado.gen.coroutine
    def getData(self):
    url = 'https://movie.douban.com/j/search_subjects?type=movie&tag=%E7%83%AD%E9%97%A8&page_limit=50&page_start=0'
    client = AsyncHTTPClient()
    res = yield client.fetch(url)
    if res.error:
    ret = {"ret": 0}
    else:
    ret = json.loads(res.body)
    raise tornado.gen.Return(ret)
  • 异步 mysql 图示

    异步MySQL

WebSocket

实时获取消息

  • 前端轮询:有数据立即回复,没数据就回复没数据
  • 长轮询:没有数据改变时不做任何响应
  • WebSocket

概述

  • WebSocket 是 HTML5 规范中提出新的客户端 - 服务器通信协议,该协议本身使用新的 ws://url
  • WebSocket 是独立的、创建在 TCP 协议之上的协议,和 HTTP 的唯一的关系是使用了 HTTP 协议的 101 状态码进行协议切换。使用 TCP 的默认端口 80,可以绕过大多数防火墙
  • WebSocket 使客户端与服务端之间的数据交互变得更加简单,允许服务器直接向客户端推送数据而不需要客户端的请求。两者可以建立持久链接,并且数据可以双向通信
  • 目前大多数主流浏览器都已经支持 WebSocket

Tornado 的 WebSocket 模块

  • WebSocketHandle:处理通信
  • open():当一个 WebSocket 连接建立后被调用
  • on_message(message):当客户端发送消息过来时调用
  • on_close():当 WebSocket 连接关闭后调用
  • write_message(message, binary=False):主动向客户端发送 message 消息,message 可以是字符串或者字典 (自动转为 Json 字符串)。如果 binary 为 False,则 message 会以 UT-8 编码发送。如果为 True,可以发送二进制模式,字节码。
  • close():关闭 WebSocket 连接
  • check_origin(origin):判断源 origin,对于符合条件的请求源允许连接

前端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>聊天界面</title>
<script type="text/javascript" charset="utf-8" src="{{static_url('js/jquery.min.js')}}"></script>
</head>
<body>
<div id="contents" style="width:500px;height:500px;overflow:auto"></div>
<div>
<input type="text" id="message"/>
<button onclick="sendMessage()">发送</button>
</div>
<script>
//建立WebSocket链接
var ws = new WebSocket("ws://127.0.0.1:8000/chat")
//接收服务器的消息
ws.onmessage = function (e) {
$('#contents').append("<p>" + e.data + "</p>")
}

//向服务器发送消息
function sendMessage() {
var message = $('#message').val()
ws.send(message)
$('#message').val("")
}
</script>
</body>
</html>

服务器代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import tornado.web
from tornado.web import RequestHandler
from tornado.websocket import WebSocketHandler


class StaticFileHandler(tornado.web.StaticFileHandler):
def __init__(self, *args, **kwargs):
super(StaticFileHandler, self).__init__(*args, **kwargs)
self.xsrf_token


class HomeHandler(RequestHandler):
def get(self, *args, **kwargs):
self.render('home.html')


class ChatHandler(WebSocketHandler):
users = []

def open(self):
self.users.append(self)
for user in self.users:
user.write_message(u"[%s]登陆了" % (self.request.remote_ip))

def on_message(self, message):
for user in self.users:
user.write_message(u"[%s]说:%s" % (self.request.remote_ip, message))

def on_close(self):
self.users.remove(self)
for user in self.users:
user.write_message(u"[%s]退出了" % (self.request.remote_ip))

def check_origin(self, origin):
return True

注册界面

注册界面

登录界面

登录界面

聊天界面

聊天界面