feat: enhance context cancellation handling

This commit is contained in:
Fu Diwei 2025-04-23 19:32:21 +08:00
parent a90b6a8589
commit 97f102533c
31 changed files with 287 additions and 57 deletions

View File

@ -137,6 +137,12 @@ func (d *DeployerProvider) deployToLoadbalancer(ctx context.Context, cloudCertId
listListenersLimit := int32(100)
var listListenersToken *string = nil
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
listListenersReq := &alialb.ListListenersRequest{
MaxResults: tea.Int32(listListenersLimit),
NextToken: listListenersToken,
@ -166,6 +172,12 @@ func (d *DeployerProvider) deployToLoadbalancer(ctx context.Context, cloudCertId
// REF: https://help.aliyun.com/zh/slb/application-load-balancer/developer-reference/api-alb-2020-06-16-listlisteners
listListenersToken = nil
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
listListenersReq := &alialb.ListListenersRequest{
MaxResults: tea.Int32(listListenersLimit),
NextToken: listListenersToken,
@ -262,6 +274,12 @@ func (d *DeployerProvider) updateListenerCertificate(ctx context.Context, cloudL
listListenerCertificatesLimit := int32(100)
var listListenerCertificatesToken *string = nil
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
listListenerCertificatesReq := &alialb.ListListenerCertificatesRequest{
NextToken: listListenerCertificatesToken,
MaxResults: tea.Int32(listListenerCertificatesLimit),

View File

@ -142,6 +142,12 @@ func (d *DeployerProvider) deployToCloudNative(ctx context.Context, certPEM stri
listDomainsPageNumber := int32(1)
listDomainsPageSize := int32(10)
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
listDomainsReq := &aliapig.ListDomainsRequest{
GatewayId: tea.String(d.config.GatewayId),
NameLike: tea.String(d.config.Domain),

View File

@ -126,8 +126,10 @@ func (d *DeployerProvider) Deploy(ctx context.Context, certPEM string, privkeyPE
// 循环获取部署任务详情,等待任务状态变更
// REF: https://help.aliyun.com/zh/ssl-certificate/developer-reference/api-cas-2020-04-07-describedeploymentjob
for {
if ctx.Err() != nil {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
describeDeploymentJobReq := &alicas.DescribeDeploymentJobRequest{

View File

@ -132,6 +132,12 @@ func (d *DeployerProvider) deployToLoadbalancer(ctx context.Context, cloudCertId
describeLoadBalancerListenersLimit := int32(100)
var describeLoadBalancerListenersToken *string = nil
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
describeLoadBalancerListenersReq := &alislb.DescribeLoadBalancerListenersRequest{
RegionId: tea.String(d.config.Region),
MaxResults: tea.Int32(describeLoadBalancerListenersLimit),
@ -166,8 +172,14 @@ func (d *DeployerProvider) deployToLoadbalancer(ctx context.Context, cloudCertId
var errs []error
for _, listenerPort := range listenerPorts {
if err := d.updateListenerCertificate(ctx, d.config.LoadbalancerId, listenerPort, cloudCertId); err != nil {
errs = append(errs, err)
select {
case <-ctx.Done():
return ctx.Err()
default:
if err := d.updateListenerCertificate(ctx, d.config.LoadbalancerId, listenerPort, cloudCertId); err != nil {
errs = append(errs, err)
}
}
}

View File

@ -125,6 +125,12 @@ func (d *DeployerProvider) deployToLoadbalancer(ctx context.Context, cloudCertId
listListenersLimit := int32(100)
var listListenersToken *string = nil
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
listListenersReq := &alinlb.ListListenersRequest{
MaxResults: tea.Int32(listListenersLimit),
NextToken: listListenersToken,
@ -158,8 +164,14 @@ func (d *DeployerProvider) deployToLoadbalancer(ctx context.Context, cloudCertId
var errs []error
for _, listenerId := range listenerIds {
if err := d.updateListenerCertificate(ctx, listenerId, cloudCertId); err != nil {
errs = append(errs, err)
select {
case <-ctx.Done():
return ctx.Err()
default:
if err := d.updateListenerCertificate(ctx, listenerId, cloudCertId); err != nil {
errs = append(errs, err)
}
}
}

View File

@ -152,8 +152,14 @@ func (d *DeployerProvider) deployToLoadbalancer(ctx context.Context, cloudCertId
var errs []error
for _, listener := range listeners {
if err := d.updateListenerCertificate(ctx, d.config.LoadbalancerId, listener.Type, listener.Port, cloudCertId); err != nil {
errs = append(errs, err)
select {
case <-ctx.Done():
return ctx.Err()
default:
if err := d.updateListenerCertificate(ctx, d.config.LoadbalancerId, listener.Type, listener.Port, cloudCertId); err != nil {
errs = append(errs, err)
}
}
}
@ -209,8 +215,14 @@ func (d *DeployerProvider) deployToListener(ctx context.Context, cloudCertId str
var errs []error
for _, listener := range listeners {
if err := d.updateListenerCertificate(ctx, d.config.LoadbalancerId, listener.Type, listener.Port, cloudCertId); err != nil {
errs = append(errs, err)
select {
case <-ctx.Done():
return ctx.Err()
default:
if err := d.updateListenerCertificate(ctx, d.config.LoadbalancerId, listener.Type, listener.Port, cloudCertId); err != nil {
errs = append(errs, err)
}
}
}

View File

@ -152,8 +152,14 @@ func (d *DeployerProvider) deployToLoadbalancer(ctx context.Context, cloudCertId
var errs []error
for _, listener := range listeners {
if err := d.updateListenerCertificate(ctx, d.config.LoadbalancerId, listener.Type, listener.Port, cloudCertId); err != nil {
errs = append(errs, err)
select {
case <-ctx.Done():
return ctx.Err()
default:
if err := d.updateListenerCertificate(ctx, d.config.LoadbalancerId, listener.Type, listener.Port, cloudCertId); err != nil {
errs = append(errs, err)
}
}
}
@ -209,8 +215,14 @@ func (d *DeployerProvider) deployToListener(ctx context.Context, cloudCertId str
var errs []error
for _, listener := range listeners {
if err := d.updateListenerCertificate(ctx, d.config.LoadbalancerId, listener.Type, listener.Port, cloudCertId); err != nil {
errs = append(errs, err)
select {
case <-ctx.Done():
return ctx.Err()
default:
if err := d.updateListenerCertificate(ctx, d.config.LoadbalancerId, listener.Type, listener.Port, cloudCertId); err != nil {
errs = append(errs, err)
}
}
}

View File

@ -117,16 +117,22 @@ func (d *DeployerProvider) Deploy(ctx context.Context, certPEM string, privkeyPE
var errs []error
for _, domain := range domains {
// 关联证书与加速域名
// REF: https://docs.byteplus.com/en/docs/byteplus-cdn/reference-batchdeploycert
batchDeployCertReq := &bpcdn.BatchDeployCertRequest{
CertId: upres.CertId,
Domain: domain,
}
batchDeployCertResp, err := d.sdkClient.BatchDeployCert(batchDeployCertReq)
d.logger.Debug("sdk request 'cdn.BatchDeployCert'", slog.Any("request", batchDeployCertReq), slog.Any("response", batchDeployCertResp))
if err != nil {
errs = append(errs, err)
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// 关联证书与加速域名
// REF: https://docs.byteplus.com/en/docs/byteplus-cdn/reference-batchdeploycert
batchDeployCertReq := &bpcdn.BatchDeployCertRequest{
CertId: upres.CertId,
Domain: domain,
}
batchDeployCertResp, err := d.sdkClient.BatchDeployCert(batchDeployCertReq)
d.logger.Debug("sdk request 'cdn.BatchDeployCert'", slog.Any("request", batchDeployCertReq), slog.Any("response", batchDeployCertResp))
if err != nil {
errs = append(errs, err)
}
}
}

View File

@ -160,6 +160,12 @@ func (d *DeployerProvider) deployToLoadbalancer(ctx context.Context, certPEM str
listListenersLimit := int32(2000)
var listListenersMarker *string = nil
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
listListenersReq := &hcelbmodel.ListListenersRequest{
Limit: typeutil.ToPtr(listListenersLimit),
Marker: listListenersMarker,
@ -201,8 +207,14 @@ func (d *DeployerProvider) deployToLoadbalancer(ctx context.Context, certPEM str
var errs []error
for _, listenerId := range listenerIds {
if err := d.modifyListenerCertificate(ctx, listenerId, upres.CertId); err != nil {
errs = append(errs, err)
select {
case <-ctx.Done():
return ctx.Err()
default:
if err := d.modifyListenerCertificate(ctx, listenerId, upres.CertId); err != nil {
errs = append(errs, err)
}
}
}

View File

@ -172,6 +172,12 @@ func (d *DeployerProvider) deployToCloudServer(ctx context.Context, certPEM stri
listHostPage := int32(1)
listHostPageSize := int32(100)
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
listHostReq := &hcwafmodel.ListHostRequest{
Hostname: typeutil.ToPtr(strings.TrimPrefix(d.config.Domain, "*")),
Page: typeutil.ToPtr(listHostPage),
@ -239,6 +245,12 @@ func (d *DeployerProvider) deployToPremiumHost(ctx context.Context, certPEM stri
listPremiumHostPage := int32(1)
listPremiumHostPageSize := int32(100)
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
listPremiumHostReq := &hcwafmodel.ListPremiumHostRequest{
Hostname: typeutil.ToPtr(strings.TrimPrefix(d.config.Domain, "*")),
Page: typeutil.ToPtr(fmt.Sprintf("%d", listPremiumHostPage)),

View File

@ -132,6 +132,12 @@ func (d *DeployerProvider) deployToLoadbalancer(ctx context.Context, cloudCertId
describeListenersPageNumber := 1
describeListenersPageSize := 100
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
describeListenersReq := jdlbapi.NewDescribeListenersRequest(d.config.RegionId)
describeListenersReq.SetFilters([]jdcommon.Filter{{Name: "loadBalancerId", Values: []string{d.config.LoadbalancerId}}})
describeListenersReq.SetPageSize(describeListenersPageNumber)
@ -164,8 +170,13 @@ func (d *DeployerProvider) deployToLoadbalancer(ctx context.Context, cloudCertId
var errs []error
for _, listenerId := range listenerIds {
if err := d.updateListenerCertificate(ctx, listenerId, cloudCertId); err != nil {
errs = append(errs, err)
select {
case <-ctx.Done():
return ctx.Err()
default:
if err := d.updateListenerCertificate(ctx, listenerId, cloudCertId); err != nil {
errs = append(errs, err)
}
}
}

View File

@ -65,6 +65,12 @@ func (d *DeployerProvider) Deploy(ctx context.Context, certPEM string, privkeyPE
listDomainsPageNumber := 1
listDomainsPageSize := 100
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
listDomainsReq := jdvodapi.NewListDomainsRequest()
listDomainsReq.SetPageNumber(1)
listDomainsReq.SetPageSize(100)

View File

@ -188,8 +188,13 @@ func (d *DeployerProvider) deployToLoadbalancer(ctx context.Context, cloudCertId
var errs []error
for _, listenerId := range listenerIds {
if err := d.modifyListenerCertificate(ctx, d.config.LoadbalancerId, listenerId, cloudCertId); err != nil {
errs = append(errs, err)
select {
case <-ctx.Done():
return ctx.Err()
default:
if err := d.modifyListenerCertificate(ctx, d.config.LoadbalancerId, listenerId, cloudCertId); err != nil {
errs = append(errs, err)
}
}
}

View File

@ -108,8 +108,10 @@ func (d *DeployerProvider) Deploy(ctx context.Context, certPEM string, privkeyPE
// 循环获取部署任务详情,等待任务状态变更
// REF: https://cloud.tencent.com.cn/document/api/400/91658
for {
if ctx.Err() != nil {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
describeHostDeployRecordDetailReq := tcssl.NewDescribeHostDeployRecordDetailRequest()

View File

@ -132,6 +132,12 @@ func (d *DeployerProvider) deployToLoadbalancer(ctx context.Context, cloudCertId
describeListenersPageSize := int64(100)
describeListenersPageNumber := int64(1)
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
describeListenersReq := &vealb.DescribeListenersInput{
LoadBalancerId: ve.String(d.config.LoadbalancerId),
Protocol: ve.String("HTTPS"),
@ -163,8 +169,13 @@ func (d *DeployerProvider) deployToLoadbalancer(ctx context.Context, cloudCertId
var errs []error
for _, listenerId := range listenerIds {
if err := d.updateListenerCertificate(ctx, listenerId, cloudCertId); err != nil {
errs = append(errs, err)
select {
case <-ctx.Done():
return ctx.Err()
default:
if err := d.updateListenerCertificate(ctx, listenerId, cloudCertId); err != nil {
errs = append(errs, err)
}
}
}

View File

@ -117,16 +117,21 @@ func (d *DeployerProvider) Deploy(ctx context.Context, certPEM string, privkeyPE
var errs []error
for _, domain := range domains {
// 关联证书与加速域名
// REF: https://www.volcengine.com/docs/6454/125712
batchDeployCertReq := &vecdn.BatchDeployCertRequest{
CertId: upres.CertId,
Domain: domain,
}
batchDeployCertResp, err := d.sdkClient.BatchDeployCert(batchDeployCertReq)
d.logger.Debug("sdk request 'cdn.BatchDeployCert'", slog.Any("request", batchDeployCertReq), slog.Any("response", batchDeployCertResp))
if err != nil {
errs = append(errs, err)
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// 关联证书与加速域名
// REF: https://www.volcengine.com/docs/6454/125712
batchDeployCertReq := &vecdn.BatchDeployCertRequest{
CertId: upres.CertId,
Domain: domain,
}
batchDeployCertResp, err := d.sdkClient.BatchDeployCert(batchDeployCertReq)
d.logger.Debug("sdk request 'cdn.BatchDeployCert'", slog.Any("request", batchDeployCertReq), slog.Any("response", batchDeployCertResp))
if err != nil {
errs = append(errs, err)
}
}
}

View File

@ -128,6 +128,12 @@ func (d *DeployerProvider) deployToLoadbalancer(ctx context.Context, cloudCertId
describeListenersPageSize := int64(100)
describeListenersPageNumber := int64(1)
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
describeListenersReq := &veclb.DescribeListenersInput{
LoadBalancerId: ve.String(d.config.LoadbalancerId),
Protocol: ve.String("HTTPS"),
@ -159,8 +165,13 @@ func (d *DeployerProvider) deployToLoadbalancer(ctx context.Context, cloudCertId
var errs []error
for _, listenerId := range listenerIds {
if err := d.updateListenerCertificate(ctx, listenerId, cloudCertId); err != nil {
errs = append(errs, err)
select {
case <-ctx.Done():
return ctx.Err()
default:
if err := d.updateListenerCertificate(ctx, listenerId, cloudCertId); err != nil {
errs = append(errs, err)
}
}
}

View File

@ -125,17 +125,22 @@ func (d *DeployerProvider) Deploy(ctx context.Context, certPEM string, privkeyPE
var errs []error
for _, domain := range domains {
// 绑定证书
// REF: https://www.volcengine.com/docs/6469/1186278#%E7%BB%91%E5%AE%9A%E8%AF%81%E4%B9%A6
bindCertReq := &velive.BindCertBody{
ChainID: upres.CertId,
Domain: domain,
HTTPS: ve.Bool(true),
}
bindCertResp, err := d.sdkClient.BindCert(ctx, bindCertReq)
d.logger.Debug("sdk request 'live.BindCert'", slog.Any("request", bindCertReq), slog.Any("response", bindCertResp))
if err != nil {
errs = append(errs, err)
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// 绑定证书
// REF: https://www.volcengine.com/docs/6469/1186278#%E7%BB%91%E5%AE%9A%E8%AF%81%E4%B9%A6
bindCertReq := &velive.BindCertBody{
ChainID: upres.CertId,
Domain: domain,
HTTPS: ve.Bool(true),
}
bindCertResp, err := d.sdkClient.BindCert(ctx, bindCertReq)
d.logger.Debug("sdk request 'live.BindCert'", slog.Any("request", bindCertReq), slog.Any("response", bindCertResp))
if err != nil {
errs = append(errs, err)
}
}
}

View File

@ -199,8 +199,10 @@ func (d *DeployerProvider) Deploy(ctx context.Context, certPEM string, privkeyPE
wangsuTaskId = wangsuTaskMatches[1]
}
for {
if ctx.Err() != nil {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
getDeploymentTaskDetailResp, err := d.sdkClient.GetDeploymentTaskDetail(wangsuTaskId)

View File

@ -93,6 +93,12 @@ func (u *UploaderProvider) getCertIfExists(ctx context.Context, certPEM string,
searchWebsiteSSLPageNumber := int32(1)
searchWebsiteSSLPageSize := int32(100)
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
searchWebsiteSSLReq := &opsdk.SearchWebsiteSSLRequest{
Page: searchWebsiteSSLPageNumber,
PageSize: searchWebsiteSSLPageSize,

View File

@ -71,6 +71,12 @@ func (u *UploaderProvider) Upload(ctx context.Context, certPEM string, privkeyPE
listUserCertificateOrderPage := int64(1)
listUserCertificateOrderLimit := int64(50)
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
listUserCertificateOrderReq := &alicas.ListUserCertificateOrderRequest{
CurrentPage: tea.Int64(listUserCertificateOrderPage),
ShowSize: tea.Int64(listUserCertificateOrderLimit),

View File

@ -74,6 +74,12 @@ func (u *UploaderProvider) Upload(ctx context.Context, certPEM string, privkeyPE
var listCertificatesNextToken *string = nil
listCertificatesMaxItems := int32(1000)
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
listCertificatesReq := &awsacm.ListCertificatesInput{
NextToken: listCertificatesNextToken,
MaxItems: aws.Int32(listCertificatesMaxItems),

View File

@ -74,6 +74,12 @@ func (u *UploaderProvider) Upload(ctx context.Context, certPEM string, privkeyPE
Source: bytepluscdn.GetStrPtr("cert_center"),
}
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
listCertInfoResp, err := u.sdkClient.ListCertInfo(listCertInfoReq)
u.logger.Debug("sdk request 'cdn.ListCertInfo'", slog.Any("request", listCertInfoReq), slog.Any("response", listCertInfoResp))
if err != nil {

View File

@ -76,6 +76,12 @@ func (u *UploaderProvider) Upload(ctx context.Context, certPEM string, privkeyPE
listCertificatesLimit := int32(2000)
var listCertificatesMarker *string = nil
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
listCertificatesReq := &hcelbmodel.ListCertificatesRequest{
Limit: typeutil.ToPtr(listCertificatesLimit),
Marker: listCertificatesMarker,

View File

@ -72,6 +72,12 @@ func (u *UploaderProvider) Upload(ctx context.Context, certPEM string, privkeyPE
listCertificatesLimit := int32(50)
listCertificatesOffset := int32(0)
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
listCertificatesReq := &hcscmmodel.ListCertificatesRequest{
Limit: typeutil.ToPtr(listCertificatesLimit),
Offset: typeutil.ToPtr(listCertificatesOffset),

View File

@ -77,6 +77,12 @@ func (u *UploaderProvider) Upload(ctx context.Context, certPEM string, privkeyPE
listCertificatesPage := int32(1)
listCertificatesPageSize := int32(100)
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
listCertificatesReq := &hcwafmodel.ListCertificatesRequest{
Page: typeutil.ToPtr(listCertificatesPage),
Pagesize: typeutil.ToPtr(listCertificatesPageSize),

View File

@ -77,6 +77,12 @@ func (u *UploaderProvider) Upload(ctx context.Context, certPEM string, privkeyPE
describeCertsPageNumber := 1
describeCertsPageSize := 10
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
describeCertsReq := jdsslapi.NewDescribeCertsRequest()
describeCertsReq.SetDomainName(certX509.Subject.CommonName)
describeCertsReq.SetPageNumber(describeCertsPageNumber)

View File

@ -93,6 +93,12 @@ func (u *UploaderProvider) getCertIfExists(ctx context.Context, certPEM string)
sslCenterListPage := int32(1)
sslCenterListPerPage := int32(100)
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
sslCenterListReq := &rainyunsdk.SslCenterListRequest{
Filters: &rainyunsdk.SslCenterListFilters{
Domain: &certX509.Subject.CommonName,

View File

@ -124,6 +124,12 @@ func (u *UploaderProvider) getCertIfExists(ctx context.Context, certPEM string)
getCertificateListPage := int(1)
getCertificateListLimit := int(1000)
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
getCertificateListReq := u.sdkClient.NewGetCertificateListRequest()
getCertificateListReq.Mode = ucloud.String("trust")
getCertificateListReq.Domain = ucloud.String(certX509.Subject.CommonName)

View File

@ -75,6 +75,12 @@ func (u *UploaderProvider) Upload(ctx context.Context, certPEM string, privkeyPE
Source: "volc_cert_center",
}
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
listCertInfoResp, err := u.sdkClient.ListCertInfo(listCertInfoReq)
u.logger.Debug("sdk request 'cdn.ListCertInfo'", slog.Any("request", listCertInfoReq), slog.Any("response", listCertInfoResp))
if err != nil {

View File

@ -47,8 +47,10 @@ func (w *workflowInvoker) GetLogs() domain.WorkflowLogs {
func (w *workflowInvoker) processNode(ctx context.Context, node *domain.WorkflowNode) error {
current := node
for current != nil {
if ctx.Err() != nil {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if current.Type == domain.WorkflowNodeTypeBranch || current.Type == domain.WorkflowNodeTypeExecuteResultBranch {