上图中实线箭头表示强引用(复合),虚线箭头表示弱引用(聚合)
connect to replica set
PyMongo的文档给出了如何连接到复制集:指定复制集的名字,以及一个或多个该复制集内的节点。如:
MongoClient('localhost', replicaset='foo')
上述操作是non-blocking,立即返回,通过后台线程去连接指定节点,PyMongo连接到节点后,会从mongod节点获取到复制集内其他节点的信息,然后再连接到复制集内的其他节点。
from time import sleep
c = MongoClient('localhost', replicaset='foo'); print(c.nodes); sleep(0.1); print(c.nodes)
frozenset([])
frozenset([(u'localhost', 27019), (u'localhost', 27017), (u'localhost', 27018)])可以看到,刚初始化MongoClient实例时,并没有连接到任何节点(c.nodes)为空;过了一段时间,再查看,那么会发现已经连上了复制集内的三个节点。
那么问题来了,创建MongoClient后,尚未连接到复制集节点之前,能否立即操作数据库?
If you need to do any operation with a MongoClient, such as a find() or an insert_one(), the client waits to discover a suitable member before it attempts the operation.
通过后续的代码分析可以看到,会通过一个条件变量(threading.Condition)去协调。
PyMongo Monitor
上面提到,初始化MongoClient对象的时候,会通过指定的mognod节点去发现复制集内的其他节点,这个就是通过
monitor.Monitor来实现的。从上面的类图可以看到,每一个server(与一个mongod节点对应)都有一个monitor。Monitor的作用在于:- Health: detect when a member goes down or comes up, or if a different member becomes primary
- Configuration: detect when members are added or removed, and detect changes in members’ tags
- Latency: track a moving average of each member’s ping time
Monitor会启动一个后台线程
PeriodExecutor,定时(默认10s)通过socket连接Pool给对应的mongod节点发送 ismaster 消息。核心代码(略作调整)如下def _run(self): self._server_description = self._check_with_retry() self._topology.on_change(self._server_description) def _check_with_retry(self): address = self._server_description.address response, round_trip_time = self._check_with_socket( sock_info, metadata=metadata) self._avg_round_trip_time.add_sample(round_trip_time) # 更新rtt sd = ServerDescription( address=address, ismaster=response, round_trip_time=self._avg_round_trip_time.get()) return sd def _check_with_socket(self, sock_info, metadata=None): """Return (IsMaster, round_trip_time). Can raise ConnectionFailure or OperationFailure. """ cmd = SON([('ismaster', 1)]) if metadata is not None: cmd['client'] = metadata if self._server_description.max_wire_version >= 6: cluster_time = self._topology.max_cluster_time() if cluster_time is not None: cmd['$clusterTime'] = cluster_time start = _time() request_id, msg, max_doc_size = message.query( 0, 'admin.$cmd', 0, -1, cmd, None, DEFAULT_CODEC_OPTIONS) # TODO: use sock_info.command() sock_info.send_message(msg, max_doc_size) reply = sock_info.receive_message(request_id) return IsMaster(reply.command_response()), _time() - start类
IsMaster是对ismaster command reponse的封装,比较核心的属性包括:- replica_set_name:从mongod节点看来,复制集的名字
- primary:从mongod节点看来,谁是Priamry
- all_hosts: 从mongod节点看来,复制集中的所有节点
- last_write_date: mongod节点最后写入数据的时间,用来判断secondary节点的staleness
- set_version:config version
-
关键字:
青岛软件培训
可能你正在寻找一家靠谱的IT培训机构,
渴望突破职业瓶颈,
找一份得体的工作。
恰巧万码学堂正在寻找像你这样不甘平凡的追光者!
我们拒绝纸上谈兵,直接参与真实开发流程!
现在行动,未来可期
立即拨打0532-85025005,预约免费职业规划咨询
前20名咨询者赠送《2025高薪技术岗位白皮书》!
你不是在报名课程,而是在投资五年后的自己!
