进程

进程简介

  • 进程(任务)
    • 在计算机中,其实进程就是一个任务。
    • 在操作系统中,进程是程序执行和资源分配的基本单元。
  • 单核 CPU 实现多任务
    • 只是将 CPU 的时间快速的切换和分配到不同的任务上。
    • 主频足够高,切换足够快,人的肉眼无法分辨而已。
  • 多核 CPU 实现多任务
    • 如果任务的数量不超过 CPU 的核心数,完全可以实现一个核心只做一个任务。
    • 在操作系统中几乎是可能的,任务的数量往往远远大于核心数。
    • 同样的采用轮询的方式,轮流执行不同的任务,只是做任务的 “人” 有多个而已。

进程管理

  • 简单示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    import os
    import time
    from multiprocessing import Process


    def do_some_thing():
    print('子进程开始 :', os.getpid())
    print('父进程 :', os.getppid())
    time.sleep(1)
    print('子进程结束')


    # 启动子进程,会将父进程所在的文件再加载一次,将会造成无限循环下去,造成错误
    # 因此,通常将执行的代码放到下面的结构中
    if __name__ == '__main__':
    # 获取当前进程号
    print('主进程:', os.getpid())
    # 创建一个进程,指定任务(通过函数)
    # 参数介绍:
    # target:指定任务,一个函数
    # name:进程名
    # args和kwargs:传递给子进程任务函数的参数
    p = Process(target=do_some_thing)
    # 当主进程结束后子进程任然在运行,这样的进程叫僵尸进程(有风险)
    # 设置:当主进程结束时,结束子进程
    p.daemon = True
    # 启动进程
    p.start()
    # 等待主进程结束,在结束主进程,可以指定等待时间
    p.join()
    # 终止子进程
    # p.terminate()
    print('主进程结束')

进程锁

  • 说明:当多个进程操作同一资源时,可能会造成混乱,甚至错误。如:写文件等
  • 解决:通常我们可以通过加锁的方式解决
  • 示例:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    import os
    import time
    import multiprocessing


    def loop(label, lock):
    # 获取锁
    lock.acquire()
    time.sleep(1)
    # 中间执行的任务,不可能同时多个进程执行
    print(label, os.getpid())
    # 释放锁
    lock.release()


    if __name__ == '__main__':
    print('主进程:', os.getpid())

    # 创建进程锁
    lock = multiprocessing.Lock()
    # 创建子进程
    # 用于存储所有的进程
    recode = []
    for i in range(5):
    p = multiprocessing.Process(target=loop, args=('子进程:', lock))
    p.start()
    recode.append(p)

    # 等待进程结束
    for p in recode:
    p.join()

进程池

  • 说明:创建少量的进程可以通过创建 Process 对象完成。如果需要大量的进程创建和管理时就比较费劲了。

  • 解决:可以通过进程池加已解决,而且可以通过参数控制进程池中进程的并发数,提高 CPU 的利用率。

  • 操作:

    1
    2
    3
    4
    5
    1.创建进程池
    2.添加进程
    3.关闭进程池
    4.等待进程池结束
    5.设置回调
  • 示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    import time
    import random
    import multiprocessing


    # 进程任务
    def task(num):
    print(num, '开始')
    start = time.time()
    time.sleep(random.random() * 5)
    end = time.time()
    print(num, '执行了:', (end - start))
    return num


    # 进程函数结后会调用,参数是进程函数的返回值
    def callback(s):
    print(s, '结束')


    if __name__ == '__main__':
    # 获取CPU核心数
    print('核心:', multiprocessing.cpu_count())
    # 创建进程池,一般进程池中的进程数不超过CPU的核心数
    pool = multiprocessing.Pool(4)
    # 循环创建进程并添加到进程池中
    for i in range(5):
    # 参数:
    # func:任务函数
    # args:任务函数参数
    # callback:回调函数,进程结束时调用,参数是进程函数的返回值
    pool.apply_async(func=task, args=(i,), callback=callback)

    # 关闭进程池,关闭后就不能再添加进程了
    pool.close()
    # 等待进程池结束
    pool.join()
    print('主进程结束')

数据共享

  • 全局变量:全局变量不能共享

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    import multiprocessing

    num = 250
    lt = ['hello']


    # 进程任务
    def run():
    global num, lt
    print('子进程开始')
    num += 10
    lt.append('world')
    print('子进程:', num, lt)
    print('子进程结束')


    if __name__ == '__main__':
    print('主进程开始:', num, lt)
    p = multiprocessing.Process(target=run)
    p.start()
    p.join()
    print('主进程结束:', num, lt)
  • 管道(pipe)

    • 说明:创建管道时,得到两个链接

      1
      2
      创建管道,默认是全双工的,两边都可以收发
      duplex=False,是半双工,p_a只能收,p_b只能发
    • 示例:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      import multiprocessing


      # 进程任务
      def run(p_a):
      # 子进程给主进程发数据
      # p_a.send(['a', 'b', 'c', 'd', 'e'])
      recv = p_a.recv()
      print('子进程接收到:', recv)


      if __name__ == '__main__':
      # 创建管道,默认是全双工的,两边都可以收发
      # duplex=False,是半双工,p_a只能收,p_b只能发
      p_a, p_b = multiprocessing.Pipe(duplex=False)
      p = multiprocessing.Process(target=run, args=(p_a,))
      p.start()
      # 主进程向子进程发数据
      p_b.send([1, 2, 3, 4, 5])
      p.join()
      # print('主进程接收到:', p_b.recv())
      print('主进程结束')
  • 对列(queue)

    • 示例:介绍

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      import multiprocessing

      if __name__ == '__main__':
      # 创建对列,先进先出
      q = multiprocessing.Queue(3)
      # 判断对列是否为空
      print(q.empty())
      # 判断对列是否已满
      print(q.full())
      # 压入数据
      q.put('hello')
      q.put('world')
      q.put('xxx')
      # 对列已满时,再添加数据会阻塞,设置不会阻塞会报错
      # q.put('yyy', False)
      # 获取对列长度
      print(q.qsize())
      # 读取数据
      print(q.get())
      print(q.get())
      print(q.get())
      # 对列为空时,读取也会阻塞
      # print(q.get())
      print('over')
      # 关闭对列
      q.close()
    • 示例:演示

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      import os
      import time
      import multiprocessing


      # 获取数据
      def get_data(queue):
      data = queue.get()
      print('读取数据:', data)


      # 写入数据
      def put_data(queue):
      # 拼接数据
      data = str(os.getpid()) + ' ' + str(time.time())
      print('压入数据:', data)
      queue.put(data)


      if __name__ == '__main__':
      # 创建对列
      q = multiprocessing.Queue(3)
      # 创建5个进程用于写数据
      record1 = []
      for i in range(5):
      p = multiprocessing.Process(target=put_data, args=(q,))
      p.start()
      record1.append(p)

      # 创建5个进程用于读数据
      record2 = []
      for i in range(5):
      p = multiprocessing.Process(target=get_data, args=(q,))
      p.start()
      record2.append(p)

      for p in record1:
      p.join()

      for p in record2:
      p.join()
  • 共享内存

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    import multiprocessing
    from ctypes import c_char_p


    def run(v, s, a, l, d):
    print('子进程:', v.value)
    v.value += 10
    print('子进程:', s.value)
    s.value = b'world'
    print('子进程:', a[0])
    a[0] = 5
    l.append('大哥,您收好!')
    d['name'] = 'xiaoming'


    if __name__ == '__main__':
    # 共享内存,可以共享不同类型的数据
    server = multiprocessing.Manager()
    # 整数 i,小数 f
    v = server.Value('i', 250)
    # 字符串
    s = server.Value(c_char_p, b'hello')
    # 数组,相当于列表
    a = server.Array('i', range(5))
    # 列表
    l = server.list()
    # 字典
    d = server.dict()

    p = multiprocessing.Process(target=run, args=(v, s, a, l, d))

    p.start()
    p.join()

    print('主进程:', v.value)
    print('主进程:', s.value)
    print('主进程:', a[0])
    print('主进程:', l)
    print('主进程:', d)

自定义进程类

  • 说明:继承自 Process 类,实现 run 方法,start 后自动执行 run 方法
  • 示例:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    import time
    from multiprocessing import Process


    class MyProcess(Process):
    def __init__(self, delay):
    super().__init__()
    self.delay = delay

    # 实现该方法,进程start后自动调用
    def run(self):
    for i in range(3):
    print('子进程运行中...')
    time.sleep(self.delay)


    if __name__ == '__main__':
    p = MyProcess(1)
    p.start()
    p.join()
    print('子进程结束')