进程同步锁:
当运行程序的时候,有可能你的程序同时开多个进程,开进程的时候会将多个执行结果打印出来,这样的话打印的信息都是错乱的,怎么保证打印信息是有序的呢?
其实也就是相当于让进程独享资源。
1 from multiprocessing import Process,Lock #引用函数 2 import time 3 def work(name,mutex): 4 mutex.acquire() #在这里加入锁 5 print('task <%s> is runing' %name) 6 time.sleep(2) 7 print('task <%s> is done' % name) 8 mutex.release() #加完锁以后必须需要解锁 9 10 if __name__ == '__main__': 11 mutex=Lock() 12 p1=Process(target=work,args=('egon',mutex)) 13 p2=Process(target=work,args=('alex',mutex)) 14 p1.start() 15 p2.start() 16 print('主')
比如说模拟抢票的功能:
要先写一个文本 ("count":1) 就记个数就行
1 import json 2 import os 3 import time 4 from multiprocessing import Process,Lock 5 def search(): 6 dic=json.load(open('db.txt')) 7 print('\033[32m[%s] 看到剩余票数<%s>\033[0m' %(os.getpid(),dic['count'])) 8 def get_ticket(): 9 dic = json.load(open('db.txt')) 10 time.sleep(0.5) #模拟读数据库的网络延迟 11 if dic['count'] > 0: 12 dic['count']-=1 13 time.sleep(0.5) # 模拟写数据库的网络延迟 14 json.dump(dic,open('db.txt','w')) 15 print('\033[31m%s 购票成功\033[0m' %os.getpid()) 16 def task(mutex): 17 search() 18 mutex.acquire() 19 get_ticket() 20 mutex.release() 21 if __name__ == '__main__': 22 mutex=Lock() 23 for i in range(10): 24 p=Process(target=task,args=(mutex,)) 25 p.start()
进程队列:
共享内存的方式:
1 from multiprocessing import Process,Manager,Lock #Manager共享内存函数 2 3 def task(dic,mutex): 4 with mutex: 5 dic['count']-=1 6 7 if __name__ == '__main__': 8 mutex=Lock() 9 m=Manager() 10 dic=m.dict({'count':100}) 11 p_l=[] 12 for i in range(100): 13 p=Process(target=task,args=(dic,mutex)) 14 p_l.append(p) 15 p.start() 16 17 for p in p_l: 18 p.join() 19 print(dic)
队列:
进程彼此之间隔离,要实现进程间通信
1 from multiprocessing import Queue #引用函数 2 q=Queue(3) #意味着你队列长队最大为三 3 4 q.put('first') 5 q.put('second') 6 q.put('third') 7 # q.put('fourth') #满了的话会一直卡住 8 9 print(q.get()) 10 print(q.get()) 11 print(q.get()) 12 print(q.get()) 13 14 #了解 15 # q=Queue(3) 16 # 17 # q.put('first',block=False) 18 # q.put('second',block=False) 19 # q.put('third',block=False) #这样的话队列满了就会抛出异常,不会卡在这里 20 # # q.put_nowait('fourth')
21 #q.put('fourth',block=False) 22 # q.put('fourth',timeout=3) #指定抛出时间,如果3秒后队列还是满的抛出异常
生产者消费者模型:
正常情况下,一般都是生产者预先生产出商品,然后等着消费者来买。
(实际情况可能是有多个生产者,多个消费者)
from multiprocessing import Process, JoinableQueue import time, os def producer(q, name): for i in range(3): time.sleep(1) res = '%s%s' % (name, i) q.put(res) print('\033[45m<%s> 生产了 [%s]\033[0m' % (os.getpid(), res)) q.join() #对应下边的task_done def consumer(q): while True: res = q.get() time.sleep(1.5) print('\033[34m<%s> 吃了 [%s]\033[0m' % (os.getpid(), res)) q.task_done() #代表我这个进程我已经取走了,发给生产者,对应生产者要有个jion() if __name__ == '__main__': q = JoinableQueue() # 生产者们:即厨师们 p1 = Process(target=producer, args=(q, '包子')) p2 = Process(target=producer, args=(q, '饺子')) p3 = Process(target=producer, args=(q, '馄饨')) # 消费者们:即吃货们 c1 = Process(target=consumer, args=(q,)) c2 = Process(target=consumer, args=(q,)) c1.daemon=True #如果消费者不取完的话,程序无法结束 c2.daemon=True #这里主进程运行完,子进程要结束掉 p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() print('主')
进程池:
1 from multiprocessing import Pool 2 import os,time 3 4 def work(n): 5 print('task <%s> is runing' %os.getpid()) 6 time.sleep(2) 7 return n**2 8 if __name__ == '__main__': 9 # print(os.cpu_count()) 10 p=Pool(4) #定义进程池的大小,如果不定义一般是内核数 11 # for i in range(10): 12 # res=p.apply(work,args=(i,)) #向进程池里边添加进程,同步提交 13 # print(res) 14 15 res_l=[] 16 for i in range(10): 17 res=p.apply_async(work,args=(i,)) #向进程池异步提交,只管扔进去,不管是否执行完成 18 res_l.append(res) 19 20 p.close() 21 p.join() 22 # 23 # for res in res_l: 24 # print(res.get())
进程池之回调函数:
需要回调函数的场景,进程池中任何一个任务一旦处理完成,就立即告诉主进程,我好了,你可以处理我的结果了,主进程调用一个函数去处理该结果,该函数即回调函数。
1 import requests #pip3 install requests 2 import os,time 3 from multiprocessing import Pool 4 def get_page(url): 5 print('<%s> get :%s' %(os.getpid(),url)) 6 respone = requests.get(url) 7 if respone.status_code == 200: 8 return {'url':url,'text':respone.text} 9 10 def parse_page(dic): 11 print('<%s> parse :%s' %(os.getpid(),dic['url'])) 12 time.sleep(0.5) 13 res='url:%s size:%s\n' %(dic['url'],len(dic['text'])) #模拟解析网页内容 14 with open('db.txt','a') as f: 15 f.write(res) 16 17 18 if __name__ == '__main__': 19 p=Pool(4) 20 urls = [ 21 'http://www.baidu.com', 22 'http://www.baidu.com', 23 'http://www.baidu.com', 24 'http://www.baidu.com', 25 'http://www.baidu.com', 26 'http://www.baidu.com', 27 'http://www.baidu.com', 28 ] 29 30 31 for url in urls: 32 p.apply_async(get_page,args=(url,),callback=parse_page) 33 34 35 p.close() 36 p.join() 37 print('主进程pid:',os.getpid())
paramike模块:
paramike模块是一个用作做远程控制的模块,使用该模块可以对远程服务器进行命令或者文件操作。值得一提的是,fabric和ansible内部的远程管理就是使用的paramike模块。
#需要下载安装
#pip3 install paramiko
远程连接功能:
1 import paramiko 2 3 # 创建SSH对象 4 ssh = paramiko.SSHClient() 5 # 允许连接不在know_hosts文件中的主机 6 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 7 # 连接服务器 8 ssh.connect(hostname='120.92.84.249', port=22, username='root', password='123QWEasd') 9 10 # 执行命令 11 stdin, stdout, stderr = ssh.exec_command('df') 12 # 获取命令结果 13 result = stdout.read() 14 print(result.decode('utf-8')) 15 # 关闭连接 16 ssh.close()
这是基于账号密码来访问客户端的。
另外一种方式是基于秘钥的,
现在服务端制作一个秘钥,ssh-keygen制作秘钥,sz可以下载到window桌面。
客户端要用,肯定要基于服务端有认证文件,利用 ssh-copy-id -i 用户@ip
1 import paramiko 2 3 private_key = paramiko.RSAKey.from_private_key_file('id_rsa') #秘钥的文件位置 4 5 # 创建SSH对象 6 ssh = paramiko.SSHClient() 7 # 允许连接不在know_hosts文件中的主机 8 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 9 # 连接服务器 10 ssh.connect(hostname='120.92.84.249', port=22, username='root', pkey=private_key) 11 12 # 执行命令 13 stdin, stdout, stderr = ssh.exec_command('df') 14 # 获取命令结果 15 result = stdout.read() 16 print(result.decode('utf-8')) 17 # 关闭连接 18 ssh.close()
上传下载:
1 import paramiko 2 3 transport = paramiko.Transport(('120.92.84.249', 22)) 4 transport.connect(username='root', password='123QWEasd') 5 6 sftp = paramiko.SFTPClient.from_transport(transport) 7 # 将location.py 上传至服务器 /tmp/test.py 8 sftp.put('id_rsa', '/tmp/test.rsa') 9 # 将remove_path 下载到本地 local_path 10 # sftp.get('remove_path', 'local_path') 11 12 transport.close()