mirror of https://github.com/minio/minio.git
				
				
				
			Add admin API to send console log messages (#7784)
Utilized by mc admin console command.
This commit is contained in:
		
							parent
							
								
									0772438125
								
							
						
					
					
						commit
						8a71b0ec5a
					
				| 
						 | 
				
			
			@ -1817,3 +1817,68 @@ func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) {
 | 
			
		|||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// The handler sends console logs to the connected HTTP client.
 | 
			
		||||
func (a adminAPIHandlers) ConsoleLogHandler(w http.ResponseWriter, r *http.Request) {
 | 
			
		||||
	ctx := newContext(r, w, "ConsoleLog")
 | 
			
		||||
 | 
			
		||||
	objectAPI := validateAdminReq(ctx, w, r)
 | 
			
		||||
	if objectAPI == nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	node := r.URL.Query().Get("node")
 | 
			
		||||
	// limit buffered console entries if client requested it.
 | 
			
		||||
	limitStr := r.URL.Query().Get("limit")
 | 
			
		||||
	limitLines, err := strconv.Atoi(limitStr)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		limitLines = 10
 | 
			
		||||
	}
 | 
			
		||||
	// Avoid reusing tcp connection if read timeout is hit
 | 
			
		||||
	// This is needed to make r.Context().Done() work as
 | 
			
		||||
	// expected in case of read timeout
 | 
			
		||||
	w.Header().Add("Connection", "close")
 | 
			
		||||
	w.Header().Set(xhttp.ContentType, "text/event-stream")
 | 
			
		||||
 | 
			
		||||
	doneCh := make(chan struct{})
 | 
			
		||||
	defer close(doneCh)
 | 
			
		||||
	logCh := make(chan interface{}, 4000)
 | 
			
		||||
 | 
			
		||||
	remoteHosts := getRemoteHosts(globalEndpoints)
 | 
			
		||||
	peers, err := getRestClients(remoteHosts)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	globalConsoleSys.Subscribe(logCh, doneCh, node, limitLines, nil)
 | 
			
		||||
 | 
			
		||||
	for _, peer := range peers {
 | 
			
		||||
		if node == "" || strings.ToLower(peer.host.Name) == strings.ToLower(node) {
 | 
			
		||||
			peer.ConsoleLog(logCh, doneCh)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	enc := json.NewEncoder(w)
 | 
			
		||||
 | 
			
		||||
	keepAliveTicker := time.NewTicker(500 * time.Millisecond)
 | 
			
		||||
	defer keepAliveTicker.Stop()
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
		case entry := <-logCh:
 | 
			
		||||
			log := entry.(madmin.LogInfo)
 | 
			
		||||
			if log.SendLog(node) {
 | 
			
		||||
				if err := enc.Encode(log); err != nil {
 | 
			
		||||
					return
 | 
			
		||||
				}
 | 
			
		||||
				w.(http.Flusher).Flush()
 | 
			
		||||
			}
 | 
			
		||||
		case <-keepAliveTicker.C:
 | 
			
		||||
			if _, err := w.Write([]byte(" ")); err != nil {
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			w.(http.Flusher).Flush()
 | 
			
		||||
		case <-GlobalServiceDoneCh:
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -135,6 +135,9 @@ func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool)
 | 
			
		|||
 | 
			
		||||
	// HTTP Trace
 | 
			
		||||
	adminV1Router.Methods(http.MethodGet).Path("/trace").HandlerFunc(adminAPI.TraceHandler)
 | 
			
		||||
	// Console Logs
 | 
			
		||||
	adminV1Router.Methods(http.MethodGet).Path("/log").HandlerFunc(httpTraceAll(adminAPI.ConsoleLogHandler))
 | 
			
		||||
 | 
			
		||||
	// If none of the routes match, return error.
 | 
			
		||||
	adminV1Router.NotFoundHandler = http.HandlerFunc(httpTraceHdrs(notFoundHandlerJSON))
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -31,7 +31,6 @@ import (
 | 
			
		|||
	"github.com/minio/cli"
 | 
			
		||||
	"github.com/minio/minio-go/v6/pkg/set"
 | 
			
		||||
	"github.com/minio/minio/cmd/logger"
 | 
			
		||||
	"github.com/minio/minio/cmd/logger/target/console"
 | 
			
		||||
	"github.com/minio/minio/cmd/logger/target/http"
 | 
			
		||||
	"github.com/minio/minio/pkg/auth"
 | 
			
		||||
	"github.com/minio/minio/pkg/dns"
 | 
			
		||||
| 
						 | 
				
			
			@ -97,7 +96,7 @@ func loadLoggers() {
 | 
			
		|||
 | 
			
		||||
	if globalServerConfig.Logger.Console.Enabled {
 | 
			
		||||
		// Enable console logging
 | 
			
		||||
		logger.AddTarget(console.New())
 | 
			
		||||
		logger.AddTarget(globalConsoleSys.Console())
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,128 @@
 | 
			
		|||
/*
 | 
			
		||||
 * MinIO Cloud Storage, (C) 2019 MinIO, Inc.
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
package cmd
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	ring "container/ring"
 | 
			
		||||
	"context"
 | 
			
		||||
 | 
			
		||||
	"github.com/minio/minio/cmd/logger"
 | 
			
		||||
	"github.com/minio/minio/cmd/logger/message/log"
 | 
			
		||||
	"github.com/minio/minio/cmd/logger/target/console"
 | 
			
		||||
	"github.com/minio/minio/pkg/madmin"
 | 
			
		||||
	xnet "github.com/minio/minio/pkg/net"
 | 
			
		||||
	"github.com/minio/minio/pkg/pubsub"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// number of log messages to buffer
 | 
			
		||||
const defaultLogBufferCount = 10000
 | 
			
		||||
 | 
			
		||||
//HTTPConsoleLoggerSys holds global console logger state
 | 
			
		||||
type HTTPConsoleLoggerSys struct {
 | 
			
		||||
	pubsub   *pubsub.PubSub
 | 
			
		||||
	console  *console.Target
 | 
			
		||||
	nodeName string
 | 
			
		||||
	logBuf   *ring.Ring
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewConsoleLogger - creates new HTTPConsoleLoggerSys with all nodes subscribed to
 | 
			
		||||
// the console logging pub sub system
 | 
			
		||||
func NewConsoleLogger(ctx context.Context, endpoints EndpointList) *HTTPConsoleLoggerSys {
 | 
			
		||||
	host, err := xnet.ParseHost(GetLocalPeer(globalEndpoints))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		logger.FatalIf(err, "Unable to start console logging subsystem")
 | 
			
		||||
	}
 | 
			
		||||
	var nodeName string
 | 
			
		||||
	if globalIsDistXL {
 | 
			
		||||
		nodeName = host.Name
 | 
			
		||||
	}
 | 
			
		||||
	ps := pubsub.New()
 | 
			
		||||
	return &HTTPConsoleLoggerSys{
 | 
			
		||||
		ps, nil, nodeName, ring.New(defaultLogBufferCount),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// HasLogListeners returns true if console log listeners are registered
 | 
			
		||||
// for this node or peers
 | 
			
		||||
func (sys *HTTPConsoleLoggerSys) HasLogListeners() bool {
 | 
			
		||||
	return sys != nil && sys.pubsub.HasSubscribers()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Subscribe starts console logging for this node.
 | 
			
		||||
func (sys *HTTPConsoleLoggerSys) Subscribe(subCh chan interface{}, doneCh chan struct{}, node string, last int, filter func(entry interface{}) bool) {
 | 
			
		||||
	// Enable console logging for remote client even if local console logging is disabled in the config.
 | 
			
		||||
	if !globalServerConfig.Logger.Console.Enabled && !sys.pubsub.HasSubscribers() {
 | 
			
		||||
		logger.AddTarget(globalConsoleSys.Console())
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	cnt := 0
 | 
			
		||||
	// by default send all console logs in the ring buffer unless node or limit query parameters
 | 
			
		||||
	// are set.
 | 
			
		||||
	var lastN []madmin.LogInfo
 | 
			
		||||
	if last > defaultLogBufferCount || last <= 0 {
 | 
			
		||||
		last = defaultLogBufferCount
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	lastN = make([]madmin.LogInfo, last)
 | 
			
		||||
	r := sys.logBuf
 | 
			
		||||
	r.Do(func(p interface{}) {
 | 
			
		||||
		if p != nil && (p.(madmin.LogInfo)).SendLog(node) {
 | 
			
		||||
			lastN[cnt%last] = p.(madmin.LogInfo)
 | 
			
		||||
			cnt++
 | 
			
		||||
		}
 | 
			
		||||
	})
 | 
			
		||||
	// send last n console log messages in order filtered by node
 | 
			
		||||
	if cnt > 0 {
 | 
			
		||||
		for i := 0; i < last; i++ {
 | 
			
		||||
			entry := lastN[(cnt+i)%last]
 | 
			
		||||
			if (entry == madmin.LogInfo{}) {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			select {
 | 
			
		||||
			case subCh <- entry:
 | 
			
		||||
			case <-doneCh:
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	sys.pubsub.Subscribe(subCh, doneCh, filter)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Console returns a  console target
 | 
			
		||||
func (sys *HTTPConsoleLoggerSys) Console() *HTTPConsoleLoggerSys {
 | 
			
		||||
	if sys.console == nil {
 | 
			
		||||
		sys.console = console.New()
 | 
			
		||||
	}
 | 
			
		||||
	return sys
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Send log message 'e' to console and publish to console
 | 
			
		||||
// log pubsub system
 | 
			
		||||
func (sys *HTTPConsoleLoggerSys) Send(e interface{}) error {
 | 
			
		||||
	lg := madmin.LogInfo{}
 | 
			
		||||
	lg.Entry = e.(log.Entry)
 | 
			
		||||
	lg.NodeName = sys.nodeName
 | 
			
		||||
	sys.pubsub.Publish(lg)
 | 
			
		||||
	// add log to ring buffer
 | 
			
		||||
	sys.logBuf.Value = lg
 | 
			
		||||
	sys.logBuf = sys.logBuf.Next()
 | 
			
		||||
 | 
			
		||||
	if globalServerConfig.Logger.Console.Enabled {
 | 
			
		||||
		return sys.console.Send(e)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -159,6 +159,9 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
 | 
			
		|||
		registerSTSRouter(router)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// initialize globalConsoleSys system
 | 
			
		||||
	globalConsoleSys = NewConsoleLogger(context.Background(), globalEndpoints)
 | 
			
		||||
 | 
			
		||||
	enableConfigOps := gatewayName == "nas"
 | 
			
		||||
	enableIAMOps := globalEtcdClient != nil
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -174,6 +174,10 @@ var (
 | 
			
		|||
	// registered listeners
 | 
			
		||||
	globalHTTPTrace = pubsub.New()
 | 
			
		||||
 | 
			
		||||
	// global console system to send console logs to
 | 
			
		||||
	// registered listeners
 | 
			
		||||
	globalConsoleSys *HTTPConsoleLoggerSys
 | 
			
		||||
 | 
			
		||||
	globalEndpoints EndpointList
 | 
			
		||||
 | 
			
		||||
	// Global server's network statistics
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -629,6 +629,48 @@ func (client *peerRESTClient) Trace(traceCh chan interface{}, doneCh chan struct
 | 
			
		|||
	}()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ConsoleLog - sends request to peer nodes to get console logs
 | 
			
		||||
func (client *peerRESTClient) ConsoleLog(logCh chan interface{}, doneCh chan struct{}) {
 | 
			
		||||
	go func() {
 | 
			
		||||
		for {
 | 
			
		||||
			// get cancellation context to properly unsubscribe peers
 | 
			
		||||
			ctx, cancel := context.WithCancel(context.Background())
 | 
			
		||||
			respBody, err := client.callWithContext(ctx, peerRESTMethodLog, nil, nil, -1)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				// Retry the failed request.
 | 
			
		||||
				time.Sleep(5 * time.Second)
 | 
			
		||||
			} else {
 | 
			
		||||
				dec := gob.NewDecoder(respBody)
 | 
			
		||||
 | 
			
		||||
				go func() {
 | 
			
		||||
					<-doneCh
 | 
			
		||||
					cancel()
 | 
			
		||||
				}()
 | 
			
		||||
 | 
			
		||||
				for {
 | 
			
		||||
					var log madmin.LogInfo
 | 
			
		||||
					if err = dec.Decode(&log); err != nil {
 | 
			
		||||
						break
 | 
			
		||||
					}
 | 
			
		||||
					select {
 | 
			
		||||
					case logCh <- log:
 | 
			
		||||
					default:
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			select {
 | 
			
		||||
			case <-doneCh:
 | 
			
		||||
				cancel()
 | 
			
		||||
				http.DrainBody(respBody)
 | 
			
		||||
				return
 | 
			
		||||
			default:
 | 
			
		||||
				// There was error in the REST request, retry.
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func getRemoteHosts(endpoints EndpointList) []*xnet.Host {
 | 
			
		||||
	var remoteHosts []*xnet.Host
 | 
			
		||||
	for _, hostStr := range GetRemotePeers(endpoints) {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -51,6 +51,7 @@ const (
 | 
			
		|||
	peerRESTMethodTrace                    = "trace"
 | 
			
		||||
	peerRESTMethodBucketLifecycleSet       = "setbucketlifecycle"
 | 
			
		||||
	peerRESTMethodBucketLifecycleRemove    = "removebucketlifecycle"
 | 
			
		||||
	peerRESTMethodLog                      = "log"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -32,6 +32,7 @@ import (
 | 
			
		|||
	"github.com/minio/minio/cmd/logger"
 | 
			
		||||
	"github.com/minio/minio/pkg/event"
 | 
			
		||||
	"github.com/minio/minio/pkg/lifecycle"
 | 
			
		||||
	"github.com/minio/minio/pkg/madmin"
 | 
			
		||||
	xnet "github.com/minio/minio/pkg/net"
 | 
			
		||||
	"github.com/minio/minio/pkg/policy"
 | 
			
		||||
	trace "github.com/minio/minio/pkg/trace"
 | 
			
		||||
| 
						 | 
				
			
			@ -903,6 +904,38 @@ func (s *peerRESTServer) BackgroundOpsStatusHandler(w http.ResponseWriter, r *ht
 | 
			
		|||
	logger.LogIf(ctx, gob.NewEncoder(w).Encode(state))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ConsoleLogHandler sends console logs of this node back to peer rest client
 | 
			
		||||
func (s *peerRESTServer) ConsoleLogHandler(w http.ResponseWriter, r *http.Request) {
 | 
			
		||||
	if !s.IsValid(w, r) {
 | 
			
		||||
		s.writeErrorResponse(w, errors.New("Invalid request"))
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	w.Header().Set("Connection", "close")
 | 
			
		||||
	w.WriteHeader(http.StatusOK)
 | 
			
		||||
	w.(http.Flusher).Flush()
 | 
			
		||||
 | 
			
		||||
	doneCh := make(chan struct{})
 | 
			
		||||
	defer close(doneCh)
 | 
			
		||||
 | 
			
		||||
	ch := make(chan interface{}, 2000)
 | 
			
		||||
	globalConsoleSys.Subscribe(ch, doneCh, "", 0, nil)
 | 
			
		||||
 | 
			
		||||
	enc := gob.NewEncoder(w)
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
		case entry := <-ch:
 | 
			
		||||
			log := entry.(madmin.LogInfo)
 | 
			
		||||
			if err := enc.Encode(log); err != nil {
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			w.(http.Flusher).Flush()
 | 
			
		||||
		case <-r.Context().Done():
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *peerRESTServer) writeErrorResponse(w http.ResponseWriter, err error) {
 | 
			
		||||
	w.WriteHeader(http.StatusForbidden)
 | 
			
		||||
	w.Write([]byte(err.Error()))
 | 
			
		||||
| 
						 | 
				
			
			@ -958,6 +991,7 @@ func registerPeerRESTHandlers(router *mux.Router) {
 | 
			
		|||
 | 
			
		||||
	subrouter.Methods(http.MethodPost).Path(SlashSeparator + peerRESTMethodTrace).HandlerFunc(server.TraceHandler)
 | 
			
		||||
	subrouter.Methods(http.MethodPost).Path(SlashSeparator + peerRESTMethodBackgroundHealStatus).HandlerFunc(server.BackgroundHealStatusHandler)
 | 
			
		||||
	subrouter.Methods(http.MethodPost).Path(SlashSeparator + peerRESTMethodLog).HandlerFunc(server.ConsoleLogHandler)
 | 
			
		||||
 | 
			
		||||
	router.NotFoundHandler = http.HandlerFunc(httpTraceAll(notFoundHandler))
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -289,6 +289,8 @@ func serverMain(ctx *cli.Context) {
 | 
			
		|||
		globalSweepHealState = initHealState()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// initialize globalConsoleSys system
 | 
			
		||||
	globalConsoleSys = NewConsoleLogger(context.Background(), globalEndpoints)
 | 
			
		||||
	// Configure server.
 | 
			
		||||
	var handler http.Handler
 | 
			
		||||
	handler, err = configureServerHandler(globalEndpoints)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,85 @@
 | 
			
		|||
/*
 | 
			
		||||
 * MinIO Cloud Storage, (C) 2019 MinIO, Inc.
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
package madmin
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"net/url"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"strings"
 | 
			
		||||
 | 
			
		||||
	"github.com/minio/minio/cmd/logger/message/log"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// LogInfo holds console log messages
 | 
			
		||||
type LogInfo struct {
 | 
			
		||||
	log.Entry
 | 
			
		||||
	NodeName string `json:"node"`
 | 
			
		||||
	Err      error  `json:"-"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SendLog returns true if log pertains to node specified in args.
 | 
			
		||||
func (l LogInfo) SendLog(node string) bool {
 | 
			
		||||
	return node == "" || strings.ToLower(node) == strings.ToLower(l.NodeName)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetLogs - listen on console log messages.
 | 
			
		||||
func (adm AdminClient) GetLogs(node string, lineCnt int, doneCh <-chan struct{}) <-chan LogInfo {
 | 
			
		||||
	logCh := make(chan LogInfo, 1)
 | 
			
		||||
 | 
			
		||||
	// Only success, start a routine to start reading line by line.
 | 
			
		||||
	go func(logCh chan<- LogInfo) {
 | 
			
		||||
		defer close(logCh)
 | 
			
		||||
		urlValues := make(url.Values)
 | 
			
		||||
		urlValues.Set("node", node)
 | 
			
		||||
		urlValues.Set("limit", strconv.Itoa(lineCnt))
 | 
			
		||||
		for {
 | 
			
		||||
			reqData := requestData{
 | 
			
		||||
				relPath:     "/v1/log",
 | 
			
		||||
				queryValues: urlValues,
 | 
			
		||||
			}
 | 
			
		||||
			// Execute GET to call log handler
 | 
			
		||||
			resp, err := adm.executeMethod("GET", reqData)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				closeResponse(resp)
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			if resp.StatusCode != http.StatusOK {
 | 
			
		||||
				logCh <- LogInfo{Err: httpRespToErrorResponse(resp)}
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			dec := json.NewDecoder(resp.Body)
 | 
			
		||||
			for {
 | 
			
		||||
				var info LogInfo
 | 
			
		||||
				if err = dec.Decode(&info); err != nil {
 | 
			
		||||
					break
 | 
			
		||||
				}
 | 
			
		||||
				select {
 | 
			
		||||
				case <-doneCh:
 | 
			
		||||
					return
 | 
			
		||||
				case logCh <- info:
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
		}
 | 
			
		||||
	}(logCh)
 | 
			
		||||
 | 
			
		||||
	// Returns the log info channel, for caller to start reading from.
 | 
			
		||||
	return logCh
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -40,7 +40,7 @@ func (ps *PubSub) Publish(item interface{}) {
 | 
			
		|||
	defer ps.RUnlock()
 | 
			
		||||
 | 
			
		||||
	for _, sub := range ps.subs {
 | 
			
		||||
		if sub.filter(item) {
 | 
			
		||||
		if sub.filter == nil || sub.filter(item) {
 | 
			
		||||
			select {
 | 
			
		||||
			case sub.ch <- item:
 | 
			
		||||
			default:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue