// Temporary implementation to do Create,Drop,Refresh operations on GSI
// cluster. Eventually be replaced by MetadataProvider.
package client
import "net/http"
import "encoding/json"
import "bytes"
import "fmt"
import "io/ioutil"
import "errors"
import "strings"
import "sync"
import "github.com/couchbase/indexing/secondary/logging"
import "github.com/couchbase/indexing/secondary/common"
import mclient "github.com/couchbase/indexing/secondary/manager/client"
// indexError for a failed index-request.
type indexError struct {
Code string `json:"code,omitempty"`
Msg string `json:"msg,omitempty"`
}
// indexRequest message
type indexRequest struct {
Version uint64 `json:"version,omitempty"`
Type string `json:"type,omitempty"`
Index indexInfo `json:"index,omitempty"`
}
// indexMetaResponse for an indexRequest
type indexMetaResponse struct {
Version uint64 `json:"version,omitempty"`
Status string `json:"status,omitempty"`
Indexes []indexInfo `json:"indexes,omitempty"`
Errors []indexError `json:"errors,omitempty"`
}
// cbqClient to access cbq-agent for admin operation on index.
type cbqClient struct {
rw sync.RWMutex // protects `indexes` field
adminport string
queryport string
httpc *http.Client
indexes []*mclient.IndexMetadata
logPrefix string
}
// newCbqClient create cbq-cluster client.
func newCbqClient(cluster string) (*cbqClient, error) {
clusterUrl, err := common.ClusterAuthUrl(cluster)
if err != nil {
return nil, err
}
cinfo, err := common.NewClusterInfoCache(clusterUrl, "default" /*pooln*/)
if err != nil {
return nil, err
}
if err = cinfo.Fetch(); err != nil {
return nil, err
}
nodes := cinfo.GetNodesByServiceType("indexAdmin")
if l := len(nodes); l < 1 {
err := fmt.Errorf("cinfo.GetNodesByServiceType() returns %d nodes", l)
return nil, err
}
adminport, err := cinfo.GetServiceAddress(nodes[0], "indexAdmin")
if err != nil {
return nil, err
}
queryport, err := cinfo.GetServiceAddress(nodes[0], "indexScan")
if err != nil {
return nil, err
}
b := &cbqClient{
adminport: "http://" + adminport,
queryport: queryport,
httpc: http.DefaultClient,
}
b.logPrefix = fmt.Sprintf("[cbqClient %v]", b.adminport)
return b, nil
}
func (b *cbqClient) Sync() error {
return nil
}
// Refresh implement BridgeAccessor{} interface.
func (b *cbqClient) Refresh() ([]*mclient.IndexMetadata, error) {
var resp *http.Response
var mresp indexMetaResponse
// Construct request body.
req := indexRequest{Type: "list"}
body, err := json.Marshal(req)
if err == nil { // Post HTTP request.
bodybuf := bytes.NewBuffer(body)
url := b.adminport + "/list"
logging.Infof("%v posting %v to URL %v", b.logPrefix, bodybuf, url)
resp, err = b.httpc.Post(url, "application/json", bodybuf)
if err == nil {
defer resp.Body.Close()
mresp, err = b.metaResponse(resp)
if err == nil {
indexes := make([]*mclient.IndexMetadata, 0)
for _, info := range mresp.Indexes {
indexes = append(
indexes, newIndexMetaData(&info, b.queryport))
}
b.rw.Lock()
defer b.rw.Unlock()
b.indexes = indexes
return indexes, nil
}
return nil, err
}
}
return nil, err
}
// Nodes implement BridgeAccessor{} interface.
func (b *cbqClient) Nodes() ([]*IndexerService, error) {
node := &IndexerService{
Adminport: b.adminport,
Queryport: b.queryport,
Status: "online",
}
return []*IndexerService{node}, nil
}
// CreateIndex implement BridgeAccessor{} interface.
func (b *cbqClient) CreateIndex(
name, bucket, using, exprType, partnExpr, whereExpr string,
secExprs []string, isPrimary bool,
with []byte) (defnID uint64, err error) {
var resp *http.Response
var mresp indexMetaResponse
// Construct request body.
info := indexInfo{
Name: name,
Bucket: bucket,
Using: using,
ExprType: exprType,
PartnExpr: partnExpr,
WhereExpr: whereExpr,
SecExprs: secExprs,
IsPrimary: isPrimary,
}
req := indexRequest{Type: "create", Index: info}
body, err := json.Marshal(req)
if err == nil { // Post HTTP request.
bodybuf := bytes.NewBuffer(body)
url := b.adminport + "/create"
logging.Infof("%v posting %v to URL %v", b.logPrefix, bodybuf, url)
resp, err = b.httpc.Post(url, "application/json", bodybuf)
if err == nil {
defer resp.Body.Close()
mresp, err = b.metaResponse(resp)
if err == nil {
defnID := mresp.Indexes[0].DefnID
b.Refresh()
return defnID, nil
}
return 0, err
}
}
return 0, err
}
// BuildIndexes implement BridgeAccessor{} interface.
func (b *cbqClient) BuildIndexes(defnID []uint64) error {
panic("cbqClient does not implement build-indexes")
}
// DropIndex implement BridgeAccessor{} interface.
func (b *cbqClient) DropIndex(defnID uint64) error {
var resp *http.Response
// Construct request body.
req := indexRequest{
Type: "drop", Index: indexInfo{DefnID: uint64(defnID)},
}
body, err := json.Marshal(req)
if err == nil {
// Post HTTP request.
bodybuf := bytes.NewBuffer(body)
url := b.adminport + "/drop"
logging.Infof("%v posting %v to URL %v", b.logPrefix, bodybuf, url)
resp, err = b.httpc.Post(url, "application/json", bodybuf)
if err == nil {
defer resp.Body.Close()
_, err = b.metaResponse(resp)
if err == nil {
b.Refresh()
return nil
}
return err
}
}
return err
}
// GetScanports implement BridgeAccessor{} interface.
func (b *cbqClient) GetScanports() (queryports []string) {
return []string{b.queryport}
}
// GetScanport implement BridgeAccessor{} interface.
func (b *cbqClient) GetScanport(
defnID uint64,
retry int) (queryport string, targetDefnID uint64, ok bool) {
return b.queryport, defnID, true
}
// GetIndexDefn implements BridgeAccessor{} interface.
func (b *cbqClient) GetIndexDefn(defnID uint64) *common.IndexDefn {
panic("cbqClient does not implement GetIndexDefn")
}
// Timeit implement BridgeAccessor{} interface.
func (b *cbqClient) Timeit(defnID uint64, value float64) {
// TODO: do nothing ?
}
// IndexState implement BridgeAccessor{} interface.
func (b *cbqClient) IndexState(defnID uint64) (common.IndexState, error) {
return common.INDEX_STATE_ACTIVE, nil
}
// Close implement BridgeAccessor
func (b *cbqClient) Close() {
// TODO: do nothing ?
}
// Gather index meta response from http response.
func (b *cbqClient) metaResponse(
resp *http.Response) (mresp indexMetaResponse, err error) {
var body []byte
body, err = ioutil.ReadAll(resp.Body)
if err == nil {
if err = json.Unmarshal(body, &mresp); err == nil {
logging.Tracef("%v received raw response %s", b.logPrefix, string(body))
if strings.Contains(mresp.Status, "error") {
err = errors.New(mresp.Errors[0].Msg)
}
}
}
return mresp, err
}
// indexInfo describes an index.
type indexInfo struct {
Name string `json:"name,omitempty"`
Bucket string `json:"bucket,omitempty"`
DefnID uint64 `json:"defnID, omitempty"`
Using string `json:"using,omitempty"`
ExprType string `json:"exprType,omitempty"`
PartnExpr string `json:"partnExpr,omitempty"`
SecExprs []string `json:"secExprs,omitempty"`
WhereExpr string `json:"whereExpr,omitempty"`
IsPrimary bool `json:"isPrimary,omitempty"`
}
func newIndexMetaData(info *indexInfo, queryport string) *mclient.IndexMetadata {
defn := &common.IndexDefn{
DefnId: common.IndexDefnId(info.DefnID),
Name: info.Name,
Using: common.IndexType(info.Using),
Bucket: info.Bucket,
IsPrimary: info.IsPrimary,
ExprType: common.ExprType(info.ExprType),
SecExprs: info.SecExprs,
PartitionKey: info.PartnExpr,
}
instances := []*mclient.InstanceDefn{
&mclient.InstanceDefn{
InstId: common.IndexInstId(info.DefnID), // TODO: defnID as InstID
State: common.INDEX_STATE_READY,
Endpts: []common.Endpoint{common.Endpoint(queryport)},
},
}
imeta := &mclient.IndexMetadata{
Definition: defn,
Instances: instances,
}
return imeta
}
package client
import "errors"
import "time"
import "github.com/couchbase/indexing/secondary/logging"
import "github.com/couchbase/indexing/secondary/common"
import "github.com/golang/protobuf/proto"
import protobuf "github.com/couchbase/indexing/secondary/protobuf/query"
import mclient "github.com/couchbase/indexing/secondary/manager/client"
// TODO:
// - Timeit() uses the wall-clock time instead of process-time to compute
// load. This is very crude.
// ErrorProtocol
var ErrorProtocol = errors.New("queryport.client.protocol")
// ErrorNoHost
var ErrorNoHost = errors.New("queryport.client.noHost")
// ErrorEmptyDeployment
var ErrorEmptyDeployment = errors.New("queryport.client.emptyDeployment")
// ErrorManyDeployment
var ErrorManyDeployment = errors.New("queryport.client.manyDeployment")
// ErrorInvalidDeploymentNode
var ErrorInvalidDeploymentNode = errors.New("queryport.client.invdDeployPlan")
// ErrorIndexNotFound
var ErrorIndexNotFound = errors.New("queryport.indexNotFound")
// ErrorInstanceNotFound
var ErrorInstanceNotFound = errors.New("queryport.instanceNotFound")
// ErrorIndexNotReady
var ErrorIndexNotReady = errors.New("queryport.indexNotReady")
// ErrorClientUninitialized
var ErrorClientUninitialized = errors.New("queryport.clientUninitialized")
// ErrorNotImplemented
var ErrorNotImplemented = errors.New("queryport.notImplemented")
// ErrorInvalidConsistency
var ErrorInvalidConsistency = errors.New("queryport.invalidConsistency")
// ErrorExpectedTimestamp
var ErrorExpectedTimestamp = errors.New("queryport.expectedTimestamp")
// ResponseHandler shall interpret response packets from server
// and handle them. If handler is not interested in receiving any
// more response it shall return false, else it shall continue
// until *protobufEncode.StreamEndResponse message is received.
type ResponseHandler func(resp ResponseReader) bool
// ResponseReader to obtain the actual data returned from server,
// handlers, should first call Error() and then call GetEntries().
type ResponseReader interface {
// GetEntries returns a list of secondary-key and corresponding
// primary-key if returned value is nil, then there are no more
// entries for this query.
GetEntries() ([]common.SecondaryKey, [][]byte, error)
// Error returns the error value, if nil there is no error.
Error() error
}
// Remoteaddr string in the shape of "<host:port>"
type Remoteaddr string
// Inclusion specifier for range queries.
type Inclusion uint32
const (
// Neither does not include low-key and high-key
Neither Inclusion = iota
// Low includes low-key but does not include high-key
Low
// High includes high-key but does not include low-key
High
// Both includes both low-key and high-key
Both
)
// BridgeAccessor for Create,Drop,List,Refresh operations.
type BridgeAccessor interface {
// Synchronously update current server metadata to the client
// A Refresh call followed by a Sync() ensures that client is
// up to date wrt the server.
Sync() error
// Refresh shall refresh to latest set of index managed by GSI
// cluster, cache it locally and return the list of index.
Refresh() ([]*mclient.IndexMetadata, error)
// Nodes shall return a map of adminport and queryport for indexer
// nodes.
Nodes() ([]*IndexerService, error)
// CreateIndex and return defnID of created index.
// name
// index name
// bucket
// bucket name in which index is defined.
// using
// token should always be GSI.
// exprType
// token specifies how in interpret partnExpr, whereExpr, secExprs
// partnExpr
// marshalled expression of type `exprType` that emits partition
// value from a kv-document.
// whereExpr
// marshalled predicate-expression of type `exprType` that emits
// a boolean from a kv-document.
// secExprs
// marshalled list of expression of type `exprType` that emits
// an array of secondary-key values from a kv-document.
// isPrimary
// specify whether the index is created on docid.
// with
// JSON marshalled description about index deployment (and more...).
CreateIndex(
name, bucket, using, exprType, partnExpr, whereExpr string,
secExprs []string, isPrimary bool,
with []byte) (defnID uint64, err error)
// BuildIndexes to build a deferred set of indexes. This call implies
// that indexes specified are already created.
BuildIndexes(defnIDs []uint64) error
// DropIndex to drop index specified by `defnID`.
// - if index is in deferred build state, it shall be removed
// from deferred list.
DropIndex(defnID uint64) error
// GetScanports shall return list of queryports for all indexer in
// the cluster.
GetScanports() (queryports []string)
// GetScanport shall fetch queryport address for indexer,
// if `retry` is ZERO, pick the indexer under least
// load, else do a round-robin, based on the retry count,
// if more than one indexer is found hosing the index or an
// equivalent index.
GetScanport(
defnID uint64,
retry int) (queryport string, targetDefnID uint64, ok bool)
// GetIndex will return the index-definition structure for defnID.
GetIndexDefn(defnID uint64) *common.IndexDefn
// IndexState returns the current state of index `defnID` and error.
IndexState(defnID uint64) (common.IndexState, error)
// Timeit will add `value` to incrementalAvg for index-load.
Timeit(defnID uint64, value float64)
// Close this accessor.
Close()
}
// GsiAccessor for index operation on GSI cluster.
type GsiAccessor interface {
BridgeAccessor
// LookupStatistics for a single secondary-key.
LookupStatistics(
defnID uint64, v common.SecondaryKey) (common.IndexStatistics, error)
// RangeStatistics for index range.
RangeStatistics(
defnID uint64, low, high common.SecondaryKey,
inclusion Inclusion) (common.IndexStatistics, error)
// Lookup scan index between low and high.
Lookup(
defnID uint64, values []common.SecondaryKey,
distinct bool, limit int64,
cons common.Consistency, vector *TsConsistency,
callb ResponseHandler) error
// Range scan index between low and high.
Range(
defnID uint64, low, high common.SecondaryKey,
inclusion Inclusion, distinct bool, limit int64,
cons common.Consistency, vector *TsConsistency,
callb ResponseHandler) error
// ScanAll for full table scan.
ScanAll(
defnID uint64, limit int64,
cons common.Consistency, vector *TsConsistency,
callb ResponseHandler) error
// CountLookup of all entries in index.
CountLookup(
defnID uint64,
cons common.Consistency, vector *TsConsistency) (int64, error)
// CountRange of all entries in index.
CountRange(
defnID uint64,
cons common.Consistency, vector *TsConsistency) (int64, error)
}
var useMetadataProvider = true
// IndexerService returns the status of the indexer node
// as observed by the GsiClient.
type IndexerService struct {
Adminport string
Queryport string
Status string // one of "initial", "online", "recovery"
}
// GsiClient for accessing GSI cluster. The client shall
// use `adminport` for meta-data operation and `queryport`
// for index-scan related operations.
type GsiClient struct {
bridge BridgeAccessor // manages adminport
cluster string
maxvb int
config common.Config
queryClients map[string]*GsiScanClient
}
// NewGsiClient returns client to access GSI cluster.
func NewGsiClient(
cluster string, config common.Config) (c *GsiClient, err error) {
if useMetadataProvider {
c, err = makeWithMetaProvider(cluster, config)
} else {
c, err = makeWithCbq(cluster, config)
}
if err != nil {
return nil, err
}
c.maxvb = -1
c.Refresh()
return c, nil
}
// IndexState implements BridgeAccessor{} interface.
func (c *GsiClient) IndexState(defnID uint64) (common.IndexState, error) {
if c.bridge == nil {
return common.INDEX_STATE_ERROR, ErrorClientUninitialized
}
return c.bridge.IndexState(defnID)
}
// Sync implements BridgeAccessor{} interface.
func (c *GsiClient) Sync() error {
if c.bridge == nil {
return ErrorClientUninitialized
}
return c.bridge.Sync()
}
// Refresh implements BridgeAccessor{} interface.
func (c *GsiClient) Refresh() ([]*mclient.IndexMetadata, error) {
if c.bridge == nil {
return nil, ErrorClientUninitialized
}
return c.bridge.Refresh()
}
// Nodes implements BridgeAccessor{} interface.
func (c *GsiClient) Nodes() ([]*IndexerService, error) {
if c.bridge == nil {
return nil, ErrorClientUninitialized
}
return c.bridge.Nodes()
}
// BucketTs will return the current vbucket-timestamp.
func (c *GsiClient) BucketTs(bucketn string) (*TsConsistency, error) {
b, err := common.ConnectBucket(c.cluster, "default" /*pooln*/, bucketn)
if err != nil {
return nil, err
}
defer b.Close()
if c.maxvb == -1 {
if c.maxvb, err = common.MaxVbuckets(b); err != nil {
return nil, err
}
}
seqnos, vbuuids, err := common.BucketTs(b, c.maxvb)
if err != nil {
return nil, err
}
vbnos := make([]uint16, c.maxvb)
for i := range vbnos {
vbnos[i] = uint16(i)
}
return NewTsConsistency(vbnos, seqnos, vbuuids), nil
}
// CreateIndex implements BridgeAccessor{} interface.
func (c *GsiClient) CreateIndex(
name, bucket, using, exprType, partnExpr, whereExpr string,
secExprs []string, isPrimary bool,
with []byte) (defnID uint64, err error) {
if c.bridge == nil {
return defnID, ErrorClientUninitialized
}
defnID, err = c.bridge.CreateIndex(
name, bucket, using, exprType, partnExpr, whereExpr,
secExprs, isPrimary, with)
return defnID, err
}
// BuildIndexes implements BridgeAccessor{} interface.
func (c *GsiClient) BuildIndexes(defnIDs []uint64) error {
if c.bridge == nil {
return ErrorClientUninitialized
}
return c.bridge.BuildIndexes(defnIDs)
}
// DropIndex implements BridgeAccessor{} interface.
func (c *GsiClient) DropIndex(defnID uint64) error {
if c.bridge == nil {
return ErrorClientUninitialized
}
return c.bridge.DropIndex(defnID)
}
// LookupStatistics for a single secondary-key.
func (c *GsiClient) LookupStatistics(
defnID uint64, value common.SecondaryKey) (common.IndexStatistics, error) {
return nil, ErrorNotImplemented
// FIXME: this API is marked not-implemented because UniqueKeyCount
// is not yet available from indexer.
// Refer: https://issues.couchbase.com/browse/MB-13375
//
//if c.bridge == nil {
// return nil, ErrorClientUninitialized
//}
//// check whether the index is present and available.
//if _, err := c.bridge.IndexState(defnID); err != nil {
// return nil, err
//}
//var stats common.IndexStatistics
//var err error
//err = c.doScan(defnID, func(qc *GsiScanClient, targetDefnID uint64) error {
// stats, err = qc.LookupStatistics(targetDefnID, value)
// return err
//})
//return stats, err
}
// RangeStatistics for index range.
func (c *GsiClient) RangeStatistics(
defnID uint64, low, high common.SecondaryKey,
inclusion Inclusion) (common.IndexStatistics, error) {
return nil, ErrorNotImplemented
// FIXME: this API is marked not-implemented because UniqueKeyCount
// is not yet available from indexer.
// Refer: https://issues.couchbase.com/browse/MB-13375
//
//if c.bridge == nil {
// return nil, ErrorClientUninitialized
//}
//// check whether the index is present and available.
//if _, err := c.bridge.IndexState(defnID); err != nil {
// return nil, err
//}
//var stats common.IndexStatistics
//var err error
//err = c.doScan(defnID, func(qc *GsiScanClient, targetDefnID uint64) error {
// stats, err = qc.RangeStatistics(targetDefnID, low, high, inclusion)
// return err
//})
//return stats, err
}
// Lookup scan index between low and high.
func (c *GsiClient) Lookup(
defnID uint64, values []common.SecondaryKey,
distinct bool, limit int64,
cons common.Consistency, vector *TsConsistency,
callb ResponseHandler) error {
if c.bridge == nil {
return ErrorClientUninitialized
}
// check whether the index is present and available.
if _, err := c.bridge.IndexState(defnID); err != nil {
protoResp := &protobuf.ResponseStream{
Err: &protobuf.Error{Error: proto.String(err.Error())},
}
callb(protoResp)
return nil
}
return c.doScan(defnID, func(qc *GsiScanClient, targetDefnID uint64) (err error) {
index := c.bridge.GetIndexDefn(targetDefnID)
vector, err = c.getConsistency(cons, vector, index.Bucket)
if err != nil {
return err
}
return qc.Lookup(targetDefnID, values, distinct, limit, cons, vector, callb)
})
}
// Range scan index between low and high.
func (c *GsiClient) Range(
defnID uint64, low, high common.SecondaryKey,
inclusion Inclusion, distinct bool, limit int64,
cons common.Consistency, vector *TsConsistency,
callb ResponseHandler) error {
if c.bridge == nil {
return ErrorClientUninitialized
}
// check whether the index is present and available.
if _, err := c.bridge.IndexState(defnID); err != nil {
protoResp := &protobuf.ResponseStream{
Err: &protobuf.Error{Error: proto.String(err.Error())},
}
callb(protoResp)
return nil
}
return c.doScan(defnID, func(qc *GsiScanClient, targetDefnID uint64) (err error) {
index := c.bridge.GetIndexDefn(targetDefnID)
vector, err = c.getConsistency(cons, vector, index.Bucket)
if err != nil {
return err
}
return qc.Range(
targetDefnID, low, high, inclusion, distinct, limit, cons, vector, callb)
})
}
// ScanAll for full table scan.
func (c *GsiClient) ScanAll(
defnID uint64, limit int64,
cons common.Consistency, vector *TsConsistency,
callb ResponseHandler) error {
if c.bridge == nil {
return ErrorClientUninitialized
}
// check whether the index is present and available.
if _, err := c.bridge.IndexState(defnID); err != nil {
protoResp := &protobuf.ResponseStream{
Err: &protobuf.Error{Error: proto.String(err.Error())},
}
callb(protoResp)
return nil
}
return c.doScan(defnID, func(qc *GsiScanClient, targetDefnID uint64) (err error) {
index := c.bridge.GetIndexDefn(targetDefnID)
vector, err = c.getConsistency(cons, vector, index.Bucket)
if err != nil {
return err
}
return qc.ScanAll(targetDefnID, limit, cons, vector, callb)
})
}
// CountLookup to count number entries for given set of keys.
func (c *GsiClient) CountLookup(
defnID uint64, values []common.SecondaryKey,
cons common.Consistency, vector *TsConsistency) (count int64, err error) {
if c.bridge == nil {
return count, ErrorClientUninitialized
}
// check whether the index is present and available.
if _, err := c.bridge.IndexState(defnID); err != nil {
return 0, err
}
err = c.doScan(defnID, func(qc *GsiScanClient, targetDefnID uint64) error {
index := c.bridge.GetIndexDefn(targetDefnID)
vector, err = c.getConsistency(cons, vector, index.Bucket)
if err != nil {
return err
}
count, err = qc.CountLookup(targetDefnID, values, cons, vector)
return err
})
return count, err
}
// CountRange to count number entries in the given range.
func (c *GsiClient) CountRange(
defnID uint64,
low, high common.SecondaryKey,
inclusion Inclusion,
cons common.Consistency, vector *TsConsistency) (count int64, err error) {
if c.bridge == nil {
return count, ErrorClientUninitialized
}
// check whether the index is present and available.
if _, err := c.bridge.IndexState(defnID); err != nil {
return 0, err
}
err = c.doScan(defnID, func(qc *GsiScanClient, targetDefnID uint64) error {
index := c.bridge.GetIndexDefn(targetDefnID)
vector, err = c.getConsistency(cons, vector, index.Bucket)
if err != nil {
return err
}
count, err = qc.CountRange(targetDefnID, low, high, inclusion, cons, vector)
return err
})
return count, err
}
// Close the client and all open connections with server.
func (c *GsiClient) Close() {
if c.bridge == nil {
return
}
c.bridge.Close()
for _, queryClient := range c.queryClients {
queryClient.Close()
}
}
func (c *GsiClient) updateScanClients() {
cache := make(map[string]bool)
// add new indexer-nodes
for _, queryport := range c.bridge.GetScanports() {
cache[queryport] = true
if _, ok := c.queryClients[queryport]; !ok {
c.queryClients[queryport] = NewGsiScanClient(queryport, c.config)
}
}
// forget removed indexer-nodes.
for queryport, queryClient := range c.queryClients {
if _, ok := cache[queryport]; !ok {
queryClient.Close()
delete(c.queryClients, queryport)
}
}
}
func (c *GsiClient) doScan(
defnID uint64, callb func(*GsiScanClient, uint64) error) error {
var qc *GsiScanClient
var err error
var ok1, ok2 bool
var queryport string
var targetDefnID uint64
wait := c.config["retryIntervalScanport"].Int()
retry := c.config["retryScanPort"].Int()
for i := 0; i < retry; i++ {
if queryport, targetDefnID, ok1 = c.bridge.GetScanport(defnID, i); ok1 {
if qc, ok2 = c.queryClients[queryport]; ok2 {
begin := time.Now().UnixNano()
if err = callb(qc, targetDefnID); err == nil {
c.bridge.Timeit(targetDefnID, float64(time.Now().UnixNano()-begin))
return nil
}
}
}
logging.Infof(
"Retrying scan for index %v (%v %v) ...\n", targetDefnID, ok1, ok2)
c.updateScanClients()
time.Sleep(time.Duration(wait) * time.Millisecond)
}
if err != nil {
return err
}
return ErrorNoHost
}
func (c *GsiClient) getConsistency(
cons common.Consistency,
vector *TsConsistency, bucket string) (*TsConsistency, error) {
var err error
if cons == common.QueryConsistency && vector == nil {
return nil, ErrorExpectedTimestamp
} else if cons == common.SessionConsistency {
if vector, err = c.BucketTs(bucket); err != nil {
return nil, err
}
} else if cons == common.AnyConsistency {
vector = nil
} else {
return nil, ErrorInvalidConsistency
}
return vector, nil
}
// create GSI client using cbqBridge and ScanCoordinator
func makeWithCbq(cluster string, config common.Config) (*GsiClient, error) {
var err error
c := &GsiClient{
cluster: cluster,
config: config,
queryClients: make(map[string]*GsiScanClient),
}
if c.bridge, err = newCbqClient(cluster); err != nil {
return nil, err
}
for _, queryport := range c.bridge.GetScanports() {
queryClient := NewGsiScanClient(queryport, config)
c.queryClients[queryport] = queryClient
}
return c, nil
}
func makeWithMetaProvider(
cluster string, config common.Config) (c *GsiClient, err error) {
c = &GsiClient{
cluster: cluster,
config: config,
queryClients: make(map[string]*GsiScanClient),
}
c.bridge, err = newMetaBridgeClient(cluster, config)
if err != nil {
return nil, err
}
c.updateScanClients()
return c, nil
}
//--------------------------
// Consistency and Stability
//--------------------------
// TsConsistency specifies a subset of vbuckets to be used as
// timestamp vector to specify consistency criteria.
//
// Timestamp-vector will be ignored for AnyConsistency, computed
// locally by scan-coordinator or accepted as scan-arguments for
// SessionConsistency.
type TsConsistency struct {
Vbnos []uint16
Seqnos []uint64
Vbuuids []uint64
}
// NewTsConsistency returns a new consistency vector object.
func NewTsConsistency(
vbnos []uint16, seqnos []uint64, vbuuids []uint64) *TsConsistency {
return &TsConsistency{Vbnos: vbnos, Seqnos: seqnos, Vbuuids: vbuuids}
}
// Override vbucket's {seqno, vbuuid} in the timestamp-vector,
// if vbucket is not present in the vector, append them to vector.
func (ts *TsConsistency) Override(
vbno uint16, seqno, vbuuid uint64) *TsConsistency {
for i, vb := range ts.Vbnos {
if vbno == vb {
ts.Seqnos[i], ts.Vbuuids[i] = seqno, vbuuid
return ts
}
}
ts.Vbnos = append(ts.Vbnos, vbno)
ts.Seqnos = append(ts.Seqnos, seqno)
ts.Vbuuids = append(ts.Vbuuids, vbuuid)
return ts
}
package client
import "errors"
import "fmt"
import "net"
import "time"
import "github.com/couchbase/indexing/secondary/logging"
import "github.com/couchbase/indexing/secondary/transport"
import protobuf "github.com/couchbase/indexing/secondary/protobuf/query"
// ErrorClosedPool
var ErrorClosedPool = errors.New("queryport.closedPool")
// ErrorNoPool
var ErrorNoPool = errors.New("queryport.errorNoPool")
// ErrorPoolTimeout
var ErrorPoolTimeout = errors.New("queryport.connPoolTimeout")
type connectionPool struct {
host string
mkConn func(host string) (*connection, error)
connections chan *connection
createsem chan bool
// config params
maxPayload int
timeout time.Duration
availTimeout time.Duration
logPrefix string
}
type connection struct {
conn net.Conn
pkt *transport.TransportPacket
}
func newConnectionPool(
host string,
poolSize, poolOverflow, maxPayload int,
timeout, availTimeout time.Duration) *connectionPool {
cp := &connectionPool{
host: host,
connections: make(chan *connection, poolSize),
createsem: make(chan bool, poolSize+poolOverflow),
maxPayload: maxPayload,
timeout: timeout,
availTimeout: availTimeout,
logPrefix: fmt.Sprintf("[Queryport-connpool:%v]", host),
}
cp.mkConn = cp.defaultMkConn
logging.Infof("%v started ...\n", cp.logPrefix)
return cp
}
// ConnPoolTimeout is notified whenever connections are acquired from a pool.
var ConnPoolCallback func(host string, source string, start time.Time, err error)
func (cp *connectionPool) defaultMkConn(host string) (*connection, error) {
logging.Infof("%v open new connection ...\n", cp.logPrefix)
conn, err := net.Dial("tcp", host)
if err != nil {
return nil, err
}
flags := transport.TransportFlag(0).SetProtobuf()
pkt := transport.NewTransportPacket(cp.maxPayload, flags)
pkt.SetEncoder(transport.EncodingProtobuf, protobuf.ProtobufEncode)
pkt.SetDecoder(transport.EncodingProtobuf, protobuf.ProtobufDecode)
return &connection{conn, pkt}, nil
}
func (cp *connectionPool) Close() (err error) {
defer func() {
if r := recover(); r != nil {
logging.Errorf("%v Close() crashed: %v\n", cp.logPrefix, r)
logging.Errorf("%s", logging.StackTrace())
}
}()
close(cp.connections)
for connectn := range cp.connections {
connectn.conn.Close()
}
logging.Infof("%v ... stopped\n", cp.logPrefix)
return
}
func (cp *connectionPool) GetWithTimeout(d time.Duration) (connectn *connection, err error) {
if cp == nil {
return nil, ErrorNoPool
}
path, ok := "", false
if ConnPoolCallback != nil {
defer func(path *string, start time.Time) {
ConnPoolCallback(cp.host, *path, start, err)
}(&path, time.Now())
}
path = "short-circuit"
// short-circuit available connetions.
select {
case connectn, ok = <-cp.connections:
if !ok {
return nil, ErrorClosedPool
}
logging.Debugf("%v new connection from pool\n", cp.logPrefix)
return connectn, nil
default:
}
t := time.NewTimer(cp.availTimeout * time.Millisecond)
defer t.Stop()
// Try to grab an available connection within 1ms
select {
case connectn, ok = <-cp.connections:
path = "avail1"
if !ok {
return nil, ErrorClosedPool
}
logging.Debugf("%v new connection (avail1) from pool\n", cp.logPrefix)
return connectn, nil
case <-t.C:
// No connection came around in time, let's see
// whether we can get one or build a new one first.
t.Reset(d) // Reuse the timer for the full timeout.
select {
case connectn, ok = <-cp.connections:
path = "avail2"
if !ok {
return nil, ErrorClosedPool
}
logging.Debugf("%v new connection (avail2) from pool\n", cp.logPrefix)
return connectn, nil
case cp.createsem <- true:
path = "create"
// Build a connection if we can't get a real one.
// This can potentially be an overflow connection, or
// a pooled connection.
connectn, err := cp.mkConn(cp.host)
if err != nil {
// On error, release our create hold
<-cp.createsem
}
logging.Debugf("%v new connection (create) from pool\n", cp.logPrefix)
return connectn, err
case <-t.C:
return nil, ErrorPoolTimeout
}
}
}
func (cp *connectionPool) Get() (*connection, error) {
return cp.GetWithTimeout(cp.timeout * time.Millisecond)
}
func (cp *connectionPool) Return(connectn *connection, healthy bool) {
if connectn.conn == nil {
return
}
laddr := connectn.conn.LocalAddr()
if cp == nil {
logging.Infof("%v pool closed\n", cp.logPrefix, laddr)
connectn.conn.Close()
}
if healthy {
defer func() {
if recover() != nil {
// This happens when the pool has already been
// closed and we're trying to return a
// connection to it anyway. Just close the
// connection.
connectn.conn.Close()
}
}()
select {
case cp.connections <- connectn:
logging.Debugf("%v connection %q reclaimed to pool\n", cp.logPrefix, laddr)
default:
logging.Debugf("%v closing overflow connection %q\n", cp.logPrefix, laddr)
<-cp.createsem
connectn.conn.Close()
}
} else {
logging.Infof("%v closing unhealthy connection %q\n", cp.logPrefix, laddr)
<-cp.createsem
connectn.conn.Close()
}
}
package client
import "sync"
import "fmt"
import "errors"
import "time"
import "encoding/json"
import "github.com/couchbase/indexing/secondary/logging"
import common "github.com/couchbase/indexing/secondary/common"
import mclient "github.com/couchbase/indexing/secondary/manager/client"
type metadataClient struct {
cluster string
finch chan bool
mdClient *mclient.MetadataProvider
rw sync.RWMutex // protects all fields listed below
// sherlock topology management, multi-node & single-partition.
adminports map[string]common.IndexerId // book-keeping for cluster changes
topology map[common.IndexerId]map[common.IndexDefnId]*mclient.IndexMetadata
// shelock load replicas.
replicas map[common.IndexDefnId][]common.IndexDefnId
// shelock load balancing.
loads map[common.IndexDefnId]*loadHeuristics // index -> loadHeuristics
// config
servicesNotifierRetryTm int
}
func newMetaBridgeClient(
cluster string, config common.Config) (c *metadataClient, err error) {
b := &metadataClient{
cluster: cluster,
finch: make(chan bool),
adminports: make(map[string]common.IndexerId),
loads: make(map[common.IndexDefnId]*loadHeuristics),
}
b.topology = make(map[common.IndexerId]map[common.IndexDefnId]*mclient.IndexMetadata)
b.servicesNotifierRetryTm = config["servicesNotifierRetryTm"].Int()
// initialize meta-data-provide.
uuid, err := common.NewUUID()
if err != nil {
logging.Errorf("Could not generate UUID in common.NewUUID\n")
return nil, err
}
b.mdClient, err = mclient.NewMetadataProvider(uuid.Str())
if err != nil {
return nil, err
}
if err := b.updateIndexerList(false); err != nil {
logging.Errorf("updateIndexerList(): %v\n", err)
b.mdClient.Close()
return nil, err
}
b.Refresh()
go b.watchClusterChanges() // will also update the indexer list
return b, nil
}
// Sync will update the indexer list.
func (b *metadataClient) Sync() error {
err := b.updateIndexerList(true)
if err != nil {
logging.Errorf("updateIndexerList(): %v\n", err)
}
return err
}
// Refresh implement BridgeAccessor{} interface.
func (b *metadataClient) Refresh() ([]*mclient.IndexMetadata, error) {
mindexes := b.mdClient.ListIndex()
indexes := make([]*mclient.IndexMetadata, 0, len(mindexes))
for _, mindex := range mindexes {
indexes = append(indexes, mindex)
}
b.rw.Lock()
defer b.rw.Unlock()
// b.adminports is the source of truth for list of indexers.
newtopo :=
make(map[common.IndexerId]map[common.IndexDefnId]*mclient.IndexMetadata)
for _, indexerID := range b.adminports {
newtopo[indexerID] = make(map[common.IndexDefnId]*mclient.IndexMetadata)
}
// gather topology of each index.
for _, index := range mindexes {
for _, instance := range index.Instances {
id := instance.IndexerId
if _, ok := newtopo[id]; ok {
newtopo[id][index.Definition.DefnId] = index
}
}
}
b.topology = newtopo
// compute replicas
b.replicas = b.computeReplicas()
// remove loads for indexes that is been deleted / gone-offline.
for defnId := range b.loads {
if _, ok := b.replicas[defnId]; !ok {
delete(b.loads, defnId)
}
}
return indexes, nil
}
// Nodes implement BridgeAccessor{} interface.
func (b *metadataClient) Nodes() ([]*IndexerService, error) {
b.rw.RLock()
defer b.rw.RUnlock()
// gather Indexer services
nodes := make(map[string]*IndexerService)
for indexerID := range b.topology {
if indexerID != common.INDEXER_ID_NIL {
a, q, err := b.mdClient.FindServiceForIndexer(indexerID)
if err == nil {
nodes[a] = &IndexerService{
Adminport: a, Queryport: q, Status: "initial",
}
}
}
}
// gather indexer status
for _, indexer := range b.mdClient.CheckIndexerStatus() {
if node, ok := nodes[indexer.Adminport]; ok && indexer.Connected {
node.Status = "online"
}
}
services := make([]*IndexerService, 0, len(nodes))
for _, node := range nodes {
services = append(services, node)
}
return services, nil
}
// GetIndexDefn implements BridgeAccessor{} interface.
func (b *metadataClient) GetIndexDefn(defnID uint64) *common.IndexDefn {
b.rw.RLock()
defer b.rw.RUnlock()
for _, indexes := range b.topology {
for id, index := range indexes {
if defnID == uint64(id) {
return index.Definition
}
}
}
return nil
}
// CreateIndex implements BridgeAccessor{} interface.
func (b *metadataClient) CreateIndex(
indexName, bucket, using, exprType, partnExpr, whereExpr string,
secExprs []string, isPrimary bool,
planJSON []byte) (uint64, error) {
plan := make(map[string]interface{})
if planJSON != nil && len(planJSON) > 0 {
err := json.Unmarshal(planJSON, &plan)
if err != nil {
return 0, err
}
}
refreshCnt := 0
RETRY:
defnID, err, needRefresh := b.mdClient.CreateIndexWithPlan(
indexName, bucket, using, exprType, partnExpr, whereExpr,
secExprs, isPrimary, plan)
if needRefresh && refreshCnt == 0 {
fmsg := "GsiClient: Indexer Node List is out-of-date. Require refresh."
logging.Debugf(fmsg)
if err := b.updateIndexerList(false); err != nil {
logging.Errorf("updateIndexerList(): %v\n", err)
return uint64(defnID), err
}
refreshCnt++
goto RETRY
}
b.Refresh() // refresh so that we too have IndexMetadata table.
return uint64(defnID), err
}
// BuildIndexes implements BridgeAccessor{} interface.
func (b *metadataClient) BuildIndexes(defnIDs []uint64) error {
_, ok := b.getNodes(defnIDs)
if !ok {
return ErrorIndexNotFound
}
ids := make([]common.IndexDefnId, len(defnIDs))
for i, id := range defnIDs {
ids[i] = common.IndexDefnId(id)
}
return b.mdClient.BuildIndexes(ids)
}
// DropIndex implements BridgeAccessor{} interface.
func (b *metadataClient) DropIndex(defnID uint64) error {
err := b.mdClient.DropIndex(common.IndexDefnId(defnID))
if err == nil { // cleanup index local cache.
b.deleteIndex(defnID)
}
return err
}
// GetScanports implements BridgeAccessor{} interface.
func (b *metadataClient) GetScanports() (queryports []string) {
b.rw.RLock()
defer b.rw.RUnlock()
queryports = make([]string, 0)
for indexerID := range b.topology {
if indexerID != common.INDEXER_ID_NIL {
_, queryport, err := b.mdClient.FindServiceForIndexer(indexerID)
if err == nil {
queryports = append(queryports, queryport)
}
}
}
logging.Debugf("Scan ports %v for all indexes", queryports)
return queryports
}
// GetScanport implements BridgeAccessor{} interface.
func (b *metadataClient) GetScanport(
defnID uint64,
retry int) (queryport string, targetDefnID uint64, ok bool) {
b.rw.RLock()
defer b.rw.RUnlock()
if retry == 0 {
targetDefnID = b.pickOptimal(defnID) // index under least load
} else {
targetDefnID = b.roundRobin(defnID, retry)
}
_, queryport, err :=
b.mdClient.FindServiceForIndex(common.IndexDefnId(targetDefnID))
if err != nil {
return "", 0, false
}
fmsg := "Scan port %s for index defnID %d of equivalent index defnId %d"
logging.Debugf(fmsg, queryport, targetDefnID, defnID)
return queryport, targetDefnID, true
}
// Timeit implement BridgeAccessor{} interface.
func (b *metadataClient) Timeit(defnID uint64, value float64) {
b.rw.Lock()
defer b.rw.Unlock()
id := common.IndexDefnId(defnID)
if load, ok := b.loads[id]; !ok {
b.loads[id] = &loadHeuristics{avgLoad: value, count: 1}
} else {
// compute incremental average.
avg, n := load.avgLoad, load.count
load.avgLoad = (float64(n)*avg + float64(value)) / float64(n+1)
load.count = n + 1
}
}
// IndexState implement BridgeAccessor{} interface.
func (b *metadataClient) IndexState(defnID uint64) (common.IndexState, error) {
b.Refresh()
b.rw.RLock()
defer b.rw.RUnlock()
for _, indexes := range b.topology {
for _, index := range indexes {
if index.Definition.DefnId == common.IndexDefnId(defnID) {
if index.Instances != nil && len(index.Instances) > 0 {
state := index.Instances[0].State
if len(index.Instances) == 0 {
err := fmt.Errorf("no instance for %q", defnID)
return state, err
} else if index.Instances[0].Error != "" {
return state, errors.New(index.Instances[0].Error)
} else {
return state, nil
}
}
return common.INDEX_STATE_ERROR, ErrorInstanceNotFound
}
}
}
return common.INDEX_STATE_ERROR, ErrorIndexNotFound
}
// close this bridge, to be called when a new indexer is added or
// an active indexer leaves the cluster or during system shutdown.
func (b *metadataClient) Close() {
defer func() { recover() }() // in case async Close is called.
b.mdClient.Close()
close(b.finch)
}
//--------------------------------
// local functions to map replicas
//--------------------------------
// compute a map of replicas for each index in 2i.
func (b *metadataClient) computeReplicas() map[common.IndexDefnId][]common.IndexDefnId {
replicaMap := make(map[common.IndexDefnId][]common.IndexDefnId)
for id1, indexes1 := range b.topology {
for _, index1 := range indexes1 {
replicas := make([]common.IndexDefnId, 0)
replicas = append(replicas, index1.Definition.DefnId) // add itself
for id2, indexes2 := range b.topology {
if id1 == id2 { // skip colocated indexes
continue
}
for _, index2 := range indexes2 {
if b.equivalentIndex(index1, index2) { // pick equivalents
replicas = append(replicas, index2.Definition.DefnId)
}
}
}
replicaMap[index1.Definition.DefnId] = replicas // map it
}
}
return replicaMap
}
// compare whether two index are equivalent.
func (b *metadataClient) equivalentIndex(
index1, index2 *mclient.IndexMetadata) bool {
d1, d2 := index1.Definition, index2.Definition
if d1.Using != d1.Using ||
d1.Bucket != d2.Bucket ||
d1.IsPrimary != d2.IsPrimary ||
d1.ExprType != d2.ExprType ||
d1.PartitionScheme != d2.PartitionScheme ||
d1.PartitionKey != d2.PartitionKey ||
d1.WhereExpr != d2.WhereExpr {
return false
}
for _, s1 := range d1.SecExprs {
for _, s2 := range d2.SecExprs {
if s1 != s2 {
return false
}
}
}
return true
}
//--------------------------------------
// local functions to work with replicas
//--------------------------------------
// manage load statistics.
type loadHeuristics struct {
avgLoad float64
count uint64
}
// pick an optimal replica for the index `defnID` under least load.
func (b *metadataClient) pickOptimal(defnID uint64) uint64 {
id := common.IndexDefnId(defnID)
optimalID, currLoad := id, 0.0
if load, ok := b.loads[id]; ok {
currLoad = load.avgLoad
}
for _, replicaID := range b.replicas[id] {
load, ok := b.loads[replicaID]
if !ok { // no load for this replica
return uint64(replicaID)
}
if currLoad == 0.0 || load.avgLoad < currLoad {
// found an index under less load
optimalID, currLoad = replicaID, load.avgLoad
}
}
return uint64(optimalID)
}
func (b *metadataClient) roundRobin(defnID uint64, retry int) uint64 {
id := common.IndexDefnId(defnID)
replicas, ok := b.replicas[id]
if l := len(replicas); ok && l > 0 {
return uint64(replicas[retry%l])
}
return defnID
}
//----------------
// local functions
//----------------
func (b *metadataClient) deleteIndex(defnID uint64) {
b.rw.Lock()
defer b.rw.Unlock()
id := common.IndexDefnId(defnID)
for indexerID, indexes := range b.topology {
delete(indexes, id)
b.topology[indexerID] = indexes
}
b.replicas = b.computeReplicas()
delete(b.loads, common.IndexDefnId(defnID))
}
// getNodes return the set of nodes hosting the specified set
// of indexes
func (b *metadataClient) getNodes(defnIDs []uint64) ([]string, bool) {
adminports := make([]string, 0)
for _, defnID := range defnIDs {
adminport, ok := b.getNode(defnID)
if !ok {
return nil, false
}
adminports = append(adminports, adminport)
}
return adminports, true
}
// getNode hosting index with `defnID`.
func (b *metadataClient) getNode(defnID uint64) (adminport string, ok bool) {
aport, _, err := b.mdClient.FindServiceForIndex(common.IndexDefnId(defnID))
if err != nil {
return "", false
}
return aport, true
}
// update 2i cluster information,
// IMPORTANT: make sure to call Refresh() after calling updateIndexerList()
func (b *metadataClient) updateIndexerList(discardExisting bool) error {
clusterURL, err := common.ClusterAuthUrl(b.cluster)
if err != nil {
return err
}
cinfo, err := common.NewClusterInfoCache(clusterURL, "default")
if err != nil {
return err
}
if err := cinfo.Fetch(); err != nil {
return err
}
// populate indexers' adminport and queryport
adminports, err := getIndexerAdminports(cinfo)
if err != nil {
return err
}
fmsg := "Refreshing indexer list due to cluster changes or auto-refresh."
logging.Infof(fmsg)
logging.Infof("Refreshed Indexer List: %v", adminports)
b.rw.Lock()
defer b.rw.Unlock()
if discardExisting {
for _, indexerID := range b.adminports {
b.mdClient.UnwatchMetadata(indexerID)
}
b.adminports = nil
}
// watch all indexers
m := make(map[string]common.IndexerId)
for _, adminport := range adminports { // add new indexer-nodes if any
if indexerID, ok := b.adminports[adminport]; !ok {
// This adminport is provided by cluster manager. Meta client will
// honor cluster manager to treat this adminport as a healthy node.
// If the indexer is unavail during initialization, WatchMetadata()
// will return afer timeout. A background watcher will keep
// retrying, since it can be tranisent partitioning error.
// If retry eventually successful, this callback will be invoked
// to update meta_client. The metadata client has to rely on the
// cluster manager to send a notification if this node is detected
// to be down, such that the metadata client can stop the
// background watcher.
fn := func(ad string, n_id common.IndexerId, o_id common.IndexerId) {
b.updateIndexer(ad, n_id, o_id)
}
// WatchMetadata will "unwatch" an old metadata watcher which
// shares the same indexer Id (but the adminport may be different).
indexerID = b.mdClient.WatchMetadata(adminport, fn)
m[adminport] = indexerID
} else {
err = b.mdClient.UpdateServiceAddrForIndexer(indexerID, adminport)
m[adminport] = indexerID
delete(b.adminports, adminport)
}
}
// delete indexer-nodes that got removed from cluster.
for _, indexerID := range b.adminports {
// check if the indexerId exists in var "m". In case the
// adminport changes for the same index node, there would
// be two adminport mapping to the same indexerId, one
// in b.adminport (old) and the other in "m" (new). So
// make sure not to accidently unwatch the indexer.
found := false
for _, id := range m {
if indexerID == id {
found = true
}
}
if !found {
b.mdClient.UnwatchMetadata(indexerID)
}
}
b.adminports = m
return err
}
func (b *metadataClient) updateIndexer(
adminport string, newIndexerId, oldIndexerId common.IndexerId) {
func() {
b.rw.Lock()
defer b.rw.Unlock()
logging.Infof(
"Acknowledged that new indexer is registered. Indexer = %v, id = %v",
adminport, newIndexerId)
b.adminports[adminport] = newIndexerId
}()
b.Refresh()
}
// return adminports for all known indexers.
func getIndexerAdminports(cinfo *common.ClusterInfoCache) ([]string, error) {
iAdminports := make([]string, 0)
for _, node := range cinfo.GetNodesByServiceType("indexAdmin") {
status, err := cinfo.GetNodeStatus(node)
if err != nil {
return nil, err
}
logging.Verbosef("node %v status: %q", node, status)
if status == "healthy" || status == "active" || status == "warmup" {
adminport, err := cinfo.GetServiceAddress(node, "indexAdmin")
if err != nil {
return nil, err
}
iAdminports = append(iAdminports, adminport)
} else {
logging.Warnf("node %v status: %q", node, status)
}
}
return iAdminports, nil
}
// FIXME/TODO: based on discussion with John-
//
// if we cannot watch the metadata due to network partition we will
// have an empty list of index and cannot query, in other words
// the client will tolerate the partition and rejects scans until it
// is healed.
// i) alternatively, figure out a way to propagate error that happens
// with watchClusterChanges() go-routine.
//
// and while propating error back to the caller
// 1) we can encourage the caller to Refresh() the client hoping for
// success, or,
// 2) Close() the client and re-create it.
//
// side-effects of partitioning,
// a) query cannot get indexes from the indexer node -- so n1ql has
// to do bucket scan. It is a perf issue.
// b) Network disconnected after watcher is up. We have the list of
// indexes -- but we cannot query on it. N1QL should still degrade
// to bucket scan.
func (b *metadataClient) watchClusterChanges() {
selfRestart := func() {
time.Sleep(time.Duration(b.servicesNotifierRetryTm) * time.Millisecond)
go b.watchClusterChanges()
}
clusterURL, err := common.ClusterAuthUrl(b.cluster)
if err != nil {
logging.Errorf("common.ClusterAuthUrl(): %v\n", err)
selfRestart()
return
}
scn, err := common.NewServicesChangeNotifier(clusterURL, "default")
if err != nil {
logging.Errorf("common.NewServicesChangeNotifier(): %v\n", err)
selfRestart()
return
}
defer scn.Close()
// For observing node services config
ch := scn.GetNotifyCh()
for {
b.Refresh()
select {
case _, ok := <-ch:
if !ok {
selfRestart()
return
} else if err := b.updateIndexerList(false); err != nil {
logging.Errorf("updateIndexerList(): %v\n", err)
selfRestart()
return
}
case <-b.finch:
return
}
}
}
// Package queryport provides a simple library to spawn a queryport and access
// queryport via passive client API.
//
// ---> Request ---> Request
// <--- Response <--- Response
// <--- Response <--- Response
// ... ---> EndStreamRequest
// <--- StreamEndResponse <--- Response (residue)
// <--- StreamEndResponse
package client
import "errors"
import "fmt"
import "io"
import "net"
import "time"
import "encoding/json"
import "github.com/couchbase/indexing/secondary/logging"
import "github.com/couchbase/indexing/secondary/common"
import protobuf "github.com/couchbase/indexing/secondary/protobuf/query"
import "github.com/couchbase/indexing/secondary/transport"
import "github.com/golang/protobuf/proto"
// GsiScanClient for scan operations.
type GsiScanClient struct {
queryport string
pool *connectionPool
// config params
maxPayload int // TODO: what if it exceeds ?
readDeadline time.Duration
writeDeadline time.Duration
poolSize int
poolOverflow int
cpTimeout time.Duration
cpAvailWaitTimeout time.Duration
logPrefix string
}
func NewGsiScanClient(queryport string, config common.Config) *GsiScanClient {
t := time.Duration(config["connPoolAvailWaitTimeout"].Int())
c := &GsiScanClient{
queryport: queryport,
maxPayload: config["maxPayload"].Int(),
readDeadline: time.Duration(config["readDeadline"].Int()),
writeDeadline: time.Duration(config["writeDeadline"].Int()),
poolSize: config["settings.poolSize"].Int(),
poolOverflow: config["settings.poolOverflow"].Int(),
cpTimeout: time.Duration(config["connPoolTimeout"].Int()),
cpAvailWaitTimeout: t,
logPrefix: fmt.Sprintf("[GsiScanClient:%q]", queryport),
}
c.pool = newConnectionPool(
queryport, c.poolSize, c.poolOverflow, c.maxPayload, c.cpTimeout,
c.cpAvailWaitTimeout)
logging.Infof("%v started ...\n", c.logPrefix)
return c
}
// LookupStatistics for a single secondary-key.
func (c *GsiScanClient) LookupStatistics(
defnID uint64, value common.SecondaryKey) (common.IndexStatistics, error) {
// serialize lookup value.
val, err := json.Marshal(value)
if err != nil {
return nil, err
}
req := &protobuf.StatisticsRequest{
DefnID: proto.Uint64(defnID),
Span: &protobuf.Span{Equals: [][]byte{val}},
}
resp, err := c.doRequestResponse(req)
if err != nil {
return nil, err
}
statResp := resp.(*protobuf.StatisticsResponse)
if statResp.GetErr() != nil {
err = errors.New(statResp.GetErr().GetError())
return nil, err
}
return statResp.GetStats(), nil
}
// RangeStatistics for index range.
func (c *GsiScanClient) RangeStatistics(
defnID uint64, low, high common.SecondaryKey,
inclusion Inclusion) (common.IndexStatistics, error) {
// serialize low and high values.
l, err := json.Marshal(low)
if err != nil {
return nil, err
}
h, err := json.Marshal(high)
if err != nil {
return nil, err
}
req := &protobuf.StatisticsRequest{
DefnID: proto.Uint64(defnID),
Span: &protobuf.Span{
Range: &protobuf.Range{
Low: l, High: h, Inclusion: proto.Uint32(uint32(inclusion)),
},
},
}
resp, err := c.doRequestResponse(req)
if err != nil {
return nil, err
}
statResp := resp.(*protobuf.StatisticsResponse)
if statResp.GetErr() != nil {
err = errors.New(statResp.GetErr().GetError())
return nil, err
}
return statResp.GetStats(), nil
}
// Lookup scan index between low and high.
func (c *GsiScanClient) Lookup(
defnID uint64, values []common.SecondaryKey,
distinct bool, limit int64,
cons common.Consistency, vector *TsConsistency,
callb ResponseHandler) error {
// serialize lookup value.
equals := make([][]byte, 0, len(values))
for _, value := range values {
val, err := json.Marshal(value)
if err != nil {
return err
}
equals = append(equals, val)
}
connectn, err := c.pool.Get()
if err != nil {
return err
}
healthy := true
defer func() { c.pool.Return(connectn, healthy) }()
conn, pkt := connectn.conn, connectn.pkt
req := &protobuf.ScanRequest{
DefnID: proto.Uint64(defnID),
Span: &protobuf.Span{Equals: equals},
Distinct: proto.Bool(distinct),
Limit: proto.Int64(limit),
Cons: proto.Uint32(uint32(cons)),
}
if vector != nil {
req.Vector = protobuf.NewTsConsistency(
vector.Vbnos, vector.Seqnos, vector.Vbuuids)
}
// ---> protobuf.ScanRequest
if err := c.sendRequest(conn, pkt, req); err != nil {
fmsg := "%v Lookup() request transport failed `%v`\n"
logging.Errorf(fmsg, c.logPrefix, err)
healthy = false
return err
}
cont := true
for cont {
// <--- protobuf.ResponseStream
cont, healthy, err = c.streamResponse(conn, pkt, callb)
if err != nil {
fmsg := "%v Lookup() response failed `%v`\n"
logging.Errorf(fmsg, c.logPrefix, err)
}
}
return err
}
// Range scan index between low and high.
func (c *GsiScanClient) Range(
defnID uint64, low, high common.SecondaryKey, inclusion Inclusion,
distinct bool, limit int64, cons common.Consistency, vector *TsConsistency,
callb ResponseHandler) error {
// serialize low and high values.
l, err := json.Marshal(low)
if err != nil {
return err
}
h, err := json.Marshal(high)
if err != nil {
return err
}
connectn, err := c.pool.Get()
if err != nil {
return err
}
healthy := true
defer func() { c.pool.Return(connectn, healthy) }()
conn, pkt := connectn.conn, connectn.pkt
req := &protobuf.ScanRequest{
DefnID: proto.Uint64(defnID),
Span: &protobuf.Span{
Range: &protobuf.Range{
Low: l, High: h, Inclusion: proto.Uint32(uint32(inclusion)),
},
},
Distinct: proto.Bool(distinct),
Limit: proto.Int64(limit),
Cons: proto.Uint32(uint32(cons)),
}
if vector != nil {
req.Vector = protobuf.NewTsConsistency(
vector.Vbnos, vector.Seqnos, vector.Vbuuids)
}
// ---> protobuf.ScanRequest
if err := c.sendRequest(conn, pkt, req); err != nil {
fmsg := "%v Range() request transport failed `%v`\n"
logging.Errorf(fmsg, c.logPrefix, err)
healthy = false
return err
}
cont := true
for cont {
// <--- protobuf.ResponseStream
cont, healthy, err = c.streamResponse(conn, pkt, callb)
if err != nil {
fmsg := "%v Range() response failed `%v`\n"
logging.Errorf(fmsg, c.logPrefix, err)
}
}
return err
}
// ScanAll for full table scan.
func (c *GsiScanClient) ScanAll(
defnID uint64, limit int64,
cons common.Consistency, vector *TsConsistency,
callb ResponseHandler) error {
connectn, err := c.pool.Get()
if err != nil {
return err
}
healthy := true
defer func() { c.pool.Return(connectn, healthy) }()
conn, pkt := connectn.conn, connectn.pkt
req := &protobuf.ScanAllRequest{
DefnID: proto.Uint64(defnID),
Limit: proto.Int64(limit),
Cons: proto.Uint32(uint32(cons)),
}
if vector != nil {
req.Vector = protobuf.NewTsConsistency(
vector.Vbnos, vector.Seqnos, vector.Vbuuids)
}
if err := c.sendRequest(conn, pkt, req); err != nil {
fmsg := "%v ScanAll() request transport failed `%v`\n"
logging.Errorf(fmsg, c.logPrefix, err)
healthy = false
return err
}
cont := true
for cont {
// <--- protobuf.ResponseStream
cont, healthy, err = c.streamResponse(conn, pkt, callb)
if err != nil {
fmsg := "%v ScanAll() response failed `%v`\n"
logging.Errorf(fmsg, c.logPrefix, err)
}
}
return err
}
// CountLookup to count number entries for given set of keys.
func (c *GsiScanClient) CountLookup(
defnID uint64, values []common.SecondaryKey,
cons common.Consistency, vector *TsConsistency) (int64, error) {
// serialize match value.
equals := make([][]byte, 0, len(values))
for _, value := range values {
val, err := json.Marshal(value)
if err != nil {
return 0, err
}
equals = append(equals, val)
}
req := &protobuf.CountRequest{
DefnID: proto.Uint64(defnID),
Span: &protobuf.Span{Equals: equals},
Cons: proto.Uint32(uint32(cons)),
}
if vector != nil {
req.Vector = protobuf.NewTsConsistency(
vector.Vbnos, vector.Seqnos, vector.Vbuuids)
}
resp, err := c.doRequestResponse(req)
if err != nil {
return 0, err
}
countResp := resp.(*protobuf.CountResponse)
if countResp.GetErr() != nil {
err = errors.New(countResp.GetErr().GetError())
return 0, err
}
return countResp.GetCount(), nil
}
// CountRange to count number entries in the given range.
func (c *GsiScanClient) CountRange(
defnID uint64, low, high common.SecondaryKey, inclusion Inclusion,
cons common.Consistency, vector *TsConsistency) (int64, error) {
// serialize low and high values.
l, err := json.Marshal(low)
if err != nil {
return 0, err
}
h, err := json.Marshal(high)
if err != nil {
return 0, err
}
req := &protobuf.CountRequest{
DefnID: proto.Uint64(defnID),
Span: &protobuf.Span{
Range: &protobuf.Range{
Low: l, High: h, Inclusion: proto.Uint32(uint32(inclusion)),
},
},
Cons: proto.Uint32(uint32(cons)),
}
if vector != nil {
req.Vector = protobuf.NewTsConsistency(
vector.Vbnos, vector.Seqnos, vector.Vbuuids)
}
resp, err := c.doRequestResponse(req)
if err != nil {
return 0, err
}
countResp := resp.(*protobuf.CountResponse)
if countResp.GetErr() != nil {
err = errors.New(countResp.GetErr().GetError())
return 0, err
}
return countResp.GetCount(), nil
}
func (c *GsiScanClient) Close() error {
return c.pool.Close()
}
func (c *GsiScanClient) doRequestResponse(req interface{}) (interface{}, error) {
connectn, err := c.pool.Get()
if err != nil {
return nil, err
}
healthy := true
defer func() { c.pool.Return(connectn, healthy) }()
conn, pkt := connectn.conn, connectn.pkt
// ---> protobuf.*Request
if err := c.sendRequest(conn, pkt, req); err != nil {
fmsg := "%v %T request transport failed `%v`\n"
logging.Errorf(fmsg, c.logPrefix, req, err)
healthy = false
return nil, err
}
timeoutMs := c.readDeadline * time.Millisecond
conn.SetReadDeadline(time.Now().Add(timeoutMs))
// <--- protobuf.*Response
resp, err := pkt.Receive(conn)
if err != nil {
fmsg := "%v %T response transport failed `%v`\n"
logging.Errorf(fmsg, c.logPrefix, req, err)
healthy = false
return nil, err
}
conn.SetReadDeadline(time.Now().Add(timeoutMs))
// <--- protobuf.StreamEndResponse (skipped) TODO: knock this off.
if endResp, err := pkt.Receive(conn); err != nil {
fmsg := "%v %T response transport failed `%v`\n"
logging.Errorf(fmsg, c.logPrefix, req, err)
healthy = false
return nil, err
} else if endResp != nil {
healthy = false
return nil, ErrorProtocol
}
return resp, nil
}
func (c *GsiScanClient) sendRequest(
conn net.Conn, pkt *transport.TransportPacket, req interface{}) (err error) {
timeoutMs := c.writeDeadline * time.Millisecond
conn.SetWriteDeadline(time.Now().Add(timeoutMs))
return pkt.Send(conn, req)
}
func (c *GsiScanClient) streamResponse(
conn net.Conn,
pkt *transport.TransportPacket,
callb ResponseHandler) (cont bool, healthy bool, err error) {
var resp interface{}
var finish bool
laddr := conn.LocalAddr()
timeoutMs := c.readDeadline * time.Millisecond
conn.SetReadDeadline(time.Now().Add(timeoutMs))
if resp, err = pkt.Receive(conn); err != nil {
resp := &protobuf.ResponseStream{
Err: &protobuf.Error{Error: proto.String(err.Error())},
}
callb(resp) // callback with error
cont, healthy = false, false
if err == io.EOF {
err = fmt.Errorf("server closed connection (EOF)")
} else {
fmsg := "%v connection %q response transport failed `%v`\n"
logging.Errorf(fmsg, c.logPrefix, laddr, err)
}
} else if resp == nil {
finish = true
fmsg := "%v connection %q received StreamEndResponse"
logging.Tracef(fmsg, c.logPrefix, laddr)
callb(&protobuf.StreamEndResponse{}) // callback most likely return true
cont, healthy = false, true
} else { // End of stream marker
streamResp := resp.(*protobuf.ResponseStream)
cont = callb(streamResp)
healthy = true
}
if cont == false && healthy == true && finish == false {
err, healthy = c.closeStream(conn, pkt)
}
return
}
func (c *GsiScanClient) closeStream(
conn net.Conn, pkt *transport.TransportPacket) (err error, healthy bool) {
var resp interface{}
laddr := conn.LocalAddr()
healthy = true
// request server to end the stream.
err = c.sendRequest(conn, pkt, &protobuf.EndStreamRequest{})
if err != nil {
fmsg := "%v closeStream() request transport failed `%v`\n"
logging.Errorf(fmsg, c.logPrefix, err)
healthy = false
return
}
fmsg := "%v connection %q transmitted protobuf.EndStreamRequest"
logging.Tracef(fmsg, c.logPrefix, laddr)
timeoutMs := c.readDeadline * time.Millisecond
// flush the connection until stream has ended.
for true {
conn.SetReadDeadline(time.Now().Add(timeoutMs))
resp, err = pkt.Receive(conn)
if err != nil {
healthy = false
if err == io.EOF {
logging.Errorf("%v connection %q closed \n", c.logPrefix, laddr)
return
}
fmsg := "%v connection %q response transport failed `%v`\n"
logging.Errorf(fmsg, c.logPrefix, laddr, err)
return
} else if resp == nil { // End of stream marker
return
}
}
return
}