C语言基础学习PYTHON——基础学习D11
20180908内容纲要:
1、RabbitMQ消息队列
(1)RabbitMQ安装
(2)Rabbits示例
模式一:fanout
模式二:direct
模式三:topic
Remote Procedure Call(RPC)
2、Redis缓存
(1)连接方式
(2)连接池
(3)操作
1、string操作
2、hash操作
3、list操作
4、set集合操作
5、其他常用操作
(4)发布订阅
3、小结
4、练习
1 RabbitMQ消息队列
MQ全称为MessageQueue,消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
MQ的消费-生产者模型的一个典型的代表,一端往消息队列中不断的写入消息,而另一端则可以读取或者订阅队列中的消息。
RabbitMQ的结构图如下:
Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,
exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
当然,也有其他消息队列,比如ZeroMQ、ActiveMQ等。
(1)RabbitMQ的安装
首先,因为RabbitMQ由ERLANG实现,下载ERLANG 源代码。
安装 http://www.rabbitmq.com/install-standalone-mac.html
然后,下载RabbitMQ。
安装 python rabbitMQ module
安装教程:https://www.cnblogs.com/lykbk/p/erewererewr32434343.html
在windows安装中可能会出现这样的问题:RabbitMQ报错Error: unable to connect to node rabbit@xxx: nodedown
解决办法:https://blog.csdn.net/u012930316/article/details/76841025
(2)RabbitMQ基本示例
a、消息分发轮询
RabbitMQ_send
RabbitMQ_receive
在实现一个消费者对应一个生产者,那能不能一对多呢?
在一个生产者对应多个消费者的时候,生产者发出的消息会根据启动顺序依次被消费者接收。即消息分发轮询。
如果no_ack存在,当消息正在处理的过程中接受端断开,消息就会丢失。发送端发送的数据就会删除;
如果no_ack不存在,发送端就会等待接受端处理完毕的指令。如果此时接受端1断开,那么就由2接收,如果都断开;
那么数据会一直存储在发送端,直至有新的接受端出现,并且发送端收到接受端数据处理完毕的指令,数据才会在发送端被删除。
Message acknowledgments are turned on by default. In previous examples we explicitly turned them off via the no_ack=True flag.
消息确认在默认情况下是打开的。在前面的示例中,我们通过no_ack=True标志显式地关闭了它们。
It's time to remove this flag and send a proper acknowledgment from the worker, once we're done with a task.
当我们完成一项任务时,是时候移除此标志并从工作人员发送适当的确认信息了。
如果客户端突然断开,数据则会丢失,为了防止数据丢失需要下面一段代码:
ch.basic_ack(delivery_tag=method.delivery_tag)
b、消息持久化
在RabbitMQ的安装路径下C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.5\sbin>在sbin文件夹中rabbit的一些指令
比如rabbitmqctl.bat list_queues可以查询存在的队列消息等。
那么客户端的断开会发生数据丢失,通过确认函数来保证数据不会丢失。那么如果服务器端断开呢?数据一样会丢失。
为了防止服务器端断开导致数据丢失,我们需要以下两步:
durable队列持久化 delievery消息持久化
1、首先要进行声明。
channel.queue_declare(queue='hello', durable=True)
2、虽然在这里声明,但并不能真的持久,还需这样。
channel.queue_declare(queue='task_queue', durable=True)
这样,即使RabbitMQ重新启动,task_queue队列也不会丢失。
消息持久化
c、消息公平分发
如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。那么解决办法?
channel.basic_qos(prefetch_count=1)
这行代码只需加入到消费者端即可。
持久化+消息分发
d、消息发布/订阅
如果服务器发出一条消息,是否所有的客户端都能收到消息呢?
Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息。
fanout: 所有bind到此exchange的queue都可以接收消息;
direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息;
topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息;
headers: 通过headers 来决定把消息发给哪些queue 这个不常用。
表达式符号说明:#代表一个或多个字符,*代表任何字符
例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
模式一:fanout广播
fanout_publisher
fanout_subscriber
模式二:direct有选择的接收消息
direct_publisher
direct_subscriber
模式三:topic更细致的消息过滤
*.orange.*所有中间包含orange
*.*.rabbit所有以rabbit结尾的
lazy.#所有以lazy开头的
topic_publisher
topic_subscriber
To receive all the logs run:
python receive_logs_topic.py "#"
To receive all logs from the facility "kern":
python receive_logs_topic.py "kern.*"
Or if you want to hear only about "critical" logs:
python receive_logs_topic.py "*.critical"
You can create multiple bindings:
python receive_logs_topic.py "kern.*" "*.critical"
And to emit a log with a routing key "kern.critical" type:
python emit_log_topic.py "kern.critical" "A critical kernel error"
Remote procedure call (RPC)远程过程调用
RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。
.
rpc_server
rpc_client
2 Redis缓存
Redis是一个Key-Value存储系统。和Memcached类似,它支持存储的value类型相对更多。
包括string(字符串)、list(链表)、set(集合)、zset(sorted set --有序集合)和hash(哈希类型)。
这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。在此基础上,redis支持各种不同方式的排序。
与memcached一样,为了保证效率,数据都是缓存在内存中。
区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。
memcached 的使用:
http://www.cnblogs.com/wupeiqi/articles/5132791.html
Windows下安装Redis教程:
https://jingyan.baidu.com/article/0f5fb099045b056d8334ea97.html
(1)连接方式
redis-py提供两个类Redis和StrictRedis用于实现Redis的命令,StrictRedis用于实现大部分官方的命令,并使用官方的语法和命令.
Redis是StrictRedis的子类,用于向后兼容旧版本的redis-py。
redis_connection
(2) 连接池
redis-py使用connection pool来管理对一个redis server的所有连接,避免每次建立、释放连接的开销。默认,每个Redis实例都会维护一个自己的连接池。
可以直接建立一个连接池,然后作为参数Redis,这样就可以实现多个Redis实例共享一个连接池。
redis_connection pool
(3) 操作
1、string操作
redis中的String在在内存中按照一个name对应一个value来存储。如图:
set(name, value, ex=None, px=None, nx=False, xx=False)
set(name, value, ex=None, px=None, nx=False, xx=False)
setnx(name, value)
setnx(name, value)
setex(name, value, time)
setex(name, value, time)
psetex(name, time_ms, value)
psetex(name, time_ms, value)
mset(*args, **kwargs)
mset(*args, **kwargs)
get(name)
get(name)
mget(keys, *args)
mget(keys, *args)
getset(name, value)
getset(name, value)
getrange(key, start, end)
getrange(key, start, end)
setrange(name, offset, value)
setrange(name, offset, value)
setbit(name, offset, value)
setbit(name, offset, value)
getbit(name, offset)
getbit(name, offset)
bitcount(key, start=None, end=None)
bitcount(key, start=None, end=None)
strlen(name)
strlen(name)
incr(self, name, amount=1)
incr(self, name, amount=1)
incrbyfloat(self, name, amount=1.0)
incrbyfloat(self, name, amount=1.0)
decr(self, name, amount=1)
decr(self, name, amount=1)
append(key, value)
append(key, value)
2、hash操作
hash表现形式上有些像pyhton中的dict,可以存储一组关联性较强的数据 , redis中Hash在内存中的存储格式如下图:
hset(name, key, value)
hset(name, key, value)
hmset(name, mapping)
hmset(name, mapping)
hget(name,key)
hget(name,key)
hmget(name, keys, *args)
hmget(name, keys, *args)
hgetall(name)
hgetall(name)
hlen(name)
hlen(name)
hkeys(name)
hkeys(name)
hvals(name)
hvals(name)
hexists(name, key)
hexists(name, key)
hdel(name,*keys)
hdel(name,*keys)
hincrby(name, key, amount=1)
hincrby(name, key, amount=1)
hincrbyfloat(name, key, amount=1.0)
hincrbyfloat(name, key, amount=1.0)
hscan(name, cursor=0, match=None, count=None)
Start a full hash scan with:
HSCAN myhash 0
Start a hash scan with fields matching a pattern with:
HSCAN myhash 0 MATCH order_*
Start a hash scan with fields matching a pattern and forcing the scan command to do more scanning with:
HSCAN myhash 0 MATCH order_* COUNT 1000
hscan(name, cursor=0, match=None, count=None)
hscan_iter(name, match=None, count=None)
hscan_iter(name, match=None, count=None)
3、list操作
List操作,redis中的List在在内存中按照一个name对应一个List来存储。如图:
lpush(name,values)
lpush(name,values)
lpushx(name,value)
lpushx(name,value)
llen(name)
llen(name)
linsert(name, where, refvalue, value))
linsert(name, where, refvalue, value))
r.lset(name, index, value)
r.lset(name, index, value)
r.lrem(name, value, num)
r.lrem(name, value, num)
lpop(name)
lpop(name)
lindex(name, index)
lindex(name, index)
lrange(name, start, end)
lrange(name, start, end)
ltrim(name, start, end)
ltrim(name, start, end)
rpoplpush(src, dst)
rpoplpush(src, dst)
blpop(keys, timeout)
blpop(keys, timeout)
brpoplpush(src, dst, timeout=0)
brpoplpush(src, dst, timeout=0)
3、set集合操作
Set操作,Set集合就是不允许重复的列表
sadd(name,values)
sadd(name,values)
scard(name)
scard(name)
sdiff(keys, *args)
sdiff(keys, *args)
sdiffstore(dest, keys, *args)
sdiffstore(dest, keys, *args)
sinter(keys, *args)
sinter(keys, *args)
sinterstore(dest, keys, *args)
sinterstore(dest, keys, *args)
sismember(name, value)
sismember(name, value)
smembers(name)
smembers(name)
smove(src, dst, value)
smove(src, dst, value)
spop(name)
spop(name)
srandmember(name, numbers)
srandmember(name, numbers)
srem(name, values)
srem(name, values)
sunion(keys, *args)
sunion(keys, *args)
sunionstore(dest,keys, *args)
sunionstore(dest,keys, *args)
sscan(name, cursor=0, match=None, count=None)
sscan_iter(name, match=None, count=None)
sscan(name, cursor=0, match=None, count=None)
有序集合,在集合的基础上,为每元素排序;元素的排序需要根据另外一个值来进行比较
所以,对于有序集合,每一个元素有两个值,即:值和分数,分数专门用来做排序。
zadd(name, *args, **kwargs)
zadd(name, *args, **kwargs)
zcard(name)
zcard(name)
zcount(name, min, max)
zcount(name, min, max)
zincrby(name, value, amount)
zincrby(name, value, amount)
r.zrange( name, start, end, desc=False, withscores=False, score_cast_func=float)
r.zrange
zrank(name, value)
zrank(name, value)
zrem(name, values)
zrem(name, values)
zremrangebyrank(name, min, max)
zremrangebyrank(name, min, max)
zremrangebyscore(name, min, max)
zremrangebyscore(name, min, max)
zscore(name, value)
zscore(name, value)
zinterstore(dest, keys, aggregate=None)
zinterstore(dest, keys, aggregate=None)
zunionstore(dest, keys, aggregate=None)
zunionstore(dest, keys, aggregate=None)
zscan(name, cursor=0, match=None, count=None, score_cast_func=float)
zscan_iter(name, match=None, count=None,score_cast_func=float)
zscan
4、常用其他操作
delete(*names)
delete(*names)
exists(name)
exists(name)
keys(pattern='*')
keys(pattern='*')
expire(name ,time)
expire(name ,time)
rename(src, dst)
rename(src, dst)
move(name, db))
move(name, db))
randomkey()
randomkey()
type(name)
type(name)
scan(cursor=0, match=None, count=None)
scan_iter(match=None, count=None)
scan(cursor=0, match=None, count=None)
(4)发布订阅
发布者
订阅者
3 小结
路漫漫其修远兮,吾将上下而求索~!
真的是这样,有好多事情,你不经历过,真的是不知道有多niux!
间歇性踌躇满志,持续性混吃等死。
4 练习
题目:rpc命令端
需求:
可以异步的执行多个命令
对多台机器
作业是真的不会做。有机会有勇气可以点进来看看:
https://www.cnblogs.com/catepython/p/9051490.html
我是尾巴~
这次推荐:如何用几个简单的命令改善你的Linux安全
https://mp.weixin.qq.com/s/aSabNfaCla4Y4WZSTjLn3Q
虽不才,才要坚持。https://www.cnblogs.com/zhangkanghui/p/9609589.html