进程
进程简介
- 进程(任务)
- 在计算机中,其实进程就是一个任务。
- 在操作系统中,进程是程序执行和资源分配的基本单元。
 
- 单核CPU实现多任务
- 只是将CPU的时间快速的切换和分配到不同的任务上。
- 主频足够高,切换足够快,人的肉眼无法分辨而已。
 
- 多核CPU实现多任务
- 如果任务的数量不超过CPU的核心数,完全可以实现一个核心只做一个任务。
- 在操作系统中几乎是可能的,任务的数量往往远远大于核心数。
- 同样的采用轮询的方式,轮流执行不同的任务,只是做任务的“人”有多个而已。
 
进程管理
- 
简单示例: 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33import os 
 import time
 from multiprocessing import Process
 def do_some_thing():
 print('子进程开始 :', os.getpid())
 print('父进程 :', os.getppid())
 time.sleep(1)
 print('子进程结束')
 # 启动子进程,会将父进程所在的文件再加载一次,将会造成无限循环下去,造成错误
 # 因此,通常将执行的代码放到下面的结构中
 if __name__ == '__main__':
 # 获取当前进程号
 print('主进程:', os.getpid())
 # 创建一个进程,指定任务(通过函数)
 # 参数介绍:
 # target:指定任务,一个函数
 # name:进程名
 # args和kwargs:传递给子进程任务函数的参数
 p = Process(target=do_some_thing)
 # 当主进程结束后子进程任然在运行,这样的进程叫僵尸进程(有风险)
 # 设置:当主进程结束时,结束子进程
 p.daemon = True
 # 启动进程
 p.start()
 # 等待主进程结束,在结束主进程,可以指定等待时间
 p.join()
 # 终止子进程
 # p.terminate()
 print('主进程结束')
进程锁
- **说明:**当多个进程操作同一资源时,可能会造成混乱,甚至错误。如:写文件等
- **解决:**通常我们可以通过加锁的方式解决
- 示例:1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31import os 
 import time
 import multiprocessing
 def loop(label, lock):
 # 获取锁
 lock.acquire()
 time.sleep(1)
 # 中间执行的任务,不可能同时多个进程执行
 print(label, os.getpid())
 # 释放锁
 lock.release()
 if __name__ == '__main__':
 print('主进程:', os.getpid())
 # 创建进程锁
 lock = multiprocessing.Lock()
 # 创建子进程
 # 用于存储所有的进程
 recode = []
 for i in range(5):
 p = multiprocessing.Process(target=loop, args=('子进程:', lock))
 p.start()
 recode.append(p)
 # 等待进程结束
 for p in recode:
 p.join()
进程池
- 
**说明:**创建少量的进程可以通过创建Process对象完成。如果需要大量的进程创建和管理时就比较费劲了。 
- 
**解决:**可以通过进程池加已解决,而且可以通过参数控制进程池中进程的并发数,提高CPU的利用率。 
- 
操作: 1 
 2
 3
 4
 51.创建进程池 
 2.添加进程
 3.关闭进程池
 4.等待进程池结束
 5.设置回调
- 
示例: 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38import time 
 import random
 import multiprocessing
 # 进程任务
 def task(num):
 print(num, '开始')
 start = time.time()
 time.sleep(random.random() * 5)
 end = time.time()
 print(num, '执行了:', (end - start))
 return num
 # 进程函数结后会调用,参数是进程函数的返回值
 def callback(s):
 print(s, '结束')
 if __name__ == '__main__':
 # 获取CPU核心数
 print('核心:', multiprocessing.cpu_count())
 # 创建进程池,一般进程池中的进程数不超过CPU的核心数
 pool = multiprocessing.Pool(4)
 # 循环创建进程并添加到进程池中
 for i in range(5):
 # 参数:
 # func:任务函数
 # args:任务函数参数
 # callback:回调函数,进程结束时调用,参数是进程函数的返回值
 pool.apply_async(func=task, args=(i,), callback=callback)
 # 关闭进程池,关闭后就不能再添加进程了
 pool.close()
 # 等待进程池结束
 pool.join()
 print('主进程结束')
数据共享
- 
全局变量:全局变量不能共享1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22import multiprocessing 
 num = 250
 lt = ['hello']
 # 进程任务
 def run():
 global num, lt
 print('子进程开始')
 num += 10
 lt.append('world')
 print('子进程:', num, lt)
 print('子进程结束')
 if __name__ == '__main__':
 print('主进程开始:', num, lt)
 p = multiprocessing.Process(target=run)
 p.start()
 p.join()
 print('主进程结束:', num, lt)
- 
管道(pipe)- 
**说明:**创建管道时,得到两个链接 1 
 2创建管道,默认是全双工的,两边都可以收发 
 duplex=False,是半双工,p_a只能收,p_b只能发
- 
示例: 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22import multiprocessing 
 # 进程任务
 def run(p_a):
 # 子进程给主进程发数据
 # p_a.send(['a', 'b', 'c', 'd', 'e'])
 recv = p_a.recv()
 print('子进程接收到:', recv)
 if __name__ == '__main__':
 # 创建管道,默认是全双工的,两边都可以收发
 # duplex=False,是半双工,p_a只能收,p_b只能发
 p_a, p_b = multiprocessing.Pipe(duplex=False)
 p = multiprocessing.Process(target=run, args=(p_a,))
 p.start()
 # 主进程向子进程发数据
 p_b.send([1, 2, 3, 4, 5])
 p.join()
 # print('主进程接收到:', p_b.recv())
 print('主进程结束')
 
- 
- 
对列(queue)- 
**示例:**介绍 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26import multiprocessing 
 if __name__ == '__main__':
 # 创建对列,先进先出
 q = multiprocessing.Queue(3)
 # 判断对列是否为空
 print(q.empty())
 # 判断对列是否已满
 print(q.full())
 # 压入数据
 q.put('hello')
 q.put('world')
 q.put('xxx')
 # 对列已满时,再添加数据会阻塞,设置不会阻塞会报错
 # q.put('yyy', False)
 # 获取对列长度
 print(q.qsize())
 # 读取数据
 print(q.get())
 print(q.get())
 print(q.get())
 # 对列为空时,读取也会阻塞
 # print(q.get())
 print('over')
 # 关闭对列
 q.close()
- 
**示例:**演示 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41import os 
 import time
 import multiprocessing
 # 获取数据
 def get_data(queue):
 data = queue.get()
 print('读取数据:', data)
 # 写入数据
 def put_data(queue):
 # 拼接数据
 data = str(os.getpid()) + ' ' + str(time.time())
 print('压入数据:', data)
 queue.put(data)
 if __name__ == '__main__':
 # 创建对列
 q = multiprocessing.Queue(3)
 # 创建5个进程用于写数据
 record1 = []
 for i in range(5):
 p = multiprocessing.Process(target=put_data, args=(q,))
 p.start()
 record1.append(p)
 # 创建5个进程用于读数据
 record2 = []
 for i in range(5):
 p = multiprocessing.Process(target=get_data, args=(q,))
 p.start()
 record2.append(p)
 for p in record1:
 p.join()
 for p in record2:
 p.join()
 
- 
- 
共享内存1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39import multiprocessing 
 from ctypes import c_char_p
 def run(v, s, a, l, d):
 print('子进程:', v.value)
 v.value += 10
 print('子进程:', s.value)
 s.value = b'world'
 print('子进程:', a[0])
 a[0] = 5
 l.append('大哥,您收好!')
 d['name'] = 'xiaoming'
 if __name__ == '__main__':
 # 共享内存,可以共享不同类型的数据
 server = multiprocessing.Manager()
 # 整数 i,小数 f
 v = server.Value('i', 250)
 # 字符串
 s = server.Value(c_char_p, b'hello')
 # 数组,相当于列表
 a = server.Array('i', range(5))
 # 列表
 l = server.list()
 # 字典
 d = server.dict()
 p = multiprocessing.Process(target=run, args=(v, s, a, l, d))
 p.start()
 p.join()
 print('主进程:', v.value)
 print('主进程:', s.value)
 print('主进程:', a[0])
 print('主进程:', l)
 print('主进程:', d)
自定义进程类
- **说明:**继承自Process类,实现run方法,start后自动执行run方法
- 示例:1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21import time 
 from multiprocessing import Process
 class MyProcess(Process):
 def __init__(self, delay):
 super().__init__()
 self.delay = delay
 # 实现该方法,进程start后自动调用
 def run(self):
 for i in range(3):
 print('子进程运行中...')
 time.sleep(self.delay)
 if __name__ == '__main__':
 p = MyProcess(1)
 p.start()
 p.join()
 print('子进程结束')