Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EC / BN client logging and better request timeouts #13

Open
wants to merge 9 commits into
base: dev
Choose a base branch
from
104 changes: 75 additions & 29 deletions beacon/client/http-provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ import (
"context"
"fmt"
"io"
"log/slog"
"net/http"
"net/http/httptrace"
"strconv"
"strings"
"sync"
"time"

"github.com/goccy/go-json"
"github.com/rocket-pool/node-manager-core/beacon"
"github.com/rocket-pool/node-manager-core/log"
)

const (
Expand All @@ -36,24 +39,35 @@ const (
RequestWithdrawalCredentialsChangePath = "/eth/v1/beacon/pool/bls_to_execution_changes"

MaxRequestValidatorsCount = 600

fastGetMethod string = "Fast GET"
slowGetMethod string = "Slow GET"
postMethod string = "POST"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of postMethod there's http.MethodPost.

Instead of multiple clients, use https://pkg.go.dev/net/http#NewRequestWithContext which gives you control over the timeout via a context with a timeout

Pass the resulting Request to client.Do and pick a large timeout for client.Timeout (at least the max of the timeouts specified in context.WithTimeout)

Better yet, use the provided context.Context in each of the getters unless nil, and default to an appropriate-lengthed timeout. It would be an inflexible abstraction if it prescribed a two-tier timeout system instead of just using context.Context

)

type BeaconHttpProvider struct {
providerAddress string
client http.Client
fastClient http.Client
slowClient http.Client
}

func NewBeaconHttpProvider(providerAddress string, timeout time.Duration) *BeaconHttpProvider {
// Creates a new HTTP provider for the Beacon API
// Most calls will use the fast timeout, but queries to validator status will use the slow timeout since they can be very large.
// Set a timeout of 0 to disable it.
func NewBeaconHttpProvider(providerAddress string, fastTimeout time.Duration, slowTimeout time.Duration) *BeaconHttpProvider {
return &BeaconHttpProvider{
providerAddress: providerAddress,
client: http.Client{
Timeout: timeout,
fastClient: http.Client{
Timeout: fastTimeout,
},
slowClient: http.Client{
Timeout: slowTimeout,
},
}
}

func (p *BeaconHttpProvider) Beacon_Attestations(ctx context.Context, blockId string) (AttestationsResponse, bool, error) {
responseBody, status, err := p.getRequest(ctx, fmt.Sprintf(RequestAttestationsPath, blockId))
responseBody, status, err := p.getFastRequest(ctx, fmt.Sprintf(RequestAttestationsPath, blockId))
if err != nil {
return AttestationsResponse{}, false, fmt.Errorf("error getting attestations data for slot %s: %w", blockId, err)
}
Expand All @@ -71,7 +85,7 @@ func (p *BeaconHttpProvider) Beacon_Attestations(ctx context.Context, blockId st
}

func (p *BeaconHttpProvider) Beacon_Block(ctx context.Context, blockId string) (BeaconBlockResponse, bool, error) {
responseBody, status, err := p.getRequest(ctx, fmt.Sprintf(RequestBeaconBlockPath, blockId))
responseBody, status, err := p.getFastRequest(ctx, fmt.Sprintf(RequestBeaconBlockPath, blockId))
if err != nil {
return BeaconBlockResponse{}, false, fmt.Errorf("error getting beacon block data: %w", err)
}
Expand Down Expand Up @@ -109,8 +123,7 @@ func (p *BeaconHttpProvider) Beacon_Committees(ctx context.Context, stateId stri
}

// Committees responses are large, so let the json decoder read it in a buffered fashion
clientWithoutTimeout := http.Client{}
reader, status, err := getRequestReader(ctx, fmt.Sprintf(RequestCommitteePath, stateId)+query, p.providerAddress, clientWithoutTimeout)
reader, status, err := getRequestReader(ctx, slowGetMethod, fmt.Sprintf(RequestCommitteePath, stateId)+query, p.providerAddress, p.slowClient)
if err != nil {
return CommitteesResponse{}, fmt.Errorf("error getting committees: %w", err)
}
Expand Down Expand Up @@ -140,7 +153,7 @@ func (p *BeaconHttpProvider) Beacon_Committees(ctx context.Context, stateId stri
}

func (p *BeaconHttpProvider) Beacon_FinalityCheckpoints(ctx context.Context, stateId string) (FinalityCheckpointsResponse, error) {
responseBody, status, err := p.getRequest(ctx, fmt.Sprintf(RequestFinalityCheckpointsPath, stateId))
responseBody, status, err := p.getFastRequest(ctx, fmt.Sprintf(RequestFinalityCheckpointsPath, stateId))
if err != nil {
return FinalityCheckpointsResponse{}, fmt.Errorf("error getting finality checkpoints: %w", err)
}
Expand All @@ -155,7 +168,7 @@ func (p *BeaconHttpProvider) Beacon_FinalityCheckpoints(ctx context.Context, sta
}

func (p *BeaconHttpProvider) Beacon_Genesis(ctx context.Context) (GenesisResponse, error) {
responseBody, status, err := p.getRequest(ctx, RequestGenesisPath)
responseBody, status, err := p.getFastRequest(ctx, RequestGenesisPath)
if err != nil {
return GenesisResponse{}, fmt.Errorf("error getting genesis data: %w", err)
}
Expand All @@ -170,7 +183,7 @@ func (p *BeaconHttpProvider) Beacon_Genesis(ctx context.Context) (GenesisRespons
}

func (p *BeaconHttpProvider) Beacon_Header(ctx context.Context, blockId string) (BeaconBlockHeaderResponse, bool, error) {
responseBody, status, err := p.getRequest(ctx, fmt.Sprintf(RequestBeaconBlockHeaderPath, blockId))
responseBody, status, err := p.getFastRequest(ctx, fmt.Sprintf(RequestBeaconBlockHeaderPath, blockId))
if err != nil {
return BeaconBlockHeaderResponse{}, false, fmt.Errorf("error getting beacon block header data: %w", err)
}
Expand All @@ -192,7 +205,7 @@ func (p *BeaconHttpProvider) Beacon_Validators(ctx context.Context, stateId stri
if len(ids) > 0 {
query = fmt.Sprintf("?id=%s", strings.Join(ids, ","))
}
responseBody, status, err := p.getRequestWithoutTimeout(ctx, fmt.Sprintf(RequestValidatorsPath, stateId)+query)
responseBody, status, err := p.getSlowRequest(ctx, fmt.Sprintf(RequestValidatorsPath, stateId)+query)
if err != nil {
return ValidatorsResponse{}, fmt.Errorf("error getting validators: %w", err)
}
Expand All @@ -218,7 +231,7 @@ func (p *BeaconHttpProvider) Beacon_VoluntaryExits_Post(ctx context.Context, req
}

func (p *BeaconHttpProvider) Config_DepositContract(ctx context.Context) (Eth2DepositContractResponse, error) {
responseBody, status, err := p.getRequest(ctx, RequestEth2DepositContractMethod)
responseBody, status, err := p.getFastRequest(ctx, RequestEth2DepositContractMethod)
if err != nil {
return Eth2DepositContractResponse{}, fmt.Errorf("error getting eth2 deposit contract: %w", err)
}
Expand All @@ -233,13 +246,16 @@ func (p *BeaconHttpProvider) Config_DepositContract(ctx context.Context) (Eth2De
}

func (p *BeaconHttpProvider) Config_Spec(ctx context.Context) (Eth2ConfigResponse, error) {
responseBody, status, err := p.getRequest(ctx, RequestEth2ConfigPath)
// Run the request
responseBody, status, err := p.getFastRequest(ctx, RequestEth2ConfigPath)
if err != nil {
return Eth2ConfigResponse{}, fmt.Errorf("error getting eth2 config: %w", err)
}
if status != http.StatusOK {
return Eth2ConfigResponse{}, fmt.Errorf("error getting eth2 config: HTTP status %d; response body: '%s'", status, string(responseBody))
}

// Unmarshal the response
var eth2Config Eth2ConfigResponse
if err := json.Unmarshal(responseBody, &eth2Config); err != nil {
return Eth2ConfigResponse{}, fmt.Errorf("error decoding eth2 config: %w", err)
Expand All @@ -248,13 +264,16 @@ func (p *BeaconHttpProvider) Config_Spec(ctx context.Context) (Eth2ConfigRespons
}

func (p *BeaconHttpProvider) Node_Syncing(ctx context.Context) (SyncStatusResponse, error) {
responseBody, status, err := p.getRequest(ctx, RequestSyncStatusPath)
// Run the request
responseBody, status, err := p.getFastRequest(ctx, RequestSyncStatusPath)
if err != nil {
return SyncStatusResponse{}, fmt.Errorf("error getting node sync status: %w", err)
}
if status != http.StatusOK {
return SyncStatusResponse{}, fmt.Errorf("error getting node sync status: HTTP status %d; response body: '%s'", status, string(responseBody))
}

// Unmarshal the response
var syncStatus SyncStatusResponse
if err := json.Unmarshal(responseBody, &syncStatus); err != nil {
return SyncStatusResponse{}, fmt.Errorf("error decoding node sync status: %w", err)
Expand All @@ -263,14 +282,16 @@ func (p *BeaconHttpProvider) Node_Syncing(ctx context.Context) (SyncStatusRespon
}

func (p *BeaconHttpProvider) Validator_DutiesProposer(ctx context.Context, indices []string, epoch uint64) (ProposerDutiesResponse, error) {
responseBody, status, err := p.getRequest(ctx, fmt.Sprintf(RequestValidatorProposerDuties, strconv.FormatUint(epoch, 10)))
// Run the request
responseBody, status, err := p.getFastRequest(ctx, fmt.Sprintf(RequestValidatorProposerDuties, strconv.FormatUint(epoch, 10)))
if err != nil {
return ProposerDutiesResponse{}, fmt.Errorf("error getting validator proposer duties: %w", err)
}
if status != http.StatusOK {
return ProposerDutiesResponse{}, fmt.Errorf("error getting validator proposer duties: HTTP status %d; response body: '%s'", status, string(responseBody))
}

// Unmarshal the response
var syncDuties ProposerDutiesResponse
if err := json.Unmarshal(responseBody, &syncDuties); err != nil {
return ProposerDutiesResponse{}, fmt.Errorf("error decoding validator proposer duties data: %w", err)
Expand All @@ -281,14 +302,14 @@ func (p *BeaconHttpProvider) Validator_DutiesProposer(ctx context.Context, indic
func (p *BeaconHttpProvider) Validator_DutiesSync_Post(ctx context.Context, indices []string, epoch uint64) (SyncDutiesResponse, error) {
// Perform the post request
responseBody, status, err := p.postRequest(ctx, fmt.Sprintf(RequestValidatorSyncDuties, strconv.FormatUint(epoch, 10)), indices)

if err != nil {
return SyncDutiesResponse{}, fmt.Errorf("error getting validator sync duties: %w", err)
}
if status != http.StatusOK {
return SyncDutiesResponse{}, fmt.Errorf("error getting validator sync duties: HTTP status %d; response body: '%s'", status, string(responseBody))
}

// Unmarshal the response
var syncDuties SyncDutiesResponse
if err := json.Unmarshal(responseBody, &syncDuties); err != nil {
return SyncDutiesResponse{}, fmt.Errorf("error decoding validator sync duties data: %w", err)
Expand All @@ -301,20 +322,19 @@ func (p *BeaconHttpProvider) Validator_DutiesSync_Post(ctx context.Context, indi
// ==========================

// Make a GET request to the beacon node and read the body of the response
func (p *BeaconHttpProvider) getRequest(ctx context.Context, requestPath string) ([]byte, int, error) {
return getRequestImpl(ctx, requestPath, p.providerAddress, p.client)
func (p *BeaconHttpProvider) getFastRequest(ctx context.Context, requestPath string) ([]byte, int, error) {
return getRequestImpl(ctx, fastGetMethod, requestPath, p.providerAddress, p.fastClient)
}

// Make a GET request to the beacon node and read the body of the response
func (p *BeaconHttpProvider) getRequestWithoutTimeout(ctx context.Context, requestPath string) ([]byte, int, error) {
clientWithoutTimeout := http.Client{}
return getRequestImpl(ctx, requestPath, p.providerAddress, clientWithoutTimeout)
func (p *BeaconHttpProvider) getSlowRequest(ctx context.Context, requestPath string) ([]byte, int, error) {
return getRequestImpl(ctx, slowGetMethod, requestPath, p.providerAddress, p.slowClient)
}

// Make a GET request to the beacon node and read the body of the response
func getRequestImpl(ctx context.Context, requestPath string, providerAddress string, client http.Client) ([]byte, int, error) {
func getRequestImpl(ctx context.Context, methodName string, requestPath string, providerAddress string, client http.Client) ([]byte, int, error) {
// Send request
reader, status, err := getRequestReader(ctx, requestPath, providerAddress, client)
reader, status, err := getRequestReader(ctx, methodName, requestPath, providerAddress, client)
if err != nil {
return []byte{}, 0, err
}
Expand All @@ -334,6 +354,10 @@ func getRequestImpl(ctx context.Context, requestPath string, providerAddress str

// Make a POST request to the beacon node
func (p *BeaconHttpProvider) postRequest(ctx context.Context, requestPath string, requestBody any) ([]byte, int, error) {
// Log the request and add tracing if enabled
path := fmt.Sprintf(RequestUrlFormat, p.providerAddress, requestPath)
ctx = logRequest(ctx, postMethod, path)

// Get request body
requestBodyBytes, err := json.Marshal(requestBody)
if err != nil {
Expand All @@ -342,15 +366,14 @@ func (p *BeaconHttpProvider) postRequest(ctx context.Context, requestPath string
requestBodyReader := bytes.NewReader(requestBodyBytes)

// Create the request
path := fmt.Sprintf(RequestUrlFormat, p.providerAddress, requestPath)
request, err := http.NewRequestWithContext(ctx, http.MethodPost, path, requestBodyReader)
if err != nil {
return nil, 0, fmt.Errorf("error creating POST request to [%s]: %w", path, err)
}
request.Header.Set("Content-Type", RequestContentType)

// Submit the request
response, err := p.client.Do(request)
response, err := p.fastClient.Do(request)
if err != nil {
return []byte{}, 0, fmt.Errorf("error running POST request to [%s]: %w", path, err)
}
Expand All @@ -374,9 +397,13 @@ func epochAt(config beacon.Eth2Config, time uint64) uint64 {
}

// Make a GET request but do not read its body yet (allows buffered decoding)
func getRequestReader(ctx context.Context, requestPath string, providerAddress string, client http.Client) (io.ReadCloser, int, error) {
// Make the request
func getRequestReader(ctx context.Context, methodName string, requestPath string, providerAddress string, client http.Client) (io.ReadCloser, int, error) {
// Log the request and add tracing if enabled
path := fmt.Sprintf(RequestUrlFormat, providerAddress, requestPath)
trimmedPath, _, _ := strings.Cut(path, "?")
ctx = logRequest(ctx, methodName, trimmedPath)

// Make the request
req, err := http.NewRequestWithContext(ctx, http.MethodGet, path, nil)
if err != nil {
return nil, 0, fmt.Errorf("error creating GET request to [%s]: %w", path, err)
Expand All @@ -387,12 +414,31 @@ func getRequestReader(ctx context.Context, requestPath string, providerAddress s
response, err := client.Do(req)
if err != nil {
// Remove the query for readability
trimmedPath, _, _ := strings.Cut(path, "?")
return nil, 0, fmt.Errorf("error running GET request to [%s]: %w", trimmedPath, err)
}
return response.Body, response.StatusCode, nil
}

// Log a request and add HTTP tracing to the context if the logger has it enabled
func logRequest(ctx context.Context, methodName string, path string) context.Context {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like this should be a receiver on BeaconHttpProvider so it can log info specific to the instance

logger, _ := log.FromContext(ctx)
if logger == nil {
return ctx
}

logger.Debug("Calling BN request",
slog.String(log.MethodKey, methodName),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems way more practical to log ctx.Deadline() btw

slog.String(log.PathKey, path),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to log the host, too...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The host is included in the path; it's the whole URL minus the query because queries can get unwieldy for certain calls.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well i leave it up to you but i would probably parse it using net/url so that we can leverage structured logging fully

)
tracer := logger.GetHttpTracer()
if tracer != nil {
// Enable HTTP tracing if requested
ctx = httptrace.WithClientTrace(ctx, tracer)
}

return ctx
}

// ==========================
// === Committees Decoder ===
// ==========================
Expand Down
8 changes: 5 additions & 3 deletions beacon/client/std-http-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ type StandardHttpClient struct {
*StandardClient
}

// Create a new client instance
func NewStandardHttpClient(providerAddress string, timeout time.Duration) *StandardHttpClient {
provider := NewBeaconHttpProvider(providerAddress, timeout)
// Create a new client instance.
// Most calls will use the fast timeout, but queries to validator status will use the slow timeout since they can be very large.
// Set a timeout of 0 to disable it.
func NewStandardHttpClient(providerAddress string, fastTimeout time.Duration, slowTimeout time.Duration) *StandardHttpClient {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add a type StandardHttpClientOpts struct with two time.Duration fields. if non-0 they can be used as the defaults to context.WithTImeout. If 0, default to something sensible.

NewStandardHttpClient("http://blah", nil) would just use all defaults.

provider := NewBeaconHttpProvider(providerAddress, fastTimeout, slowTimeout)
return &StandardHttpClient{
StandardClient: NewStandardClient(provider),
}
Expand Down
36 changes: 36 additions & 0 deletions config/external-beacon-config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ type ExternalBeaconConfig struct {

// The URL of the Prysm gRPC endpoint (only needed if using Prysm VCs)
PrysmRpcUrl Parameter[string]

// Number of seconds to wait for a fast request to complete
FastTimeout Parameter[uint64]

// Number of seconds to wait for a slow request to complete
SlowTimeout Parameter[uint64]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add units to these field names? ie FastTimeoutMS

Seconds seems too rough of a granularity

}

// Generates a new ExternalBeaconConfig configuration
Expand Down Expand Up @@ -92,6 +98,34 @@ func NewExternalBeaconConfig() *ExternalBeaconConfig {
Network_All: "",
},
},

FastTimeout: Parameter[uint64]{
ParameterCommon: &ParameterCommon{
ID: ids.FastTimeoutID,
Name: "Fast Timeout",
Description: "Number of seconds to wait for a request to complete that is expected to be fast and light before timing out the request.",
AffectsContainers: []ContainerID{ContainerID_Daemon},
CanBeBlank: false,
OverwriteOnUpgrade: false,
},
Default: map[Network]uint64{
Network_All: 5,
},
},

SlowTimeout: Parameter[uint64]{
ParameterCommon: &ParameterCommon{
ID: ids.SlowTimeoutID,
Name: "Slow Timeout",
Description: "Number of seconds to wait for a request to complete that is expected to be slow and heavy, either taking a long time to process or returning a large amount of data, before timing out the request. Examples include querying the Beacon Node for the state of a large number of validators.",
AffectsContainers: []ContainerID{ContainerID_Daemon},
CanBeBlank: false,
OverwriteOnUpgrade: false,
},
Default: map[Network]uint64{
Network_All: 30,
},
},
}
}

Expand All @@ -106,6 +140,8 @@ func (cfg *ExternalBeaconConfig) GetParameters() []IParameter {
&cfg.BeaconNode,
&cfg.HttpUrl,
&cfg.PrysmRpcUrl,
&cfg.FastTimeout,
&cfg.SlowTimeout,
}
}

Expand Down
Loading
Loading