Source code for ducktape.cluster.cluster

# Copyright 2014 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import collections


class ClusterNode(object):
    def __init__(self, account, **kwargs):
        self.account = account
        for k, v in kwargs.items():
            setattr(self, k, v)

    @property
    def name(self):
        return self.account.hostname

    @property
    def operating_system(self):
        return self.account.operating_system


[docs]class Cluster(object): """ Interface for a cluster -- a collection of nodes with login credentials. This interface doesn't define any mapping of roles/services to nodes. It only interacts with some underlying system that can describe available resources and mediates reservations of those resources. """
[docs] def __init__(self): self.max_used_nodes = 0
def __len__(self): """Size of this cluster object. I.e. number of 'nodes' in the cluster.""" return self.available().size() + self.used().size()
[docs] def alloc(self, cluster_spec): """ Allocate some nodes. :param cluster_spec: A ClusterSpec describing the nodes to be allocated. :throws InsufficientResources: If the nodes cannot be allocated. :return: Allocated nodes spec """ allocated = self.do_alloc(cluster_spec) self.max_used_nodes = max(self.max_used_nodes, len(self.used())) return allocated
[docs] def do_alloc(self, cluster_spec): """ Subclasses should implement actual allocation here. :param cluster_spec: A ClusterSpec describing the nodes to be allocated. :throws InsufficientResources: If the nodes cannot be allocated. :return: Allocated nodes spec """ raise NotImplementedError
[docs] def free(self, nodes): """Free the given node or list of nodes""" if isinstance(nodes, collections.Iterable): for s in nodes: self.free_single(s) else: self.free_single(nodes)
def free_single(self, node): raise NotImplementedError() def __eq__(self, other): return other is not None and self.__dict__ == other.__dict__ def __hash__(self): return hash(tuple(sorted(self.__dict__.items()))) def num_available_nodes(self): return self.available().size()
[docs] def available(self): """ Return a ClusterSpec object describing the currently available nodes. """ raise NotImplementedError
[docs] def used(self): """ Return a ClusterSpec object describing the currently in use nodes. """ raise NotImplementedError
def max_used(self): return self.max_used_nodes
[docs] def all(self): """ Return a ClusterSpec object describing all nodes. """ return self.available().clone().add(self.used())