当前位置:网络安全 > 并发编程

并发编程

  • 发布:2023-09-29 20:17

1。基本流程操作

os.getpid 获取进程id:

导入操作系统
导入时间
打印('开始')
time.sleep(40) #配合time.sleep()
print(os.getpid(),os.getppid(),'end') #获取进程id os.getpid() os.getppid() 父进程id
 -  -  -  -  -  - 结果:
开始
5028 1960 end #5028 process id 当前文件执行的进程id,程序结束,进程停止; 1960 pycharm父进程id
#pid进程号
#ppid 父进程ID
# 子进程
# 父进程在父进程中创建子进程
# pycharm中启动的所有py程序都是pycharm的子进程

流程创建,执行此文件中的函数:

导入操作系统
导入时间
from multiprocessing import Process #[1] 导入进程 P Process 类 from multiprocessing
def 函数():
print('start',os.getpid()) #[3]打开的进程函数的操作:获取进程id,因为这是子进程执行的程序内容
time.sleep(1) #所以这里的os.getpid()代表的是启动进程的pid
print('结束',os.getpid())
如果 __name__ == '__main__':
p = Process(target=func) #[2]创建进程对象 Process(target=函数名) #target=xx#查看源码可以看到Process的init方法中有一个参数target班级
p.start() # 异步调用该方法来启动进程,但不等待进程实际启动。
print('主:',os.getpid())
 -  -  -  -  -  -  -  - 结果:main: 3012 #[4]先执行start,但先打印main。因为是异步的,所以调用了启动进程的方法。 p.start执行后会创建进程,传入的目标会用括号()执行。无需等待该子进程启动执行,当前程序继续执行。
start 5748 #[5] 获取到的两个ID来自正在运行的子进程程序,获取到的子进程pid值相同。
结束 5748
>tasklist|findstr "py" #查看进程的Windows命令

进程是并发的:

导入操作系统
导入时间
来自多处理导入过程
打败():
print('开始吃饭',os.getpid())
时间.睡眠(1)
print('结束进食',os.getpid())
def 睡眠():
print('开始睡觉',os.getpid())
时间.睡眠(1)
prinst('结束睡眠',os.getpid())
如果 __name__ == '__main__':
p1 = Process(target=eat) # 【1】创建一个即将执行eat函数的进程对象
p1.start() # [2] 启动一个进程
p2 = Process(target=sleep) # 【3】创建一个即将执行sleep函数的进程对象
p2.start() # [4] 启动进程
print('主:',os.getpid())
 -  -  -  -  -  -  -  -  -  -  -  -  - 结果:
main: 6216 #[5] 异步,父进程不会等到子进程结束才执行。由于进程创建需要时间,所以在创建子进程的过程中,主进程已经执行完毕并打印出了主进程的pid。 。
开始吃6468 #[6] 从这里可以看出,代码首先创建并执行p1进程,然后创建并执行p2进程,但是两者并没有说先执行p1。

两个文件打印__name__,分别执行后再打印__main__。功能:

1)在执行文件中,在该文件中打印__name__,结果显示__main__;在执行文件中,import导入其他文件,其他文件中有打印 __name__ 即 print(__name__) 显示该文件中该文件的文件名,即是,模块名称。

2) 因此,当有 if __name__ == '__main__': 时,由于等式成立,所以会执行 if 判断下的程序。其他导入的文件中存在if __name__ == '__main__':,因为导入此文件时,__name__不等于' __main __':,但模块名称与之相同,则 if __name__ == '__main__': 其它文件中的内容不会被执行

3) 从这里可以看出:如果想让其他文件导入时也执行该文件中的程序,就不要写在 if __name__ == '__main__ 下':;相反,如果希望导入其他文件时不执行该文件中的程序,则在下面写 if __name__ == '__main__':;

4)基于上述:在Windows中,在文件中打开一个子进程,通过导入的方式在子进程内存空间中执行。如果启动进程的命令没有写在if __name__ == '__main__':下,那么启动子进程后,子进程再启动一个子进程,递归会发生。创建子进程,所有错误都会报;

如果是在 Linux 或 Mac 操作系统下使用 fork 创建进程,则不需要写 if __name__ == '__main__': 不会报错被举报。创建进程的方法不同。它们将父进程内存中的数据复制到子进程中。

  • 打印([__名称__])如果 __name__ == '__main__':
    #控制当这个py文件直接作为脚本执行时,会执行里面的代码。
    # 当这个py文件作为模块导入时,里面的代码不会被执行。
    打印('你好你好')
    # __name__ == '__main__'
    #执行的文件是__name__所在的文件
    # __name__ == '文件名'
    # 当__name__所在文件被导入并执行时
    
  • 主进程与子进程的关系

    #主进程与子进程的关系
    导入操作系统
    导入时间
    来自多处理导入过程
    def 函数():
    print('开始',os.getpid())
    时间.睡眠(10)
    print('结束',os.getpid())
    如果 __name__ == '__main__':
    p = 进程(目标=函数)
    p.start() # 异步调用该方法来启动进程,但不等待进程实际启动。
    print('主:',os.getpid())
     -  -  -  -  -  -  -  -  - -结果:
    主线:6636
    开始 6752
    结束 6752
    # 主进程尚未结束:等待子进程结束
    # 主进程负责回收子进程的资源
    # 如果子进程结束,父进程不回收资源,子进程就会成为僵尸进程。
    #主进程结束逻辑
    #主进程代码结束
    # 结束所有子进程
    #为子进程回收资源
    # 主进程结束
    导入操作系统
    导入时间
    来自多处理导入过程
    def 函数():
    print('开始',os.getpid())
    时间.睡眠(1)
    print('结束',os.getpid())
    打印(“MCW”)
    如果 __name__ == '__main__':
    p = 进程(目标=函数)
    p.start() # 异步调用该方法来启动进程,但不等待进程实际启动。#[4]如果没有object.join,由于没有阻塞,父进程和子进程异步执行,创建进程的时间会更长。程序打印“加入开始”后,会先打印“5000封邮件已发送”。由于程序中遇到对象.join(),会阻塞等待子进程结束,父进程才继续执行程序。
    
    #启动5个流程并向公司5000人发送电子邮件。发送电子邮件后,打印一条消息“已发送 5,000 封电子邮件”。
    导入时间
    随机导入
    来自多处理导入过程
    def send_mail(a):
    时间.睡眠(随机.随机())
    print('发送了一封电子邮件',a)
    如果 __name__ == '__main__':
    l = []
    对于范围(5)内的 i:
    p = 进程(目标=send_mail,args=(i,))
    p.start()
    l.追加(p)
    打印(l)
    for p in l:p.join() #[1]join块,将之前的异步非阻塞变成同步阻塞
    # 阻塞直到以上五个进程结束
    print('已发送 5000 封电子邮件')
     -  -  -  -  -  -  -  -  -  -  - -结果:
    []
    已发送电子邮件 3
    已发送电子邮件 4
    已发送电子邮件 0
    已发送电子邮件 1
    发送了电子邮件2
    已发送 5000 封电子邮件
    

    分解分析上面的if __name__ == '__main__':如下代码:

    要求应该是:同时发送多封邮件,直到所有邮件发送完才发送“已发送5000封邮件”。
    (一)情况一:
    创建了五个进程,且没有加入阻塞。打印和发送是在电子邮件发送之前完成的,因此出现了问题。
    如果 __name__ == '__main__':
    对于范围(5)内的 i:
    p = 进程(目标=send_mail,args=(i,))
    p.start()
    print('已发送 5000 封电子邮件')
     -  -  -  -  -  - 结果:已发送 5000 封电子邮件
    已发送电子邮件 0
    已发送电子邮件 1
    已发送电子邮件 3
    发送了电子邮件2
    已发送电子邮件 4
    (2)情况2:
    由于每次加入都会被阻止,并且电子邮件会发送五次,因此每次发送前一封电子邮件后,才能发送下一封电子邮件。串行效率低,达不到要求。
    如果 __name__ == '__main__':
    对于范围(5)内的 i:
    p = 进程(目标=send_mail,args=(i,))
    p.start()
    p.join()
    print('已发送 5000 封电子邮件')
     -  -  -  -  -  - -结果:
    已发送电子邮件 0
    已发送电子邮件 1
    发送了电子邮件2
    已发送电子邮件 3
    已发送电子邮件 4
    已发送 5000 封电子邮件
    (3)情况三:
    五个进程同时执行,但只有最后一个进程被阻塞。因为 p 代表 for 循环五次后的最后一个,所以 p.join() 只阻塞最后一个。因此,打印已发送的 5000 封电子邮件时,每次都会打印最后创建的进程。但是,如果其他进程执行速度较慢,则在所有电子邮件发送之前,所有电子邮件都已打印并发送。完全的。这有一个问题。
    如果 __name__ == '__main__':
    对于范围(5)内的 i:
    p = 进程(目标=send_mail,args=(i,))
    p.start()
    p.join()
    print('已发送 5000 封电子邮件')
     -  -  -  -  -  - -结果:
    发送了电子邮件2
    已发送电子邮件 1
    已发送电子邮件 4
    已发送 5000 封电子邮件
    已发送电子邮件 3
    已发送电子邮件 0
    (4)情况四:
    [1]将每次创建并启动的进程对象追加到链表中,循环链表对每个进程执行object.join()阻塞。
    [2] 相当于对已完成的p.join进行pass,没有任何作用。
    [3]未完成的子进程会被阻塞,主程序会等待未完成的子进程结束。子进程并发执行,并且每个子进程都是阻塞的
    [4]已经完成的阻塞就可以了,未完成的阻塞,所有阻塞结束,程序继续执行,即所有子进程都完成了,父进程从阻塞后面重新开始执行。所有子进程的总结束时间以最长的子进程时间作为所有子进程的结束时间。
    假设运行时间为: 2 4 1 3 2所有子进程结束所需时间为4秒,所花费的时间为持续时间最长的子进程所花费的时间。
    如果 __name__ == '__main__':
    l = []
    对于范围(5)内的 i:
    p = 进程(目标=send_mail,args=(i,))
    p.start()
    l.追加(p)
    for p in l:p.join() #join块,将之前的异步非阻塞变成同步阻塞
    # 阻塞直到以上五个进程结束
    print('已发送 5000 封电子邮件')
    #总结一下:如果想实现并发,就得等到所有子进程结束后,父进程才结束。然后将每个进程附加到列表中,并循环遍历阻塞每个列表元素的列表。
    

    本程序中创建的子流程对象也可以是其他导入模块中的函数

    www.sychzs.cn------------------------
    导入操作系统、时间
    def 函数():
    print('开始',os.getpid())
    时间.睡眠(1)
    print('结束',os.getpid())
    测试.py------------------
    导入操作系统
    导入xx
    来自多处理导入过程
    如果 __name__ == '__main__':
    p = Process(target=xx.func) #这可以是导入函数,也可以使用 module.function 执行。估计类里的方法也是可以的。
    p.start() #
    print('主:',os.getpid())
    -----------------------www.sychzs.cn 打印结果:
    主要:5768
    开始 5896
    结束 5896
    

    向子进程传递参数(想想参数字典等特殊参数?):

    来自多处理导入流程
    def func(arg1,arg2):
    打印(参数1,参数2)
    如果 __name__ == '__main__':
    p = 进程(目标=func,args=(1,2))
    p.start() -  -  -  -  -  - -结果:
    1 2
    #向进程要执行的函数传递参数,创建进程对象时添加args=(1,)。如果是单个参数,则添加逗号组成元组。元组中的元素和形式参数的位置之间存在一一对应的关系。
    来自多处理导入过程
    def 函数(arg1):
    打印(arg1)
    如果 __name__ == '__main__':
    p = 进程(目标=func,args=(1,))
    p.start()
     -  -  -  -  -  - 结果:
    1
    传递参数总结:
    [1]向进程要执行的函数传递参数,创建进程对象时添加args=(1,)。如果是单个参数,则添加逗号组成元组。元组中的元素和形式参数的位置之间存在一一对应的关系。
    
    #总结
    # 1.启动一个进程
    # 函数名称(参数1、参数2)
    # 来自多处理导入进程
    # p = Process(target=函数名, args=(参数1,参数2))
    #p.start()
    #注意:函数名称可以从其他模块导入。
    # 2.父进程和子进程
    # 3.父进程会等待所有子进程结束才结束。
    # 回收资源
    # 4、进程启动时windows和linux/ios的区别
    # 启动进程的进程需要放在 if __name__ == '__main__' 下
    # 在Windows中,相当于在子进程中从头到尾执行主进程文件。
    # 除了 if __name__ == '__main__' 下的代码
    # 在linux中,代码不被执行,而是直接执行被调用的func函数。
    # 5.join方法
    # 将进程的结束事件封装成join方法
    # 执行join方法的作用是阻塞,直到子进程执行结束,则阻塞结束。
    # 在多个子进程中使用join
    #p_l=[]
    # 对于范围(10)内的 i:
    # p = Process(target=函数名, args=(参数1,参数2))
    #p.start()
    # p_l.append(p)
    # for p in p_l:p.join()
    #这里写的是所有子进程终止后要执行的代码。[2]一定要在p.start()之前设置,将p设置为守护进程,禁止p创建子进程,当父进程代码执行结束时,p将终止运行。
    # 当主进程的代码结束时,守护进程也结束
    # 使用生产者-消费者模型时
    # 与守护线程比较时
    # 所有子进程必须在主进程结束之前结束,主进程负责回收资源。
    关于守护进程需要强调两点:
    一:主进程代码执行结束后,守护进程就会终止。
    第二:守护进程中不能打开子进程,否则会抛出异常:AssertionError: daemonic paths are not allowed to have child
    如果我们有两个任务需要并发执行,那么开一个主进程和一个子进程分别执行就可以了。如果主进程任务结束后不再需要子进程的任务,那么应该在启动之前将子进程设置为守护进程。主进程代码结束,守护进程终止;
    使用场景:当主进程的代码结束时使用守护进程。
    

    进程is_alive()的其他方法terminate():

    导入时间
    来自多处理导入过程
    def son1():
    而真实:
    print('还活着')
    睡眠时间(0.5)
    如果 __name__ == '__main__':
    p = 进程(目标=son1)
    p.start()      # 异步 非阻塞
    print(www.sychzs.cn_alive())
    time.sleep(1)
    p.terminate()   # 异步的 非阻塞
    print(www.sychzs.cn_alive())   # 进程还活着 因为操作系统还没来得及关闭进程
    time.sleep(0.01)
    print(www.sychzs.cn_alive())   # 操作系统已经响应了我们要关闭进程的需求,再去检测的时候,得到的结果是进程已经结束了
    --------------结果:
    True
    is alive
    is alive
    True
    False
    # 什么是异步非阻塞?
    # terminate
    [1]查看进程是否运行状态  对象.is_alive
    [2]终止进程操作 对象.终止(terminate())   终止之后判断是否还活着,结果是还活着一段时间,即终止也是需要时间终止的
    终止进程操作 对象.终止   终止之后判断是否还活着,结果是还活着一段时间。异步,不等他  非阻塞
    start异步非阻塞
    

    面向对象创建进程

    import os
    import time
    from multiprocessing import Process
    class MyProcecss2(Process):
    def run(self):
    while True:
    print('is alive')
    time.sleep(0.5)
    class MyProcecss1(Process):
    def __init__(self,x,y):
    self.x = x
    self.y = y
    super().__init__()
    def run(self):
    print(self.x,self.y,os.getpid())
    for i in range(5):
    print('in son2')
    time.sleep(1)
    if __name__ == '__main__':
    mp = MyProcecss1(1,2)
    mp.daemon = True
    mp.start()
    print(www.sychzs.cn_alive())
    mp.terminate()
    mp2 = MyProcecss2()
    mp2.start()
    print('main :',os.getpid())
    time.sleep(1)
    ----------------结果:
    True
    main : 5808
    is alive
    is alive
    

    面向对象编程简写:

    class MyProcecss1(Process):  #
    def run(self):
    for i in range(5):
    print('jincheng1')
    time.sleep(1)
    if __name__ == '__main__':
    mp = MyProcecss1()
    mp.start()
    ----------------结果:
    jincheng1
    jincheng1
    jincheng1
    jincheng1
    jincheng1
    
    class MyProcecss1(Process): #1)创建一个类,继承Process。
    def __init__(self,x,y): #4)创建init初始化方法,往里面传参,然后run里面就可以使用传进去的参数了。怎么传进去?创建实例的时候传入参数。
    self.x = x
    self.y = y
    super().__init__()#5)如果子进程不需要传参,init方法可以不写。因为父类中init中有很多创建进程的步骤自定义没有,无法创建进程,所以super().无法创建时报错如下结果:
    def run(self): #2)必须写一个run方法,run方法里面写子进程的运行代码
    for i in range(5):
    print('jincheng%s',self.x,self.y)
    time.sleep(1)
    if __name__ == '__main__':
    mp = MyProcecss1(1,2)   # #3)创建这个类的对象,往对象中传参实例变量。不能往run方法里面传参,因为没有调用run,是start开启进程类中封装了的运行的run。那么可以将参数放到init
    mp.start()
    -----------------结果:
    # assert self._popen is None, 'cannot start a process twice'
    #AttributeError: 'MyProcecss1' object has no attribute '_popen'
    jincheng%s 1 2
    jincheng%s 1 2
    jincheng%s 1 2
    jincheng%s 1 2
    jincheng%s 1 2
    

    面向对象创建进程。
    1)创建一个类,继承Process。
    2)必须写一个run方法,run方法里面写子进程的运行代码
    3)创建这个类的对象,往对象中传参实例变量。不能忘run方法里面传参,因为没有调用run,是start开启进程类中封装了的运行的run。那么可以放到init
    4)创建init初始化方法,往里面传参,然后run里面就可以使用传进去的参数了。怎么传进去?创建实例的时候传入参数。
    5)如果子进程不需要传参,init方法可以不写。因为父类中init中有很多创建进程的步骤自定义没有,无法创建进程,所以super()
    

    同步阻塞 比如p.join 等待子进程结束父进程才往下执行

    # Process类总结:
    # 开启进程的方式
    # 面向函数
    # def 函数名:要在子进程中执行的代码
    # p = Process(target= 函数名,args=(参数1,))
    # 面向对象
    # class 类名(Process):
    # def __init__(self,参数1,参数2):   # 如果子进程不需要参数可以不写
    # self.a = 参数1
    # self.b = 参数2
    # super().__init__()
    # def run(self):
    # 要在子进程中执行的代码
    # p = 类名(参数1,参数2)
    # Process提供的操作进程的方法
    # p.start() 开启进程      异步非阻塞
    # p.terminate() 结束进程  异步非阻塞
    # p.join()     同步阻塞
    # p.isalive()  获取当前进程的状态
    # daemon = True 设置为守护进程,守护进程永远在主进程的代码结束之后自动结束
    

    进程直接通信:

    并发编程 买票系统 锁的概念:

    并发进程做什么事。
    并发编程结合网络编程

    一:并发编程结合网络编程
    1)并发编程创建子进程。子进程有自己的独立空间
    2)网络编程tcp编程,多个客户端连接服务端,一个客户端在服务端收发信息的部分就创建一个进程,这样每个客户端对应服务端一个进程,实现多个客户端连接服务端。

    二:车票购票系统:
    购票多个人买票,一个一个排队购买,多人同时购买(创建子进程,实现并发)
    用户太多,一台服务器处理并发子进程数量有限,再来一台服务器响应处理。多台服务器同时响应请求就要出现调度的服务器,负载均衡服务器。
    买票信息存在数据库。没台处理服务器一个数据库,信息不一致。公用一台数据库

    处理服务器,处理一条买票信息,数据库就要减少一个票,二者通信,网络通信。通信有延迟。
    处理服务器和数据库的通信:
    10个人抢一张票

    • 车票购票系统分析:

      1)查票一个函数

      2)10个人抢就是10个进程来执行这个函数,谁查询的解决,谁要传参进去,传参 args,传的是元组

      3)买票了:买票一个函数 time模拟网络延迟

      4)这时,1张票结果多个人都买到了,这时是有问题的,

      5)问题分析,因为多个人是有并发的,一个人读取到一张票,网络延迟,写入减少一张票需要0.1秒,在0.1秒内多个人又来并发读到一张票,做同样的操作。于是多个人在0.1秒内成功执行了操作.一张票多人都读取到买到了

      6)多个人来数据库读取请求。有个锁,只有一个人成功执行之后才能下一个人进来操作。只有一个进程对同一个文件操作完之后别的进程才能进来操作这个文件,保证数据一致性

      7)lock.acquire()加锁,没有抢到锁的进程被阻塞了,等待释放锁。代码前加锁,一个进程执行完释放锁,suo.释放()

      8)查票不加锁,买票必须加锁,不加锁数据会出错,买票是串行,非并行。
      所以:
      1、如果在一个并发的场景下涉及到某部分内容时需要修改一些所有进程共享的数据资源 需要加锁来维护数据的安全
      2、在数据安全的基础上,才考虑效率问题。
      这里查票函数并发,买票函数涉及到修改共享数据所以需要加锁串行,保证数据一致性。并发是多进程实现,一个网络连接(网络编程)创建一个子进程。
      3、同步存在的意义:

      10)进程与进程间数据隔离,任何对共享数据资源修改都存在数据安全问题,加锁解决

      11)加锁还可以上下文管理,用with封装了异常处理的,不会出现release不能解锁的问题 release是放在finamly的
      12)with加锁可以放在买票函数里,也可以放在task任务里。在task任务里,加锁然后执行买票函数

      13)在子进程中 对需要加锁的代码 进行with lock:

      14)查票函数 买票函数 任务函数(查看不需加锁 并发,加锁买票 串行 目的 保证数据一致性,不会混乱(如果修改共享数据也并发,每个人都读到一张票,然后做买票,再修改共享数据减去一张票,就是多个人买到票了))

    # 并发 能够做的事儿
    # 1.实现能够响应多个client端的server
    # 2.抢票系统
    #'ticket_count.txt'文本内容:
    {"count": 0}
    import time
    import json
    from multiprocessing import Process,Lock
    def search_ticket(user):
    with open('ticket_count.txt') as f:
    dic = json.load(f)
    print('%s查询结果  : %s张余票'%(user,dic['count']))
    def buy_ticket(user,lock):
    # with lock:
    # lock.acquire()   # 给这段代码加上一把锁
    time.sleep(0.02)
    with open('ticket_count.txt') as f:
    dic = json.load(f)
    if dic['count'] > 0:
    print('%s买到票了'%(user))
    dic['count'] -= 1
    else:
    print('%s没买到票' % (user))
    time.sleep(0.02)
    with open('ticket_count.txt','w') as f:
    json.dump(dic,f)
    # lock.release()   # 给这段代码解锁
    def task(user, lock):
    search_ticket(user)
    with lock:
    buy_ticket(user, lock)
    if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
    p = Process(target=task,args=('user%s'%i,lock))
    p.start()
    -------------结果:
    user0查询结果  : 3张余票
    user0买到票了
    user1查询结果  : 3张余票
    user1买到票了
    user3查询结果  : 1张余票
    user3买到票了
    user6查询结果  : 0张余票
    --------------------------
    # 1.如果在一个并发的场景下,涉及到某部分内容
    # 是需要修改一些所有进程共享数据资源
    # 需要加锁来维护数据的安全
    # 2.在数据安全的基础上,才考虑效率问题
    # 3.同步存在的意义
    # 数据的安全性
    # 在主进程中实例化 lock = Lock()
    # 把这把锁传递给子进程
    # 在子进程中 对需要加锁的代码 进行 with lock:
    # with lock相当于lock.acquire()和lock.release()
    # 在进程中需要加锁的场景
    # 共享的数据资源(文件、数据库)
    # 对资源进行修改、删除操作
    # 加锁之后能够保证数据的安全性 但是也降低了程序的执行效率
    加锁的场景:
    加锁的优缺点:
    加锁方法:
    加锁的原因:
    什么是锁:
    多把锁:
    锁的简单理解:
    场景:修改共享数据资源
    方法:
    从多进程模块导入Lock类
    实例化类
    with lock:
    函数执行(传参lock进入)
    def 函数(lock)不需要操作对lock
    lock = Lock()
    with lock:
    需要加锁解锁的函数执行,将锁对象传进去
    

    进程之间的数据隔离*

    from multiprocessing import Process
    n = 100
    def func():
    global n
    n -= 1
    if __name__ == '__main__':
    p_l = []
    for i in range(10):
    p = Process(target=func)
    p.start()
    p_l.append(p)
    for p in p_l:p.join()
    print(n)
    -------------结果:
    100
    

    队列的使用:

    • 两个隔离的进程间进行通信:
      一个父进程中多个子进程,一个子进程一个任务。子进程完成任务进程间需要通信。假设是计算求和,就需要返回计算结果。
      进程间通信方式: 队列 文件 管道 (数据表似乎也是文件)

    • 队列:

      1)导入模块

      2)创建队列,

      3)队列传到函数

      4)函数内调用,将数据put

      5)外 get

    • 队列原理:

    1)相当于第三个文件,在内存中的

    2)子进程put 父进程get

    3)先进先出

    • 队列:基于
      文件家族的sockct实现,不是基于网络的。存取数据基于piclkle,lock
      两个进程往同一个队列里面存取数据,有可能两个进程往同一个地方存取到数据,所有要保证数据安全,队列加锁了,串行

    • 队列 sockct piclkle,lock

      管道 sockct piclkle

      队列基于管道+锁实现

      管道基于sockct piclkle实现

      (重写队列方法,会报错缺少东西,说明内部使用了)

    • 管道 :一个发一个收 效率高,多个的话那么数据不安全,它本身没有锁,所有效率高

      队列:多个收发

    • q.get在队列为空时发生阻塞。这样就会等待有值再往下执行
      创建队列传一个参数 为队列大小 。 q = Queue(5)
      put多个,满了阻塞
      put_nowait 当队列为满的时候再向队列中放数据 会报错并且会丢失数据
      get_nowait # 在队列为空的时候 直接报错

      get 阻塞 get_nowait非阻塞

    • 三个方法不准确,有隐患
      原因:在并发中获取三个值的路途中就已经被别的进程修改了,
      比如:获取的到是空的,值在返回的途中,有个进程已经放入值,这时空的结果就是有问题的。

      q.empty

      q.qsize

      q.full

      初始化里面初始化方法

    from multiprocessing import Queue,Process
    # 先进先出
    def func(exp,q):
    ret = eval(exp)
    q.put({ret,2,3})
    q.put(ret*2)
    q.put(ret*4)
    if __name__ == '__main__':
    q = Queue()
    Process(target=func,args=('1+2+3',q)).start()
    print(q.get())
    print(q.get())
    print(q.get())
    -----------------结果:
    {2, 3, 6}
    12
    24
    
    # Queue基于 天生就是数据安全的
    # 文件家族的socket pickle lock
    # pipe 管道(不安全的) = 文件家族的socket pickle
    # 队列 = 管道 + 锁
    # from multiprocessing import Pipe
    # pip = Pipe()
    # pip.send()
    # pip.recv()
    
    import queue
    from multiprocessing import Queue
    q = Queue(5)
    q.put(1)
    q.put(2)
    q.put(3)
    q.put(4)
    q.put(5)   # 当队列为满的时候再向队列中放数据 队列会阻塞
    print('5555555')
    try:
    q.put_nowait(6)  # 当队列为满的时候再向队列中放数据 会报错并且会丢失数据
    except queue.Full:
    pass
    print('6666666')
    --------------结果:
    5555555
    6666666import queue
    from multiprocessing import Queue
    q = Queue(5)
    q.put(1)
    q.put(2)
    q.put(3)
    q.put(4)
    q.put(5)   # 当队列为满的时候再向队列中放数据 队列会阻塞
    print('5555555')
    try:
    q.put_nowait(6)  # 当队列为满的时候再向队列中放数据 会报错并且会丢失数据
    except queue.Full:
    pass
    print('6666666')
    print(q.get())
    print(q.get())
    print(q.get())   # 在队列为空的时候会发生阻塞
    print(q.get())   # 在队列为空的时候会发生阻塞
    print(q.get())   # 在队列为空的时候会发生阻塞
    try:
    print(q.get_nowait())   # 在队列为空的时候 直接报错
    except queue.Empty:pass
    -----------------结果:
    5555555
    6666666
    1
    2
    3
    4
    5
    
    from multiprocessing import Queue
    q = Queue(5) #从多进程导入Queue 队列, Queue(n)队列创建一个进程队列对象。最大存放n个数据
    q.put("mcw") #put将数据放入进程队列
    q.put("xiao")
    aa=q.get() #get 将数据拿出进程队列 mcw 先进先出
    bb=q.get() #xiao
    print(aa)
    print(bb)
    ----------------结果:
    mcw
    xiao
    

    相关文章

    最新资讯