类:Mongo::Cluster::SdamFlow Private
Overview
此类是私有 API 的一部分。 应尽可能避免使用此类,因为它将来可能会被删除或更改。
处理服务器描述更改事件的 SDAM 流。
更新服务器描述、拓扑结构描述并发布 SDAM 事件。
SdamFlow 旨在为需要处理的每个服务器描述更改事件实例化一次。
实例属性摘要折叠
- # cluster ⇒ 对象 只读 private
- #original_desc ⇒ 对象 只读 private
- # previous_desc ⇒ 对象 只读 private
-
#拓扑结构⇒ 对象
只读
private
存储在该属性中的拓扑结构可以在单个 sdam 流中多次更改(例如 未知 -> RS 无主节点 -> RS 有主节点)。
- #updated_desc ⇒ Object 只读 private
实例方法摘要折叠
-
# add_servers_from_desc (updated_desc) ⇒ Array<Server>
private
将给定描述中引用的尚未在集群中的所有服务器(应该来自良好的主节点)添加到集群中。
- #等待? ⇒ 布尔 private
-
#成为_未知? ⇒ 布尔
private
返回此流程处理其描述的服务器以前是否为未知状态,现在是否为未知状态。
-
# check_if_has_primary ⇒ 对象
private
检查集群是否有主节点,如果没有,则将拓扑结构转换为 ReplicaSetNoPrimary。
-
# commit_changes ⇒ 对象
private
发布服务器描述更改事件,更新集群上的拓扑结构结构,并根据 SDAM 流处理期间执行的操作按需发布拓扑结构更改事件。
- #disconnect_servers ⇒ 对象 private
-
# do_remove (address_str) ⇒ 对象
private
从拓扑结构中删除指定的服务器,并在拓扑结构最终出现空服务器列表时发出警告。
-
#initialize (cluster, previous_desc, updated_desc, awaited: false) ⇒ SdamFlow
构造函数
private
SdamFlow 的新实例。
- # publish_description_change_event ⇒ 对象 private
-
# remove ⇒ 对象
private
从拓扑结构中删除正在处理其描述的服务器。
-
# remove_servers_not_in_desc (updated_desc) ⇒ 对象
private
从拓扑结构中删除给定服务器描述中不存在的服务器(该描述应该来自良好的主节点)。
- # server_description_changed ⇒ 对象 private
-
# stale_primary? ⇒ 布尔
private
updated_desc 是否适用于过时的主节点。
-
# start_pool_if_data_ Bearing ⇒ 对象
private
如果正在处理的服务器被识别为承载数据,则创建服务器的连接池,以便开始填充。
-
#拓扑结构_有效地_已更改? ⇒ 布尔
private
返回拓扑结构是否因运行SDAM 流而发生有意义的更改。
-
# update_rs_from_primary ⇒ 对象
private
使用主节点服务器描述中的信息更新拓扑,该拓扑必须是 ReplicaSetWithPrimary。
-
#update_rs_with_primary_from_member ⇒ Object
private
从非主节点更新 ReplicaSetWithPrimary 拓扑。
-
# update_rs_Without_primary ⇒ 对象
private
从非主节点更新 ReplicaSetNoPrimary 拓扑。
-
# update_server_descriptions ⇒ 对象
private
更新地址与 updated_desc 地址匹配的所有服务器上的描述。
-
# update_unknown_with_standalone ⇒ 对象
private
发现独立服务器时,从未知拓扑类型转换为单一拓扑类型。
- # verify_invariants ⇒ 对象 private
构造函数详情
#initialize (集群, previous_desc, updated_desc, awaited: false) ⇒ SdamFlow
此方法是私有 API 的一部分。 您应尽可能避免使用此方法,因为它将来可能会被删除或更改。
返回 SdamFlow 的新实例。
31 32 33 34 35 36 37 38 |
# File 'lib/ Mongo/ 集群/sdam_flow.rb', line 31 def 初始化(集群, previous_desc, updated_desc, 等待: false) @cluster = 集群 @topology = 集群.拓扑结构 @original_desc = @previous_desc = previous_desc @updated_desc = updated_desc @servers_to_disconnect = [] @awaited = !!等待 end |
实例属性详细信息
# cluster ⇒对象(只读)
此方法是私有 API 的一部分。 您应尽可能避免使用此方法,因为它将来可能会被删除或更改。
40 41 42 |
# File 'lib/ Mongo/ 集群/sdam_flow.rb', line 40 def 集群 @cluster end |
#original_desc ⇒对象(只读)
此方法是私有 API 的一部分。 您应尽可能避免使用此方法,因为它将来可能会被删除或更改。
56 57 58 |
# File 'lib/ Mongo/ 集群/sdam_flow.rb', line 56 def ORIGIN_DESC @original_desc end |
# previous_desc ⇒对象(只读)
此方法是私有 API 的一部分。 您应尽可能避免使用此方法,因为它将来可能会被删除或更改。
54 55 56 |
# File 'lib/ Mongo/ 集群/sdam_flow.rb', line 54 def previous_desc @previous_desc end |
#拓扑结构⇒对象(只读)
此方法是私有 API 的一部分。 您应尽可能避免使用此方法,因为它将来可能会被删除或更改。
存储在该属性中的拓扑结构可以在单个 sdam 流中多次更改(例如 未知 -> RS 无主节点 -> RS 有主节点)。 拓扑更改事件在流处理结束时发送,因此上面的示例仅向应用程序发布具有主事件的未知 RS。
52 53 54 |
# File 'lib/ Mongo/ 集群/sdam_flow.rb', line 52 def 拓扑结构 @topology end |
# updated_desc ⇒对象(只读)
此方法是私有 API 的一部分。 您应尽可能避免使用此方法,因为它将来可能会被删除或更改。
55 56 57 |
# File 'lib/ Mongo/ 集群/sdam_flow.rb', line 55 def updated_desc @updated_desc end |
实例方法详细信息
# add_servers_from_desc (updated_desc) ⇒ Array< MongoDB Server >
此方法是私有 API 的一部分。 您应尽可能避免使用此方法,因为它将来可能会被删除或更改。
添加的服务器不受监控。 必须稍后启动监控
将给定描述中引用的尚未在集群中的所有服务器(应该来自良好的主节点)添加到集群中。
分别。
387 388 389 390 391 392 393 394 395 396 397 398 399 400 |
# File 'lib/ Mongo/ 集群/sdam_flow.rb', line 387 def add_servers_from_desc(updated_desc) completed_servers = [] %w(主机 被动语态 仲裁节点).每 do |m| updated_desc.发送(m).每 do |Address_str| if server = 集群.添加(Address_str, 监控: false) completed_servers << server end end end verify_invariants completed_servers end |
#等待? ⇒布尔
此方法是私有 API 的一部分。 您应尽可能避免使用此方法,因为它将来可能会被删除或更改。
58 59 60 |
# File 'lib/ Mongo/ 集群/sdam_flow.rb', line 58 def 等待? @awaited end |
#成为_未知? ⇒布尔
此方法是私有 API 的一部分。 您应尽可能避免使用此方法,因为它将来可能会被删除或更改。
返回此流程处理其描述的服务器以前是否为未知状态,现在是否为未知状态。 特别是用于决定是否清除服务器的连接池。
631 632 633 |
# File 'lib/ Mongo/ 集群/sdam_flow.rb', line 631 def completed_unknown? updated_desc.未知? && !ORIGIN_DESC.未知? end |
# check_if_has_primary ⇒对象
此方法是私有 API 的一部分。 您应尽可能避免使用此方法,因为它将来可能会被删除或更改。
检查集群是否有主节点,如果没有,则将拓扑结构转换为 ReplicaSetNoPrimary。调用此方法时,拓扑结构必须是 ReplicaSetWithPrimary。
583 584 585 586 587 588 589 590 591 592 593 594 595 596 |
# File 'lib/ Mongo/ 集群/sdam_flow.rb', line 583 def check_if_has_primary 除非 拓扑结构.replica_set? 提高 ArgumentError, " check_if_has_primary 应仅在拓扑结构为副本集时调用,但#{ 拓扑结构 . class . name . sub ( / .*:: / , ' ' ) } " end 主节点 = server_list.检测 do |server| # 集名称错误的主节点不是主节点 server.主节点? && server.描述.replica_set_name == 拓扑结构.replica_set_name end 除非 主节点 @topology = 拓扑结构::ReplicaSetNoPrimary.new( 拓扑结构., 拓扑结构.监控, self) end end |
# commit_changes ⇒对象
此方法是私有 API 的一部分。 您应尽可能避免使用此方法,因为它将来可能会被删除或更改。
发布服务器描述更改事件,更新集群上的拓扑结构结构,并根据 SDAM 流处理期间执行的操作按需发布拓扑结构更改事件。
507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 |
# File 'lib/ Mongo/ 集群/sdam_flow.rb', line 507 def commit_changes # 应用程序可见的事件顺序应如下所示: # # 1 . 我们正在处理的服务器的描述更改; # 2 . 拓扑结构更改(如有); # 3 . 其他服务器的描述更改(如有)。 # # 这里棘手的部分是服务器描述更改是 # 并非全部一起处理。 publish_description_change_event start_pool_if_data_ Bearing topology_changed_event_published = false if !拓扑结构.等于?(集群.拓扑结构) || @need_topology_changed_event # 我们即将发布拓扑更改事件。 # 重新创建拓扑结构实例以获取其服务器描述 # 是最新的。 @topology = 拓扑结构.class.new(拓扑结构., 拓扑结构.监控, 集群) # 这会发送 SDAM 事件 集群.update_topology(拓扑结构) topology_changed_event_published = true @need_topology_changed_event = false end # 如果服务器描述发生更改,则发送拓扑描述更改事件 # 必须与上一个和下一个拓扑一起发布 # 相同类型,除非我们已经发布了拓扑结构更改事件 if topology_changed_event_published return end if updated_desc.未知? && previous_desc.未知? return end if updated_desc.object_id == previous_desc.object_id return end 除非 拓扑结构_有效地_已更改? return end # 如果我们在这里,服务器描述发生了变化 # 在我们的拓扑中,但拓扑类没有改变。 # 发布拓扑更改事件并重新创建拓扑 # 将新的服务器描述列表放入其中。 @topology = 拓扑结构.class.new(拓扑结构., 拓扑结构.监控, 集群) # 这会发送 SDAM 事件 集群.update_topology(拓扑结构) end |
#disconnect_servers ⇒对象
此方法是私有 API 的一部分。 您应尽可能避免使用此方法,因为它将来可能会被删除或更改。
559 560 561 562 563 564 565 566 |
# File 'lib/ Mongo/ 集群/sdam_flow.rb', line 559 def disconnect_servers while server = @servers_to_disconnect.转变 if server.已连接? # 不要发布服务器关闭事件,因为这已经完成 server.断开连接! end end end |
# do_remove (address_str) ⇒对象
此方法是私有 API 的一部分。 您应尽可能避免使用此方法,因为它将来可能会被删除或更改。
从拓扑结构中删除指定的服务器,并在拓扑结构最终出现空服务器列表时发出警告
434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 |
# File 'lib/ Mongo/ 集群/sdam_flow.rb', line 434 def do_remove(Address_str) 服务器 = 集群.删除(Address_str, 断开连接: false) 服务器.每 do |server| # 清除描述,将服务器标记为未知。 server.clear_description # 我们需要在此处发布服务器关闭事件,但无法关闭 # 服务器,因为它可能是拥有监视器的服务器 # 此流程当前正在执行哪个线程,在这种情况下,关闭 # 服务器可以终止线程并让 SDAM 处理 # incomplete. 因此,我们必须从集群中删除服务器, # 发布事件,但不要在服务器上调用断开连接,直到 # 所有处理完成时的最后时刻。 publish_sdam_event( mongo::监控::SERVER_CLOSED, mongo::监控::事件::ServerClosed.new(server.地址, 集群.拓扑结构) ) end @servers_to_disconnect += 服务器 if server_list.空? log_warn( “拓扑现在没有服务器 - 这可能是集群和/或应用程序配置错误” ) end end |
# publish_description_change_event ⇒对象
此方法是私有 API 的一部分。 您应尽可能避免使用此方法,因为它将来可能会被删除或更改。
460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 |
# File 'lib/ Mongo/ 集群/sdam_flow.rb', line 460 def publish_description_change_event # 当服务器描述确实发生变化时,可以调用此方法 # 但在拓扑更新之前。 因此,我们检查 # 服务器描述更改和整体拓扑更改。 当这个 # 在 SDAM 流程结束时调用方法,作为“提交更改”的一部分 # 步骤,将服务器描述更改合并到拓扑结构 # change. 除非 @server_description_changed || 拓扑结构_有效地_已更改? return end # 这里的 updated_desc 可能不是我们收到的描述 # 服务器 - 如果主节点过时,服务器会自行报告 # 作为主节点,但此处的 updated_desc 未知。 # 我们过去不会通知未知 -> 未知的服务器更改。 # 从技术上讲,这些都是有效的状态更改(或至少与 # 其他服务器描述更改时,描述尚未 # 进行了有意义的更改,但事件仍会发布)。 # 当前版本的驾驶员会在“未知”->“未知”时发出通知 # 转换。 # 避免在更新后的描述与以下内容相同时分派事件: # 之前的描述。 这允许在多个 # 次流程中应发布事件的时间,而无需 # 关注是否有任何未发布的更改。 if updated_desc.object_id == previous_desc.object_id return end publish_sdam_event( ::mongo::监控::SERVER_DESCRIPTION_CHANGED, ::mongo::监控::事件::ServerDescriptionChanged.new( updated_desc.地址, 拓扑结构, previous_desc, updated_desc, 等待: 等待?, ) ) @previous_desc = updated_desc @need_topology_changed_event = true end |
# remove ⇒对象
此方法是私有 API 的一部分。 您应尽可能避免使用此方法,因为它将来可能会被删除或更改。
从拓扑结构中删除正在处理其描述的服务器。
427 428 429 430 |
# File 'lib/ Mongo/ 集群/sdam_flow.rb', line 427 def 删除 publish_description_change_event do_remove(updated_desc.地址.to_s) end |
# remove_servers_not_in_desc (updated_desc) ⇒对象
此方法是私有 API 的一部分。 您应尽可能避免使用此方法,因为它将来可能会被删除或更改。
从拓扑结构中删除给定服务器描述中不存在的服务器(该描述应该来自良好的主节点)。
405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 |
# File 'lib/ Mongo/ 集群/sdam_flow.rb', line 405 def remove_servers_not_in_desc(updated_desc) updated_desc_address_strs = %w(主机 被动语态 仲裁节点).map do |m| updated_desc.发送(m) end.展平 server_list.每 do |server| 除非 updated_desc_address_strs.包括?(Address_str = server.地址.to_s) updated_host = updated_desc.地址.to_s if updated_desc.me && updated_desc.me != updated_host updated_host += " (自我标识为#{ updated_desc . me } ) " end log_warn( "正在删除服务器#{ Address_str } ,因为它不在主节点报告的主机中" + " #{ updated_host } 。报告的主机包括: " + updated_desc.主机.连接 (JOIN)(' , ') ) do_remove(Address_str) end end end |
# server_description_changed ⇒对象
此方法是私有 API 的一部分。 您应尽可能避免使用此方法,因为它将来可能会被删除或更改。
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/ Mongo/ 集群/sdam_flow.rb', line 93 def server_description_changed @previous_server_descriptions = server_list.map do |server| [server.地址.to_s, server.描述] end 除非 update_server_descriptions # 所有转换都要求使用其 updated_desc 的服务器 # 处理仍在集群中(即,未被删除) # 处理另一个响应的数量,可能是并发的)。 # 如果 update_server_descriptions 返回 false,则说明没有服务器 # 在我们正在处理的描述的拓扑中,停止。 return end 案例 拓扑结构 when 拓扑结构::LoadBalanced @updated_desc = ::mongo::服务器::说明::负载均衡器.new( updated_desc.地址, ) update_server_descriptions when 拓扑结构::Single if 拓扑结构.replica_set_name if updated_desc.replica_set_name != 拓扑结构.replica_set_name log_warn( " MongoDB Server #{ updated_desc . Address . to_s }有错误的副本集名称 ' #{ updated_desc . replica_set_name } ';预期 ' #{ Topology . replica_set_name } ' " ) @updated_desc = ::mongo::服务器::说明.new( updated_desc.地址, {}, average_round_trip_time: updated_desc.average_round_trip_time, minimum_round_trip_time: updated_desc.minimum_round_trip_time ) update_server_descriptions end end when 拓扑结构::未知 if updated_desc.独立运行? update_unknown_with_standalone elsif updated_desc.mongos? @topology = 拓扑结构::分片.new(拓扑结构., 拓扑结构.监控, self) elsif updated_desc.主节点? @topology = 拓扑结构::ReplicaSetWithPrimary.new( 拓扑结构..合并(merge)(replica_set_name: updated_desc.replica_set_name), 拓扑结构.监控, self) update_rs_from_primary elsif updated_desc.从节点(secondary node from replica set)? || updated_desc.仲裁节点? || updated_desc.其他? @topology = 拓扑结构::ReplicaSetNoPrimary.new( 拓扑结构..合并(merge)(replica_set_name: updated_desc.replica_set_name), 拓扑结构.监控, self) update_rs_Without_primary end when 拓扑结构::分片 除非 updated_desc.未知? || updated_desc.mongos? log_warn( "正在删除服务器#{ updated_desc . Address . to_s } ,因为它的类型错误 ( #{ updated_desc . server_type . to_s . upcase } ) - 预期为 SHARDED " ) 删除 end when 拓扑结构::ReplicaSetWithPrimary if updated_desc.独立运行? || updated_desc.mongos? log_warn( "删除服务器#{ updated_desc . 解决 . to_s } ,因为它的类型错误 ( #{ updated_desc . server_type . to_s . upcase } ) — 预期是副本集成员" ) 删除 check_if_has_primary elsif updated_desc.主节点? update_rs_from_primary elsif updated_desc.从节点(secondary node from replica set)? || updated_desc.仲裁节点? || updated_desc.其他? update_rs_with_primary_from_member else check_if_has_primary end when 拓扑结构::ReplicaSetNoPrimary if updated_desc.独立运行? || updated_desc.mongos? log_warn( "删除服务器#{ updated_desc . 解决 . to_s } ,因为它的类型错误 ( #{ updated_desc . server_type . to_s . upcase } ) — 预期是副本集成员" ) 删除 elsif updated_desc.主节点? # 在这里,我们将拓扑类型更改为具有主节点的 RS,但是 # 在处理 updated_desc 时,我们可能会发现其 RS 名称 # 与我们现有的 RS 名称不匹配。 为此 # 一定不能将 updated_desc 的 RS 名称传递给 # 拓扑构造函数。 # 在处理过程中,我们可能会删除其 updated_desc # 我们正在处理(例如再次出现 RS 名称不匹配的情况), # 在这种情况下,拓扑类型将Go回没有主节点的 RS # 在 check_if_has_primary 步骤中。 @topology = 拓扑结构::ReplicaSetWithPrimary.new( # 不要在此处传递 updated_desc 的 RS 名称 拓扑结构., 拓扑结构.监控, self) update_rs_from_primary elsif updated_desc.从节点(secondary node from replica set)? || updated_desc.仲裁节点? || updated_desc.其他? update_rs_Without_primary end else 提高 ArgumentError, "未知拓扑#{ topology . class } " end verify_invariants commit_changes disconnect_servers end |
# stale_primary? ⇒布尔
此方法是私有 API 的一部分。 您应尽可能避免使用此方法,因为它将来可能会被删除或更改。
updated_desc 是否适用于过时的主节点。
599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 |
# File 'lib/ Mongo/ 集群/sdam_flow.rb', line 599 def stale_primary? if updated_desc.max_wire_version >= 17 if updated_desc.election_id.nil? && !拓扑结构.max_election_id.nil? return true end if updated_desc.election_id && 拓扑结构.max_election_id && updated_desc.election_id < 拓扑结构.max_election_id return true end if updated_desc.election_id == 拓扑结构.max_election_id if updated_desc.set_version.nil? && !拓扑结构.max_set_version.nil? return true end if updated_desc.set_version && 拓扑结构.max_set_version && updated_desc.set_version < 拓扑结构.max_set_version return true end end else if updated_desc.election_id && updated_desc.set_version if 拓扑结构.max_set_version && 拓扑结构.max_election_id && (updated_desc.set_version < 拓扑结构.max_set_version || (updated_desc.set_version == 拓扑结构.max_set_version && updated_desc.election_id < 拓扑结构.max_election_id)) return true end end end false end |
# start_pool_if_data_ Bearing ⇒对象
此方法是私有 API 的一部分。 您应尽可能避免使用此方法,因为它将来可能会被删除或更改。
如果正在处理的服务器被识别为承载数据,则创建服务器的连接池,以便它可以开始填充
570 571 572 573 574 575 576 577 578 |
# File 'lib/ Mongo/ 集群/sdam_flow.rb', line 570 def start_pool_if_data_ Bearing return if !updated_desc.data_ Bearing? server_list.每 do |server| if server.地址 == @updated_desc.地址 server.池 end end end |
#拓扑结构_有效地_已更改? ⇒布尔
此方法是私有 API 的一部分。 您应尽可能避免使用此方法,因为它将来可能会被删除或更改。
返回拓扑结构是否因运行SDAM 流而发生有意义的更改。
该规范通过拓扑类型和每个拓扑中服务器描述的相等性来定义拓扑相等性;这个定义对我们不可用,因为我们的拓扑对象不保存服务器描述,而是“实时”的。 因此,我们必须在 SDAM 流开始时存储服务器描述的完整列表,并将它们与当前的列表进行比较。
644 645 646 647 648 649 650 651 652 653 654 |
# File 'lib/ Mongo/ 集群/sdam_flow.rb', line 644 def 拓扑结构_有效地_已更改? 除非 拓扑结构.等于?(集群.拓扑结构) return true end server_descriptions = server_list.map do |server| [server.地址.to_s, server.描述] end @previous_server_descriptions != server_descriptions end |
# update_rs_from_primary ⇒对象
此方法是私有 API 的一部分。 您应尽可能避免使用此方法,因为它将来可能会被删除或更改。
使用主节点服务器描述中的信息更新拓扑,该拓扑必须是 ReplicaSetWithPrimary。
此方法不会将拓扑结构类型更改为 ReplicaSetWithPrimary — 这需要在调用此方法之前完成。
如果其描述正在处理的主节点 (primary node in the replica set)节点被确定为过时,此方法会将服务器描述和拓扑结构类型更改为未知。
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 |
# File 'lib/ Mongo/ 集群/sdam_flow.rb', line 221 def update_rs_from_primary if 拓扑结构.replica_set_name.nil? @topology = 拓扑结构::ReplicaSetWithPrimary.new( 拓扑结构..合并(merge)(replica_set_name: updated_desc.replica_set_name), 拓扑结构.监控, self) end if 拓扑结构.replica_set_name != updated_desc.replica_set_name log_warn( " 正在删除服务器 #{ updated_desc.address.to_s} , 因为 它 有一个 " + "不正确的副本集名称 ' #{ updated_desc . replica_set_name } '; " + "预期' #{拓扑。replica_set_name } } ' " ) 删除 check_if_has_primary return end if stale_primary? @updated_desc = ::mongo::服务器::说明.new( updated_desc.地址, {}, average_round_trip_time: updated_desc.average_round_trip_time, minimum_round_trip_time: updated_desc.minimum_round_trip_time ) update_server_descriptions check_if_has_primary return end if updated_desc.max_wire_version >= 17 @topology = 拓扑结构::ReplicaSetWithPrimary.new( 拓扑结构..合并(merge)( max_election_id: updated_desc.election_id, max_set_version: updated_desc.set_version ), 拓扑结构.监控, self) else max_election_id = 拓扑结构.new_max_election_id(updated_desc) max_set_version = 拓扑结构.new_max_set_version(updated_desc) if max_election_id != 拓扑结构.max_election_id || max_set_version != 拓扑结构.max_set_version then @topology = 拓扑结构::ReplicaSetWithPrimary.new( 拓扑结构..合并(merge)( max_election_id: max_election_id, max_set_version: max_set_version ), 拓扑结构.监控, self) end end # 此时我们已接受更新的服务器描述 # 和拓扑(均为主节点)。 提交这些更改,以便 # 它们各自的 SDAM 事件在 SDAM 事件之前发布 # 个服务器添加/删除 publish_description_change_event server_list.每 do |server| if server.地址 != updated_desc.地址 if server.主节点? server.update_description( ::mongo::服务器::说明.new( server.地址, {}, average_round_trip_time: server.描述.average_round_trip_time, minimum_round_trip_time: updated_desc.minimum_round_trip_time ) ) end end end 服务器 = add_servers_from_desc(updated_desc) remove_servers_not_in_desc(updated_desc) check_if_has_primary 服务器.每 do |server| server.start_monitoring end end |
#update_rs_with_primary_from_member ⇒ Object
此方法是私有 API 的一部分。 您应尽可能避免使用此方法,因为它将来可能会被删除或更改。
从非主节点更新 ReplicaSetWithPrimary 拓扑。
304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 |
# File 'lib/ Mongo/ 集群/sdam_flow.rb', line 304 def update_rs_with_primary_from_member if 拓扑结构.replica_set_name != updated_desc.replica_set_name log_warn( " 正在删除服务器 #{ updated_desc.address.to_s} , 因为 它 有一个 " + "不正确的副本集名称 ( #{ updated_desc.replica_set_name } } ) ; " + " current set name is #{ Topology . replica_set_name } " ) 删除 check_if_has_primary return end if updated_desc.me_mismatch? log_warn( " 正在删除服务器 #{ updated_desc .address.to_s } } , 因为 它 " + "将自身报告为#{ updated_desc . me } " ) 删除 check_if_has_primary return end have_primary = false server_list.每 do |server| if server.主节点? have_primary = true 中断 end end 除非 have_primary @topology = 拓扑结构::ReplicaSetNoPrimary.new( 拓扑结构., 拓扑结构.监控, self) end end |
# update_rs_Without_primary ⇒对象
此方法是私有 API 的一部分。 您应尽可能避免使用此方法,因为它将来可能会被删除或更改。
从非主节点更新 ReplicaSetNoPrimary 拓扑。
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 |
# File 'lib/ Mongo/ 集群/sdam_flow.rb', line 341 def update_rs_Without_primary if 拓扑结构.replica_set_name.nil? @topology = 拓扑结构::ReplicaSetNoPrimary.new( 拓扑结构..合并(merge)(replica_set_name: updated_desc.replica_set_name), 拓扑结构.监控, self) end if 拓扑结构.replica_set_name != updated_desc.replica_set_name log_warn( " 正在删除服务器 #{ updated_desc.address.to_s} , 因为 它 有一个 " + "不正确的副本集名称 ( #{ updated_desc.replica_set_name } } ) ; " + " current set name is #{ Topology . replica_set_name } " ) 删除 return end publish_description_change_event 服务器 = add_servers_from_desc(updated_desc) commit_changes 服务器.每 do |server| server.start_monitoring end if updated_desc.me_mismatch? log_warn( " 正在删除服务器 #{ updated_desc .address.to_s } } , 因为 它 " + "将自身报告为#{ updated_desc . me } " ) 删除 return end end |
# update_server_descriptions ⇒对象
此方法是私有 API 的一部分。 您应尽可能避免使用此方法,因为它将来可能会被删除或更改。
更新地址与 updated_desc 地址匹配的所有服务器上的描述。
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/ Mongo/ 集群/sdam_flow.rb', line 66 def update_server_descriptions server_list.每 do |server| if server.地址 == updated_desc.地址 # 当新描述中的拓扑版本出现时,必须运行 SDAM 流 # 等于当前拓扑版本,参见 # http://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.md#what-is-the-purpose-of-topologyversion 除非 updated_desc.topology_version_gte?(server.描述) return false end @server_description_changed = server.描述 != updated_desc # 始终更新服务器描述,以便未更新的字段 # 会影响描述相等性比较,但属于 # 描述已更新。 server.update_description(updated_desc) server.update_last_scan # 如果描述之间没有内容差异,我们 # 仍然需要运行sdam flow,但如果该流没有产生任何变化 # 在拓扑结构,我们将省略发送事件。 return true end end false end |
# update_unknown_with_standalone ⇒对象
此方法是私有 API 的一部分。 您应尽可能避免使用此方法,因为它将来可能会被删除或更改。
发现独立服务器时,从未知拓扑类型转换为单一拓扑类型。
200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/ Mongo/ 集群/sdam_flow.rb', line 200 def update_unknown_with_standalone if 种子.长度 == 1 @topology = 拓扑结构::Single.new( 拓扑结构., 拓扑结构.监控, self) else log_warn( " 删除服务器 #{ updated_desc . Address . to_s } ,因为它是独立的,并且我们有多个种子 ( # { SEED . length } ) " ) 删除 end end |
# verify_invariants ⇒对象
此方法是私有 API 的一部分。 您应尽可能避免使用此方法,因为它将来可能会被删除或更改。
656 657 658 659 660 661 662 663 664 |
# File 'lib/ Mongo/ 集群/sdam_flow.rb', line 656 def verify_invariants if mongo::Lint.已启用? if 集群.拓扑结构.单身? if 集群.server_list.长度 > 1 提高 mongo::错误::LintError, "尝试创建具有多个服务器的单个拓扑结构: #{ 集群 .servers_list } } " end end end end |