Skip to content

Commit

Permalink
server: Rework config peer resolving and connection handling
Browse files Browse the repository at this point in the history
  • Loading branch information
xdustinface committed Apr 11, 2023
1 parent c1e2068 commit cb34771
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 82 deletions.
29 changes: 0 additions & 29 deletions chia/server/reconnect_task.py

This file was deleted.

9 changes: 2 additions & 7 deletions chia/server/start_farmer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@
from chia.rpc.farmer_rpc_api import FarmerRpcApi
from chia.server.outbound_message import NodeType
from chia.server.start_service import RpcInfo, Service, async_run
from chia.types.peer_info import PeerInfo
from chia.types.peer_info import UnresolvedPeerInfo
from chia.util.chia_logging import initialize_service_logging
from chia.util.config import load_config, load_config_cli
from chia.util.default_root import DEFAULT_ROOT_PATH
from chia.util.keychain import Keychain
from chia.util.network import get_host_addr

# See: https://bugs.python.org/issue29288
"".encode("idna")
Expand All @@ -34,12 +33,8 @@ def create_farmer_service(
) -> Service[Farmer]:
service_config = config[SERVICE_NAME]

connect_peers = []
fnp = service_config.get("full_node_peer")
if fnp is not None:
connect_peers.append(
PeerInfo(str(get_host_addr(fnp["host"], prefer_ipv6=config.get("prefer_ipv6", False))), fnp["port"])
)
connect_peers = set() if fnp is None else {UnresolvedPeerInfo(fnp["host"], fnp["port"])}

overrides = service_config["network_overrides"]["constants"][service_config["selected_network"]]
updated_constants = consensus_constants.replace_str_to_bytes(**overrides)
Expand Down
11 changes: 4 additions & 7 deletions chia/server/start_harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@
from chia.rpc.harvester_rpc_api import HarvesterRpcApi
from chia.server.outbound_message import NodeType
from chia.server.start_service import RpcInfo, Service, async_run
from chia.types.peer_info import PeerInfo
from chia.types.peer_info import UnresolvedPeerInfo
from chia.util.chia_logging import initialize_service_logging
from chia.util.config import load_config, load_config_cli
from chia.util.default_root import DEFAULT_ROOT_PATH
from chia.util.network import get_host_addr

# See: https://bugs.python.org/issue29288
"".encode("idna")
Expand All @@ -27,7 +26,7 @@ def create_harvester_service(
root_path: pathlib.Path,
config: Dict[str, Any],
consensus_constants: ConsensusConstants,
farmer_peer: Optional[PeerInfo],
farmer_peer: Optional[UnresolvedPeerInfo],
connect_to_daemon: bool = True,
) -> Service[Harvester]:
service_config = config[SERVICE_NAME]
Expand All @@ -49,7 +48,7 @@ def create_harvester_service(
node_type=NodeType.HARVESTER,
advertised_port=service_config["port"],
service_name=SERVICE_NAME,
connect_peers=[] if farmer_peer is None else [farmer_peer],
connect_peers=set() if farmer_peer is None else {farmer_peer},
network_id=network_id,
rpc_info=rpc_info,
connect_to_daemon=connect_to_daemon,
Expand All @@ -63,9 +62,7 @@ async def async_main() -> int:
service_config = load_config_cli(DEFAULT_ROOT_PATH, "config.yaml", SERVICE_NAME)
config[SERVICE_NAME] = service_config
initialize_service_logging(service_name=SERVICE_NAME, config=config)
farmer_peer = PeerInfo(
str(get_host_addr(service_config["farmer_peer"]["host"])), service_config["farmer_peer"]["port"]
)
farmer_peer = UnresolvedPeerInfo(service_config["farmer_peer"]["host"], service_config["farmer_peer"]["port"])
service = create_harvester_service(DEFAULT_ROOT_PATH, config, DEFAULT_CONSTANTS, farmer_peer)
await service.setup_process_global_state()
await service.run()
Expand Down
62 changes: 46 additions & 16 deletions chia/server/start_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import sys
from pathlib import Path
from types import FrameType
from typing import Any, Awaitable, Callable, Coroutine, Dict, Generic, List, Optional, Tuple, Type, TypeVar
from typing import Any, Awaitable, Callable, Coroutine, Dict, Generic, List, Optional, Set, Tuple, Type, TypeVar

from chia.cmds.init_funcs import chia_full_version_str
from chia.daemon.server import service_launch_lock_path
Expand All @@ -20,13 +20,13 @@
from chia.server.ssl_context import chia_ssl_ca_paths, private_ssl_ca_paths
from chia.server.upnp import UPnP
from chia.server.ws_connection import WSChiaConnection
from chia.types.peer_info import PeerInfo
from chia.types.peer_info import PeerInfo, UnresolvedPeerInfo
from chia.util.ints import uint16
from chia.util.lock import Lockfile, LockfileError
from chia.util.network import get_host_addr
from chia.util.setproctitle import setproctitle

from ..protocols.shared_protocol import capabilities
from .reconnect_task import start_reconnect_task

# this is used to detect whether we are running in the main process or not, in
# signal handlers. We need to ignore signals in the sub processes.
Expand Down Expand Up @@ -55,7 +55,7 @@ def __init__(
*,
config: Dict[str, Any],
upnp_ports: List[int] = [],
connect_peers: List[PeerInfo] = [],
connect_peers: Set[UnresolvedPeerInfo] = set(),
on_connect_callback: Optional[Callable[[WSChiaConnection], Awaitable[None]]] = None,
rpc_info: Optional[RpcInfo] = None,
connect_to_daemon: bool = True,
Expand All @@ -77,6 +77,7 @@ def __init__(
self._network_id: str = network_id
self.max_request_body_size = max_request_body_size
self._listen = listen
self.reconnect_retry_seconds: int = 3

self._log = logging.getLogger(service_name)
self._log.info(f"Starting service {self._service_name} ...")
Expand Down Expand Up @@ -129,9 +130,44 @@ def __init__(

self._on_connect_callback = on_connect_callback
self._advertised_port = advertised_port
self._reconnect_tasks: Dict[PeerInfo, Optional[asyncio.Task[None]]] = {peer: None for peer in connect_peers}
self._connect_peers = connect_peers
self._connect_peers_task: Optional[asyncio.Task[None]] = None
self.upnp: UPnP = UPnP()

async def _connect_peers_task_handler(self) -> None:
resolved_peers: Dict[UnresolvedPeerInfo, PeerInfo] = {}
prefer_ipv6 = self.config.get("prefer_ipv6", False)
while True:
for unresolved in self._connect_peers:
resolved = resolved_peers.get(unresolved)
if resolved is None:
try:
resolved = PeerInfo(get_host_addr(unresolved.host, prefer_ipv6=prefer_ipv6), unresolved.port)
except Exception as e:
self._log.warning(f"Failed to resolve {unresolved.host}: {e}")
continue
self._log.info(f"Add resolved {resolved}")
resolved_peers[unresolved] = resolved

if any(connection.get_peer_info() == resolved for connection in self._server.all_connections.values()):
continue

if not await self._server.start_client(resolved, None):
self._log.info(f"Failed to connect to {resolved}")
# Re-resolve to make sure the IP didn't change, this helps for example to keep dyndns hostnames
# up to date.
try:
resolved_new = PeerInfo(
get_host_addr(unresolved.host, prefer_ipv6=prefer_ipv6), unresolved.port
)
except Exception as e:
self._log.warning(f"Failed to resolve after connection failure {unresolved.host}: {e}")
continue
if resolved_new != resolved:
self._log.info(f"Host {unresolved.host} changed from {resolved} to {resolved_new}")
resolved_peers[unresolved] = resolved_new
await asyncio.sleep(self.reconnect_retry_seconds)

async def start(self) -> None:
# TODO: move those parameters to `__init__`
if self._did_start:
Expand All @@ -158,8 +194,7 @@ async def start(self) -> None:
)
self._advertised_port = self._server.get_port()

for peer in self._reconnect_tasks.keys():
self.add_peer(peer)
self._connect_peers_task = asyncio.create_task(self._connect_peers_task_handler())

self._log.info(
f"Started {self._service_name} service on network_id: {self._network_id} "
Expand Down Expand Up @@ -190,11 +225,8 @@ async def run(self) -> None:
self._log.error(f"{self._service_name}: already running")
raise ValueError(f"{self._service_name}: already running") from e

def add_peer(self, peer: PeerInfo) -> None:
if self._reconnect_tasks.get(peer) is not None:
raise ServiceException(f"Peer {peer} already added")

self._reconnect_tasks[peer] = start_reconnect_task(self._server, peer, self._log)
def add_peer(self, peer: UnresolvedPeerInfo) -> None:
self._connect_peers.add(peer)

async def setup_process_global_state(self) -> None:
# Being async forces this to be run from within an active event loop as is
Expand Down Expand Up @@ -242,10 +274,8 @@ def stop(self) -> None:
self.upnp.release(port)

self._log.info("Cancelling reconnect task")
for task in self._reconnect_tasks.values():
if task is not None:
task.cancel()
self._reconnect_tasks.clear()
if self._connect_peers_task is not None:
self._connect_peers_task.cancel()
self._log.info("Closing connections")
self._server.close_all()
self._node._close()
Expand Down
9 changes: 4 additions & 5 deletions chia/server/start_timelord.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@
from chia.server.start_service import RpcInfo, Service, async_run
from chia.timelord.timelord import Timelord
from chia.timelord.timelord_api import TimelordAPI
from chia.types.peer_info import PeerInfo
from chia.types.peer_info import UnresolvedPeerInfo
from chia.util.chia_logging import initialize_service_logging
from chia.util.config import load_config, load_config_cli
from chia.util.default_root import DEFAULT_ROOT_PATH
from chia.util.network import get_host_addr

# See: https://bugs.python.org/issue29288
"".encode("idna")
Expand All @@ -35,9 +34,9 @@ def create_timelord_service(
) -> Service[Timelord]:
service_config = config[SERVICE_NAME]

connect_peers = [
PeerInfo(str(get_host_addr(service_config["full_node_peer"]["host"])), service_config["full_node_peer"]["port"])
]
connect_peers = {
UnresolvedPeerInfo(service_config["full_node_peer"]["host"], service_config["full_node_peer"]["port"])
}
overrides = service_config["network_overrides"]["constants"][service_config["selected_network"]]
updated_constants = constants.replace_str_to_bytes(**overrides)

Expand Down
10 changes: 2 additions & 8 deletions chia/server/start_wallet.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@
from chia.rpc.wallet_rpc_api import WalletRpcApi
from chia.server.outbound_message import NodeType
from chia.server.start_service import RpcInfo, Service, async_run
from chia.types.peer_info import PeerInfo
from chia.types.peer_info import UnresolvedPeerInfo
from chia.util.chia_logging import initialize_service_logging
from chia.util.config import load_config, load_config_cli
from chia.util.default_root import DEFAULT_ROOT_PATH
from chia.util.keychain import Keychain
from chia.util.network import get_host_addr
from chia.util.task_timing import maybe_manage_task_instrumentation
from chia.wallet.wallet_node import WalletNode

Expand Down Expand Up @@ -50,13 +49,8 @@ def create_wallet_service(
)
peer_api = WalletNodeAPI(node)
fnp = service_config.get("full_node_peer")
connect_peers = set() if fnp is None else {UnresolvedPeerInfo(fnp["host"], fnp["port"])}

if fnp:
connect_peers = [
PeerInfo(str(get_host_addr(fnp["host"], prefer_ipv6=config.get("prefer_ipv6", False))), fnp["port"])
]
else:
connect_peers = []
network_id = service_config["selected_network"]
rpc_port = service_config.get("rpc_port")
rpc_info: Optional[RpcInfo] = None
Expand Down
6 changes: 3 additions & 3 deletions chia/simulator/setup_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from chia.simulator.time_out_assert import time_out_assert_custom_interval
from chia.timelord.timelord import Timelord
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.peer_info import PeerInfo
from chia.types.peer_info import UnresolvedPeerInfo
from chia.util.hash import std_hash
from chia.util.ints import uint16, uint32
from chia.util.keychain import Keychain
Expand Down Expand Up @@ -302,7 +302,7 @@ async def setup_farmer_multi_harvester(
]
farmer_service = await farmer_node_iterators[0].__anext__()
if start_services:
farmer_peer = PeerInfo(block_tools.config["self_hostname"], uint16(farmer_service._server._port))
farmer_peer = UnresolvedPeerInfo(block_tools.config["self_hostname"], uint16(farmer_service._server._port))
else:
farmer_peer = None
harvester_node_iterators = []
Expand Down Expand Up @@ -432,7 +432,7 @@ async def setup_full_system_inner(
harvester_iter = setup_harvester(
shared_b_tools,
shared_b_tools.root_path / "harvester",
PeerInfo(shared_b_tools.config["self_hostname"], farmer_service._server.get_port()),
UnresolvedPeerInfo(shared_b_tools.config["self_hostname"], farmer_service._server.get_port()),
consensus_constants,
)
vdf1_port = uint16(find_available_listen_port("vdf1"))
Expand Down
4 changes: 2 additions & 2 deletions chia/simulator/setup_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from chia.simulator.start_simulator import create_full_node_simulator_service
from chia.timelord.timelord import Timelord
from chia.timelord.timelord_launcher import kill_processes, spawn_process
from chia.types.peer_info import PeerInfo
from chia.types.peer_info import UnresolvedPeerInfo
from chia.util.bech32m import encode_puzzle_hash
from chia.util.config import config_path_for_filename, lock_and_load_config, save_config
from chia.util.ints import uint16
Expand Down Expand Up @@ -242,7 +242,7 @@ async def setup_wallet_node(
async def setup_harvester(
b_tools: BlockTools,
root_path: Path,
farmer_peer: Optional[PeerInfo],
farmer_peer: Optional[UnresolvedPeerInfo],
consensus_constants: ConsensusConstants,
start_service: bool = True,
) -> AsyncGenerator[Service[Harvester], None]:
Expand Down
6 changes: 6 additions & 0 deletions chia/types/peer_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@
from chia.util.streamable import Streamable, streamable


@dataclass(frozen=True)
class UnresolvedPeerInfo:
host: str
port: uint16


# TODO, Replace unsafe_hash with frozen and drop the __init__ as soon as all PeerInfo call sites pass in an IPAddress.
@dataclass(unsafe_hash=True)
class PeerInfo:
Expand Down
10 changes: 7 additions & 3 deletions tests/farmer_harvester/test_farmer_harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from chia.farmer.farmer import Farmer
from chia.simulator.time_out_assert import time_out_assert
from chia.types.peer_info import PeerInfo
from chia.types.peer_info import UnresolvedPeerInfo
from chia.util.keychain import generate_mnemonic


Expand All @@ -18,6 +18,7 @@ def farmer_is_started(farmer):
async def test_start_with_empty_keychain(farmer_one_harvester_not_started):
_, farmer_service, bt = farmer_one_harvester_not_started
farmer: Farmer = farmer_service._node
farmer_service.reconnect_retry_seconds = 1
# First remove all keys from the keychain
bt.local_keychain.delete_all_keys()
# Make sure the farmer service is not initialized yet
Expand All @@ -42,6 +43,9 @@ async def test_harvester_handshake(farmer_one_harvester_not_started):
harvester = harvester_service._node
farmer = farmer_service._node

farmer_service.reconnect_retry_seconds = 1
harvester_service.reconnect_retry_seconds = 1

def farmer_has_connections():
return len(farmer.server.get_connections()) > 0

Expand All @@ -60,7 +64,7 @@ async def handshake_done() -> bool:
# Start both services and wait a bit
await farmer_service.start()
await harvester_service.start()
harvester_service.add_peer(PeerInfo(str(farmer_service.self_hostname), farmer_service._server.get_port()))
harvester_service.add_peer(UnresolvedPeerInfo(str(farmer_service.self_hostname), farmer_service._server.get_port()))
# Handshake task should be started but the handshake should not be done
await time_out_assert(5, handshake_task_active, True)
assert not await handshake_done()
Expand All @@ -76,7 +80,7 @@ async def handshake_done() -> bool:
assert len(harvester.plot_manager.farmer_public_keys) == 0
# Re-start the harvester and make sure the handshake task gets started but the handshake still doesn't go through
await harvester_service.start()
harvester_service.add_peer(PeerInfo(str(farmer_service.self_hostname), farmer_service._server.get_port()))
harvester_service.add_peer(UnresolvedPeerInfo(str(farmer_service.self_hostname), farmer_service._server.get_port()))
await time_out_assert(5, handshake_task_active, True)
assert not await handshake_done()
# Stop the farmer and make sure the handshake_task doesn't block the shutdown
Expand Down
1 change: 1 addition & 0 deletions tests/plot_sync/test_plot_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ def new_test_dir(name: str, plot_list: List[Path]) -> Directory:
file.write(bytes(100))

harvester_services, farmer_service, bt = farmer_two_harvester_not_started
farmer_service.reconnect_retry_seconds = 1
farmer: Farmer = farmer_service._node
await farmer_service.start()
harvesters: List[Harvester] = [
Expand Down
Loading

0 comments on commit cb34771

Please sign in to comment.