| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | /* | 
					
						
							| 
									
										
										
										
											2019-04-10 02:39:42 +08:00
										 |  |  |  * MinIO Cloud Storage, (C) 2018, 2019 MinIO, Inc. | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  |  * | 
					
						
							|  |  |  |  * Licensed under the Apache License, Version 2.0 (the "License"); | 
					
						
							|  |  |  |  * you may not use this file except in compliance with the License. | 
					
						
							|  |  |  |  * You may obtain a copy of the License at | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  *     http://www.apache.org/licenses/LICENSE-2.0
 | 
					
						
							|  |  |  |  * | 
					
						
							|  |  |  |  * Unless required by applicable law or agreed to in writing, software | 
					
						
							|  |  |  |  * distributed under the License is distributed on an "AS IS" BASIS, | 
					
						
							|  |  |  |  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
					
						
							|  |  |  |  * See the License for the specific language governing permissions and | 
					
						
							|  |  |  |  * limitations under the License. | 
					
						
							|  |  |  |  */ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | package cmd | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							| 
									
										
										
										
											2018-10-10 05:00:01 +08:00
										 |  |  | 	"bytes" | 
					
						
							| 
									
										
										
										
											2018-03-16 04:27:16 +08:00
										 |  |  | 	"context" | 
					
						
							| 
									
										
										
										
											2020-04-28 01:06:21 +08:00
										 |  |  | 	"encoding/json" | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 	"fmt" | 
					
						
							| 
									
										
										
										
											2019-01-14 14:44:20 +08:00
										 |  |  | 	"io" | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 	"net/url" | 
					
						
							| 
									
										
										
										
											2020-03-27 12:07:39 +08:00
										 |  |  | 	"sort" | 
					
						
							| 
									
										
										
										
											2018-08-24 14:31:14 +08:00
										 |  |  | 	"strings" | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 	"sync" | 
					
						
							| 
									
										
										
										
											2018-06-06 16:51:56 +08:00
										 |  |  | 	"time" | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-09-26 14:08:24 +08:00
										 |  |  | 	"github.com/klauspost/compress/zip" | 
					
						
							| 
									
										
										
										
											2020-07-15 00:38:05 +08:00
										 |  |  | 	"github.com/minio/minio-go/v7/pkg/set" | 
					
						
							| 
									
										
										
										
											2019-05-30 13:29:37 +08:00
										 |  |  | 	"github.com/minio/minio/cmd/crypto" | 
					
						
							| 
									
										
										
										
											2018-04-06 06:04:40 +08:00
										 |  |  | 	"github.com/minio/minio/cmd/logger" | 
					
						
							| 
									
										
										
										
											2020-01-28 06:12:34 +08:00
										 |  |  | 	"github.com/minio/minio/pkg/bucket/policy" | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 	"github.com/minio/minio/pkg/event" | 
					
						
							| 
									
										
										
										
											2019-06-26 07:42:24 +08:00
										 |  |  | 	"github.com/minio/minio/pkg/madmin" | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 	xnet "github.com/minio/minio/pkg/net" | 
					
						
							| 
									
										
										
										
											2019-10-15 00:44:51 +08:00
										 |  |  | 	"github.com/minio/minio/pkg/sync/errgroup" | 
					
						
							| 
									
										
										
										
											2020-04-28 01:06:21 +08:00
										 |  |  | 	"github.com/willf/bloom" | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // NotificationSys - notification system.
 | 
					
						
							|  |  |  | type NotificationSys struct { | 
					
						
							|  |  |  | 	sync.RWMutex | 
					
						
							|  |  |  | 	targetList                 *event.TargetList | 
					
						
							| 
									
										
										
										
											2020-04-27 21:25:05 +08:00
										 |  |  | 	targetResCh                chan event.TargetIDResult | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 	bucketRulesMap             map[string]event.RulesMap | 
					
						
							|  |  |  | 	bucketRemoteTargetRulesMap map[string]map[event.TargetID]event.RulesMap | 
					
						
							| 
									
										
										
										
											2019-03-15 07:27:31 +08:00
										 |  |  | 	peerClients                []*peerRESTClient | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // GetARNList - returns available ARNs.
 | 
					
						
							| 
									
										
										
										
											2020-04-22 00:38:32 +08:00
										 |  |  | func (sys *NotificationSys) GetARNList(onlyActive bool) []string { | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 	arns := []string{} | 
					
						
							| 
									
										
										
										
											2019-11-22 14:08:41 +08:00
										 |  |  | 	if sys == nil { | 
					
						
							|  |  |  | 		return arns | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2019-10-23 13:59:13 +08:00
										 |  |  | 	region := globalServerRegion | 
					
						
							| 
									
										
										
										
											2020-04-22 00:38:32 +08:00
										 |  |  | 	for targetID, target := range sys.targetList.TargetMap() { | 
					
						
							| 
									
										
										
										
											2020-07-21 03:52:49 +08:00
										 |  |  | 		// httpclient target is part of ListenNotification
 | 
					
						
							| 
									
										
										
										
											2018-08-24 14:31:14 +08:00
										 |  |  | 		// which doesn't need to be listed as part of the ARN list
 | 
					
						
							|  |  |  | 		// This list is only meant for external targets, filter
 | 
					
						
							|  |  |  | 		// this out pro-actively.
 | 
					
						
							|  |  |  | 		if !strings.HasPrefix(targetID.ID, "httpclient+") { | 
					
						
							| 
									
										
										
										
											2020-04-22 00:38:32 +08:00
										 |  |  | 			if onlyActive && !target.HasQueueStore() { | 
					
						
							|  |  |  | 				if _, err := target.IsActive(); err != nil { | 
					
						
							|  |  |  | 					continue | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2018-08-24 14:31:14 +08:00
										 |  |  | 			arns = append(arns, targetID.ToARN(region).String()) | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return arns | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-05-10 02:25:38 +08:00
										 |  |  | // NotificationPeerErr returns error associated for a remote peer.
 | 
					
						
							|  |  |  | type NotificationPeerErr struct { | 
					
						
							|  |  |  | 	Host xnet.Host // Remote host on which the rpc call was initiated
 | 
					
						
							|  |  |  | 	Err  error     // Error returned by the remote peer for an rpc call
 | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-01-14 14:44:20 +08:00
										 |  |  | // A NotificationGroup is a collection of goroutines working on subtasks that are part of
 | 
					
						
							|  |  |  | // the same overall task.
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // A zero NotificationGroup is valid and does not cancel on error.
 | 
					
						
							|  |  |  | type NotificationGroup struct { | 
					
						
							|  |  |  | 	wg   sync.WaitGroup | 
					
						
							|  |  |  | 	errs []NotificationPeerErr | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // WithNPeers returns a new NotificationGroup with length of errs slice upto nerrs,
 | 
					
						
							|  |  |  | // upon Wait() errors are returned collected from all tasks.
 | 
					
						
							|  |  |  | func WithNPeers(nerrs int) *NotificationGroup { | 
					
						
							|  |  |  | 	return &NotificationGroup{errs: make([]NotificationPeerErr, nerrs)} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Wait blocks until all function calls from the Go method have returned, then
 | 
					
						
							|  |  |  | // returns the slice of errors from all function calls.
 | 
					
						
							|  |  |  | func (g *NotificationGroup) Wait() []NotificationPeerErr { | 
					
						
							|  |  |  | 	g.wg.Wait() | 
					
						
							|  |  |  | 	return g.errs | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Go calls the given function in a new goroutine.
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // The first call to return a non-nil error will be
 | 
					
						
							|  |  |  | // collected in errs slice and returned by Wait().
 | 
					
						
							|  |  |  | func (g *NotificationGroup) Go(ctx context.Context, f func() error, index int, addr xnet.Host) { | 
					
						
							|  |  |  | 	g.wg.Add(1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	go func() { | 
					
						
							|  |  |  | 		defer g.wg.Done() | 
					
						
							|  |  |  | 		g.errs[index] = NotificationPeerErr{ | 
					
						
							|  |  |  | 			Host: addr, | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		for i := 0; i < 3; i++ { | 
					
						
							|  |  |  | 			if err := f(); err != nil { | 
					
						
							|  |  |  | 				g.errs[index].Err = err | 
					
						
							|  |  |  | 				// Last iteration log the error.
 | 
					
						
							|  |  |  | 				if i == 2 { | 
					
						
							|  |  |  | 					reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr.String()) | 
					
						
							|  |  |  | 					ctx := logger.SetReqInfo(ctx, reqInfo) | 
					
						
							|  |  |  | 					logger.LogIf(ctx, err) | 
					
						
							| 
									
										
										
										
											2018-12-19 06:39:21 +08:00
										 |  |  | 				} | 
					
						
							|  |  |  | 				// Wait for one second and no need wait after last attempt.
 | 
					
						
							|  |  |  | 				if i < 2 { | 
					
						
							|  |  |  | 					time.Sleep(1 * time.Second) | 
					
						
							|  |  |  | 				} | 
					
						
							| 
									
										
										
										
											2019-01-14 14:44:20 +08:00
										 |  |  | 				continue | 
					
						
							| 
									
										
										
										
											2018-12-19 06:39:21 +08:00
										 |  |  | 			} | 
					
						
							| 
									
										
										
										
											2019-01-14 14:44:20 +08:00
										 |  |  | 			break | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	}() | 
					
						
							|  |  |  | } | 
					
						
							| 
									
										
										
										
											2018-12-19 06:39:21 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-03-15 07:27:31 +08:00
										 |  |  | // ReloadFormat - calls ReloadFormat REST call on all peers.
 | 
					
						
							| 
									
										
										
										
											2019-01-14 14:44:20 +08:00
										 |  |  | func (sys *NotificationSys) ReloadFormat(dryRun bool) []NotificationPeerErr { | 
					
						
							| 
									
										
										
										
											2019-03-15 07:27:31 +08:00
										 |  |  | 	ng := WithNPeers(len(sys.peerClients)) | 
					
						
							|  |  |  | 	for idx, client := range sys.peerClients { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2019-01-14 14:44:20 +08:00
										 |  |  | 		client := client | 
					
						
							| 
									
										
										
										
											2020-04-10 00:30:02 +08:00
										 |  |  | 		ng.Go(GlobalContext, func() error { | 
					
						
							| 
									
										
										
										
											2019-01-14 14:44:20 +08:00
										 |  |  | 			return client.ReloadFormat(dryRun) | 
					
						
							| 
									
										
										
										
											2019-03-15 07:27:31 +08:00
										 |  |  | 		}, idx, *client.host) | 
					
						
							| 
									
										
										
										
											2019-01-14 14:44:20 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	return ng.Wait() | 
					
						
							| 
									
										
										
										
											2018-12-19 06:39:21 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-06-07 08:46:22 +08:00
										 |  |  | // DeletePolicy - deletes policy across all peers.
 | 
					
						
							|  |  |  | func (sys *NotificationSys) DeletePolicy(policyName string) []NotificationPeerErr { | 
					
						
							|  |  |  | 	ng := WithNPeers(len(sys.peerClients)) | 
					
						
							|  |  |  | 	for idx, client := range sys.peerClients { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		client := client | 
					
						
							| 
									
										
										
										
											2020-04-10 00:30:02 +08:00
										 |  |  | 		ng.Go(GlobalContext, func() error { | 
					
						
							| 
									
										
										
										
											2019-06-07 08:46:22 +08:00
										 |  |  | 			return client.DeletePolicy(policyName) | 
					
						
							|  |  |  | 		}, idx, *client.host) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return ng.Wait() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // LoadPolicy - reloads a specific modified policy across all peers
 | 
					
						
							|  |  |  | func (sys *NotificationSys) LoadPolicy(policyName string) []NotificationPeerErr { | 
					
						
							|  |  |  | 	ng := WithNPeers(len(sys.peerClients)) | 
					
						
							|  |  |  | 	for idx, client := range sys.peerClients { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		client := client | 
					
						
							| 
									
										
										
										
											2020-04-10 00:30:02 +08:00
										 |  |  | 		ng.Go(GlobalContext, func() error { | 
					
						
							| 
									
										
										
										
											2019-06-07 08:46:22 +08:00
										 |  |  | 			return client.LoadPolicy(policyName) | 
					
						
							|  |  |  | 		}, idx, *client.host) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return ng.Wait() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-08-14 04:41:06 +08:00
										 |  |  | // LoadPolicyMapping - reloads a policy mapping across all peers
 | 
					
						
							|  |  |  | func (sys *NotificationSys) LoadPolicyMapping(userOrGroup string, isGroup bool) []NotificationPeerErr { | 
					
						
							|  |  |  | 	ng := WithNPeers(len(sys.peerClients)) | 
					
						
							|  |  |  | 	for idx, client := range sys.peerClients { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		client := client | 
					
						
							| 
									
										
										
										
											2020-04-10 00:30:02 +08:00
										 |  |  | 		ng.Go(GlobalContext, func() error { | 
					
						
							| 
									
										
										
										
											2019-08-14 04:41:06 +08:00
										 |  |  | 			return client.LoadPolicyMapping(userOrGroup, isGroup) | 
					
						
							|  |  |  | 		}, idx, *client.host) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return ng.Wait() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-06-07 08:46:22 +08:00
										 |  |  | // DeleteUser - deletes a specific user across all peers
 | 
					
						
							|  |  |  | func (sys *NotificationSys) DeleteUser(accessKey string) []NotificationPeerErr { | 
					
						
							|  |  |  | 	ng := WithNPeers(len(sys.peerClients)) | 
					
						
							|  |  |  | 	for idx, client := range sys.peerClients { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		client := client | 
					
						
							| 
									
										
										
										
											2020-04-10 00:30:02 +08:00
										 |  |  | 		ng.Go(GlobalContext, func() error { | 
					
						
							| 
									
										
										
										
											2019-06-07 08:46:22 +08:00
										 |  |  | 			return client.DeleteUser(accessKey) | 
					
						
							|  |  |  | 		}, idx, *client.host) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return ng.Wait() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // LoadUser - reloads a specific user across all peers
 | 
					
						
							|  |  |  | func (sys *NotificationSys) LoadUser(accessKey string, temp bool) []NotificationPeerErr { | 
					
						
							|  |  |  | 	ng := WithNPeers(len(sys.peerClients)) | 
					
						
							|  |  |  | 	for idx, client := range sys.peerClients { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		client := client | 
					
						
							| 
									
										
										
										
											2020-04-10 00:30:02 +08:00
										 |  |  | 		ng.Go(GlobalContext, func() error { | 
					
						
							| 
									
										
										
										
											2019-06-07 08:46:22 +08:00
										 |  |  | 			return client.LoadUser(accessKey, temp) | 
					
						
							|  |  |  | 		}, idx, *client.host) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return ng.Wait() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-08-03 05:25:00 +08:00
										 |  |  | // LoadGroup - loads a specific group on all peers.
 | 
					
						
							|  |  |  | func (sys *NotificationSys) LoadGroup(group string) []NotificationPeerErr { | 
					
						
							|  |  |  | 	ng := WithNPeers(len(sys.peerClients)) | 
					
						
							|  |  |  | 	for idx, client := range sys.peerClients { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		client := client | 
					
						
							| 
									
										
										
										
											2020-04-10 00:30:02 +08:00
										 |  |  | 		ng.Go(GlobalContext, func() error { return client.LoadGroup(group) }, idx, *client.host) | 
					
						
							| 
									
										
										
										
											2019-08-03 05:25:00 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	return ng.Wait() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-04-25 03:10:09 +08:00
										 |  |  | // DeleteServiceAccount - deletes a specific service account across all peers
 | 
					
						
							|  |  |  | func (sys *NotificationSys) DeleteServiceAccount(accessKey string) []NotificationPeerErr { | 
					
						
							|  |  |  | 	ng := WithNPeers(len(sys.peerClients)) | 
					
						
							|  |  |  | 	for idx, client := range sys.peerClients { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		client := client | 
					
						
							|  |  |  | 		ng.Go(GlobalContext, func() error { | 
					
						
							|  |  |  | 			return client.DeleteServiceAccount(accessKey) | 
					
						
							|  |  |  | 		}, idx, *client.host) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return ng.Wait() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // LoadServiceAccount - reloads a specific service account across all peers
 | 
					
						
							|  |  |  | func (sys *NotificationSys) LoadServiceAccount(accessKey string) []NotificationPeerErr { | 
					
						
							|  |  |  | 	ng := WithNPeers(len(sys.peerClients)) | 
					
						
							|  |  |  | 	for idx, client := range sys.peerClients { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		client := client | 
					
						
							|  |  |  | 		ng.Go(GlobalContext, func() error { | 
					
						
							|  |  |  | 			return client.LoadServiceAccount(accessKey) | 
					
						
							|  |  |  | 		}, idx, *client.host) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return ng.Wait() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-06-26 07:42:24 +08:00
										 |  |  | // BackgroundHealStatus - returns background heal status of all peers
 | 
					
						
							| 
									
										
										
										
											2020-08-08 04:22:53 +08:00
										 |  |  | func (sys *NotificationSys) BackgroundHealStatus() ([]madmin.BgHealState, []NotificationPeerErr) { | 
					
						
							|  |  |  | 	ng := WithNPeers(len(sys.peerClients)) | 
					
						
							| 
									
										
										
										
											2019-06-26 07:42:24 +08:00
										 |  |  | 	states := make([]madmin.BgHealState, len(sys.peerClients)) | 
					
						
							|  |  |  | 	for idx, client := range sys.peerClients { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2020-08-08 10:43:06 +08:00
										 |  |  | 		idx := idx | 
					
						
							| 
									
										
										
										
											2020-08-08 04:22:53 +08:00
										 |  |  | 		client := client | 
					
						
							|  |  |  | 		ng.Go(GlobalContext, func() error { | 
					
						
							|  |  |  | 			st, err := client.BackgroundHealStatus() | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							|  |  |  | 				return err | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2019-06-26 07:42:24 +08:00
										 |  |  | 			states[idx] = st | 
					
						
							| 
									
										
										
										
											2020-08-08 04:22:53 +08:00
										 |  |  | 			return nil | 
					
						
							|  |  |  | 		}, idx, *client.host) | 
					
						
							| 
									
										
										
										
											2019-06-26 07:42:24 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-08-08 04:22:53 +08:00
										 |  |  | 	return states, ng.Wait() | 
					
						
							| 
									
										
										
										
											2019-06-26 07:42:24 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-01-14 14:44:20 +08:00
										 |  |  | // StartProfiling - start profiling on remote peers, by initiating a remote RPC.
 | 
					
						
							|  |  |  | func (sys *NotificationSys) StartProfiling(profiler string) []NotificationPeerErr { | 
					
						
							| 
									
										
										
										
											2019-03-15 07:27:31 +08:00
										 |  |  | 	ng := WithNPeers(len(sys.peerClients)) | 
					
						
							|  |  |  | 	for idx, client := range sys.peerClients { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2019-01-14 14:44:20 +08:00
										 |  |  | 		client := client | 
					
						
							| 
									
										
										
										
											2020-04-10 00:30:02 +08:00
										 |  |  | 		ng.Go(GlobalContext, func() error { | 
					
						
							| 
									
										
										
										
											2019-01-14 14:44:20 +08:00
										 |  |  | 			return client.StartProfiling(profiler) | 
					
						
							| 
									
										
										
										
											2019-03-15 07:27:31 +08:00
										 |  |  | 		}, idx, *client.host) | 
					
						
							| 
									
										
										
										
											2019-01-14 14:44:20 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	return ng.Wait() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // DownloadProfilingData - download profiling data from all remote peers.
 | 
					
						
							|  |  |  | func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io.Writer) bool { | 
					
						
							|  |  |  | 	profilingDataFound := false | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Initialize a zip writer which will provide a zipped content
 | 
					
						
							|  |  |  | 	// of profiling data of all nodes
 | 
					
						
							|  |  |  | 	zipWriter := zip.NewWriter(writer) | 
					
						
							|  |  |  | 	defer zipWriter.Close() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-03-15 07:27:31 +08:00
										 |  |  | 	for _, client := range sys.peerClients { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		data, err := client.DownloadProfileData() | 
					
						
							| 
									
										
										
										
											2019-01-14 14:44:20 +08:00
										 |  |  | 		if err != nil { | 
					
						
							| 
									
										
										
										
											2019-03-15 07:27:31 +08:00
										 |  |  | 			reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) | 
					
						
							| 
									
										
										
										
											2019-01-14 14:44:20 +08:00
										 |  |  | 			ctx := logger.SetReqInfo(ctx, reqInfo) | 
					
						
							|  |  |  | 			logger.LogIf(ctx, err) | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		profilingDataFound = true | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-01-11 09:19:58 +08:00
										 |  |  | 		for typ, data := range data { | 
					
						
							|  |  |  | 			// Send profiling data to zip as file
 | 
					
						
							|  |  |  | 			header, zerr := zip.FileInfoHeader(dummyFileInfo{ | 
					
						
							| 
									
										
										
										
											2020-03-17 02:39:53 +08:00
										 |  |  | 				name:    fmt.Sprintf("profile-%s-%s", client.host.String(), typ), | 
					
						
							| 
									
										
										
										
											2020-01-11 09:19:58 +08:00
										 |  |  | 				size:    int64(len(data)), | 
					
						
							|  |  |  | 				mode:    0600, | 
					
						
							|  |  |  | 				modTime: UTCNow(), | 
					
						
							|  |  |  | 				isDir:   false, | 
					
						
							|  |  |  | 				sys:     nil, | 
					
						
							|  |  |  | 			}) | 
					
						
							|  |  |  | 			if zerr != nil { | 
					
						
							|  |  |  | 				reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) | 
					
						
							|  |  |  | 				ctx := logger.SetReqInfo(ctx, reqInfo) | 
					
						
							|  |  |  | 				logger.LogIf(ctx, zerr) | 
					
						
							|  |  |  | 				continue | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			zwriter, zerr := zipWriter.CreateHeader(header) | 
					
						
							|  |  |  | 			if zerr != nil { | 
					
						
							|  |  |  | 				reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) | 
					
						
							|  |  |  | 				ctx := logger.SetReqInfo(ctx, reqInfo) | 
					
						
							|  |  |  | 				logger.LogIf(ctx, zerr) | 
					
						
							|  |  |  | 				continue | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			if _, err = io.Copy(zwriter, bytes.NewBuffer(data)); err != nil { | 
					
						
							|  |  |  | 				reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) | 
					
						
							|  |  |  | 				ctx := logger.SetReqInfo(ctx, reqInfo) | 
					
						
							|  |  |  | 				logger.LogIf(ctx, err) | 
					
						
							|  |  |  | 				continue | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2019-01-14 14:44:20 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-01-11 09:19:58 +08:00
										 |  |  | 	// Local host
 | 
					
						
							| 
									
										
										
										
											2019-01-14 14:44:20 +08:00
										 |  |  | 	thisAddr, err := xnet.ParseHost(GetLocalPeer(globalEndpoints)) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		logger.LogIf(ctx, err) | 
					
						
							|  |  |  | 		return profilingDataFound | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	data, err := getProfileData() | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", thisAddr.String()) | 
					
						
							|  |  |  | 		ctx := logger.SetReqInfo(ctx, reqInfo) | 
					
						
							|  |  |  | 		logger.LogIf(ctx, err) | 
					
						
							|  |  |  | 		return profilingDataFound | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	profilingDataFound = true | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Send profiling data to zip as file
 | 
					
						
							| 
									
										
										
										
											2020-01-11 09:19:58 +08:00
										 |  |  | 	for typ, data := range data { | 
					
						
							|  |  |  | 		header, zerr := zip.FileInfoHeader(dummyFileInfo{ | 
					
						
							| 
									
										
										
										
											2020-03-04 22:58:12 +08:00
										 |  |  | 			name:    fmt.Sprintf("profile-%s-%s", thisAddr, typ), | 
					
						
							| 
									
										
										
										
											2020-01-11 09:19:58 +08:00
										 |  |  | 			size:    int64(len(data)), | 
					
						
							|  |  |  | 			mode:    0600, | 
					
						
							|  |  |  | 			modTime: UTCNow(), | 
					
						
							|  |  |  | 			isDir:   false, | 
					
						
							|  |  |  | 			sys:     nil, | 
					
						
							|  |  |  | 		}) | 
					
						
							|  |  |  | 		if zerr != nil { | 
					
						
							|  |  |  | 			return profilingDataFound | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2019-01-14 14:44:20 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-01-11 09:19:58 +08:00
										 |  |  | 		zwriter, zerr := zipWriter.CreateHeader(header) | 
					
						
							|  |  |  | 		if zerr != nil { | 
					
						
							|  |  |  | 			return profilingDataFound | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2019-01-14 14:44:20 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-01-11 09:19:58 +08:00
										 |  |  | 		if _, err = io.Copy(zwriter, bytes.NewBuffer(data)); err != nil { | 
					
						
							|  |  |  | 			return profilingDataFound | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2019-01-14 14:44:20 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return profilingDataFound | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-08-29 06:04:43 +08:00
										 |  |  | // ServerUpdate - updates remote peers.
 | 
					
						
							| 
									
										
										
										
											2020-07-23 23:03:31 +08:00
										 |  |  | func (sys *NotificationSys) ServerUpdate(ctx context.Context, u *url.URL, sha256Sum []byte, lrTime time.Time) []NotificationPeerErr { | 
					
						
							| 
									
										
										
										
											2019-08-29 06:04:43 +08:00
										 |  |  | 	ng := WithNPeers(len(sys.peerClients)) | 
					
						
							|  |  |  | 	for idx, client := range sys.peerClients { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		client := client | 
					
						
							| 
									
										
										
										
											2020-07-23 23:03:31 +08:00
										 |  |  | 		ng.Go(ctx, func() error { | 
					
						
							|  |  |  | 			return client.ServerUpdate(ctx, u, sha256Sum, lrTime) | 
					
						
							| 
									
										
										
										
											2019-08-29 06:04:43 +08:00
										 |  |  | 		}, idx, *client.host) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return ng.Wait() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-01-14 14:44:20 +08:00
										 |  |  | // SignalService - calls signal service RPC call on all peers.
 | 
					
						
							|  |  |  | func (sys *NotificationSys) SignalService(sig serviceSignal) []NotificationPeerErr { | 
					
						
							| 
									
										
										
										
											2019-03-15 07:27:31 +08:00
										 |  |  | 	ng := WithNPeers(len(sys.peerClients)) | 
					
						
							|  |  |  | 	for idx, client := range sys.peerClients { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2019-01-14 14:44:20 +08:00
										 |  |  | 		client := client | 
					
						
							| 
									
										
										
										
											2020-04-10 00:30:02 +08:00
										 |  |  | 		ng.Go(GlobalContext, func() error { | 
					
						
							| 
									
										
										
										
											2019-01-14 14:44:20 +08:00
										 |  |  | 			return client.SignalService(sig) | 
					
						
							| 
									
										
										
										
											2019-03-15 07:27:31 +08:00
										 |  |  | 		}, idx, *client.host) | 
					
						
							| 
									
										
										
										
											2019-01-14 14:44:20 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	return ng.Wait() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-04-28 01:06:21 +08:00
										 |  |  | // updateBloomFilter will cycle all servers to the current index and
 | 
					
						
							|  |  |  | // return a merged bloom filter if a complete one can be retrieved.
 | 
					
						
							|  |  |  | func (sys *NotificationSys) updateBloomFilter(ctx context.Context, current uint64) (*bloomFilter, error) { | 
					
						
							|  |  |  | 	var req = bloomFilterRequest{ | 
					
						
							|  |  |  | 		Current: current, | 
					
						
							|  |  |  | 		Oldest:  current - dataUsageUpdateDirCycles, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if current < dataUsageUpdateDirCycles { | 
					
						
							|  |  |  | 		req.Oldest = 0 | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Load initial state from local...
 | 
					
						
							|  |  |  | 	var bf *bloomFilter | 
					
						
							|  |  |  | 	bfr, err := intDataUpdateTracker.cycleFilter(ctx, req.Oldest, req.Current) | 
					
						
							|  |  |  | 	logger.LogIf(ctx, err) | 
					
						
							|  |  |  | 	if err == nil && bfr.Complete { | 
					
						
							|  |  |  | 		nbf := intDataUpdateTracker.newBloomFilter() | 
					
						
							|  |  |  | 		bf = &nbf | 
					
						
							|  |  |  | 		_, err = bf.ReadFrom(bytes.NewBuffer(bfr.Filter)) | 
					
						
							|  |  |  | 		logger.LogIf(ctx, err) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	var mu sync.Mutex | 
					
						
							|  |  |  | 	g := errgroup.WithNErrs(len(sys.peerClients)) | 
					
						
							|  |  |  | 	for idx, client := range sys.peerClients { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		client := client | 
					
						
							|  |  |  | 		g.Go(func() error { | 
					
						
							|  |  |  | 			serverBF, err := client.cycleServerBloomFilter(ctx, req) | 
					
						
							|  |  |  | 			if false && intDataUpdateTracker.debug { | 
					
						
							|  |  |  | 				b, _ := json.MarshalIndent(serverBF, "", "  ") | 
					
						
							|  |  |  | 				logger.Info("Disk %v, Bloom filter: %v", client.host.Name, string(b)) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			// Keep lock while checking result.
 | 
					
						
							|  |  |  | 			mu.Lock() | 
					
						
							|  |  |  | 			defer mu.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 			if err != nil || !serverBF.Complete || bf == nil { | 
					
						
							|  |  |  | 				logger.LogIf(ctx, err) | 
					
						
							|  |  |  | 				bf = nil | 
					
						
							|  |  |  | 				return nil | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 			var tmp bloom.BloomFilter | 
					
						
							|  |  |  | 			_, err = tmp.ReadFrom(bytes.NewBuffer(serverBF.Filter)) | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							|  |  |  | 				logger.LogIf(ctx, err) | 
					
						
							|  |  |  | 				bf = nil | 
					
						
							|  |  |  | 				return nil | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			if bf.BloomFilter == nil { | 
					
						
							|  |  |  | 				bf.BloomFilter = &tmp | 
					
						
							|  |  |  | 			} else { | 
					
						
							|  |  |  | 				err = bf.Merge(&tmp) | 
					
						
							|  |  |  | 				if err != nil { | 
					
						
							|  |  |  | 					logger.LogIf(ctx, err) | 
					
						
							|  |  |  | 					bf = nil | 
					
						
							|  |  |  | 					return nil | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			return nil | 
					
						
							|  |  |  | 		}, idx) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	g.Wait() | 
					
						
							|  |  |  | 	return bf, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-01-24 23:22:14 +08:00
										 |  |  | // GetLocks - makes GetLocks RPC call on all peers.
 | 
					
						
							|  |  |  | func (sys *NotificationSys) GetLocks(ctx context.Context) []*PeerLocks { | 
					
						
							| 
									
										
										
										
											2019-03-15 07:27:31 +08:00
										 |  |  | 	locksResp := make([]*PeerLocks, len(sys.peerClients)) | 
					
						
							| 
									
										
										
										
											2019-10-15 00:44:51 +08:00
										 |  |  | 	g := errgroup.WithNErrs(len(sys.peerClients)) | 
					
						
							| 
									
										
										
										
											2019-03-15 07:27:31 +08:00
										 |  |  | 	for index, client := range sys.peerClients { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2019-10-15 00:44:51 +08:00
										 |  |  | 		index := index | 
					
						
							|  |  |  | 		g.Go(func() error { | 
					
						
							| 
									
										
										
										
											2019-01-24 23:22:14 +08:00
										 |  |  | 			// Try to fetch serverInfo remotely in three attempts.
 | 
					
						
							|  |  |  | 			for i := 0; i < 3; i++ { | 
					
						
							| 
									
										
										
										
											2019-10-15 00:44:51 +08:00
										 |  |  | 				serverLocksResp, err := sys.peerClients[index].GetLocks() | 
					
						
							| 
									
										
										
										
											2019-01-24 23:22:14 +08:00
										 |  |  | 				if err == nil { | 
					
						
							| 
									
										
										
										
											2019-10-15 00:44:51 +08:00
										 |  |  | 					locksResp[index] = &PeerLocks{ | 
					
						
							|  |  |  | 						Addr:  sys.peerClients[index].host.String(), | 
					
						
							| 
									
										
										
										
											2019-01-24 23:22:14 +08:00
										 |  |  | 						Locks: serverLocksResp, | 
					
						
							|  |  |  | 					} | 
					
						
							| 
									
										
										
										
											2019-10-15 00:44:51 +08:00
										 |  |  | 					return nil | 
					
						
							| 
									
										
										
										
											2019-01-24 23:22:14 +08:00
										 |  |  | 				} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 				// Last iteration log the error.
 | 
					
						
							|  |  |  | 				if i == 2 { | 
					
						
							| 
									
										
										
										
											2019-10-15 00:44:51 +08:00
										 |  |  | 					return err | 
					
						
							| 
									
										
										
										
											2019-01-24 23:22:14 +08:00
										 |  |  | 				} | 
					
						
							|  |  |  | 				// Wait for one second and no need wait after last attempt.
 | 
					
						
							|  |  |  | 				if i < 2 { | 
					
						
							|  |  |  | 					time.Sleep(1 * time.Second) | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2019-10-15 00:44:51 +08:00
										 |  |  | 			return nil | 
					
						
							|  |  |  | 		}, index) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	for index, err := range g.Wait() { | 
					
						
							|  |  |  | 		reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", | 
					
						
							|  |  |  | 			sys.peerClients[index].host.String()) | 
					
						
							|  |  |  | 		ctx := logger.SetReqInfo(ctx, reqInfo) | 
					
						
							|  |  |  | 		logger.LogOnceIf(ctx, err, sys.peerClients[index].host.String()) | 
					
						
							| 
									
										
										
										
											2019-01-24 23:22:14 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	return locksResp | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-05-20 04:53:54 +08:00
										 |  |  | // LoadBucketMetadata - calls LoadBucketMetadata call on all peers
 | 
					
						
							|  |  |  | func (sys *NotificationSys) LoadBucketMetadata(ctx context.Context, bucketName string) { | 
					
						
							|  |  |  | 	ng := WithNPeers(len(sys.peerClients)) | 
					
						
							|  |  |  | 	for idx, client := range sys.peerClients { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							| 
									
										
										
										
											2019-07-20 04:20:33 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2020-05-20 04:53:54 +08:00
										 |  |  | 		client := client | 
					
						
							|  |  |  | 		ng.Go(ctx, func() error { | 
					
						
							|  |  |  | 			return client.LoadBucketMetadata(bucketName) | 
					
						
							|  |  |  | 		}, idx, *client.host) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	for _, nErr := range ng.Wait() { | 
					
						
							|  |  |  | 		reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", nErr.Host.String()) | 
					
						
							|  |  |  | 		if nErr.Err != nil { | 
					
						
							|  |  |  | 			logger.LogIf(logger.SetReqInfo(ctx, reqInfo), nErr.Err) | 
					
						
							| 
									
										
										
										
											2019-07-20 04:20:33 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2020-05-20 04:53:54 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2019-07-20 04:20:33 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-05-20 04:53:54 +08:00
										 |  |  | // DeleteBucketMetadata - calls DeleteBucketMetadata call on all peers
 | 
					
						
							|  |  |  | func (sys *NotificationSys) DeleteBucketMetadata(ctx context.Context, bucketName string) { | 
					
						
							|  |  |  | 	globalBucketMetadataSys.Remove(bucketName) | 
					
						
							| 
									
										
										
										
											2020-02-05 17:42:34 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-05-20 04:53:54 +08:00
										 |  |  | 	ng := WithNPeers(len(sys.peerClients)) | 
					
						
							|  |  |  | 	for idx, client := range sys.peerClients { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							| 
									
										
										
										
											2020-02-05 17:42:34 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2020-05-20 04:53:54 +08:00
										 |  |  | 		client := client | 
					
						
							|  |  |  | 		ng.Go(ctx, func() error { | 
					
						
							|  |  |  | 			return client.DeleteBucketMetadata(bucketName) | 
					
						
							|  |  |  | 		}, idx, *client.host) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	for _, nErr := range ng.Wait() { | 
					
						
							|  |  |  | 		reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", nErr.Host.String()) | 
					
						
							|  |  |  | 		if nErr.Err != nil { | 
					
						
							|  |  |  | 			logger.LogIf(logger.SetReqInfo(ctx, reqInfo), nErr.Err) | 
					
						
							| 
									
										
										
										
											2018-05-10 06:11:51 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2020-05-20 04:53:54 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-10-02 08:05:02 +08:00
										 |  |  | // Loads notification policies for all buckets into NotificationSys.
 | 
					
						
							|  |  |  | func (sys *NotificationSys) load(buckets []BucketInfo, objAPI ObjectLayer) error { | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 	for _, bucket := range buckets { | 
					
						
							| 
									
										
										
										
											2020-04-10 00:30:02 +08:00
										 |  |  | 		ctx := logger.SetReqInfo(GlobalContext, &logger.ReqInfo{BucketName: bucket.Name}) | 
					
						
							| 
									
										
										
										
											2020-05-21 01:18:15 +08:00
										 |  |  | 		config, err := globalBucketMetadataSys.GetNotificationConfig(bucket.Name) | 
					
						
							| 
									
										
										
										
											2020-05-20 04:53:54 +08:00
										 |  |  | 		if err != nil { | 
					
						
							| 
									
										
										
										
											2019-06-19 00:23:33 +08:00
										 |  |  | 			return err | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2020-05-22 02:03:59 +08:00
										 |  |  | 		config.SetRegion(globalServerRegion) | 
					
						
							| 
									
										
										
										
											2020-05-21 01:18:15 +08:00
										 |  |  | 		if err = config.Validate(globalServerRegion, globalNotificationSys.targetList); err != nil { | 
					
						
							| 
									
										
										
										
											2020-05-20 04:53:54 +08:00
										 |  |  | 			if _, ok := err.(*event.ErrARNNotFound); !ok { | 
					
						
							|  |  |  | 				logger.LogIf(ctx, err) | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2018-10-09 06:47:13 +08:00
										 |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		sys.AddRulesMap(bucket.Name, config.ToRulesMap()) | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-06-13 11:04:01 +08:00
										 |  |  | // Init - initializes notification system from notification.xml and listenxl.meta of all buckets.
 | 
					
						
							| 
									
										
										
										
											2020-08-21 01:38:53 +08:00
										 |  |  | func (sys *NotificationSys) Init(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error { | 
					
						
							| 
									
										
										
										
											2018-10-09 06:47:13 +08:00
										 |  |  | 	if objAPI == nil { | 
					
						
							| 
									
										
										
										
											2020-05-05 00:42:58 +08:00
										 |  |  | 		return errServerNotInitialized | 
					
						
							| 
									
										
										
										
											2018-10-09 06:47:13 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-10-02 08:05:02 +08:00
										 |  |  | 	// In gateway mode, notifications are not supported.
 | 
					
						
							| 
									
										
										
										
											2020-02-02 17:52:07 +08:00
										 |  |  | 	if globalIsGateway && !objAPI.IsNotificationSupported() { | 
					
						
							| 
									
										
										
										
											2019-10-02 08:05:02 +08:00
										 |  |  | 		return nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-12-22 14:35:50 +08:00
										 |  |  | 	if globalConfigTargetList != nil { | 
					
						
							|  |  |  | 		for _, target := range globalConfigTargetList.Targets() { | 
					
						
							|  |  |  | 			if err := sys.targetList.Add(target); err != nil { | 
					
						
							|  |  |  | 				return err | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-04-27 21:25:05 +08:00
										 |  |  | 	go func() { | 
					
						
							|  |  |  | 		for res := range sys.targetResCh { | 
					
						
							|  |  |  | 			if res.Err != nil { | 
					
						
							|  |  |  | 				reqInfo := &logger.ReqInfo{} | 
					
						
							|  |  |  | 				reqInfo.AppendTags("targetID", res.ID.Name) | 
					
						
							| 
									
										
										
										
											2020-08-21 01:38:53 +08:00
										 |  |  | 				logger.LogOnceIf(logger.SetReqInfo(GlobalContext, reqInfo), res.Err, res.ID) | 
					
						
							| 
									
										
										
										
											2020-04-27 21:25:05 +08:00
										 |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	}() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-08-21 01:38:53 +08:00
										 |  |  | 	go logger.LogIf(ctx, sys.load(buckets, objAPI)) | 
					
						
							|  |  |  | 	return nil | 
					
						
							| 
									
										
										
										
											2018-10-09 06:47:13 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | // AddRulesMap - adds rules map for bucket name.
 | 
					
						
							|  |  |  | func (sys *NotificationSys) AddRulesMap(bucketName string, rulesMap event.RulesMap) { | 
					
						
							|  |  |  | 	sys.Lock() | 
					
						
							|  |  |  | 	defer sys.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	rulesMap = rulesMap.Clone() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for _, targetRulesMap := range sys.bucketRemoteTargetRulesMap[bucketName] { | 
					
						
							|  |  |  | 		rulesMap.Add(targetRulesMap) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-08-24 01:23:18 +08:00
										 |  |  | 	// Do not add for an empty rulesMap.
 | 
					
						
							|  |  |  | 	if len(rulesMap) == 0 { | 
					
						
							|  |  |  | 		delete(sys.bucketRulesMap, bucketName) | 
					
						
							|  |  |  | 	} else { | 
					
						
							|  |  |  | 		sys.bucketRulesMap[bucketName] = rulesMap | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // RemoveRulesMap - removes rules map for bucket name.
 | 
					
						
							|  |  |  | func (sys *NotificationSys) RemoveRulesMap(bucketName string, rulesMap event.RulesMap) { | 
					
						
							|  |  |  | 	sys.Lock() | 
					
						
							|  |  |  | 	defer sys.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	sys.bucketRulesMap[bucketName].Remove(rulesMap) | 
					
						
							|  |  |  | 	if len(sys.bucketRulesMap[bucketName]) == 0 { | 
					
						
							|  |  |  | 		delete(sys.bucketRulesMap, bucketName) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-12-14 04:36:45 +08:00
										 |  |  | // ConfiguredTargetIDs - returns list of configured target id's
 | 
					
						
							|  |  |  | func (sys *NotificationSys) ConfiguredTargetIDs() []event.TargetID { | 
					
						
							| 
									
										
										
										
											2019-12-20 05:45:56 +08:00
										 |  |  | 	if sys == nil { | 
					
						
							|  |  |  | 		return nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-12-14 04:36:45 +08:00
										 |  |  | 	sys.RLock() | 
					
						
							|  |  |  | 	defer sys.RUnlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	var targetIDs []event.TargetID | 
					
						
							|  |  |  | 	for _, rmap := range sys.bucketRulesMap { | 
					
						
							|  |  |  | 		for _, rules := range rmap { | 
					
						
							|  |  |  | 			for _, targetSet := range rules { | 
					
						
							| 
									
										
										
										
											2020-04-27 21:25:05 +08:00
										 |  |  | 				for id := range targetSet { | 
					
						
							|  |  |  | 					targetIDs = append(targetIDs, id) | 
					
						
							|  |  |  | 				} | 
					
						
							| 
									
										
										
										
											2019-12-14 04:36:45 +08:00
										 |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2020-03-14 15:01:15 +08:00
										 |  |  | 	// Filter out targets configured via env
 | 
					
						
							|  |  |  | 	var tIDs []event.TargetID | 
					
						
							|  |  |  | 	for _, targetID := range targetIDs { | 
					
						
							|  |  |  | 		if !globalEnvTargetList.Exists(targetID) { | 
					
						
							|  |  |  | 			tIDs = append(tIDs, targetID) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return tIDs | 
					
						
							| 
									
										
										
										
											2019-12-14 04:36:45 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | // RemoveNotification - removes all notification configuration for bucket name.
 | 
					
						
							|  |  |  | func (sys *NotificationSys) RemoveNotification(bucketName string) { | 
					
						
							|  |  |  | 	sys.Lock() | 
					
						
							|  |  |  | 	defer sys.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	delete(sys.bucketRulesMap, bucketName) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-04-27 21:25:05 +08:00
										 |  |  | 	targetIDSet := event.NewTargetIDSet() | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 	for targetID := range sys.bucketRemoteTargetRulesMap[bucketName] { | 
					
						
							| 
									
										
										
										
											2020-04-27 21:25:05 +08:00
										 |  |  | 		targetIDSet[targetID] = struct{}{} | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 		delete(sys.bucketRemoteTargetRulesMap[bucketName], targetID) | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2020-04-27 21:25:05 +08:00
										 |  |  | 	sys.targetList.Remove(targetIDSet) | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	delete(sys.bucketRemoteTargetRulesMap, bucketName) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-04-27 21:25:05 +08:00
										 |  |  | // RemoveAllRemoteTargets - closes and removes all notification targets.
 | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | func (sys *NotificationSys) RemoveAllRemoteTargets() { | 
					
						
							| 
									
										
										
										
											2020-03-22 13:10:13 +08:00
										 |  |  | 	sys.Lock() | 
					
						
							|  |  |  | 	defer sys.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 	for _, targetMap := range sys.bucketRemoteTargetRulesMap { | 
					
						
							| 
									
										
										
										
											2020-04-27 21:25:05 +08:00
										 |  |  | 		targetIDSet := event.NewTargetIDSet() | 
					
						
							|  |  |  | 		for k := range targetMap { | 
					
						
							|  |  |  | 			targetIDSet[k] = struct{}{} | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2020-04-27 21:25:05 +08:00
										 |  |  | 		sys.targetList.Remove(targetIDSet) | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Send - sends event data to all matching targets.
 | 
					
						
							| 
									
										
										
										
											2020-04-27 21:25:05 +08:00
										 |  |  | func (sys *NotificationSys) Send(args eventArgs) { | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 	sys.RLock() | 
					
						
							|  |  |  | 	targetIDSet := sys.bucketRulesMap[args.BucketName].Match(args.EventName, args.Object.Name) | 
					
						
							|  |  |  | 	sys.RUnlock() | 
					
						
							| 
									
										
										
										
											2018-10-13 03:25:59 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 	if len(targetIDSet) == 0 { | 
					
						
							| 
									
										
										
										
											2020-04-27 21:25:05 +08:00
										 |  |  | 		return | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-04-27 21:25:05 +08:00
										 |  |  | 	sys.targetList.Send(args.ToEvent(true), targetIDSet, sys.targetResCh) | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-03-27 12:07:39 +08:00
										 |  |  | // NetOBDInfo - Net OBD information
 | 
					
						
							|  |  |  | func (sys *NotificationSys) NetOBDInfo(ctx context.Context) madmin.ServerNetOBDInfo { | 
					
						
							|  |  |  | 	var sortedGlobalEndpoints []string | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	/* | 
					
						
							|  |  |  | 			Ensure that only untraversed links are visited by this server | 
					
						
							|  |  |  | 		        i.e. if netOBD tests have been performed between a -> b, then do | 
					
						
							|  |  |  | 			not run it between b -> a | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		        The graph of tests looks like this | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		            a   b   c   d | 
					
						
							|  |  |  | 		        a | o | x | x | x | | 
					
						
							|  |  |  | 		        b | o | o | x | x | | 
					
						
							|  |  |  | 		        c | o | o | o | x | | 
					
						
							|  |  |  | 		        d | o | o | o | o | | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		        'x's should be tested, and 'o's should be skipped | 
					
						
							|  |  |  | 	*/ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	hostSet := set.NewStringSet() | 
					
						
							|  |  |  | 	for _, ez := range globalEndpoints { | 
					
						
							|  |  |  | 		for _, e := range ez.Endpoints { | 
					
						
							| 
									
										
										
										
											2020-03-31 16:15:21 +08:00
										 |  |  | 			if !hostSet.Contains(e.Host) { | 
					
						
							|  |  |  | 				sortedGlobalEndpoints = append(sortedGlobalEndpoints, e.Host) | 
					
						
							|  |  |  | 				hostSet.Add(e.Host) | 
					
						
							| 
									
										
										
										
											2020-03-27 12:07:39 +08:00
										 |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	sort.Strings(sortedGlobalEndpoints) | 
					
						
							|  |  |  | 	var remoteTargets []*peerRESTClient | 
					
						
							|  |  |  | 	search := func(host string) *peerRESTClient { | 
					
						
							|  |  |  | 		for index, client := range sys.peerClients { | 
					
						
							|  |  |  | 			if client == nil { | 
					
						
							|  |  |  | 				continue | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			if sys.peerClients[index].host.String() == host { | 
					
						
							|  |  |  | 				return client | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		return nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for i := 0; i < len(sortedGlobalEndpoints); i++ { | 
					
						
							|  |  |  | 		if sortedGlobalEndpoints[i] != GetLocalPeer(globalEndpoints) { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		for j := 0; j < len(sortedGlobalEndpoints); j++ { | 
					
						
							|  |  |  | 			if j > i { | 
					
						
							|  |  |  | 				remoteTarget := search(sortedGlobalEndpoints[j]) | 
					
						
							|  |  |  | 				if remoteTarget != nil { | 
					
						
							|  |  |  | 					remoteTargets = append(remoteTargets, remoteTarget) | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	netOBDs := make([]madmin.NetOBDInfo, len(remoteTargets)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for index, client := range remoteTargets { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		var err error | 
					
						
							|  |  |  | 		netOBDs[index], err = client.NetOBDInfo(ctx) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		addr := client.host.String() | 
					
						
							|  |  |  | 		reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr) | 
					
						
							| 
									
										
										
										
											2020-04-10 00:30:02 +08:00
										 |  |  | 		ctx := logger.SetReqInfo(GlobalContext, reqInfo) | 
					
						
							| 
									
										
										
										
											2020-03-27 12:07:39 +08:00
										 |  |  | 		logger.LogIf(ctx, err) | 
					
						
							|  |  |  | 		netOBDs[index].Addr = addr | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			netOBDs[index].Error = err.Error() | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return madmin.ServerNetOBDInfo{ | 
					
						
							|  |  |  | 		Net:  netOBDs, | 
					
						
							|  |  |  | 		Addr: GetLocalPeer(globalEndpoints), | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // DispatchNetOBDInfo - Net OBD information from other nodes
 | 
					
						
							|  |  |  | func (sys *NotificationSys) DispatchNetOBDInfo(ctx context.Context) []madmin.ServerNetOBDInfo { | 
					
						
							|  |  |  | 	serverNetOBDs := []madmin.ServerNetOBDInfo{} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for index, client := range sys.peerClients { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		serverNetOBD, err := sys.peerClients[index].DispatchNetOBDInfo(ctx) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			serverNetOBD.Addr = client.host.String() | 
					
						
							|  |  |  | 			serverNetOBD.Error = err.Error() | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		serverNetOBDs = append(serverNetOBDs, serverNetOBD) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return serverNetOBDs | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-05-23 08:56:45 +08:00
										 |  |  | // DispatchNetOBDChan - Net OBD information from other nodes
 | 
					
						
							|  |  |  | func (sys *NotificationSys) DispatchNetOBDChan(ctx context.Context) chan madmin.ServerNetOBDInfo { | 
					
						
							|  |  |  | 	serverNetOBDs := make(chan madmin.ServerNetOBDInfo) | 
					
						
							|  |  |  | 	wg := sync.WaitGroup{} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	wg.Add(1) | 
					
						
							|  |  |  | 	go func() { | 
					
						
							|  |  |  | 		for _, client := range sys.peerClients { | 
					
						
							|  |  |  | 			if client == nil { | 
					
						
							|  |  |  | 				continue | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			serverNetOBD, err := client.DispatchNetOBDInfo(ctx) | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							|  |  |  | 				serverNetOBD.Addr = client.host.String() | 
					
						
							|  |  |  | 				serverNetOBD.Error = err.Error() | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			serverNetOBDs <- serverNetOBD | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		wg.Done() | 
					
						
							|  |  |  | 	}() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	go func() { | 
					
						
							|  |  |  | 		wg.Wait() | 
					
						
							|  |  |  | 		close(serverNetOBDs) | 
					
						
							|  |  |  | 	}() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return serverNetOBDs | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-04-01 08:08:28 +08:00
										 |  |  | // NetOBDParallelInfo - Performs NetOBD tests
 | 
					
						
							|  |  |  | func (sys *NotificationSys) NetOBDParallelInfo(ctx context.Context) madmin.ServerNetOBDInfo { | 
					
						
							|  |  |  | 	netOBDs := []madmin.NetOBDInfo{} | 
					
						
							|  |  |  | 	wg := sync.WaitGroup{} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for index, client := range sys.peerClients { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		wg.Add(1) | 
					
						
							|  |  |  | 		go func(index int) { | 
					
						
							|  |  |  | 			netOBD, err := sys.peerClients[index].NetOBDInfo(ctx) | 
					
						
							|  |  |  | 			netOBD.Addr = sys.peerClients[index].host.String() | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							|  |  |  | 				netOBD.Error = err.Error() | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			netOBDs = append(netOBDs, netOBD) | 
					
						
							|  |  |  | 			wg.Done() | 
					
						
							|  |  |  | 		}(index) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	wg.Wait() | 
					
						
							|  |  |  | 	return madmin.ServerNetOBDInfo{ | 
					
						
							|  |  |  | 		Net:  netOBDs, | 
					
						
							|  |  |  | 		Addr: GetLocalPeer(globalEndpoints), | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-03-27 12:07:39 +08:00
										 |  |  | // DriveOBDInfo - Drive OBD information
 | 
					
						
							|  |  |  | func (sys *NotificationSys) DriveOBDInfo(ctx context.Context) []madmin.ServerDrivesOBDInfo { | 
					
						
							|  |  |  | 	reply := make([]madmin.ServerDrivesOBDInfo, len(sys.peerClients)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	g := errgroup.WithNErrs(len(sys.peerClients)) | 
					
						
							|  |  |  | 	for index, client := range sys.peerClients { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		index := index | 
					
						
							|  |  |  | 		g.Go(func() error { | 
					
						
							|  |  |  | 			var err error | 
					
						
							|  |  |  | 			reply[index], err = sys.peerClients[index].DriveOBDInfo(ctx) | 
					
						
							|  |  |  | 			return err | 
					
						
							|  |  |  | 		}, index) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for index, err := range g.Wait() { | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			addr := sys.peerClients[index].host.String() | 
					
						
							|  |  |  | 			reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr) | 
					
						
							| 
									
										
										
										
											2020-04-10 00:30:02 +08:00
										 |  |  | 			ctx := logger.SetReqInfo(GlobalContext, reqInfo) | 
					
						
							| 
									
										
										
										
											2020-03-27 12:07:39 +08:00
										 |  |  | 			logger.LogIf(ctx, err) | 
					
						
							|  |  |  | 			reply[index].Addr = addr | 
					
						
							|  |  |  | 			reply[index].Error = err.Error() | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return reply | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-05-23 08:56:45 +08:00
										 |  |  | // DriveOBDInfoChan - Drive OBD information
 | 
					
						
							|  |  |  | func (sys *NotificationSys) DriveOBDInfoChan(ctx context.Context) chan madmin.ServerDrivesOBDInfo { | 
					
						
							|  |  |  | 	updateChan := make(chan madmin.ServerDrivesOBDInfo) | 
					
						
							|  |  |  | 	wg := sync.WaitGroup{} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for _, client := range sys.peerClients { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		wg.Add(1) | 
					
						
							|  |  |  | 		go func(client *peerRESTClient) { | 
					
						
							|  |  |  | 			reply, err := client.DriveOBDInfo(ctx) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 			addr := client.host.String() | 
					
						
							|  |  |  | 			reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr) | 
					
						
							|  |  |  | 			ctx := logger.SetReqInfo(GlobalContext, reqInfo) | 
					
						
							|  |  |  | 			logger.LogIf(ctx, err) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 			reply.Addr = addr | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							|  |  |  | 				reply.Error = err.Error() | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 			updateChan <- reply | 
					
						
							|  |  |  | 			wg.Done() | 
					
						
							|  |  |  | 		}(client) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	go func() { | 
					
						
							|  |  |  | 		wg.Wait() | 
					
						
							|  |  |  | 		close(updateChan) | 
					
						
							|  |  |  | 	}() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return updateChan | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-03-27 12:07:39 +08:00
										 |  |  | // CPUOBDInfo - CPU OBD information
 | 
					
						
							|  |  |  | func (sys *NotificationSys) CPUOBDInfo(ctx context.Context) []madmin.ServerCPUOBDInfo { | 
					
						
							|  |  |  | 	reply := make([]madmin.ServerCPUOBDInfo, len(sys.peerClients)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	g := errgroup.WithNErrs(len(sys.peerClients)) | 
					
						
							|  |  |  | 	for index, client := range sys.peerClients { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		index := index | 
					
						
							|  |  |  | 		g.Go(func() error { | 
					
						
							|  |  |  | 			var err error | 
					
						
							|  |  |  | 			reply[index], err = sys.peerClients[index].CPUOBDInfo(ctx) | 
					
						
							|  |  |  | 			return err | 
					
						
							|  |  |  | 		}, index) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for index, err := range g.Wait() { | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			addr := sys.peerClients[index].host.String() | 
					
						
							|  |  |  | 			reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr) | 
					
						
							| 
									
										
										
										
											2020-04-10 00:30:02 +08:00
										 |  |  | 			ctx := logger.SetReqInfo(GlobalContext, reqInfo) | 
					
						
							| 
									
										
										
										
											2020-03-27 12:07:39 +08:00
										 |  |  | 			logger.LogIf(ctx, err) | 
					
						
							|  |  |  | 			reply[index].Addr = addr | 
					
						
							|  |  |  | 			reply[index].Error = err.Error() | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return reply | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // DiskHwOBDInfo - Disk HW OBD information
 | 
					
						
							|  |  |  | func (sys *NotificationSys) DiskHwOBDInfo(ctx context.Context) []madmin.ServerDiskHwOBDInfo { | 
					
						
							|  |  |  | 	reply := make([]madmin.ServerDiskHwOBDInfo, len(sys.peerClients)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	g := errgroup.WithNErrs(len(sys.peerClients)) | 
					
						
							|  |  |  | 	for index, client := range sys.peerClients { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		index := index | 
					
						
							|  |  |  | 		g.Go(func() error { | 
					
						
							|  |  |  | 			var err error | 
					
						
							|  |  |  | 			reply[index], err = sys.peerClients[index].DiskHwOBDInfo(ctx) | 
					
						
							|  |  |  | 			return err | 
					
						
							|  |  |  | 		}, index) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for index, err := range g.Wait() { | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			addr := sys.peerClients[index].host.String() | 
					
						
							|  |  |  | 			reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr) | 
					
						
							| 
									
										
										
										
											2020-04-10 00:30:02 +08:00
										 |  |  | 			ctx := logger.SetReqInfo(GlobalContext, reqInfo) | 
					
						
							| 
									
										
										
										
											2020-03-27 12:07:39 +08:00
										 |  |  | 			logger.LogIf(ctx, err) | 
					
						
							|  |  |  | 			reply[index].Addr = addr | 
					
						
							|  |  |  | 			reply[index].Error = err.Error() | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return reply | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // OsOBDInfo - Os OBD information
 | 
					
						
							|  |  |  | func (sys *NotificationSys) OsOBDInfo(ctx context.Context) []madmin.ServerOsOBDInfo { | 
					
						
							|  |  |  | 	reply := make([]madmin.ServerOsOBDInfo, len(sys.peerClients)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	g := errgroup.WithNErrs(len(sys.peerClients)) | 
					
						
							|  |  |  | 	for index, client := range sys.peerClients { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		index := index | 
					
						
							|  |  |  | 		g.Go(func() error { | 
					
						
							|  |  |  | 			var err error | 
					
						
							|  |  |  | 			reply[index], err = sys.peerClients[index].OsOBDInfo(ctx) | 
					
						
							|  |  |  | 			return err | 
					
						
							|  |  |  | 		}, index) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for index, err := range g.Wait() { | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			addr := sys.peerClients[index].host.String() | 
					
						
							|  |  |  | 			reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr) | 
					
						
							| 
									
										
										
										
											2020-04-10 00:30:02 +08:00
										 |  |  | 			ctx := logger.SetReqInfo(GlobalContext, reqInfo) | 
					
						
							| 
									
										
										
										
											2020-03-27 12:07:39 +08:00
										 |  |  | 			logger.LogIf(ctx, err) | 
					
						
							|  |  |  | 			reply[index].Addr = addr | 
					
						
							|  |  |  | 			reply[index].Error = err.Error() | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return reply | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // MemOBDInfo - Mem OBD information
 | 
					
						
							|  |  |  | func (sys *NotificationSys) MemOBDInfo(ctx context.Context) []madmin.ServerMemOBDInfo { | 
					
						
							|  |  |  | 	reply := make([]madmin.ServerMemOBDInfo, len(sys.peerClients)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	g := errgroup.WithNErrs(len(sys.peerClients)) | 
					
						
							|  |  |  | 	for index, client := range sys.peerClients { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		index := index | 
					
						
							|  |  |  | 		g.Go(func() error { | 
					
						
							|  |  |  | 			var err error | 
					
						
							|  |  |  | 			reply[index], err = sys.peerClients[index].MemOBDInfo(ctx) | 
					
						
							|  |  |  | 			return err | 
					
						
							|  |  |  | 		}, index) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for index, err := range g.Wait() { | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			addr := sys.peerClients[index].host.String() | 
					
						
							|  |  |  | 			reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr) | 
					
						
							| 
									
										
										
										
											2020-04-10 00:30:02 +08:00
										 |  |  | 			ctx := logger.SetReqInfo(GlobalContext, reqInfo) | 
					
						
							| 
									
										
										
										
											2020-03-27 12:07:39 +08:00
										 |  |  | 			logger.LogIf(ctx, err) | 
					
						
							|  |  |  | 			reply[index].Addr = addr | 
					
						
							|  |  |  | 			reply[index].Error = err.Error() | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return reply | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // ProcOBDInfo - Process OBD information
 | 
					
						
							|  |  |  | func (sys *NotificationSys) ProcOBDInfo(ctx context.Context) []madmin.ServerProcOBDInfo { | 
					
						
							|  |  |  | 	reply := make([]madmin.ServerProcOBDInfo, len(sys.peerClients)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	g := errgroup.WithNErrs(len(sys.peerClients)) | 
					
						
							|  |  |  | 	for index, client := range sys.peerClients { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		index := index | 
					
						
							|  |  |  | 		g.Go(func() error { | 
					
						
							|  |  |  | 			var err error | 
					
						
							|  |  |  | 			reply[index], err = sys.peerClients[index].ProcOBDInfo(ctx) | 
					
						
							|  |  |  | 			return err | 
					
						
							|  |  |  | 		}, index) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for index, err := range g.Wait() { | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			addr := sys.peerClients[index].host.String() | 
					
						
							|  |  |  | 			reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr) | 
					
						
							| 
									
										
										
										
											2020-04-10 00:30:02 +08:00
										 |  |  | 			ctx := logger.SetReqInfo(GlobalContext, reqInfo) | 
					
						
							| 
									
										
										
										
											2020-03-27 12:07:39 +08:00
										 |  |  | 			logger.LogIf(ctx, err) | 
					
						
							|  |  |  | 			reply[index].Addr = addr | 
					
						
							|  |  |  | 			reply[index].Error = err.Error() | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return reply | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-12-12 06:27:03 +08:00
										 |  |  | // ServerInfo - calls ServerInfo RPC call on all peers.
 | 
					
						
							|  |  |  | func (sys *NotificationSys) ServerInfo() []madmin.ServerProperties { | 
					
						
							|  |  |  | 	reply := make([]madmin.ServerProperties, len(sys.peerClients)) | 
					
						
							|  |  |  | 	var wg sync.WaitGroup | 
					
						
							|  |  |  | 	for i, client := range sys.peerClients { | 
					
						
							|  |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		wg.Add(1) | 
					
						
							|  |  |  | 		go func(client *peerRESTClient, idx int) { | 
					
						
							|  |  |  | 			defer wg.Done() | 
					
						
							|  |  |  | 			info, err := client.ServerInfo() | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							|  |  |  | 				info.Endpoint = client.host.String() | 
					
						
							|  |  |  | 				info.State = "offline" | 
					
						
							|  |  |  | 			} else { | 
					
						
							|  |  |  | 				info.State = "ok" | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			reply[idx] = info | 
					
						
							|  |  |  | 		}(client, i) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	wg.Wait() | 
					
						
							|  |  |  | 	return reply | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-05-24 08:38:39 +08:00
										 |  |  | // GetLocalDiskIDs - return disk ids of the local disks of the peers.
 | 
					
						
							| 
									
										
										
										
											2020-07-21 09:31:22 +08:00
										 |  |  | func (sys *NotificationSys) GetLocalDiskIDs(ctx context.Context) (localDiskIDs [][]string) { | 
					
						
							|  |  |  | 	localDiskIDs = make([][]string, len(sys.peerClients)) | 
					
						
							| 
									
										
										
										
											2020-05-24 08:38:39 +08:00
										 |  |  | 	var wg sync.WaitGroup | 
					
						
							| 
									
										
										
										
											2020-07-21 09:31:22 +08:00
										 |  |  | 	for idx, client := range sys.peerClients { | 
					
						
							| 
									
										
										
										
											2020-05-24 08:38:39 +08:00
										 |  |  | 		if client == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		wg.Add(1) | 
					
						
							| 
									
										
										
										
											2020-07-21 09:31:22 +08:00
										 |  |  | 		go func(idx int, client *peerRESTClient) { | 
					
						
							| 
									
										
										
										
											2020-05-24 08:38:39 +08:00
										 |  |  | 			defer wg.Done() | 
					
						
							| 
									
										
										
										
											2020-07-21 09:31:22 +08:00
										 |  |  | 			localDiskIDs[idx] = client.GetLocalDiskIDs(ctx) | 
					
						
							|  |  |  | 		}(idx, client) | 
					
						
							| 
									
										
										
										
											2020-05-24 08:38:39 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	wg.Wait() | 
					
						
							| 
									
										
										
										
											2020-07-21 09:31:22 +08:00
										 |  |  | 	return localDiskIDs | 
					
						
							| 
									
										
										
										
											2020-05-24 08:38:39 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | // NewNotificationSys - creates new notification system object.
 | 
					
						
							| 
									
										
										
										
											2019-11-20 09:42:27 +08:00
										 |  |  | func NewNotificationSys(endpoints EndpointZones) *NotificationSys { | 
					
						
							| 
									
										
										
										
											2020-08-20 05:24:58 +08:00
										 |  |  | 	// targetList/bucketRulesMap/bucketRemoteTargetRulesMap are populated by NotificationSys.Init()
 | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 	return &NotificationSys{ | 
					
						
							| 
									
										
										
										
											2019-11-10 01:27:23 +08:00
										 |  |  | 		targetList:                 event.NewTargetList(), | 
					
						
							| 
									
										
										
										
											2020-04-27 21:25:05 +08:00
										 |  |  | 		targetResCh:                make(chan event.TargetIDResult), | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 		bucketRulesMap:             make(map[string]event.RulesMap), | 
					
						
							|  |  |  | 		bucketRemoteTargetRulesMap: make(map[string]map[event.TargetID]event.RulesMap), | 
					
						
							| 
									
										
										
										
											2020-04-18 02:20:56 +08:00
										 |  |  | 		peerClients:                newPeerRestClients(endpoints), | 
					
						
							| 
									
										
										
										
											2019-11-10 01:27:23 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type eventArgs struct { | 
					
						
							| 
									
										
										
										
											2018-08-24 05:40:54 +08:00
										 |  |  | 	EventName    event.Name | 
					
						
							|  |  |  | 	BucketName   string | 
					
						
							|  |  |  | 	Object       ObjectInfo | 
					
						
							|  |  |  | 	ReqParams    map[string]string | 
					
						
							|  |  |  | 	RespElements map[string]string | 
					
						
							|  |  |  | 	Host         string | 
					
						
							|  |  |  | 	UserAgent    string | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // ToEvent - converts to notification event.
 | 
					
						
							| 
									
										
										
										
											2020-04-27 21:25:05 +08:00
										 |  |  | func (args eventArgs) ToEvent(escape bool) event.Event { | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 	eventTime := UTCNow() | 
					
						
							|  |  |  | 	uniqueID := fmt.Sprintf("%X", eventTime.UnixNano()) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-08-24 05:40:54 +08:00
										 |  |  | 	respElements := map[string]string{ | 
					
						
							| 
									
										
										
										
											2018-11-03 09:40:08 +08:00
										 |  |  | 		"x-amz-request-id":        args.RespElements["requestId"], | 
					
						
							| 
									
										
										
										
											2020-04-27 21:25:05 +08:00
										 |  |  | 		"x-minio-origin-endpoint": globalMinioEndpoint, // MinIO specific custom elements.
 | 
					
						
							| 
									
										
										
										
											2018-08-24 05:40:54 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2018-12-19 02:05:26 +08:00
										 |  |  | 	// Add deployment as part of
 | 
					
						
							|  |  |  | 	if globalDeploymentID != "" { | 
					
						
							|  |  |  | 		respElements["x-minio-deployment-id"] = globalDeploymentID | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2018-08-24 05:40:54 +08:00
										 |  |  | 	if args.RespElements["content-length"] != "" { | 
					
						
							|  |  |  | 		respElements["content-length"] = args.RespElements["content-length"] | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2020-04-27 21:25:05 +08:00
										 |  |  | 	keyName := args.Object.Name | 
					
						
							|  |  |  | 	if escape { | 
					
						
							|  |  |  | 		keyName = url.QueryEscape(args.Object.Name) | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 	newEvent := event.Event{ | 
					
						
							|  |  |  | 		EventVersion:      "2.0", | 
					
						
							|  |  |  | 		EventSource:       "minio:s3", | 
					
						
							| 
									
										
										
										
											2018-11-03 09:40:08 +08:00
										 |  |  | 		AwsRegion:         args.ReqParams["region"], | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 		EventTime:         eventTime.Format(event.AMZTimeFormat), | 
					
						
							|  |  |  | 		EventName:         args.EventName, | 
					
						
							| 
									
										
										
										
											2018-11-15 02:23:44 +08:00
										 |  |  | 		UserIdentity:      event.Identity{PrincipalID: args.ReqParams["accessKey"]}, | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 		RequestParameters: args.ReqParams, | 
					
						
							| 
									
										
										
										
											2018-08-24 05:40:54 +08:00
										 |  |  | 		ResponseElements:  respElements, | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 		S3: event.Metadata{ | 
					
						
							|  |  |  | 			SchemaVersion:   "1.0", | 
					
						
							|  |  |  | 			ConfigurationID: "Config", | 
					
						
							|  |  |  | 			Bucket: event.Bucket{ | 
					
						
							|  |  |  | 				Name:          args.BucketName, | 
					
						
							| 
									
										
										
										
											2018-11-15 02:23:44 +08:00
										 |  |  | 				OwnerIdentity: event.Identity{PrincipalID: args.ReqParams["accessKey"]}, | 
					
						
							| 
									
										
										
										
											2018-04-25 06:53:30 +08:00
										 |  |  | 				ARN:           policy.ResourceARNPrefix + args.BucketName, | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 			}, | 
					
						
							|  |  |  | 			Object: event.Object{ | 
					
						
							| 
									
										
										
										
											2020-04-27 21:25:05 +08:00
										 |  |  | 				Key:       keyName, | 
					
						
							| 
									
										
										
										
											2020-06-13 11:04:01 +08:00
										 |  |  | 				VersionID: args.Object.VersionID, | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 				Sequencer: uniqueID, | 
					
						
							|  |  |  | 			}, | 
					
						
							|  |  |  | 		}, | 
					
						
							|  |  |  | 		Source: event.Source{ | 
					
						
							|  |  |  | 			Host:      args.Host, | 
					
						
							|  |  |  | 			UserAgent: args.UserAgent, | 
					
						
							|  |  |  | 		}, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if args.EventName != event.ObjectRemovedDelete { | 
					
						
							|  |  |  | 		newEvent.S3.Object.ETag = args.Object.ETag | 
					
						
							|  |  |  | 		newEvent.S3.Object.Size = args.Object.Size | 
					
						
							|  |  |  | 		newEvent.S3.Object.ContentType = args.Object.ContentType | 
					
						
							|  |  |  | 		newEvent.S3.Object.UserMetadata = args.Object.UserDefined | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return newEvent | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func sendEvent(args eventArgs) { | 
					
						
							| 
									
										
										
										
											2020-05-25 02:19:17 +08:00
										 |  |  | 	args.Object.Size, _ = args.Object.GetActualSize() | 
					
						
							| 
									
										
										
										
											2019-10-21 14:48:19 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-05-25 02:19:17 +08:00
										 |  |  | 	// remove sensitive encryption entries in metadata.
 | 
					
						
							| 
									
										
										
										
											2019-05-30 13:29:37 +08:00
										 |  |  | 	crypto.RemoveSensitiveEntries(args.Object.UserDefined) | 
					
						
							|  |  |  | 	crypto.RemoveInternalEntries(args.Object.UserDefined) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | 	// globalNotificationSys is not initialized in gateway mode.
 | 
					
						
							|  |  |  | 	if globalNotificationSys == nil { | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-12-14 03:33:11 +08:00
										 |  |  | 	if globalHTTPListen.HasSubscribers() { | 
					
						
							| 
									
										
										
										
											2020-04-27 21:25:05 +08:00
										 |  |  | 		globalHTTPListen.Publish(args.ToEvent(false)) | 
					
						
							| 
									
										
										
										
											2019-12-14 03:33:11 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-04-27 21:25:05 +08:00
										 |  |  | 	globalNotificationSys.Send(args) | 
					
						
							| 
									
										
										
										
											2018-03-16 04:03:41 +08:00
										 |  |  | } |