Day9 进程同步锁 进程队列 进程池 生产消费模型 进程池 paramike模块...

news/2024/7/8 12:27:38 标签: 运维, json, python

进程同步锁:

当运行程序的时候,有可能你的程序同时开多个进程,开进程的时候会将多个执行结果打印出来,这样的话打印的信息都是错乱的,怎么保证打印信息是有序的呢?

其实也就是相当于让进程独享资源。

 

 1 from multiprocessing import Process,Lock    #引用函数
 2 import time
 3 def work(name,mutex):    
 4     mutex.acquire()    #在这里加入锁
 5     print('task <%s> is runing' %name)
 6     time.sleep(2)
 7     print('task <%s> is done' % name)
 8     mutex.release()    #加完锁以后必须需要解锁
 9 
10 if __name__ == '__main__':
11     mutex=Lock()
12     p1=Process(target=work,args=('egon',mutex))
13     p2=Process(target=work,args=('alex',mutex))
14     p1.start()
15     p2.start()
16     print('')

 

比如说模拟抢票的功能:

要先写一个文本   ("count":1)   就记个数就行

 1 import json
 2 import os
 3 import time
 4 from multiprocessing import Process,Lock
 5 def search():
 6     dic=json.load(open('db.txt'))
 7     print('\033[32m[%s] 看到剩余票数<%s>\033[0m' %(os.getpid(),dic['count']))
 8 def get_ticket():
 9     dic = json.load(open('db.txt'))
10     time.sleep(0.5) #模拟读数据库的网络延迟
11     if dic['count'] > 0:
12         dic['count']-=1
13         time.sleep(0.5)  # 模拟写数据库的网络延迟
14         json.dump(dic,open('db.txt','w'))
15         print('\033[31m%s 购票成功\033[0m' %os.getpid())
16 def task(mutex):
17     search()
18     mutex.acquire()
19     get_ticket()
20     mutex.release()
21 if __name__ == '__main__':
22     mutex=Lock()
23     for i in range(10):
24         p=Process(target=task,args=(mutex,))
25         p.start()

 


进程队列:

共享内存的方式:

 1 from multiprocessing import Process,Manager,Lock     #Manager共享内存函数
 2 
 3 def task(dic,mutex):
 4     with mutex:
 5         dic['count']-=1
 6 
 7 if __name__ == '__main__':
 8     mutex=Lock()
 9     m=Manager()
10     dic=m.dict({'count':100})
11     p_l=[]
12     for i in range(100):
13         p=Process(target=task,args=(dic,mutex))
14         p_l.append(p)
15         p.start()
16 
17     for p in p_l:
18         p.join()
19     print(dic)


队列:

进程彼此之间隔离,要实现进程间通信

 1 from multiprocessing import Queue    #引用函数
 2 q=Queue(3)      #意味着你队列长队最大为三
 3
 4 q.put('first')
 5 q.put('second')
 6 q.put('third')
 7 # q.put('fourth')    #满了的话会一直卡住
 8 
 9 print(q.get())
10 print(q.get())
11 print(q.get())
12 print(q.get())
13 
14 #了解
15 # q=Queue(3)
16 # 
17 # q.put('first',block=False)
18 # q.put('second',block=False)
19 # q.put('third',block=False)    #这样的话队列满了就会抛出异常,不会卡在这里
20 # # q.put_nowait('fourth') 
21 #q.put('fourth',block=False)
22 # q.put('fourth',timeout=3) #指定抛出时间,如果3秒后队列还是满的抛出异常

 


生产者消费者模型:

正常情况下,一般都是生产者预先生产出商品,然后等着消费者来买。

(实际情况可能是有多个生产者,多个消费者)

from multiprocessing import Process, JoinableQueue
import time, os


def producer(q, name):
    for i in range(3):
        time.sleep(1)
        res = '%s%s' % (name, i)
        q.put(res)
        print('\033[45m<%s> 生产了 [%s]\033[0m' % (os.getpid(), res))
    q.join()  #对应下边的task_done

def consumer(q):
    while True:
        res = q.get()
        time.sleep(1.5)
        print('\033[34m<%s> 吃了 [%s]\033[0m' % (os.getpid(), res))
        q.task_done()    #代表我这个进程我已经取走了,发给生产者,对应生产者要有个jion()

if __name__ == '__main__':
    q = JoinableQueue()

    # 生产者们:即厨师们
    p1 = Process(target=producer, args=(q, '包子'))
    p2 = Process(target=producer, args=(q, '饺子'))
    p3 = Process(target=producer, args=(q, '馄饨'))

    # 消费者们:即吃货们
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))

    c1.daemon=True    #如果消费者不取完的话,程序无法结束
    c2.daemon=True   #这里主进程运行完,子进程要结束掉
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()


    p1.join()

    print('')

 

 


进程池:

 1 from multiprocessing import Pool
 2 import os,time
 3 
 4 def work(n):
 5     print('task <%s> is runing' %os.getpid())
 6     time.sleep(2)
 7     return n**2
 8 if __name__ == '__main__':
 9     # print(os.cpu_count())
10     p=Pool(4)    #定义进程池的大小,如果不定义一般是内核数
11     # for i in range(10):
12     #     res=p.apply(work,args=(i,))    #向进程池里边添加进程,同步提交
13     #     print(res)
14 
15     res_l=[]
16     for i in range(10):
17         res=p.apply_async(work,args=(i,))    #向进程池异步提交,只管扔进去,不管是否执行完成
18         res_l.append(res)
19 
20     p.close()
21     p.join()
22     #
23     # for res in res_l:
24     #     print(res.get())

 

进程池之回调函数:

需要回调函数的场景,进程池中任何一个任务一旦处理完成,就立即告诉主进程,我好了,你可以处理我的结果了,主进程调用一个函数去处理该结果,该函数即回调函数。

 1 import requests #pip3 install requests
 2 import os,time
 3 from multiprocessing import Pool
 4 def get_page(url):
 5     print('<%s> get :%s' %(os.getpid(),url))
 6     respone = requests.get(url)
 7     if respone.status_code == 200:
 8         return {'url':url,'text':respone.text}
 9 
10 def parse_page(dic):
11     print('<%s> parse :%s' %(os.getpid(),dic['url']))
12     time.sleep(0.5)
13     res='url:%s size:%s\n' %(dic['url'],len(dic['text'])) #模拟解析网页内容
14     with open('db.txt','a') as f:
15         f.write(res)
16 
17 
18 if __name__ == '__main__':
19     p=Pool(4)
20     urls = [
21         'http://www.baidu.com',
22         'http://www.baidu.com',
23         'http://www.baidu.com',
24         'http://www.baidu.com',
25         'http://www.baidu.com',
26         'http://www.baidu.com',
27         'http://www.baidu.com',
28     ]
29 
30 
31     for url in urls:
32         p.apply_async(get_page,args=(url,),callback=parse_page)
33 
34 
35     p.close()
36     p.join()
37     print('主进程pid:',os.getpid())

 

 

 


paramike模块:

paramike模块是一个用作做远程控制的模块,使用该模块可以对远程服务器进行命令或者文件操作。值得一提的是,fabric和ansible内部的远程管理就是使用的paramike模块。

#需要下载安装

#pip3 install paramiko

 

远程连接功能:

 1 import paramiko
 2 
 3 # 创建SSH对象
 4 ssh = paramiko.SSHClient()
 5 # 允许连接不在know_hosts文件中的主机
 6 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
 7 # 连接服务器
 8 ssh.connect(hostname='120.92.84.249', port=22, username='root', password='123QWEasd')
 9 
10 # 执行命令
11 stdin, stdout, stderr = ssh.exec_command('df')
12 # 获取命令结果
13 result = stdout.read()
14 print(result.decode('utf-8'))
15 # 关闭连接
16 ssh.close()

这是基于账号密码来访问客户端的。

 

 

另外一种方式是基于秘钥的,

现在服务端制作一个秘钥,ssh-keygen制作秘钥,sz可以下载到window桌面。

客户端要用,肯定要基于服务端有认证文件,利用 ssh-copy-id -i 用户@ip

 1 import paramiko
 2 
 3 private_key = paramiko.RSAKey.from_private_key_file('id_rsa')   #秘钥的文件位置
 4 
 5 # 创建SSH对象
 6 ssh = paramiko.SSHClient()
 7 # 允许连接不在know_hosts文件中的主机
 8 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
 9 # 连接服务器
10 ssh.connect(hostname='120.92.84.249', port=22, username='root', pkey=private_key)
11 
12 # 执行命令
13 stdin, stdout, stderr = ssh.exec_command('df')
14 # 获取命令结果
15 result = stdout.read()
16 print(result.decode('utf-8'))
17 # 关闭连接
18 ssh.close()

 

 

 上传下载:

 1 import paramiko
 2 
 3 transport = paramiko.Transport(('120.92.84.249', 22))
 4 transport.connect(username='root', password='123QWEasd')
 5 
 6 sftp = paramiko.SFTPClient.from_transport(transport)
 7 # 将location.py 上传至服务器 /tmp/test.py
 8 sftp.put('id_rsa', '/tmp/test.rsa')
 9 # 将remove_path 下载到本地 local_path
10 # sftp.get('remove_path', 'local_path')
11 
12 transport.close()

 

转载于:https://www.cnblogs.com/sexiaoshuai/p/7452241.html


http://www.niftyadmin.cn/n/1119846.html

相关文章

vue 引入canvas_Vue中使用canvas方法总结

下面就是小编带给大家的Vue中怎么使用canvas方法操作&#xff0c;希望能够给你们带来一定的帮助&#xff0c;谢谢大家的观看。1、如果数组中都是canvas对象, vue如何能吧canvas对象渲染到页面。v-html只能渲染出一个字符串, 没办法像appendChild那样插入canvas对象。2、项目架构…

反射,方法的调用

2019独角兽企业重金招聘Python工程师标准>>> 一、方法对象&#xff0c;Method 类型的对象 (1)获得方法对象数组 //Method[] m class.getMethods()方法获取是该类的所有public方法&#xff0c;包括从父类继承的方法&#xff1b; //Method[] m class.getDeclaredMetho…

TCP报文大小

链路层&#xff08;二层&#xff09;MTU最大传输单元&#xff1a;1500KByte。每个以太网帧64bytes-1518bytes&#xff0c;减去帧头&#xff08;DMAC目的MAC地址48bit6BytesSMAC源MAC地址48bit6BytesType域2bytes&#xff09;14Bytes和帧尾CRC校验部分4Bytes&#xff0c;即为MTU…

微信语音麦克风静音_微信语音关闭麦克风对方知道吗

大家好&#xff0c;我是时间财富网智能客服时间君&#xff0c;上述问题将由我为大家进行解答。微信语音关闭麦克风对方不会知道&#xff0c;如果一方关闭了麦克风&#xff0c;另一方是不会收到任何提示&#xff0c;只是对方无法听到声音而已。微信语音&#xff0c;是微信为开发…

ListView的Item动画

1.效果图 2.需求就是点击item歌曲时&#xff0c;实现一种飞入到预约按钮处的效果 3.思路&#xff1a;在布局文件中加入了一个条目布局&#xff0c;和listView的item一样&#xff0c;点击listView的item时&#xff0c;使用给条目布局执行动画。 code: xml布局 <RelativeLayou…

产融结合的七个进阶形态(上)

产融结合并不是简单地产业资本和金融资本结合就可以产生有效的经济效益&#xff0c;如何产生有效的产融结合&#xff0c;我们应当使金融资本去促使产业资本提高&#xff0c;产生经济效益&#xff0c;让产业资本的提高去带动金融资本的升值&#xff0c;两者属于螺旋上升的过程。…

关于FlagsAttribute

最近在看C#本质论&#xff0c;有介绍FlagsAttribute的特性&#xff0c;看了下源码&#xff0c;发现只是一个简单的特性class和一个构造函数。调试了一下.NET的源码&#xff0c;发现在console.writeline(***); ***是带有flags特性的Enum&#xff0c; 实际上&#xff0c;这个时候…

[CP1804]最短路

题目大意&#xff1a;  一个$n(n\le10^5)$个点的图&#xff0c;给定一个常数$c$&#xff0c;每对点$i,j$之间有权值为$(i\oplus j)\times c$的边。另有$m(m\le5\times10^5)$条指定权值的单向边&#xff0c;求指定点对间的最短路。 思路&#xff1a;  显然当异或值二进制下只…