RabbitMQ RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统,他遵循Mozilla Public License开源协议,MQ全称为Message Queue,消息队列(MQ)是一种应用程序对应用程序的通信方法,应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.排队指的是应用程序通过队列来通信,队列的使用除去了接收和发送应用程序同时执行的要求,说的笼统点是queue+socket实现.
◆MQ的基础应用◆ 如果启动了多个消费者,那么他们之间是串行获取数据的,也就是说如果1号消费者收不到数据,那么MQ将默认发送给2号消费者,以此类推,如果全部消费者不在线,那么MQ会默认存储这个消息,直到有消费者上线,MQ就会将消息发送给指定的消费者.
生产者:
import pikaconn = pika.BlockingConnection(pika.ConnectionParameters (host="192.168.1.5" ,port="5672" ) ) print ("链接消息:" ,conn)channel = conn.channel() channel.queue_declare(queue="lyshark" ) while True : temp =input ("发送数据:" ).strip() channel.basic_publish(exchange="" ,routing_key="lyshark" ,body=temp) conn.close()
消费者:
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters (host='192.168.1.5' ,port="5672" ) ) channel = connection.channel() channel.queue_declare(queue='lyshark' ) def callback (ch,method,properties,body ): print ("接收的数据: %r" %body) channel.basic_consume(callback, queue='lyshark' , no_ack=True ) print ("==========准备接收消息==========" )channel.start_consuming()
◆消息的持久化◆ 如果服务器端被强制关闭了,我们的消息就丢失了,那就需要我们对服务器端的数据做一个持久化处理.
在每次声明队列的时候加上durable=True 队列持久化
,delivery_mode =2 消息持久化
也就是开启持久化的意思,必须客户端服务端都要写上.
生产者:
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.1.5' )) channel = connection.channel() channel.queue_declare(queue='hello' , durable=True ) channel.basic_publish(exchange='' , routing_key='hello' , body='Hello World!' , properties=pika.BasicProperties(delivery_mode=2 , )) print (" [x] Sent 'Hello World!'" )connection.close()
消费者:
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.1.5' )) channel = connection.channel() channel.queue_declare(queue='hello' , durable=True ) def callback (ch, method, properties, body ): print ("返回数据: %r" % body) import time print ("完成..." ) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(callback, queue='hello' , no_ack=False ) print (' [*] Waiting for messages. To exit press CTRL+C' )channel.start_consuming()
◆消息发布订阅◆ 如上的配置方式,MQ只能将消息发送给一个消费者手里,有时候我们想给所有的消费者发送消息,那就需要使用广播的方式给所有的客户端发送消息的分发,MQ支持消息的公平分发,之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息.
发布者(fanout广播模式): 指定发布者为广播模式,所有bind到此exchange的queue都可以接收到服务端发送的消息.
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.1.5" )) channel = connection.channel() channel.exchange_declare(exchange="logs" , exchange_type="fanout" ) message = "info:hello lyshark" channel.basic_publish(exchange="logs" , routing_key="" , body = message ) print ("发送消息: %r" %message)connection.close()
订阅者(fanout广播模式): 订阅者修改让其随机生成队列名称,你可以启动多个订阅者来看其执行效果.
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.1.5" )) channel = connection.channel() channel.exchange_declare(exchange="logs" ,exchange_type="fanout" ) result = channel.queue_declare(exclusive=True ) queue_name = result.method.queue channel.queue_bind(exchange="logs" ,queue=queue_name) print ("==========接收数据==========" )def callback (ch, method, properties, body ): print ("收到的数据: %r" %body) channel.basic_consume(callback,queue=queue_name,no_ack=True ) channel.start_consuming()
◆选择发布订阅◆ RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据关键字判定应该将数据发送至指定队列,direct模式通过routingKey和exchange决定的那个唯一的queue可以接收消息.
发布者(direct模式):
import pikaimport sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost' )) channel = connection.channel() channel.exchange_declare(exchange='direct_logs' , type ='direct' ) severity = sys.argv[1 ] if len (sys.argv) > 1 else 'info' message = ' ' .join(sys.argv[2 :]) or 'Hello World!' channel.basic_publish(exchange='direct_logs' , routing_key=severity, body=message) print (" [x] Sent %r:%r" % (severity, message))connection.close()
发布者(direct模式):
import pikaimport sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost' )) channel = connection.channel() channel.exchange_declare(exchange='direct_logs' , type ='direct' ) result = channel.queue_declare(exclusive=True ) queue_name = result.method.queue severities = sys.argv[1 :] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0 ]) sys.exit(1 ) for severity in severities: channel.queue_bind(exchange='direct_logs' , queue=queue_name, routing_key=severity) print (' [*] Waiting for logs. To exit press CTRL+C' ) def callback (ch, method, properties, body ): print (" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True ) channel.start_consuming()
## MariaDB
MariaDB数据库管理系统是MySQL的一个分支,主要由开源社区在维护,采用GPL授权许可MariaDB的目的是完全兼容MySQL,包括API和命令行,使之能轻松成为MySQL的代替品,MariaDB由MySQL的创始人Michael Widenius主导开发,他早前曾以10亿美元的价格,将自己创建的公司MySQL AB卖给了SUN,本小结内容将介绍如何使用pymysql模块在程序中使用数据库应用.
安装环境: 先安装MariaDB数据库,并配置好以下环境.
[root@localhost ~] [root@localhost ~] [root@localhost ~] MariaDB> CREATE DATABASE IF NOT EXISTS testdb DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci; MariaDB> use testdb; MariaDB> CREATE TABLE hosts (id int not null auto_increment primary key,ip varchar(20) not null,port int not null); MariaDB> INSERT INTO hosts (ip,port) values ('1.1.1.1' ,22),('1.1.1.2' ,22), ('1.1.1.3' ,22),('1.1.1.4' ,22), ('1.1.1.5' ,22); MariaDB> grant all on *.* to root@'%' identified by '123123' ;
连接数据库:
import pymysqlconn = pymysql.connect( host="192.168.1.5" , port=3306 , user="root" , passwd="123123" , db="testdb" , charset="utf8" ) cursor = conn.cursor() print ("返回连接对象: " ,conn)print ("返回游标对象: " ,cursor)
## Paramiko
paramiko 是一个用于做远程SSH控制的模块,使用该模块可以对远程服务器进行命令或文件操作,值得一说的是,fabric和ansible内部的远程管理就是使用的paramiko来现实,其实它的底层是对ssh的上层代码的一个封装,值得注意的是,由于paramiko模块内部依赖pycrypto,所以先下载安装pycrypto模块.
◆基于密码认证◆ SSHClient:
import paramiko ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname='192.168.1.5' ,port=22 ,username='root' ,password='1233' ) stdin,stdout,stderr = ssh.exec_command('ls -lh' ) result = stdout.read() ssh.close()
Transport:
import paramiko transport = paramiko.Transport(('192.168.1.5' ,22 )) transport.connect(username='root' ,password='1233' ) ssh = paramiko.SSHClient() ssh._transport = transport stdin, stdout, stderr = ssh.exec_command('ls -lh' ) print stdout.read() transport.close()
◆基于公钥认证◆ SSHClient:
import paramikoprivate_key = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa' ) ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname='192.168.1.5' ,port=22 ,username='root' ,key=private_key) stdin,stdout,stderr = ssh.exec_command('ls -lh' ) result = stdout.read() ssh.close()
Transport:
import paramikoprivate_key = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa' ) transport = paramiko.Transport(('192.168.1.5' ,22 )) transport.connect(username='root' ,pkey=private_key) ssh = paramiko.SSHClient() ssh._transport = transport stdin,stdout,stderr = ssh.exec_command('ls -lh' ) transport.close()
◆远程传输文件◆ SFTPClient:
import paramikotransport = paramiko.Transport(('192.168.1.5' ,22 )) transport.connect(username='root' ,password='1233' ) sftp = paramiko.SFTPClient.from_transport(transport) sftp.put('./location.py' , '/tmp/lyshark.py' ) sftp.get('remove_path' ,'local_path' ) transport.close()
SFTPTransport:
import paramikoprivate_key = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa' ) transport = paramiko.Transport(('192.168.1.5' , 22 )) transport.connect(username='root' , pkey=private_key ) sftp = paramiko.SFTPClient.from_transport(transport) sftp.put('/tmp/location.py' , '/tmp/test.py' ) sftp.get('remove_path' , 'local_path' ) transport.close()
SQLAchemy , pyecharts