进程
进程简介
- 进程(任务)
- 在计算机中,其实进程就是一个任务。
- 在操作系统中,进程是程序执行和资源分配的基本单元。
- 单核CPU实现多任务
- 只是将CPU的时间快速的切换和分配到不同的任务上。
- 主频足够高,切换足够快,人的肉眼无法分辨而已。
- 多核CPU实现多任务
- 如果任务的数量不超过CPU的核心数,完全可以实现一个核心只做一个任务。
- 在操作系统中几乎是可能的,任务的数量往往远远大于核心数。
- 同样的采用轮询的方式,轮流执行不同的任务,只是做任务的“人”有多个而已。
进程管理
-
简单示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33import os
import time
from multiprocessing import Process
def do_some_thing():
print('子进程开始 :', os.getpid())
print('父进程 :', os.getppid())
time.sleep(1)
print('子进程结束')
# 启动子进程,会将父进程所在的文件再加载一次,将会造成无限循环下去,造成错误
# 因此,通常将执行的代码放到下面的结构中
if __name__ == '__main__':
# 获取当前进程号
print('主进程:', os.getpid())
# 创建一个进程,指定任务(通过函数)
# 参数介绍:
# target:指定任务,一个函数
# name:进程名
# args和kwargs:传递给子进程任务函数的参数
p = Process(target=do_some_thing)
# 当主进程结束后子进程任然在运行,这样的进程叫僵尸进程(有风险)
# 设置:当主进程结束时,结束子进程
p.daemon = True
# 启动进程
p.start()
# 等待主进程结束,在结束主进程,可以指定等待时间
p.join()
# 终止子进程
# p.terminate()
print('主进程结束')
进程锁
- **说明:**当多个进程操作同一资源时,可能会造成混乱,甚至错误。如:写文件等
- **解决:**通常我们可以通过加锁的方式解决
- 示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31import os
import time
import multiprocessing
def loop(label, lock):
# 获取锁
lock.acquire()
time.sleep(1)
# 中间执行的任务,不可能同时多个进程执行
print(label, os.getpid())
# 释放锁
lock.release()
if __name__ == '__main__':
print('主进程:', os.getpid())
# 创建进程锁
lock = multiprocessing.Lock()
# 创建子进程
# 用于存储所有的进程
recode = []
for i in range(5):
p = multiprocessing.Process(target=loop, args=('子进程:', lock))
p.start()
recode.append(p)
# 等待进程结束
for p in recode:
p.join()
进程池
-
**说明:**创建少量的进程可以通过创建Process对象完成。如果需要大量的进程创建和管理时就比较费劲了。
-
**解决:**可以通过进程池加已解决,而且可以通过参数控制进程池中进程的并发数,提高CPU的利用率。
-
操作:
1
2
3
4
51.创建进程池
2.添加进程
3.关闭进程池
4.等待进程池结束
5.设置回调 -
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38import time
import random
import multiprocessing
# 进程任务
def task(num):
print(num, '开始')
start = time.time()
time.sleep(random.random() * 5)
end = time.time()
print(num, '执行了:', (end - start))
return num
# 进程函数结后会调用,参数是进程函数的返回值
def callback(s):
print(s, '结束')
if __name__ == '__main__':
# 获取CPU核心数
print('核心:', multiprocessing.cpu_count())
# 创建进程池,一般进程池中的进程数不超过CPU的核心数
pool = multiprocessing.Pool(4)
# 循环创建进程并添加到进程池中
for i in range(5):
# 参数:
# func:任务函数
# args:任务函数参数
# callback:回调函数,进程结束时调用,参数是进程函数的返回值
pool.apply_async(func=task, args=(i,), callback=callback)
# 关闭进程池,关闭后就不能再添加进程了
pool.close()
# 等待进程池结束
pool.join()
print('主进程结束')
数据共享
-
全局变量:全局变量不能共享
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22import multiprocessing
num = 250
lt = ['hello']
# 进程任务
def run():
global num, lt
print('子进程开始')
num += 10
lt.append('world')
print('子进程:', num, lt)
print('子进程结束')
if __name__ == '__main__':
print('主进程开始:', num, lt)
p = multiprocessing.Process(target=run)
p.start()
p.join()
print('主进程结束:', num, lt) -
管道(pipe)
-
**说明:**创建管道时,得到两个链接
1
2创建管道,默认是全双工的,两边都可以收发
duplex=False,是半双工,p_a只能收,p_b只能发 -
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22import multiprocessing
# 进程任务
def run(p_a):
# 子进程给主进程发数据
# p_a.send(['a', 'b', 'c', 'd', 'e'])
recv = p_a.recv()
print('子进程接收到:', recv)
if __name__ == '__main__':
# 创建管道,默认是全双工的,两边都可以收发
# duplex=False,是半双工,p_a只能收,p_b只能发
p_a, p_b = multiprocessing.Pipe(duplex=False)
p = multiprocessing.Process(target=run, args=(p_a,))
p.start()
# 主进程向子进程发数据
p_b.send([1, 2, 3, 4, 5])
p.join()
# print('主进程接收到:', p_b.recv())
print('主进程结束')
-
-
对列(queue)
-
**示例:**介绍
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26import multiprocessing
if __name__ == '__main__':
# 创建对列,先进先出
q = multiprocessing.Queue(3)
# 判断对列是否为空
print(q.empty())
# 判断对列是否已满
print(q.full())
# 压入数据
q.put('hello')
q.put('world')
q.put('xxx')
# 对列已满时,再添加数据会阻塞,设置不会阻塞会报错
# q.put('yyy', False)
# 获取对列长度
print(q.qsize())
# 读取数据
print(q.get())
print(q.get())
print(q.get())
# 对列为空时,读取也会阻塞
# print(q.get())
print('over')
# 关闭对列
q.close() -
**示例:**演示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41import os
import time
import multiprocessing
# 获取数据
def get_data(queue):
data = queue.get()
print('读取数据:', data)
# 写入数据
def put_data(queue):
# 拼接数据
data = str(os.getpid()) + ' ' + str(time.time())
print('压入数据:', data)
queue.put(data)
if __name__ == '__main__':
# 创建对列
q = multiprocessing.Queue(3)
# 创建5个进程用于写数据
record1 = []
for i in range(5):
p = multiprocessing.Process(target=put_data, args=(q,))
p.start()
record1.append(p)
# 创建5个进程用于读数据
record2 = []
for i in range(5):
p = multiprocessing.Process(target=get_data, args=(q,))
p.start()
record2.append(p)
for p in record1:
p.join()
for p in record2:
p.join()
-
-
共享内存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39import multiprocessing
from ctypes import c_char_p
def run(v, s, a, l, d):
print('子进程:', v.value)
v.value += 10
print('子进程:', s.value)
s.value = b'world'
print('子进程:', a[0])
a[0] = 5
l.append('大哥,您收好!')
d['name'] = 'xiaoming'
if __name__ == '__main__':
# 共享内存,可以共享不同类型的数据
server = multiprocessing.Manager()
# 整数 i,小数 f
v = server.Value('i', 250)
# 字符串
s = server.Value(c_char_p, b'hello')
# 数组,相当于列表
a = server.Array('i', range(5))
# 列表
l = server.list()
# 字典
d = server.dict()
p = multiprocessing.Process(target=run, args=(v, s, a, l, d))
p.start()
p.join()
print('主进程:', v.value)
print('主进程:', s.value)
print('主进程:', a[0])
print('主进程:', l)
print('主进程:', d)
自定义进程类
- **说明:**继承自Process类,实现run方法,start后自动执行run方法
- 示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21import time
from multiprocessing import Process
class MyProcess(Process):
def __init__(self, delay):
super().__init__()
self.delay = delay
# 实现该方法,进程start后自动调用
def run(self):
for i in range(3):
print('子进程运行中...')
time.sleep(self.delay)
if __name__ == '__main__':
p = MyProcess(1)
p.start()
p.join()
print('子进程结束')