#!/usr/bin/env python
# Copyright (c) 2020 Klustron inc. All rights reserved.
# This source code is licensed under Apache 2.0 License,
# combined with Common Clause Condition 1.0, as detailed in the NOTICE file.

import platform
import sys
import json
import collections
import os
import os.path
import socket
import uuid
import getpass
import re

if sys.version_info.major == 2:
    import urllib as ulib
else:
    import urllib.request as ulib

try:
    import mysql.connector as mc
except ImportError as imerr:
    pass

try:
    import yaml
except ImportError as imerr:
    pass

def module_imported(modname):
    return modname in sys.modules

def output_info(comf, str):
    comf.write("cat <<EOF\n")
    comf.write("%s\n" % str)
    comf.write("EOF\n")

def check_version_to_major(version, mver):
    vers = version.split('.')
    major = int(vers[0])
    if major >= mver:
        return True
    else:
        return False

def check_version_to_minor(version, majorv, minorv):
    vers = version.split('.')
    major = int(vers[0])
    minor = int(vers[1])
    if major > majorv or (major == majorv and minor >= minorv):
        return True
    else:
        return False

def check_version_to_patch(version, majorv, minorv, patchv):
    vers = version.split('.')
    major = int(vers[0])
    minor = int(vers[1])
    patch = int(vers[2])
    if major > majorv or (major == majorv and minor > minorv) or (major == majorv and minor == minorv and patch >= patchv):
        return True
    else:
        return False

def get_version_info(version):
    return version.split('.')

# compare to version, return the following values:
#  -1 : version1 < version2
#   0 : version1 == version2
#   1 : version1 > version2
# TODO: change to use cmp for tuples
def compare_version(version1, version2):
    tup1 = get_version_info(version1)
    tup2 = get_version_info(version2)
    ver1 = []
    ver2 = []
    for num in tup1:
        ver1.append(int(num))
    for num in tup2:
        ver2.append(int(num))
    len1 = len(ver1)
    len2 = len(ver2)
    if len1 < len2:
        for i in range(len2 - len1):
            ver1.append(0)
        len1 = len2
    elif len1 > len2:
        for i in range(len1 - len2):
            ver2.append(0)
    for i in range(0, len1):
        if ver1[i] < ver2[i]:
            return -1
        elif ver1[i] > ver2[i]:
            return 1
    return 0

def validate_upgrade(verfrom, verto):
    res = compare_version(verto, verfrom)
    if res < 0:
        raise ValueError('Error: Degrade is not allowed!')

def output_info(comf, str):
    comf.write("cat <<EOF\n")
    comf.write("%s\n" % str)
    comf.write("EOF\n")

def my_print(toprint):
    if sys.version_info.major == 2:
        sys.stdout.write(toprint + "\n")
    else:
        print(toprint)
    sys.stdout.flush()

def run_command_verbose(command, dryrun):
    my_print(command)
    sys.stdout.flush()
    if not dryrun:
        os.system(command)
        sys.stdout.flush()

def run_remote_command(machines, ip, progdir, targetdir, command, dryrun):
    mach = machines.get(ip)
    sshport = mach.get('sshport', 22)
    if progdir.startswith('/'):
        realdir = progdir
    else:
        realdir = "%s/%s" % (mach['basedir'], progdir)
    if targetdir.startswith('/'):
        workdir = targetdir
    else:
        workdir = "%s/%s" % (realdir, targetdir)
    envstr = "export PATH=%s/bin:$PATH; export LD_LIBRARY_PATH=%s/lib:%s/lib64:$LD_LIBRARY_PATH" % (realdir, realdir, realdir)
    if mach['sshpass']:
        cmdstr='''bash remote_run.sh --sshpass --password='%s' --sshport=%d --user=%s %s '%s; cd %s || exit 1; %s' '''
        tup= (mach['password'], sshport, mach['user'], ip, envstr, workdir, command)
    else:
        cmdstr='''bash remote_run.sh --sshport=%d --user=%s %s '%s; cd %s || exit 1; %s' '''
        tup= (sshport, mach['user'], ip, envstr, workdir, command)
    run_command_verbose(cmdstr % tup, dryrun)

def stop_clustermgr_node(machines, progdir, node, dryrun):
    command = "bash stop_cluster_mgr.sh"
    run_remote_command(machines, node['ip'], progdir, 'bin', command, dryrun)

def stop_clustermgr_node_usingidx(machines, progdir, clustermgrobj, idx, dryrun):
    node = clustermgrobj['nodes'][idx]
    stop_clustermgr_node(machines, progdir, node, dryrun)

def start_clustermgr_node(machines, progdir, node, dryrun):
    command = "bash start_cluster_mgr.sh >& run.log"
    run_remote_command(machines, node['ip'], progdir, 'bin', command, dryrun)

def start_clustermgr_node_usingidx(machines, progdir, clustermgrobj, idx, dryrun):
    node = clustermgrobj['nodes'][idx]
    start_clustermgr_node(machines, progdir, node, dryrun)

def stop_nodemgr_node(machines, progdir, node, dryrun):
    command = "bash stop_node_mgr.sh"
    run_remote_command(machines, node['ip'], progdir, 'bin', command, dryrun)

def stop_nodemgr_node_usingidx(machines, progdir, nodemgrobj, idx, dryrun):
    node = nodemgrobj['nodes'][idx]
    stop_nodemgr_node(machines, progdir, node, dryrun)

def start_nodemgr_node(machines, progdir, node, dryrun):
    command = "bash start_node_mgr.sh >& run.log"
    run_remote_command(machines, node['ip'], progdir, 'bin', command, dryrun)

def start_nodemgr_node_usingidx(machines, progdir, nodemgrobj, idx, dryrun):
    node = nodemgrobj['nodes'][idx]
    start_nodemgr_node(machines, progdir, node, dryrun)

def start_storage_node(machines, progdir, node, dryrun):
    command = "bash startmysql.sh %s" % str(node['port'])
    run_remote_command(machines, node['ip'], progdir, 'dba_tools', command, dryrun)

def start_storage_node_usingidx(machines, progdir, groupobj, idx, dryrun):
    node = groupobj['nodes'][idx]
    start_storage_node(machines, progdir, node, dryrun)

def stop_storage_node(machines, progdir, node, dryrun):
    command = "bash stopmysql.sh %s" % str(node['port'])
    run_remote_command(machines, node['ip'], progdir, 'dba_tools', command, dryrun)

def stop_storage_node_usingidx(machines, progdir, groupobj, idx, dryrun):
    node = groupobj['nodes'][idx]
    stop_storage_node(machines, progdir, node, dryrun)

def kill_storage_node(machines, progdir, node, dryrun):
    command = "bash killmysql.sh %s" % str(node['port'])
    run_remote_command(machines, node['ip'], progdir, 'dba_tools', command, dryrun)

def kill_storage_node_usingidx(machines, progdir, groupobj, idx, dryrun):
    node = groupobj['nodes'][idx]
    kill_storage_node(machines, progdir, node, dryrun)

def start_server_node(machines, progdir, node, dryrun, pyexec="python2"):
    command = "%s start_pg.py --port=%s" % (pyexec, str(node['port']))
    run_remote_command(machines, node['ip'], progdir, 'scripts', command, dryrun)

def start_server_node_usingidx(machines, progdir, compobj, idx, dryrun):
    node = compobj['nodes'][idx]
    start_server_node(machines, progdir, node, dryrun)

def stop_server_node(machines, progdir, node, dryrun):
    command = "pg_ctl -D %s stop -m immediate" % str(node['datadir'])
    run_remote_command(machines, node['ip'], progdir, 'scripts', command, dryrun)

def stop_server_node_usingidx(machines, progdir, compobj, idx, dryrun):
    node = compobj['nodes'][idx]
    stop_server_node(machines, progdir, node, dryrun)

def addIpToMachineMap(map, ip, config, haspg=False):
    #my_print("add ip %s" % ip)
    if not ip in map:
        mac={"ip":ip, "user":config['defuser'], "basedir":config['defbase'], "haspg":haspg, "pg_with_san":False, 'password':'', 'sshpass':False}
        map[ip] = mac

def generate_haproxy_config(cluster, machines, subdir, confname):
    comps = cluster['comp']['nodes']
    haproxy = cluster['haproxy']
    mach = machines[haproxy['ip']]
    maxconn = haproxy.get('maxconn', 900)
    conf = open("%s/%s" % (subdir, confname), 'w')
    conf.write('''# generated automatically
    global
        pidfile %s/haproxy-%d.pid
        maxconn %d
        daemon

    defaults
        log global
        retries 5
        timeout connect 5s
        timeout client 30000s
        timeout server 30000s

    listen kunlun-cluster
        bind :%d
        mode tcp
        balance roundrobin
''' % (mach['basedir'], haproxy['port'], maxconn, haproxy['port']))
    i = 1
    for node in comps:
        conf.write("        server comp%d %s:%d weight 1 check inter 10s\n" % (i, node['ip'], node['port']))
        i += 1
    conf.close()

def get_json_from_file(filepath):
    jsconf = open(filepath)
    jstr = jsconf.read()
    relstr = jstr.lstrip()
    jsconf.close()
    jscfg = None
    if relstr.startswith("{") or relstr.startswith("["):
        jscfg = json.loads(jstr, object_pairs_hook=collections.OrderedDict)
    elif module_imported('yaml'):
        if sys.version_info.major == 2:
            jscfg = yaml.load(jstr)
        else:
            jscfg = yaml.load(jstr, Loader=yaml.FullLoader)
    else:
        raise ValueError('Error: The file format is not supported!')
    return jscfg

def addMachineToMap(map, ip, user, basedir, sshport=22, password='', sshpass=False):
    # We can add logic here to check if the item exsits, new added should be unique to existing.
    if ip in map:
        return
    if not basedir.startswith('/'):
        raise ValueError('Error: the basedir for %s must be absolute path!' % ip)
    if sshpass and password == '':
        raise ValueError('Error: password must not be empty when using sshpass to connect %s !' % ip)
    mac={"ip":ip, "user":user, "basedir":basedir, "sshport":sshport, 'password':password, 'sshpass': sshpass, "haspg": False, "pg_with_san": False}
    map[ip] = mac

def gethostip():
    hostname = socket.gethostname()
    ip = socket.gethostbyname(hostname)
    return ip

def getuuid():
    return str(uuid.uuid1())

def addIpToFilesMap(map, ip, fname, targetdir):
    if not ip in map:
        map[ip] = {}
    tmap = map[ip]
    if not fname in tmap:
        tmap[fname] = targetdir

def addNodeToFilesMap(map, node, fname, targetdir):
    ip = node['ip']
    addIpToFilesMap(map, ip, fname, targetdir)

def addIpToFilesListMap(map, ip, fname, targetdir):
    if not ip in map:
        map[ip] = []
    tlist = map[ip]
    tlist.append([fname, targetdir])

def addIpToGetBackListMap(map, ip, fname, targetdir):
    if not ip in map:
        map[ip] = []
    tlist = map[ip]
    tlist.append([fname, targetdir])

def addNodeToFilesListMap(map, node, fname, targetdir):
    ip = node['ip']
    addIpToFilesListMap(map, ip, fname, targetdir)

def addNodeToIpset(set, node):
    ip = node['ip']
    set.add(ip)

# Not used currently.
def addToCommandsMap(map, ip, targetdir, command):
    if not ip in map:
        map[ip] = []
    cmds = map[ip]
    cmds.append([targetdir, command])

def addToFileList(files, ip, sourcepath, target):
    lst = [ip, sourcepath, target]
    files.append(lst)

def addToCommandsListNoenv(cmds, ip, targetdir, command):
    lst = [ip, targetdir, command]
    cmds.append(lst)

def addToCommandsList(cmds, ip, targetdir, command, envtype="no"):
    lst = [ip, targetdir, command, envtype]
    cmds.append(lst)

def addToDirMap(map, ip, newdir):
    if not ip in map:
        map[ip] = []
    dirs = map[ip]
    dirs.append(newdir)

def addToListMap(map, key, value):
    if key not in map:
        map[key] = []
    l = map[key]
    l.append(value)

def islocal(config, ip, user):
    if ip.startswith("127") or ip in [config['localip'], "localhost", socket.gethostname()]:
        if getpass.getuser() == user:
            return True
    return False

def process_filelist(comf, config, machines, filelist):
    for filetup in filelist:
        ip = filetup[0]
        mach = machines[ip]
        sshport = mach.get('sshport', 22)
        if islocal(config, ip, mach['user']):
            # For local, we do not consider the user.
            mkstr = '''/bin/bash -xc $"cp -f %s %s" >& lastlog '''
            tup= (filetup[1], filetup[2])
        else:
            if mach['sshpass']:
                mkstr = '''bash dist.sh --sshpass --password='%s' --sshport=%d --hosts=%s --user=%s %s %s >& lastlog '''
                tup= (mach['password'], sshport, ip, mach['user'], filetup[1], filetup[2])
            else:
                mkstr = '''bash dist.sh --sshport=%d --hosts=%s --user=%s %s %s >& lastlog '''
                tup= (sshport, ip, mach['user'], filetup[1], filetup[2])
        comf.write(mkstr % tup)
        comf.write("\n")

def process_file(comf, config, machines, ip, source, target):
    process_filelist(comf, config, machines, [[ip, source, target]])

def process_getbacklist(comf, config, machines, filelist):
    for filetup in filelist:
        ip = filetup[0]
        mach = machines[ip]
        sshport = mach.get('sshport', 22)
        if islocal(config, ip, mach['user']):
            # For local, we do not consider the user.
            mkstr = '''/bin/bash -xc $"cp -f %s %s" >& lastlog '''
            tup= (filetup[1], filetup[2])
        else:
            if mach['sshpass']:
                mkstr = '''bash getback.sh --sshpass --password='%s' --sshport=%d --hosts=%s --user=%s %s %s >& lastlog '''
                tup= (mach['password'], sshport, ip, mach['user'], filetup[1], filetup[2])
            else:
                mkstr = '''bash getback.sh --sshport=%d --hosts=%s --user=%s %s %s >& lastlog '''
                tup= (sshport, ip, mach['user'], filetup[1], filetup[2])
        comf.write(mkstr % tup)
        comf.write("\n")

def process_getback(comf, config, machines, ip, source, target):
    process_getbacklist(comf, config, machines, [[ip, source, target]])

def process_commandslist_noenv(comf, config, machines, commandslist):
    for cmd in commandslist:
        ip=cmd[0]
        mach = machines[ip]
        sshport = mach.get('sshport', 22)
        if islocal(config, ip, mach['user']):
            # For local, we do not consider the user.
            mkstr = '''/bin/bash -xc $"cd %s || exit 1; %s" >& lastlog '''
            tup= (cmd[1], cmd[2])
        else:
            ttyopt=""
            if cmd[2].find("sudo ") >= 0:
                ttyopt="--tty"
            if mach['sshpass']:
                mkstr = '''bash remote_run.sh --sshpass --password='%s' --sshport=%d %s --user=%s %s $"cd %s || exit 1; %s" >& lastlog '''
                tup= (mach['password'], sshport, ttyopt, mach['user'], ip, cmd[1], cmd[2])
            else:
                mkstr = '''bash remote_run.sh --sshport=%d %s --user=%s %s $"cd %s || exit 1; %s" >& lastlog '''
                tup= (sshport, ttyopt, mach['user'], ip, cmd[1], cmd[2])
        comf.write(mkstr % tup)
        comf.write("\n")

def process_command_noenv(comf, config, machines, ip, targetdir, command):
    process_commandslist_noenv(comf, config, machines, [[ip, targetdir, command]])

def process_commandslist_setenv(comf, config, machines, commandslist):
    for cmd in commandslist:
        ip=cmd[0]
        mach = machines[ip]
        sshport = mach.get('sshport', 22)
        if islocal(config, ip, mach['user']):
            # For local, we do not consider the user.
            mkstr = '''/bin/bash -c $"cd %s || exit 1; test -f env.sh.node && source ./env.sh.node; cd %s || exit 1; test -f %s/env.sh && envtype=%s && source %s/env.sh; %s" >& lastlog '''
            tup= (mach['basedir'], cmd[1], mach['basedir'], cmd[3], mach['basedir'], cmd[2])
        else:
            ttyopt=""
            if cmd[2].find("sudo ") >= 0:
                ttyopt="--tty"
            if mach['sshpass']:
                mkstr = '''bash remote_run.sh --sshpass --password='%s' --sshport=%d %s --user=%s %s $"cd %s || exit 1; test -f env.sh.node && source ./env.sh.node; cd %s || exit 1; test -f %s/env.sh && envtype=%s && source %s/env.sh; %s" >& lastlog '''
                tup= (mach['password'], sshport, ttyopt, mach['user'], ip, mach['basedir'], cmd[1], mach['basedir'], cmd[3], mach['basedir'], cmd[2])
            else:
                mkstr = '''bash remote_run.sh --sshport=%d %s --user=%s %s $"cd %s || exit 1; test -f env.sh.node && source ./env.sh.node; cd %s || exit 1; test -f %s/env.sh && envtype=%s && source %s/env.sh; %s" >& lastlog '''
                tup= (sshport, ttyopt, mach['user'], ip, mach['basedir'], cmd[1], mach['basedir'], cmd[3], mach['basedir'], cmd[2])
        comf.write(mkstr % tup)
        comf.write("\n")

def process_command_setenv(comf, config, machines, ip, targetdir, command, envtype='no'):
    process_commandslist_setenv(comf, config, machines, [[ip, targetdir, command, envtype]])

def process_dirmap(comf, dirmap, machines, usesudo, config):
    # dir making
    for ip in dirmap:
        mach = machines.get(ip)
        dirs=dirmap[ip]
        for d in dirs:
            if usesudo:
                process_command_noenv(comf, config, machines, ip, '/',
                    'sudo mkdir -p %s && sudo chown -R %s:\`id -gn %s\` %s' % (d, mach['user'], mach['user'], d))
            else:
                process_command_noenv(comf, config, machines, ip, '/', 'mkdir -p %s' % d)

def process_fileslistmap(comf, filesmap, machines, prefix, config):
    # files copy.
    for ip in filesmap:
        mach = machines.get(ip)
        fmap = filesmap[ip]
        for fpair in fmap:
            source = fpair[0]
            if not source.startswith('/'):
                source = '%s/%s' % (prefix, fpair[0])
            target = fpair[1]
            if not target.startswith('/'):
                target = '%s/%s' % (mach['basedir'], fpair[1])
            process_file(comf, config, machines, ip, source, target)

def process_getbackmap(comf, filesmap, machines, prefix, config):
    # files copy.
    for ip in filesmap:
        mach = machines.get(ip)
        fmap = filesmap[ip]
        for fpair in fmap:
            source = fpair[0]
            if not source.startswith('/'):
                source = '%s/%s' % (mach['basedir'], fpair[0])
            target = fpair[1]
            if not target.startswith('/'):
                target = '%s/%s' % (prefix, fpair[1])
            process_getback(comf, config, machines, ip, source, target)

def process_filesmap(comf, filesmap, machines, prefix, config):
    # files copy.
    for ip in filesmap:
        mach = machines.get(ip)
        fmap = filesmap[ip]
        for fname in fmap:
            source = fname
            if not source.startswith('/'):
                source = '%s/%s' % (prefix, fname)
            target = fmap[fname]
            if not target.startswith('/'):
                target = '%s/%s' % (mach['basedir'], fmap[fname])
            process_file(comf, config, machines, ip, source, target)

def validate_ha_mode(ha_mode):
    if ha_mode not in ['rbr', 'no_rep', 'mgr']:
        raise ValueError('Error: The ha_mode must be rbr, mgr or no_rep')

def checkdirs(dirs):
    for d in dirs:
        if not os.path.exists(d):
            os.mkdir(d)

def addPortToMachine(map, ip, port):
    if not ip in map:
        map[ip] = set([port])
    else:
        pset = map[ip]
        if port in pset:
            raise ValueError("duplicate port:%s on host:%s" % (str(port), ip))
        else:
            pset.add(port)

def addDirToMachine(map, ip, directory):
    if not ip in map:
        map[ip] = set([directory])
    else:
        dset = map[ip]
        if directory in dset:
            raise ValueError("duplicate directory:%s on host:%s" % (directory, ip))
        else:
            dset.add(directory)

def gen_default_base():
    config = {
            'product_version': "1.4.1",
            'defuser': 'kunlun',
            'defbase': '/kunlun',
            'autostart': False,
            'sudo': False,
            'localip': '127.0.0.1',
            'cloud': False,
            'small': False,
            'download': False,
            'downloadsite': 'public',
            'downloadtype': 'release',
            'targetarch': platform.machine(),
            'overwrite': False,
            'professional': False,
            'rc': False,
            'mariadb': False,
            'cantian': False,
            'license_file': 'license.lic',
            'infodir': 'infos'
            }
    return config

def get_default_config1():
    config = gen_default_base()
    config.update({
        "valgrind": False,
        'defbrpc_http_port': 58000,
        'defbrpc_raft_port': 58001
            })
    return config

def get_default_config2():
    config = gen_default_base()
    config.update({
        'pyexec': "none",
        "verbose": False,
        'setbashenv': False,
        'defbrpc_http_port_clustermgr': 58000,
        'defbrpc_raft_port_clustermgr': 58001,
        'defprometheus_port_start_clustermgr': 59010,
        'defbrpc_http_port_nodemgr': 58002,
        'deftcp_port_nodemgr': 58003,
        'defstorage_portrange_nodemgr': "57000-58000",
        'defserver_portrange_nodemgr': "47000-48000",
        'defprometheus_port_start_nodemgr': 58010,
        'defraft_port_cdc': 58004,
        'defhttp_port_cdc': 58005,
        'multipledc': False,
        'enable_rocksdb': False,
        'upgrade_version': config['product_version'],
        'upgrade_all': False,
        'upgrade_stage': 'do'
            })
    return config

def init_global_base(jscfg, args):
    if 'config' not in jscfg:
        jscfg['config'] = {}
    config = jscfg['config']
    if 'product_version' not in config:
        config['product_version'] = args.product_version
    if 'defuser' not in config:
        config['defuser'] = args.defuser
    if 'defbase' not in config:
        config['defbase'] = args.defbase
    # autostart implies sudo
    if 'autostart' not in config:
        config['autostart'] = args.autostart
    if config['autostart']:
        config['sudo'] = True
    elif 'sudo' not in config:
        config['sudo'] = args.sudo
    if 'localip' not in config:
        config['localip'] = args.localip
    if 'small' not in config:
        config['small'] = args.small
    if 'cloud' not in config:
        config['cloud'] = args.cloud
    if 'download' not in config:
        config['download'] = args.download
    if 'downloadsite' not in config:
        config['downloadsite'] = args.downloadsite
    if 'downloadtype' not in config:
        config['downloadtype'] = args.downloadtype
    if 'targetarch' not in config:
        config['targetarch'] = args.targetarch
    if 'overwrite' not in config:
        config['overwrite'] = args.overwrite
    if 'professional' not in config:
        config['professional'] = args.professional
    if 'rc' not in config:
        config['rc'] = args.rc
    if 'mariadb' not in config:
        config['mariadb'] = args.mariadb
    if 'cantian' not in config:
        config['cantian'] = args.cantian
    if 'license_file' not in config:
        config['license_file'] = args.license_file
    if 'infodir' not in config:
        config['infodir'] = args.infodir
    
def init_global_config1(jscfg, args):
    init_global_base(jscfg, args)
    config = jscfg['config']
    if 'valgrind' not in config:
        config['valgrind'] = args.valgrind
    if 'defbrpc_http_port' not in config:
        config['defbrpc_http_port'] = args.defbrpc_http_port
    if 'defbrpc_raft_port' not in config:
        config['defbrpc_raft_port'] = args.defbrpc_raft_port

def init_global_config2(jscfg, args):
    init_global_base(jscfg, args)
    config = jscfg['config']
    if 'pyexec' not in config:
        config['pyexec'] = args.pyexec
    if 'verbose' not in config:
        config['verbose'] = args.verbose
    if 'setbashenv' not in config:
        config['setbashenv'] = args.setbashenv
    if 'defbrpc_http_port_clustermgr' not in config:
        config['defbrpc_http_port_clustermgr'] = args.defbrpc_http_port_clustermgr
    if 'defbrpc_raft_port_clustermgr' not in config:
        config['defbrpc_raft_port_clustermgr'] = args.defbrpc_raft_port_clustermgr
    if 'defprometheus_port_start_clustermgr' not in config:
        config['defprometheus_port_start_clustermgr'] = args.defprometheus_port_start_clustermgr
    if 'defbrpc_http_port_nodemgr' not in config:
        config['defbrpc_http_port_nodemgr'] = args.defbrpc_http_port_nodemgr
    if 'deftcp_port_nodemgr' not in config:
        config['deftcp_port_nodemgr'] = args.deftcp_port_nodemgr
    if 'defstorage_portrange_nodemgr' not in config:
        config['defstorage_portrange_nodemgr'] = args.defstorage_portrange_nodemgr
    if 'defserver_portrange_nodemgr' not in config:
        config['defserver_portrange_nodemgr'] = args.defserver_portrange_nodemgr
    if 'defprometheus_port_start_nodemgr' not in config:
        config['defprometheus_port_start_nodemgr'] = args.defprometheus_port_start_nodemgr
    if 'defraft_port_cdc' not in config:
        config['defraft_port_cdc'] = args.defraft_port_cdc
    if 'defhttp_port_cdc' not in config:
        config['defhttp_port_cdc'] = args.defhttp_port_cdc
    if 'multipledc' not in config:
        config['multipledc'] = args.multipledc
    if 'enable_rocksdb' not in config:
        config['enable_rocksdb'] = True
    if 'send_license' not in config:
        if config['professional']:
            config['send_license'] = False
        else:
            config['send_license'] = True
    if 'upgrade_version' not in config:
        config['upgrade_version'] = args.upgrade_version
    if config['upgrade_version'] != config['product_version']:
        config['upgrade_all'] = True
    if 'upgrade_all' not in config:
        config['upgrade_all'] = False
    if 'upgrade_stage' not in config:
        config['upgrade_stage'] = args.upgrade_stage

# strip the leading and trailing empty chars for a attr value
def normalize_string(obj, attr):
    obj[attr] = obj[attr].strip()

# Setup the machines object for one-key installation script(binary version)
def setup_machines1(jscfg, machines):
    machnodes = jscfg.get('machines', [])
    cluster = jscfg['cluster']
    meta = cluster['meta']
    datas = cluster['data']
    comp = cluster['comp']
    clustermgr = cluster['clustermgr']
    config = jscfg['config']
    for mach in machnodes:
        normalize_string(mach, 'ip')
        ip=mach['ip']
        user=mach.get('user', config['defuser'])
        base=mach.get('basedir', config['defbase']).strip()
        sshport = mach.get('sshport', 22)
        password = mach.get('password', '')
        sshpass = mach.get('sshpass', False)
        addMachineToMap(machines, ip, user, base, sshport, password, sshpass)
    for node in meta['nodes']:
        normalize_string(node, 'ip')
        addIpToMachineMap(machines, node['ip'], config)
    for shard in datas:
        for node in shard['nodes']:
            normalize_string(node, 'ip')
            addIpToMachineMap(machines, node['ip'], config)
    for node in comp['nodes']:
        normalize_string(node, 'ip')
        addIpToMachineMap(machines, node['ip'], config, True)
    if 'ip' in clustermgr:
        normalize_string(clustermgr, 'ip')
        addIpToMachineMap(machines, clustermgr['ip'], config)
    elif 'nodes' in clustermgr:
        for node in clustermgr['nodes']:
            ormalize_string(node, 'ip')
            addIpToMachineMap(machines, node['ip'], config)
    haproxy = cluster.get("haproxy", None)
    if haproxy is not None:
        ormalize_string(haproxy, 'ip')
        addIpToMachineMap(machines, haproxy['ip'], config)

def set_storage_dirs1(machines, node):
    mach = machines.get(node['ip'])
    if 'data_dir_path' not in node:
        node['data_dir_path'] = "%s/%s/%s" % (mach['basedir'], 'storage_datadir', str(node['port']))
    if 'log_dir_path' not in node:
        node['log_dir_path'] = "%s/%s/%s" % (mach['basedir'], 'storage_logdir', str(node['port']))
    if 'innodb_log_dir_path' not in node:
        node['innodb_log_dir_path'] = "%s/%s/%s" % (mach['basedir'], 'storage_waldir', str(node['port']))
    if 'tornado_sn_data_dir' not in node:
        node['tornado_sn_data_dir'] = "%s/%s/%s" % (mach['basedir'], 'vec_tmpdir', str(node['port']))

def set_comp_dirs1(machines, node):
    mach = machines.get(node['ip'])
    if 'datadir' not in node:
        node['datadir'] = "%s/%s/%s" % (mach['basedir'], 'server_datadir', str(node['port']))
    if 'tornado_cn.data_dir' not in node:
        node['tornado_cn.data_dir'] = "%s/%s/%s" % (mach['basedir'], 'vec_tmpdir', str(node['port']))

# ha_mode logic:
# for meta_ha_mode, the check order is: meta['ha_mode'] -> cluster['ha_mode'] -> no_rep(1 node)/mgr(multi nodes)
# for shard_ha_mode, the check order is: cluster['ha_mode'] -> meta_ha_mode
# This validates and sets the config object for one-key installation script(binary version)
def validate_and_set_config1(jscfg, machines):
    cluster = jscfg['cluster']
    meta = cluster['meta']
    comps = cluster['comp']['nodes']
    datas = cluster['data']
    clustermgr = cluster['clustermgr']
    haproxy = cluster.get("haproxy", None)
    config = jscfg['config']
    portmap = {}
    dirmap = {}

    if 'config' not in cluster:
        cluster['config'] = {}
    if 'config' not in cluster['comp']:
        cluster['comp']['config'] = {}
    if 'config' not in meta:
        meta['config'] = {}

    meta_ha_mode = ''
    shard_ha_mode = ''
    if 'ha_mode' in cluster:
        mode = cluster['ha_mode']
        validate_ha_mode(mode)
        meta_ha_mode = mode
        shard_ha_mode = mode

    if 'ha_mode' in meta:
        mode = meta.get('ha_mode')
        validate_ha_mode(mode)
        meta_ha_mode = mode

    metacnt = len(meta['nodes'])
    if metacnt == 0:
        raise ValueError('Error: There must be at least one node in meta shard')
    if meta_ha_mode == '':
        if metacnt > 1:
            # This validation is not used for rbr, so mgr for multiple replicas.
            meta_ha_mode = 'mgr'
        else:
            meta_ha_mode = 'no_rep'

    meta['ha_mode'] = meta_ha_mode
    if metacnt > 1 and meta_ha_mode == 'no_rep':
        raise ValueError('Error: meta_ha_mode is no_rep, but there are multiple nodes in meta shard')
    elif metacnt == 1 and meta_ha_mode != 'no_rep':
        raise ValueError('Error: meta_ha_mode is mgr/rbr, but there is only one node in meta shard')

    hasPrimary=False
    for node in meta['nodes']:
        if 'config' not in node:
            node['config'] = {}
        addPortToMachine(portmap, node['ip'], node['port'])
        if 'xport' in node:
            addPortToMachine(portmap, node['ip'], node['xport'])
        if 'mgr_port' in node:
            addPortToMachine(portmap, node['ip'], node['mgr_port'])
        set_storage_dirs1(machines, node)
        addDirToMachine(dirmap, node['ip'], node['data_dir_path'])
        addDirToMachine(dirmap, node['ip'], node['log_dir_path'])
        addDirToMachine(dirmap, node['ip'], node['innodb_log_dir_path'])
        addDirToMachine(dirmap, node['ip'], node['tornado_sn_data_dir'])
        if node.get('is_primary', False):
            if hasPrimary:
                raise ValueError('Error: Two primaries found in meta shard, there should be one and only one Primary specified !')
            else:
                hasPrimary = True
    if metacnt > 1:
        if not hasPrimary:
            raise ValueError('Error: No primary found in meta shard, there should be one and only one primary specified !')
    else:
        node['is_primary'] = True

    if 'enable_rocksdb' not in meta:
        meta['enable_rocksdb'] = True
    if 'enable_rocksdb' not in cluster:
        cluster['enable_rocksdb'] = True
    if 'config' not in meta:
        meta['config'] = {}
    if 'config' not in cluster:
        cluster['config'] = {}
    if 'config' not in cluster['comp']:
        cluster['comp']['config'] = {}
    if 'config' not in clustermgr:
        clustermgr['config'] = {}

    for node in comps:
        if 'config' not in node:
            node['config'] = {}
        mach = machines.get(node['ip'])
        mach['haspg'] = True
        if 'with_san' in node and node['with_san']:
            mach['pg_with_san'] = True
        set_comp_dirs1(machines, node)
        nconfig = node['config']
        nconfig['tornado_cn.enable_custom_planner'] = "true"
        nconfig['tornado_cn.data_dir'] = "\\'%s\\'" % node['tornado_cn.data_dir']
        addPortToMachine(portmap, node['ip'], node['port'])
        addPortToMachine(portmap, node['ip'], node['mysql_port'])
        addDirToMachine(dirmap, node['ip'], node['datadir'])
        addDirToMachine(dirmap, node['ip'], node['tornado_cn.data_dir'])

    if haproxy is not None:
        addPortToMachine(portmap, haproxy['ip'], haproxy['port'])
        if 'mysql_port' in haproxy:
            addPortToMachine(portmap, haproxy['ip'], haproxy['mysql_port'])

    if shard_ha_mode == '':
        shard_ha_mode = meta_ha_mode
    cluster['ha_mode'] = shard_ha_mode

    i=1
    for shard in datas:
        if 'config' not in shard:
            shard['config'] = {}
        nodecnt = len(shard['nodes'])
        if nodecnt == 0:
            raise ValueError('Error: There must be at least one node in data shard%d' % i)
        if nodecnt > 1 and shard_ha_mode == 'no_rep':
            raise ValueError('Error: shard_ha_mode is no_rep, but data shard%d has two or more' % i)
        elif nodecnt == 1 and shard_ha_mode != 'no_rep':
            raise ValueError('Error: shard_ha_mode is mgr/rbr, but data shard%d has only one' % i)
        hasPrimary=False
        for node in shard['nodes']:
            if 'config' not in node:
                node['config'] = {}
            mach = machines.get(node['ip'])
            addPortToMachine(portmap, node['ip'], node['port'])
            if 'xport' in node:
                addPortToMachine(portmap, node['ip'], node['xport'])
            if 'mgr_port' in node:
                addPortToMachine(portmap, node['ip'], node['mgr_port'])
            if 'tornado_sn_port' not in node:
                node['tornado_sn_port'] = node['port'] + 1
            addPortToMachine(portmap, node['ip'], node['tornado_sn_port'])
            set_storage_dirs1(machines, node)
            nconfig = node['config']
            nconfig['#tornado_sn_data_dir'] = node['tornado_sn_data_dir']
            nconfig['#tornado_sn_port'] = node['tornado_sn_port']
            addDirToMachine(dirmap, node['ip'], node['data_dir_path'])
            addDirToMachine(dirmap, node['ip'], node['log_dir_path'])
            addDirToMachine(dirmap, node['ip'], node['innodb_log_dir_path'])
            addDirToMachine(dirmap, node['ip'], node['tornado_sn_data_dir'])
            if node.get('is_primary', False):
                if hasPrimary:
                    raise ValueError('Error: Two primaries found in shard%d, there should be one and only one Primary specified !' % i)
                else:
                    hasPrimary = True
        if nodecnt > 1:
            if not hasPrimary:
                raise ValueError('Error: No primary found in shard%d, there should be one and only one primary specified !' % i)
        else:
            node['is_primary'] = True
        i+=1
    
    if 'ip' in clustermgr and 'nodes' in clustermgr:
        raise ValueError('Error: ip or nodes can not be both set for clustermgr !')
    elif 'ip' in clustermgr:
        node = {"ip": clustermgr['ip'], "config": {}}
        del clustermgr['ip']
        if 'brpc_raft_port' in clustermgr:
            node['brpc_raft_port'] = clustermgr['brpc_raft_port']
            del clustermgr['brpc_raft_port']
        else:
            node['brpc_raft_port'] = config['defbrpc_raft_port']
        addPortToMachine(portmap, node['ip'], node['brpc_raft_port'])
        if 'brpc_http_port' in clustermgr:
            node['brpc_http_port'] = clustermgr['brpc_http_port']
            del clustermgr['brpc_http_port']
        else:
            node['brpc_http_port'] = config['defbrpc_http_port']
        addPortToMachine(portmap, node['ip'], node['brpc_http_port'])
        clustermgr['nodes'] = [node]
    elif 'nodes' in clustermgr:
        for node in clustermgr['nodes']:
            if 'config' not in node:
                node['config'] = {}
            if 'brpc_raft_port' in node:
                addPortToMachine(portmap, node['ip'], node['brpc_raft_port'])
            else:
                node['brpc_raft_port'] = config['defbrpc_raft_port']
                addPortToMachine(portmap, node['ip'], config['defbrpc_raft_port'])
            if 'brpc_http_port' in node:
                addPortToMachine(portmap, node['ip'], node['brpc_http_port'])
            else:
                node['brpc_http_port'] = config['defbrpc_http_port']
                addPortToMachine(portmap, node['ip'], config['defbrpc_http_port'])
    else:
        raise ValueError('Error:ip or(x-or) nodes must be set for clustermgr !')

# Setup the machines object for clustermgr initialization/destroy scripts.
def setup_machines2(jscfg, machines):
    machnodes = jscfg.get('machines', [])
    meta = jscfg.get('meta', {})
    metanodes = meta.get('nodes', [])
    nodemgr = jscfg.get('node_manager', {"nodes": []})
    nodemgrnodes = nodemgr.get('nodes', [])
    clustermgr = jscfg.get('cluster_manager', {"nodes": []})
    clustermgrnodes = clustermgr.get('nodes', [])
    clusters = jscfg.get('clusters', [])
    config = jscfg['config']
    for mach in machnodes:
        normalize_string(mach, 'ip')
        ip=mach['ip']
        user=mach.get('user', config['defuser'])
        base=mach.get('basedir', config['defbase']).strip()
        sshport = mach.get('sshport', 22)
        password = mach.get('password', '')
        sshpass = mach.get('sshpass', False)
        addMachineToMap(machines, ip, user, base, sshport, password, sshpass)
    if len(metanodes) > 0:
        for node in metanodes:
            normalize_string(node, 'ip')
            addIpToMachineMap(machines, node['ip'], config)
    elif 'group_seeds' in meta:
        nodes = get_nodes_from_seeds(meta['group_seeds'])
        for node in nodes:
            normalize_string(node, 'ip')
            addIpToMachineMap(machines, node['ip'], config)
    for node in nodemgrnodes:
        normalize_string(node, 'ip')
        addIpToMachineMap(machines, node['ip'], config)
    for node in clustermgrnodes:
        normalize_string(node, 'ip')
        addIpToMachineMap(machines, node['ip'], config)
    if 'xpanel' in jscfg:
        if 'nodes' in jscfg['xpanel']:
            for node in jscfg['xpanel']['nodes']:
                normalize_string(node, 'ip')
                addIpToMachineMap(machines, node['ip'], config)
        else:
            normalize_string(jscfg['xpanel'], 'ip')
            addIpToMachineMap(machines, jscfg['xpanel']['ip'], config)
    if 'elasticsearch' in jscfg:
        normalize_string(jscfg['elasticsearch'], 'ip')
        addIpToMachineMap(machines, jscfg['elasticsearch']['ip'], config)
    if 'cdc' in jscfg:
        for node in jscfg['cdc']['nodes']:
            normalize_string(node, 'ip')
            addIpToMachineMap(machines, node['ip'], config)
    for cluster in clusters:
        for node in cluster['comp']['nodes']:
            normalize_string(node, 'ip')
            addIpToMachineMap(machines, node['ip'], config, True)
        for shard in cluster['data']:
            if 'ref' in shard:
                continue
            for node in shard['nodes']:
                normalize_string(node, 'ip')
                addIpToMachineMap(machines, node['ip'], config)
        if 'haproxy' in cluster:
            node = cluster['haproxy']
            normalize_string(node, 'ip')
            addIpToMachineMap(machines, node['ip'], config)

def set_storage_using_nodemgr(machines, item, noden, innodb_buf="1024MB"):
    if 'data_dir_path' not in item:
        item['data_dir_path'] = "%s/%s" % (noden['storage_datadirs'].split(",")[0], str(item['port']))
    if 'log_dir_path' not in item:
        item['log_dir_path'] = "%s/%s" % (noden['storage_logdirs'].split(",")[0], str(item['port']))
    if 'innodb_log_dir_path' not in item:
        item['innodb_log_dir_path'] = "%s/%s" % (noden['storage_waldirs'].split(",")[0], str(item['port']))
    if 'use_vec_engine' in item and item['use_vec_engine']:
        item['vec_data_dir'] = "%s/%s" % (noden['vec_tmpdir'], str(item['port']))
        item['config']['#tornado_sn_data_dir'] = item['vec_data_dir']
        if 'plugin_load_add' in item['config']:
            del item['config']['plugin_load_add']
    else:
        item['config']['plugin_load_add'] = 'libbinlog_backup.so'
    #my_print("setting keyring path for %s:%d" % (item['ip'], item['port']))
    if 'keyring_file_path' not in item:
        item['keyring_file_path'] = "%s/%s/keyring" % (noden['storage_keyringdir'], str(item['port']))
    mach = machines.get(item['ip'])
    item['program_dir'] = "instance_binaries/storage/%s" % str(item['port'])
    item['user'] = mach['user']
    if 'innodb_buffer_pool_size' not in item:
        item['innodb_buffer_pool_size'] = innodb_buf

def set_server_using_nodemgr(machines, item, noden):
    if 'datadir' not in item:
        item['datadir'] = "%s/%s" % (noden['server_datadirs'].split(",")[0], str(item['port']))
    item['program_dir'] = "instance_binaries/computer/%s" % str(item['port'])
    if 'use_vec_engine' in item and item['use_vec_engine']:
        item['vec_data_dir'] = "%s/%s" % (noden['vec_tmpdir'], str(item['port']))
        item['config']['tornado_cn.enable_custom_planner'] = "true"
        item['config']['tornado_cn.data_dir'] = "\\'%s\\'" % item['vec_data_dir']
    else:
        item['config']['shared_preload_libraries'] = "\\'global_deadlock_detector.so,remote_rel.so,ddl2kunlun.so\\'"

def set_upgrade_for_nodemgr_obj(item, config):
    if 'upgrade_all' not in item:
        item['upgrade_all'] = config['upgrade_all']
    if 'upgrade_storage' not in item:
        item['upgrade_storage'] = item['upgrade_all']
    if 'upgrade_server' not in item:
        item['upgrade_server'] = item['upgrade_all']
    if 'upgrade_proxysql' not in item:
        item['upgrade_proxysql'] = item['upgrade_all']
    if 'upgrade_nodemgr' not in item:
        item['upgrade_nodemgr'] = item['upgrade_all']

def set_upgrade_for_nodemgr_node(node, item):
    if 'upgrade_storage' not in node:
        node['upgrade_storage'] = item['upgrade_storage']
    if 'upgrade_server' not in node:
        node['upgrade_server'] = item['upgrade_server']
    if 'upgrade_proxysql' not in node:
        node['upgrade_proxysql'] = item['upgrade_proxysql']
    if 'upgrade_nodemgr' not in node:
        node['upgrade_nodemgr'] = item['upgrade_nodemgr']

# Notice that, for xpanel,cdc, clustermgr, current logic is same
# if that case changes, we will create different impls.

def set_upgrade_for_obj(item, config):
    if 'upgrade_all' not in item:
        item['upgrade_all'] = config['upgrade_all']

def set_upgrade_for_node(node, item):
    if 'upgrade' not in node:
        node['upgrade'] = item['upgrade_all']

def check_license_config(jscfg):
    if 'license' not in jscfg:
        raise ValueError('license is not configured!')
    licobj = jscfg['license']
    if 'custom_info' not in licobj:
        raise ValueError('custom_info is not configured!')
    if 'valid_days' not in licobj:
        raise ValueError('valid_days is not configured!')
    if 'issue_time' not in licobj:
        raise ValueError('issue_time is not configured!')
    # currently, the license file is named as 'license.lic'
    if 'license_file' not in licobj:
        licobj['license_file'] = 'license.lic'

# validate and set the configuration object for clustermgr initialization/destroy scripts.
def validate_and_set_config2(jscfg, machines):
    meta = jscfg.get('meta', {'nodes':[]})
    if not 'nodes' in meta:
        meta['nodes'] = []
    clustermgr = jscfg.get('cluster_manager', {'nodes':[]})
    if not 'nodes' in clustermgr:
        clustermgr['nodes'] = []
    if not 'cluster_manager' in clustermgr:
        jscfg['cluster_manager'] = clustermgr
    nodemgr = jscfg.get('node_manager', {'nodes':[]})
    if not 'nodes' in nodemgr:
        nodemgr['nodes'] = []
    if not 'node_manager' in nodemgr:
        jscfg['node_manager'] = nodemgr
    clusters = jscfg.get('clusters', [])
    if not 'clusters' in jscfg:
        jscfg['clusters'] = clusters
    config = jscfg['config']
    if 'need_ssl' not in config:
        config['need_ssl'] = False
    iscantian = config['cantian']
    arch = config['targetarch']

    portmap = {}
    dirmap = {}

    clustermgrips = set()
    if 'config' not in clustermgr:
        clustermgr['config'] = {}
    if 'sql_ssl_connect' in clustermgr['config']:
        config['need_ssl'] = True
    set_upgrade_for_obj(clustermgr, config)
    for node in clustermgr['nodes']:
        if 'config' not in node:
            node['config'] = {}
        if 'sql_ssl_connect' in node['config']:
            config['need_ssl'] = True
        set_upgrade_for_node(node, clustermgr)
        if node['ip'] in clustermgrips:
            raise ValueError('Error: %s exists, only one cluster_mgr can be run on a machine!' % node['ip'])
        if 'valgrind' not in node:
            node['valgrind'] = False
        clustermgrips.add(node['ip'])
        if 'brpc_raft_port' not in node:
            node['brpc_raft_port'] = config['defbrpc_raft_port_clustermgr']
        addPortToMachine(portmap, node['ip'], node['brpc_raft_port'])
        if 'brpc_http_port' not in node:
            node['brpc_http_port'] = config['defbrpc_http_port_clustermgr']
        addPortToMachine(portmap, node['ip'], node['brpc_http_port'])
        if 'prometheus_port_start' not in node:
            node['prometheus_port_start'] = config['defprometheus_port_start_clustermgr']
        addPortToMachine(portmap, node['ip'], node['prometheus_port_start'])

    if 'cdc' in jscfg:
        cdcips = set()
        cdcseeds = []
        cdc = jscfg['cdc']
        curid = 1
        if 'config' not in cdc:
            cdc['config'] = {}
        set_upgrade_for_obj(cdc, config)
        for node in cdc['nodes']:
            if 'config' not in node:
                node['config'] = {}
            set_upgrade_for_node(node, cdc)
            node['server_id'] = curid
            if 'valgrind' not in node:
                node['valgrind'] = False
            if node['ip'] in cdcips:
                raise ValueError('Error: %s exists, only one cdc can be run on a machine!' % node['ip'])
            cdcips.add(node['ip'])
            if 'raft_port' not in node:
                node['raft_port'] = config['defraft_port_cdc']
            addPortToMachine(portmap, node['ip'], node['raft_port'])
            if 'http_port' not in node:
                node['http_port'] = config['defhttp_port_cdc']
            addPortToMachine(portmap, node['ip'], node['http_port'])
            cdcseeds.append("%s:%d" % (node['ip'], node['raft_port']))
            curid += 1
        ha_seed_str = ",".join(cdcseeds)
        for node in cdc['nodes']:
            node['ha_group_member'] = ha_seed_str

    defpaths = {
            "server_datadirs": "server_datadir",
            "storage_datadirs": "storage_datadir",
            "storage_logdirs": "storage_logdir",
            "storage_waldirs": "storage_waldir",
            "storage_keyringdir": "storage_keyringdir",
            "vec_tmpdir": "vec_tmpdir"
        }
    nodemgrips = set()
    nodemgrmaps = {}
    if 'config' not in nodemgr:
        nodemgr['config'] = {}
    if 'sql_ssl_connect' in nodemgr['config']:
        config['need_ssl'] = True
    set_upgrade_for_nodemgr_obj(nodemgr, config)
    for node in nodemgr['nodes']:
        if 'config' not in node:
            node['config'] = {}
        if 'sql_ssl_connect' in node['config']:
            config['need_ssl'] = True
        set_upgrade_for_nodemgr_node(node, nodemgr)
        node['storage_usedports'] = []
        node['server_usedports'] = []
        if 'nodetype' not in node:
            node['nodetype'] = 'both'
        if 'valgrind' not in node:
            node['valgrind'] = False
        if 'skip' not in node:
            node['skip'] = False
        # default 8 cpus
        if 'total_cpu_cores' not in node:
            node['total_cpu_cores'] = 8
        # default 16GB memory.
        if 'total_mem' not in node:
            node['total_mem'] = 16384
        if 'storage_portrange' not in node:
            node['storage_portrange'] = config['defstorage_portrange_nodemgr']
        if 'server_portrange' not in node:
            node['server_portrange'] = config['defserver_portrange_nodemgr']
        if 'verbose_log' not in node:
            node['verbose_log'] = False
        # validate other configurations
        if 'storage_curport' not in node:
            range1 = node['storage_portrange'].split('-')
            node['storage_curport'] = int(range1[0]) + 1
        if 'server_curport' not in node:
            range2 = node['server_portrange'].split('-')
            node['server_curport'] = int(range2[0]) + 1
        mach = machines.get(node['ip'])
        if node['ip'] in nodemgrips:
            raise ValueError('Error: %s exists, only one node_mgr can be run on a machine!' % node['ip'])
        nodemgrips.add(node['ip'])
        nodemgrmaps[node['ip']] = node
        if 'brpc_http_port' not in node:
            node['brpc_http_port'] = config['defbrpc_http_port_nodemgr']
        addPortToMachine(portmap, node['ip'], node['brpc_http_port'])
        if 'tcp_port' not in node:
            node['tcp_port'] = config['deftcp_port_nodemgr']
        addPortToMachine(portmap, node['ip'], node['tcp_port'])
        if 'prometheus_port_start' not in node:
            node['prometheus_port_start'] = config['defprometheus_port_start_nodemgr']
        addPortToMachine(portmap, node['ip'], node['prometheus_port_start'])
        # The logic is that:
        # - if it is set, check every item is an absolute path.
        # - if it is not set, it is default to $basedir/{server_datadir, storage_datadir, storage_logdir, storage_waldir}
        for item in ["server_datadirs", "storage_datadirs", "storage_logdirs", "storage_waldirs", "storage_keyringdir", "vec_tmpdir"]:
            if item in node:
                nodedirs = node[item].strip()
                dirs = set()
                for d in nodedirs.split(","):
                    formald = d.strip()
                    #addDirToMachine(dirmap, node['ip'], d)
                    if not formald.startswith('/'):
                        raise ValueError('Error: the dir in %s must be absolute path!' % item)
                    if formald in dirs:
                        raise ValueError('Error: duplicate dir on %s(%s): %s!' % (node['ip'], item, d))
                    dirs.add(formald)
            else:
                node[item] = "%s/%s" % (mach['basedir'], defpaths[item])

    ha_mode = meta.get('ha_mode', '')
    nodecnt = len(meta['nodes'])
    if ha_mode == '':
        if nodecnt > 1:
            ha_mode = 'rbr'
        else:
            ha_mode = 'no_rep'
    meta['ha_mode'] = ha_mode
    if nodecnt == 0 and 'meta' in jscfg and not 'group_seeds' in meta:
        raise ValueError('Error: There must be at least one node in meta shard')
    if nodecnt > 1 and ha_mode == 'no_rep' and not iscantian:
        raise ValueError('Error: ha_mode is no_rep, but there are multiple nodes in meta shard')
    elif nodecnt == 1 and ha_mode != 'no_rep':
        raise ValueError('Error: ha_mode is mgr/rbr, but there is only one node in meta shard')
    hasPrimary=False
    meta_addrs = []
    if 'fullsync_level' not in meta:
        meta['fullsync_level'] = 1
    if 'config' not in meta:
        meta['config'] = {}
    for node in meta['nodes']:
        if 'config' not in node:
            node['config'] = {}
        if 'fullsync' not in node:
            if ha_mode == 'rbr':
                node['fullsync'] = 1
            else:
                node['fullsync'] = 0
        # These attr should not be set explicitly.
        for attr in ['data_dir_path', 'log_dir_path', 'innodb_log_dir_path']:
            if attr in node:
                raise ValueError('%s can not be set explicitly for meta node %s' % (attr, node['ip']))
        if node['ip'] not in nodemgrips:
            nodem = get_default_nodemgr(config, machines, node['ip'], "storage")
            nodemgr['nodes'].append(nodem)
            nodemgrmaps[node['ip']] = nodem
            nodemgrips.add(node['ip'])
        # node['nodemgr'] = nodemgrmaps.get(node['ip'])
        nodemgrobj = nodemgrmaps.get(node['ip'])
        fix_nodemgr_nodetype(nodemgrobj, 'storage')
        if 'port' not in node:
            node['port'] = get_nodemgr_nextport(nodemgrobj, "storage", 3)
        addPortToMachine(portmap, node['ip'], node['port'])
        # This is reserved for prometheus
        addPortToMachine(portmap, node['ip'], node['port'] + 1)
        # reserved for vec-engine
        addPortToMachine(portmap, node['ip'], node['port'] + 2)
        addto_usedports(nodemgrobj, 'storage', node['port'])
        set_storage_using_nodemgr(machines, node, nodemgrobj, "1024MB")
        meta_addrs.append("%s:%s" % (node['ip'], str(node['port'])))
        if meta['ha_mode'] == 'mgr':
            if 'xport' not in node:
                node['xport'] = get_nodemgr_nextport(nodemgrobj, "storage", 1)
            addPortToMachine(portmap, node['ip'], node['xport'])
            addto_usedports(nodemgrobj, 'storage', node['xport'])
            if 'mgr_port' not in node:
                node['mgr_port'] = get_nodemgr_nextport(nodemgrobj, "storage", 1)
            addPortToMachine(portmap, node['ip'], node['mgr_port'])
            addto_usedports(nodemgrobj, 'storage', node['mgr_port'])
        # keep 3 ports, for possible other usages -- not required currently.
        # get_nodemgr_nextport(nodemgrobj, "storage", 3)
        if 'election_weight' not in node:
            node['election_weight'] = 50
        if 'is_primary' not in node:
            node['is_primary'] = False
        if node['is_primary']:
            if hasPrimary:
                raise ValueError('Error: Two primaries found in meta shard, there should be one and only one Primary specified !')
            else:
                hasPrimary = True
    if nodecnt > 0:
        if not hasPrimary:
            meta['nodes'][0]['is_primary'] = True

    if 'meta' not in jscfg:
        jscfg['meta'] = {'nodes':[], 'group_seeds':""}
        meta = jscfg['meta']
    if (len(meta_addrs) > 0):
        meta['group_seeds'] = ",".join(meta_addrs)
    if 'enable_rocksdb' not in meta:
        meta['enable_rocksdb'] = True
    if 'enable_ssl_connect' not in meta:
        meta['enable_ssl_connect'] = False
    if meta['enable_ssl_connect']:
        config['need_ssl'] = True

    if 'backup' in jscfg:
        if 'hdfs' in jscfg['backup']:
            node = jscfg['backup']['hdfs']
            addPortToMachine(portmap, node['ip'], node['port'])
        if 'ssh' in jscfg['backup']:
            node = jscfg['backup']['ssh']
            if 'port' not in node:
                node['port'] = 22
            addPortToMachine(portmap, node['ip'], node['port'])
            if 'user' not in node:
                raise ValueError('Error: user must be specified for ssh backup!')
            if 'targetDir' not in node:
                raise ValueError('Error: targetDir must be specified for ssh backup!')

    if 'xpanel' in jscfg:
        xpanel = jscfg['xpanel']
        if 'saas' not in xpanel:
            xpanel['saas'] = 0
        if 'config' not in xpanel:
            xpanel['config'] = {}
        set_upgrade_for_obj(xpanel, config)
        if 'imageType' not in xpanel:
            xpanel['imageType'] = 'url'
        if xpanel['imageType'] == 'file':
            if arch == 'x86_64':
                xpanel['image'] = 'kunlun-xpanel:%s' % config['product_version']
                xpanel['upgrade_image'] = 'kunlun-xpanel:%s' % config['upgrade_version']
            else:
                xpanel['image'] = 'kunlun-xpanel:%s-%s' % (config['product_version'], arch)
                xpanel['upgrade_image'] = 'kunlun-xpanel:%s-%s' % (config['upgrade_version'], arch)
        else:
            xpanel['upgrade_image'] = xpanel['image']
            xpanel['image'] = xpanel['image'].replace('VERSION', config['product_version'])
            xpanel['upgrade_image'] = xpanel['upgrade_image'].replace('VERSION', config['upgrade_version'])
        if 'imageFile' not in xpanel:
            xpanel['imageFile'] = 'kunlun-xpanel-%s.tar.gz' % config['product_version']
            xpanel['upgrade_imageFile'] = 'kunlun-xpanel-%s.tar.gz' % config['upgrade_version']
        if 'nodes' in xpanel:
            xips = set()
            for node in xpanel['nodes']:
                if 'config' not in node:
                    node['config'] = {}
                set_upgrade_for_node(node, xpanel)
                if node['ip'] in xips:
                    raise ValueError('Error: %s exists, only one xpanel can be run on a machine!' % node['ip'])
                xips.add(node['ip'])
                if 'port' not in node:
                    node['port'] = 18080
                addPortToMachine(portmap, node['ip'], node['port'])
                if 'name' not in node:
                    node['name'] = 'xpanel_%d' % node['port']
        else:
            node = xpanel
            if 'port' not in node:
                node['port'] = 18080
            addPortToMachine(portmap, node['ip'], node['port'])
            if 'name' not in node:
                node['name'] = 'xpanel_%d' % node['port']
            xpanel['nodes'] = [{"ip":node['ip'], "port": node['port'], "name":node["name"], "upgrade": xpanel['upgrade_all']}]

    if 'elasticsearch' in jscfg:
        node = jscfg['elasticsearch']
        if 'ip' not in node:
            raise ValueError('Error: the ip of elasticsearch must be specified!')
        if 'port' not in node:
            node['port'] = 9200
        addPortToMachine(portmap, node['ip'], node['port'])
        if 'kibana_port' not in node:
            node['kibana_port'] = 5601
        addPortToMachine(portmap, node['ip'], node['kibana_port'])

    cnames = set()
    cluster_map = {}
    for cluster in clusters:
        if 'config' not in cluster:
            cluster['config'] = {}
        if 'kunlun_enable_jit' not in cluster['config'] and not config['mariadb'] and not config['cantian']:
            cluster['config']['kunlun_enable_jit'] = "ON"
        if 'name' not in cluster:
            raise ValueError('Error: the name of cluster must be specified!')
        if cluster['name'] in cnames:
            raise ValueError('Error: the name of cluster should be unique !')
        else:
            cnames.add(cluster['name'])
            cluster_map[cluster['name']] = cluster
        if  'ha_mode' not in cluster:
            cluster['ha_mode'] = 'rbr'
        if 'enable_degrade' not in cluster:
            cluster['enable_degrade'] = False
        if 'enable_global_mvcc' not in cluster:
            cluster['enable_global_mvcc'] = False
        if 'degrade_time' not in cluster:
            cluster['degrade_time'] = 15
        if 'storage_template' not in cluster:
            cluster['storage_template'] = 'normal'
        if 'innodb_buffer_pool_size_MB' not in cluster:
            cluster['innodb_buffer_pool_size_MB'] = 1024
        innodb_sizeMB = cluster['innodb_buffer_pool_size_MB']
        if 'fullsync_level' not in cluster:
            cluster['fullsync_level'] = 1
        if 'max_connections' not in cluster:
            cluster['max_connections'] = 1000
        if 'max_storage_size_GB' not in cluster:
            cluster['max_storage_size_GB'] = 20
        if 'storage_cpu_cores' not in cluster:
            cluster['storage_cpu_cores'] = 8
        if 'enable_rocksdb' not in cluster:
            cluster['enable_rocksdb'] = True
        cluster['dbcfg'] = 0
        if cluster['storage_template'] == 'small':
            cluster['dbcfg'] = 1
        storage_sizeGB = cluster['max_storage_size_GB']
        if 'use_vec_engine' not in cluster:
            cluster['use_vec_engine'] = True
        use_vec_engine = cluster['use_vec_engine']
        ha_mode = cluster['ha_mode']
        validate_ha_mode(ha_mode)
        comps = cluster['comp']
        datas = cluster['data']
        if 'config' not in comps:
            comps['config'] = {}
        for node in comps['nodes']:
            if 'config' not in node:
                node['config'] = {}
            node['use_vec_engine'] = use_vec_engine
            mach = machines.get(node['ip'])
            mach['haspg'] = True
            if 'with_san' in node and node['with_san']:
                mach['pg_with_san'] = True
            if node['ip'] not in nodemgrips:
                nodem = get_default_nodemgr(config, machines, node['ip'], "server")
                nodemgr['nodes'].append(nodem)
                nodemgrmaps[node['ip']] = nodem
                nodemgrips.add(node['ip'])
            nodemgrobj = nodemgrmaps.get(node['ip'])
            fix_nodemgr_nodetype(nodemgrobj, 'server')
            if 'port' not in node:
                node['port'] = get_nodemgr_nextport(nodemgrobj, "server", 1)
            addPortToMachine(portmap, node['ip'], node['port'])
            # This is reserved for prometheus.
            addPortToMachine(portmap, node['ip'], node['port'] + 2)
            addto_usedports(nodemgrobj, 'server', node['port'])
            if 'mysql_port' not in node:
                node['mysql_port'] = get_nodemgr_nextport(nodemgrobj, "server", 2)
            # keep 3 ports for other usage.
            get_nodemgr_nextport(nodemgrobj, "server", 3)
            addPortToMachine(portmap, node['ip'], node['mysql_port'])
            addto_usedports(nodemgrobj, 'server', node['mysql_port'])
            set_server_using_nodemgr(machines, node, nodemgrobj)
        i = 1
        for shard in datas:
            if 'ref' in shard:
                shard['is_ref'] = True
                ref = shard['ref']
                refname = ref['name']
                if refname == 'meta':
                    shard['nodes'] = meta['nodes']
                    shard['config'] = meta['config']
                else:
                    refcluster = cluster_map[refname]
                    refshardid = ref['id']
                    shard['nodes'] = refcluster['data'][i]['nodes']
                    shard['config'] = refcluster['data'][i]['config']
                for node in shard['nodes']:
                    if not use_vec_engine:
                        break
                    if 'use_vec_engine' not in node or not node['use_vec_engine']:
                        node['use_vec_engine'] = use_vec_engine
                        vecport = node['port'] + 2
                        node['config']['#tornado_sn_port'] = str(vecport)
                        nodemgrobj = nodemgrmaps.get(node['ip'])
                        set_storage_using_nodemgr(machines, node, nodemgrobj)
                continue
            else:
                shard['is_ref'] = False
            if 'config' not in shard:
                shard['config'] = {}
            nodecnt = len(shard['nodes'])
            if nodecnt == 0:
                raise ValueError('Error: There must be at least one node in the shard')
            if ha_mode == 'no_rep' and nodecnt > 1  and not iscantian:
                raise ValueError('Error: ha_mode is no_rep, but there are multiple nodes in the shard')
            elif nodecnt == 1 and ha_mode != 'no_rep':
                raise ValueError('Error: ha_mode is mgr/rbr, but there is only one node in the shard')
            hasPrimary = False
            for node in shard['nodes']:
                if 'config' not in node:
                    node['config'] = {}
                if not config['mariadb'] and not config['cantian']:
                    node['use_vec_engine'] = use_vec_engine
                if node['ip'] not in nodemgrips:
                    nodem = get_default_nodemgr(config, machines, node['ip'], "storage")
                    nodemgr['nodes'].append(nodem)
                    nodemgrmaps[node['ip']] = nodem
                    nodemgrips.add(node['ip'])
                nodemgrobj = nodemgrmaps.get(node['ip'])
                fix_nodemgr_nodetype(nodemgrobj, 'storage')
                if 'port' not in node:
                    node['port'] = get_nodemgr_nextport(nodemgrobj, "storage", 3)
                if 'fullsync' not in node:
                    node['fullsync'] = 1
                addPortToMachine(portmap, node['ip'], node['port'])
                # This is reserved for prometheus
                addPortToMachine(portmap, node['ip'], node['port'] + 1)
                # reserved for vec-engine
                addPortToMachine(portmap, node['ip'], node['port'] + 2)
                if use_vec_engine:
                    vecport = node['port'] + 2
                    node['config']['#tornado_sn_port'] = str(vecport)
                addto_usedports(nodemgrobj, 'storage', node['port'])
                set_storage_using_nodemgr(machines, node, nodemgrobj, '%dMB' % innodb_sizeMB)
                if ha_mode == 'mgr':
                    if 'xport' not in node:
                        node['xport'] = get_nodemgr_nextport(nodemgrobj, "storage", 1)
                    addPortToMachine(portmap, node['ip'], node['xport'])
                    addto_usedports(nodemgrobj, 'storage', node['xport'])
                    if 'mgr_port' not in node:
                        node['mgr_port'] = get_nodemgr_nextport(nodemgrobj, "storage", 1)
                    addPortToMachine(portmap, node['ip'], node['mgr_port'])
                    addto_usedports(nodemgrobj, 'storage', node['mgr_port'])
                # keep 3 ports for possible other usage. -- not required currently.
                # get_nodemgr_nextport(nodemgrobj, "storage", 3)
                if 'election_weight' not in node:
                    node['election_weight'] = 50
                if 'is_primary' not in node:
                    node['is_primary'] = False
                if node['is_primary']:
                    if hasPrimary:
                        raise ValueError('Error: Two primaries found in %s-shard%d, there should be one and only one Primary specified !' % (cluster['name'], i))
                    else:
                        hasPrimary = True
            if nodecnt > 0:
                if not hasPrimary:
                    shard['nodes'][0]['is_primary'] = True
            i += 1

        if 'haproxy' in cluster:
            node = cluster['haproxy']
            if 'config' not in node:
                node['config'] = {}
            addPortToMachine(portmap, node['ip'], node['port'])
            if 'mysql_port' in node:
                addPortToMachine(portmap, node['ip'], node['mysql_port'])
    jscfg['cluster_map'] = cluster_map

    if config['verbose']:
        for node in nodemgr['nodes']:
            my_print(str(node))

# Cluster installation will be added later, so not cover this now.
def setup_machines3(jscfg, machines):
    machnodes = jscfg.get('machines', [])
    meta = jscfg['meta']
    metanodes = meta.get('nodes', [])
    nodemgr = jscfg.get('node_manager', {"nodes": []})
    nodemgrnodes = nodemgr.get('nodes', [])
    clustermgr = jscfg.get('cluster_manager', {"nodes": []})
    clustermgrnodes = clustermgr.get('nodes', [])
    xpanel = jscfg.get("xpanel", {"nodes": []})
    xpanelnodes = xpanel.get('nodes', [])
    config = jscfg['config']
    for mach in machnodes:
        normalize_string(mach, 'ip')
        ip=mach['ip']
        user=mach.get('user', config['defuser'])
        base=mach.get('basedir', config['defbase']).strip()
        sshport = mach.get('sshport', 22)
        password = mach.get('password', '')
        sshpass = mach.get('sshpass', False)
        addMachineToMap(machines, ip, user, base, sshport, password, sshpass)
    if len(metanodes) > 0:
        for node in metanodes:
            normalize_string(node, 'ip')
            addIpToMachineMap(machines, node['ip'], config)
    elif 'group_seeds' in meta:
        nodes = get_nodes_from_seeds(meta['group_seeds'])
        for node in nodes:
            normalize_string(node, 'ip')
            addIpToMachineMap(machines, node['ip'], config)
    for node in nodemgrnodes:
        normalize_string(node, 'ip')
        addIpToMachineMap(machines, node['ip'], config)
    for node in clustermgrnodes:
        normalize_string(node, 'ip')
        addIpToMachineMap(machines, node['ip'], config)
    if 'elasticsearch' in jscfg:
        normalize_string(jscfg['elasticsearch'], 'ip')
        addIpToMachineMap(machines, jscfg['elasticsearch']['ip'], config)
    for node in xpanelnodes:
        normalize_string(node, 'ip')
        addIpToMachineMap(machines, node['ip'], config)

# Cluster installation is not added yet, so not cover this currently.
def validate_and_set_config3(jscfg, machines):
    # check data centers first.
    dcs = jscfg.get('datacenters', [])
    meta = jscfg.get('meta', {'nodes':[]})
    if not 'nodes' in meta:
        meta['nodes'] = []
    clustermgr = jscfg.get('cluster_manager', {'nodes':[]})
    if not 'nodes' in clustermgr:
        clustermgr['nodes'] = []
    if not 'cluster_manager' in clustermgr:
        jscfg['cluster_manager'] = clustermgr
    nodemgr = jscfg.get('node_manager', {'nodes':[]})
    if not 'nodes' in nodemgr:
        nodemgr['nodes'] = []
    if not 'node_manager' in nodemgr:
        jscfg['node_manager'] = nodemgr
    clusters = jscfg.get('clusters', [])
    if not 'clusters' in jscfg:
        jscfg['clusters'] = clusters
    config = jscfg['config']
    if 'need_ssl' not in config:
        config['need_ssl'] = False
    arch = config['targetarch']

    firstboot = False
    # if we specify meta nodes, it is real bootstrap operation.
    if len(meta['nodes']) > 0:
        firstboot = True

    dcnames = set()
    dcmap = {}
    dcprimary = None
    dcsecondarylist = []
    dcstandbylist = []
    for node in dcs:
        if node['name'] in dcnames:
            raise ValueError('Error: duplicate dc name:%s!' % node['name'])
        dcnames.add(node['name'])
        dcmap[node['name']] = node
        if 'skip' not in node:
            node['skip'] = False
        if 'province' not in node:
            node['province'] = None
        if 'city' not in node:
            node['city'] = None
        if 'is_primary' not in node:
            node['is_primary'] = False
        if node['is_primary']:
            if dcprimary is not None:
                raise ValueError('Error: there should be only one primary datacenter!')
            dcprimary = node

    if config['verbose']:
        my_print("data centers:%s\n" % str(dcs))

    if firstboot:
        if dcprimary is None:
            raise ValueError('Error: primary is not set for bootstrap operation !')
        for node in dcs:
            if node.get('skip', False):
                raise ValueError('Error: datacenter %s must be initialized during bootstrap !' % node['name'])
    
    pdcprovince = None
    pdccity = None
    if dcprimary is not None:
        pdcprovince = dcprimary['province']
        pdccity = dcprimary['city']
    for node in dcs:
        if node is dcprimary:
            continue
        if node['province'] == pdcprovince and node['city'] == pdccity:
            dcsecondarylist.append(node)
        else:
            dcstandbylist.append(node)
    jscfg['dcprimary'] = dcprimary
    jscfg['dcsecondarylist'] = dcsecondarylist
    jscfg['dcstandbylist'] = dcstandbylist

    if config['verbose']:
        if dcprimary is not None:
            my_print("primary datacenter:%s\n" % str(dcprimary))
        my_print("secondary datacenters:%s\n" % str(dcsecondarylist))
        my_print("standby datacenters:%s\n" % str(dcstandbylist))

    portmap = {}
    dirmap = {}
    ipdcmap = {}

    defpaths = {
            "server_datadirs": "server_datadir",
            "storage_datadirs": "storage_datadir",
            "storage_logdirs": "storage_logdir",
            "storage_waldirs": "storage_waldir",
            "storage_keyringdir": "storage_keyringdir",
            "vec_tmpdir": "vec_tmpdir",
        }
    nodemgrips = set()
    nodemgrmaps = {}
    if 'config' not in nodemgr:
        nodemgr['config'] = {}
    if 'sql_ssl_connect' in nodemgr['config']:
        config['need_ssl'] = True
    set_upgrade_for_nodemgr_obj(nodemgr, config)
    for node in nodemgr['nodes']:
        if 'config' not in node:
            node['config'] = {}
        if 'sql_ssl_connect' in node['config']:
            config['need_ssl'] = True
        set_upgrade_for_nodemgr_node(node, nodemgr)
        if node['ip'] in nodemgrips:
            raise ValueError('Error: %s exists, only one node_mgr can be run on a machine!' % node['ip'])
        if 'dc' not in node:
            raise ValueError('Error: dc attribute for the node(%s) must be specified !' % node['ip'])
        elif node['dc'] not in dcnames:
            if firstboot:
                raise ValueError('Error: unknown datacenter %s !' % node['dc'])
            else:
                dcnames.add(node['dc'])
                dcs.append(get_default_datacenter(node['dc']))
        ipdcmap[node['ip']] = node['dc']
        node['storage_usedports'] = []
        node['server_usedports'] = []
        if 'nodetype' not in node:
            node['nodetype'] = 'both'
        if 'valgrind' not in node:
            node['valgrind'] = False
        if 'skip' not in node:
            node['skip'] = False
        if 'verbose_log' not in node:
            node['verbose_log'] = False
        # default 8 cpus
        if 'total_cpu_cores' not in node:
            node['total_cpu_cores'] = 8
        # default 16GB memory.
        if 'total_mem' not in node:
            node['total_mem'] = 16384
        if 'storage_portrange' not in node:
            node['storage_portrange'] = config['defstorage_portrange_nodemgr']
        if 'server_portrange' not in node:
            node['server_portrange'] = config['defserver_portrange_nodemgr']
        # validate other configurations
        if 'storage_curport' not in node:
            range1 = node['storage_portrange'].split('-')
            node['storage_curport'] = int(range1[0]) + 1
        if 'server_curport' not in node:
            range2 = node['server_portrange'].split('-')
            node['server_curport'] = int(range2[0]) + 1
        mach = machines.get(node['ip'])
        nodemgrips.add(node['ip'])
        nodemgrmaps[node['ip']] = node
        if 'brpc_http_port' not in node:
            node['brpc_http_port'] = config['defbrpc_http_port_nodemgr']
        addPortToMachine(portmap, node['ip'], node['brpc_http_port'])
        if 'tcp_port' not in node:
            node['tcp_port'] = config['deftcp_port_nodemgr']
        addPortToMachine(portmap, node['ip'], node['tcp_port'])
        if 'prometheus_port_start' not in node:
            node['prometheus_port_start'] = config['defprometheus_port_start_nodemgr']
        addPortToMachine(portmap, node['ip'], node['prometheus_port_start'])
        # The logic is that:
        # - if it is set, check every item is an absolute path.
        # - if it is not set, it is default to $basedir/{server_datadir, storage_datadir, storage_logdir, storage_waldir}
        for item in ["server_datadirs", "storage_datadirs", "storage_logdirs", "storage_waldirs", "storage_keyringdir", "vec_tmpdir"]:
            if item in node:
                nodedirs = node[item].strip()
                dirs = set()
                for d in nodedirs.split(","):
                    formald = d.strip()
                    #addDirToMachine(dirmap, node['ip'], d)
                    if not formald.startswith('/'):
                        raise ValueError('Error: the dir in %s must be absolute path!' % item)
                    if formald in dirs:
                        raise ValueError('Error: duplicate dir on %s(%s): %s!' % (node['ip'], item, d))
                    dirs.add(formald)
            else:
                node[item] = "%s/%s" % (mach['basedir'], defpaths[item])

    if firstboot:
        for node in nodemgr['nodes']:
            if node['skip']:
                raise ValueError('Error: node %s must be initialized during bootstrap !' % node['ip'])

    if config['verbose']:
        my_print("ipdcmap:%s\n" % str(ipdcmap))

    clustermgrips = set()
    dc_clustermgr_map = {}
    if 'config' not in clustermgr:
        clustermgr['config'] = {}
    if 'sql_ssl_connect' in clustermgr['config']:
        config['need_ssl'] = True
    set_upgrade_for_obj(clustermgr, config)
    for node in clustermgr['nodes']:
        if 'config' not in node:
            node['config'] = {}
        if 'sql_ssl_connect' in node['config']:
            config['need_ssl'] = True
        set_upgrade_for_node(node, clustermgr)
        if node['ip'] in clustermgrips:
            raise ValueError('Error: %s exists, only one cluster_mgr can be run on a machine!' % node['ip'])
        if 'dc' not in node:
            if node['ip'] not in ipdcmap:
                raise ValueError('Error: datacenter not specified for clustermgr node %s!' % node['ip'])
            else:
                addToListMap(dc_clustermgr_map, ipdcmap[node['ip']], node)
        else:
            if node['dc'] not in dcnames:
                raise ValueError('Error: unknown datacenter %s !' % node['dc'])
            elif node['ip'] in ipdcmap and ipdcmap[node['ip']] != node['dc']:
                raise ValueError('Error: conflict datacenter specification for %s !' % node['ip'])
            else:
                addToListMap(dc_clustermgr_map, node['dc'], node)
        if 'valgrind' not in node:
            node['valgrind'] = False
        clustermgrips.add(node['ip'])
        if 'brpc_raft_port' not in node:
            node['brpc_raft_port'] = config['defbrpc_raft_port_clustermgr']
        addPortToMachine(portmap, node['ip'], node['brpc_raft_port'])
        if 'brpc_http_port' not in node:
            node['brpc_http_port'] = config['defbrpc_http_port_clustermgr']
        addPortToMachine(portmap, node['ip'], node['brpc_http_port'])
        if 'prometheus_port_start' not in node:
            node['prometheus_port_start'] = config['defprometheus_port_start_clustermgr']
        addPortToMachine(portmap, node['ip'], node['prometheus_port_start'])

    if firstboot:
        for dc in dcnames:
            if dc not in dc_clustermgr_map or len(dc_clustermgr_map[dc]) < 2:
                raise ValueError('Error: there must be at least two clustermgr nodes in datacenter %s during bootstrap!' % dc)
        # set weight
        # nodes in primary has the highest weight
        # nodes in secondaries has middle weight
        # nodes in standby has the lowest weight
        for node in dc_clustermgr_map[dcprimary['name']]:
            if check_version_to_minor(config['product_version'], 1, 2):
                node['brpc_raft_election_timeout_ms'] = 3000
        for dc in dcsecondarylist:
            for node in dc_clustermgr_map[dc['name']]:
                if check_version_to_minor(config['product_version'], 1, 2):
                    node['brpc_raft_election_timeout_ms'] = 6000
        for dc in dcstandbylist:
            for node in dc_clustermgr_map[dc['name']]:
                if check_version_to_minor(config['product_version'], 1, 2):
                    node['brpc_raft_election_timeout_ms'] = 9000

    if 'ha_mode' not in meta:
        meta['ha_mode'] = 'rbr'
    elif meta['ha_mode'] != 'rbr':
        raise ValueError('Error: ha_mode for meta must be rbr currently !')
    nodecnt = len(meta['nodes'])
    if nodecnt == 0 and 'group_seeds' not in meta:
        raise ValueError('Error: meta nodes must specified during bootstrap!')
    dc_meta_map = {}
    if 'config' not in meta:
        meta['config'] = {}
    for node in meta['nodes']:
        if 'config' not in node:
            node['config'] = {}
        # These attr should not be set explicitly.
        for attr in ['data_dir_path', 'log_dir_path', 'innodb_log_dir_path']:
            if attr in node:
                raise ValueError('%s can not be set explicitly for meta node %s' % (attr, node['ip']))
        if node['ip'] not in nodemgrips:
            raise ValueError('Error: no node_manager item for node %s!' % node['ip'])
        addToListMap(dc_meta_map, ipdcmap[node['ip']], node)
        # node['nodemgr'] = nodemgrmaps.get(node['ip'])
        nodemgrobj = nodemgrmaps.get(node['ip'])
        fix_nodemgr_nodetype(nodemgrobj, 'storage')
        if 'port' not in node:
            node['port'] = get_nodemgr_nextport(nodemgrobj, "storage", 3)
        addPortToMachine(portmap, node['ip'], node['port'])
        # This is reserved for prometheus
        addPortToMachine(portmap, node['ip'], node['port'] + 1)
        # reserved for vec-engine
        addPortToMachine(portmap, node['ip'], node['port'] + 2)
        addto_usedports(nodemgrobj, 'storage', node['port'])
        set_storage_using_nodemgr(machines, node, nodemgrobj, "1024MB")
        if 'election_weight' not in node:
            node['election_weight'] = 50
        node['is_primary'] = False
    jscfg['dc_meta_map'] = dc_meta_map

    if 'enable_rocksdb' not in meta:
        meta['enable_rocksdb'] = True
    if 'enable_ssl_connect' not in meta:
        meta['enable_ssl_connect'] = False
    if meta['enable_ssl_connect']:
        config['need_ssl'] = True

    if firstboot:
        for dc in dcnames:
            if dc not in dc_meta_map or len(dc_meta_map[dc]) < 2:
                raise ValueError('Error: there must be at least two meta nodes in datacenter %s during bootstrap!' % dc)
        pdcmetanodes = dc_meta_map[dcprimary['name']]
        pdcmetanodes[0]['is_primary'] = True
        meta_addrlist = []
        for node in meta['nodes']:
            meta_addrlist.append('%s:%d' % (node['ip'], node['port']))
        meta_addrs = ",".join(meta_addrlist)
        meta['group_seeds'] = meta_addrs
    if config['verbose']:
        my_print("group_seeds:%s\n" % meta['group_seeds'])

    if 'backup' in jscfg:
        if 'hdfs' in jscfg['backup']:
            node = jscfg['backup']['hdfs']
            addPortToMachine(portmap, node['ip'], node['port'])
        if 'ssh' in jscfg['backup']:
            node = jscfg['backup']['ssh']
            if 'port' not in node:
                node['port'] = 22
            addPortToMachine(portmap, node['ip'], node['port'])
            if 'user' not in node:
                raise ValueError('Error: user must be specified for ssh backup!')
            if 'targetDir' not in node:
                raise ValueError('Error: targetDir must be specified for ssh backup!')

    if 'xpanel' in jscfg:
        xpanel = jscfg['xpanel']
        if 'saas' not in xpanel:
            xpanel['saas'] = 0
        if 'config' not in xpanel:
            xpanel['config'] = {}
        set_upgrade_for_obj(xpanel, config)
        if 'imageType' not in xpanel:
            xpanel['imageType'] = 'url'
        if xpanel['imageType'] == 'file':
            if arch == 'x86_64':
                xpanel['image'] = 'kunlun-xpanel:%s' % config['product_version']
                xpanel['upgrade_image'] = 'kunlun-xpanel:%s' % config['upgrade_version']
            else:
                xpanel['image'] = 'kunlun-xpanel:%s-%s' % (config['product_version'], arch)
                xpanel['upgrade_image'] = 'kunlun-xpanel:%s-%s' % (config['upgrade_version'], arch)
        else:
            xpanel['upgrade_image'] = xpanel['image']
            xpanel['image'] = xpanel['image'].replace('VERSION', config['product_version'])
            xpanel['upgrade_image'] = xpanel['upgrade_image'].replace('VERSION', config['upgrade_version'])
        if 'imageFile' not in xpanel:
            xpanel['imageFile'] = 'kunlun-xpanel-%s.tar.gz' % config['product_version']
            xpanel['upgrade_imageFile'] = 'kunlun-xpanel-%s.tar.gz' % config['upgrade_version']
        for node in xpanel['nodes']:
            if 'config' not in node:
                node['config'] = {}
            set_upgrade_for_node(node, xpanel)
            if 'port' not in node:
                node['port'] = 18080
            addPortToMachine(portmap, node['ip'], node['port'])
            if 'name' not in node:
                node['name'] = 'xpanel_%d' % node['port']

    if 'elasticsearch' in jscfg:
        node = jscfg['elasticsearch']
        if 'ip' not in node:
            raise ValueError('Error: the ip of elasticsearch must be specified!')
        if 'port' not in node:
            node['port'] = 9200
        addPortToMachine(portmap, node['ip'], node['port'])
        if 'kibana_port' not in node:
            node['kibana_port'] = 5601
        addPortToMachine(portmap, node['ip'], node['kibana_port'])

    if config['verbose']:
        for node in nodemgr['nodes']:
            my_print(str(node))

def get_default_datacenter(dcname):
    node = {
            "name": dcname,
            "province": None,
            "city": None,
            "skipped": True
            }
    return node

def get_default_nodemgr(config, machines, ip, nodetype):
    mach = machines.get(ip)
    defpaths = {
            "server_datadirs": "server_datadir",
            "storage_datadirs": "storage_datadir",
            "storage_logdirs": "storage_logdir",
            "storage_waldirs": "storage_waldir",
            "storage_keyringdir": "storage_keyringdir",
            "vec_tmpdir": "vec_tmpdir"
        }
    node =  {
            'ip': ip,
            'brpc_http_port': config['defbrpc_http_port_nodemgr'],
            "tcp_port": config['deftcp_port_nodemgr'],
            "prometheus_port_start": config['defprometheus_port_start_nodemgr'],
            'total_cpu_cores': 0,
            'total_mem': 0,
            'nodetype': 'none',
            'storage_portrange': config['defstorage_portrange_nodemgr'],
            'server_portrange': config['defserver_portrange_nodemgr'],
            'storage_curport': int(config['defstorage_portrange_nodemgr'].split('-')[0]) + 1,
            'server_curport': int(config['defserver_portrange_nodemgr'].split('-')[0]) + 1,
            'storage_usedports': [],
            'server_usedports': [],
            'valgrind': False,
            "skip": True
            }
    for item in ["server_datadirs", "storage_datadirs", "storage_logdirs", "storage_waldirs", "storage_keyringdir", "vec_tmpdir"]:
        node[item] = "%s/%s" % (mach['basedir'], defpaths[item])
    return node

def get_nodemgr_nextport(nodemgrobj, nodetype, incr=1):
    if nodetype == 'storage':
        port = nodemgrobj['storage_curport']
        nodemgrobj['storage_curport'] += incr
        return port
    else:
        port = nodemgrobj['server_curport']
        nodemgrobj['server_curport'] += incr
        return port

def fix_nodemgr_nodetype(nodemgrobj, addtype):
    if nodemgrobj['nodetype'] == 'both' or nodemgrobj['nodetype'] == addtype:
        pass
    elif nodemgrobj['nodetype'] == 'none':
        nodemgrobj['nodetype'] = addtype
    else:
        nodemgrobj['nodetype'] = 'both'

def addto_usedports(nodemgrobj, addtype, port):
    if addtype == 'storage':
        nodemgrobj['storage_usedports'].append(str(port))
    else:
        nodemgrobj['server_usedports'].append(str(port))

def get_worknode(jscfg):
    meta = jscfg.get('meta', None)
    clustermgr = jscfg.get('cluster_manager', None)
    nodemgr = jscfg.get('node_manager', None)
    clusters = jscfg.get('clusters', None)
    xpanel = jscfg.get('xpanel', None)
    esobj = jscfg.get('elasticsearch', None)
    worknode = None

    if meta is not None and 'nodes' in meta and len(meta['nodes']) > 0:
        worknode = meta['nodes'][0]
    elif nodemgr is not None and 'nodes' in nodemgr and len(nodemgr['nodes']) > 0:
        worknode = nodemgr['nodes'][0]
    elif clustermgr is not None and 'nodes' in clustermgr and len(clustermgr['nodes']) > 0:
        worknode = clustermgr['nodes'][0]
    elif xpanel is not None:
        if 'nodes' in xpanel:
            worknode = xpanel['nodes'][0]
        else:
            worknode = xpanel
    elif esobj is not None:
        worknode = esobj

    return worknode

def get_clustermgr_config(node, clustermgr):
    config = clustermgr['config'].copy()
    config.update(node['config'])
    return config

def get_nodemgr_config(node, nodemgr):
    config = nodemgr['config'].copy()
    config.update(node['config'])
    return config

def get_comp_config(node, comp):
    config = comp['config'].copy()
    config.update(node['config'])
    return config

def get_meta_config(node, meta):
    config = meta['config'].copy()
    config.update(node['config'])
    return config

def get_data_config(node, shard, cluster):
    config = cluster['config'].copy()
    config.update(shard['config'])
    config.update(node['config'])
    return config

def get_masterip(nodes):
    for node in nodes:
        if node['is_primary']:
            return node['ip']
    raise ValueError('Error: No primary found!')

def gen_default_docker_config():
    config = {
            'product_version': "1.4.1",
            'defuser': 'kunlun',
            'defbase': '/kunlun',
            'namespace': 'klustron',
            'network': 'klnet',
            'autostart': False,
            'imageInFiles': False,
            'localip': '127.0.0.1',
            'dockeropts': '',
            'small': False,
            'verbose': False,
            'cantian': False,
            'download': False,
            'downloadsite': 'public',
            'downloadtype': 'release',
            'targetarch': platform.machine(),
            'overwrite': False
            }
    return config

def init_global_docker_config(jscfg, args):
    if 'config' not in jscfg:
        jscfg['config'] = {}
    config = jscfg['config']
    if 'product_version' not in config:
        config['product_version'] = args.product_version
    if 'defuser' not in config:
        config['defuser'] = args.defuser
    if 'defbase' not in config:
        config['defbase'] = args.defbase
    if 'network' not in config:
        config['network'] = args.network
    if 'namespace' not in config:
        config['namespace'] = args.namespace
    if 'autostart' not in config:
        config['autostart'] = args.autostart
    if 'imageInFiles' not in config:
        config['imageInFiles'] = args.imageInFiles
    if 'localip' not in config:
        config['localip'] = args.localip
    if 'dockeropts' not in config:
        config['dockeropts'] = args.dockeropts
    if 'small' not in config:
        config['small'] = args.small
    if 'verbose' not in config:
        config['verbose'] = args.verbose
    if 'cantian' not in config:
        config['cantian'] = args.cantian
    if 'download' not in config:
        config['download'] = args.download
    if 'downloadsite' not in config:
        config['downloadsite'] = args.downloadsite
    if 'downloadtype' not in config:
        config['downloadtype'] = args.downloadtype
    if 'targetarch' not in config:
        config['targetarch'] = args.targetarch
    if 'overwrite' not in config:
        config['overwrite'] = args.overwrite

# This is same with the one without docker, since the basic elements are same,
# although the processing logic is different.
def setup_docker_machines(jscfg, machines):
    machnodes = jscfg.get('machines', [])
    meta = jscfg.get('meta', {})
    metanodes = meta.get('nodes', [])
    nodemgr = jscfg.get('node_manager', {"nodes": []})
    nodemgrnodes = nodemgr.get('nodes', [])
    clustermgr = jscfg.get('cluster_manager', {"nodes": []})
    clustermgrnodes = clustermgr.get('nodes', [])
    config = jscfg['config']
    for mach in machnodes:
        ip=mach['ip']
        user=mach.get('user', config['defuser'])
        base=mach.get('basedir', config['defbase'])
        sshport = mach.get('sshport', 22)
        password = mach.get('password', '')
        sshpass = mach.get('sshpass', False)
        addMachineToMap(machines, ip, user, base, sshport, password, sshpass)
    if len(metanodes) > 0:
        for node in metanodes:
            addIpToMachineMap(machines, node['ip'], config)
    elif 'group_seeds' in meta:
        nodes = get_nodes_from_seeds(meta['group_seeds'])
        for node in nodes:
            addIpToMachineMap(machines, node['ip'], config)
    for node in nodemgrnodes:
        addIpToMachineMap(machines, node['ip'], config)
    for node in clustermgrnodes:
        addIpToMachineMap(machines, node['ip'], config)
    if 'xpanel' in jscfg:
        if 'nodes' in jscfg['xpanel']:
            for node in jscfg['xpanel']['nodes']:
                addIpToMachineMap(machines, node['ip'], config)
        else:
            addIpToMachineMap(machines, jscfg['xpanel']['ip'], config)
    if 'elasticsearch' in jscfg:
        addIpToMachineMap(machines, jscfg['elasticsearch']['ip'], config)


def check_dir_attrlist(node, attrlist, dirmap):
    for attr in attrlist:
            if attr not in node:
                node[attr] = None
            elif not node[attr].startswith('/'):
                raise ValueError('Error: the dir for %s must be absolute path!' % attr)
            else:
                addDirToMachine(dirmap, node['ip'], node[attr])

# validate and set the configuration object for clustermgr initialization/destroy scripts.
# This is the docker version
def validate_and_set_docker_config(jscfg, machines):
    meta = jscfg.get('meta', {'nodes':[]})
    if not 'nodes' in meta:
        meta['nodes'] = []
    clustermgr = jscfg.get('cluster_manager', {'nodes':[]})
    if not 'nodes' in clustermgr:
        clustermgr['nodes'] = []
    if not 'cluster_manager' in clustermgr:
        jscfg['cluster_manager'] = clustermgr
    nodemgr = jscfg.get('node_manager', {'nodes':[]})
    if not 'nodes' in nodemgr:
        nodemgr['nodes'] = []
    if not 'node_manager' in nodemgr:
        jscfg['node_manager'] = nodemgr
    config = jscfg['config']
    namespace = config['namespace']
    iscantian = config['cantian']
    arch = config['targetarch']

    portmap = {}
    dirmap = {}

    # clustermgr config
    if 'config' not in clustermgr:
        clustermgr['config'] = {}
    clustermgrhosts = []
    i = 1
    for node in clustermgr['nodes']:
        if 'config' not in node:
            node['config'] = {}
        attrlist = ['prometheus_datadir']
        check_dir_attrlist(node, attrlist, dirmap)
        if 'id' not in node:
            node['id'] = i
        node['name'] = '%s.clustermgr%s' % (namespace, str(node['id']))
        clustermgrhosts.append(node['name'])
        i = i + 1
    clustermgr['hosts'] = clustermgrhosts

    # kunlun-node config
    # allow to specify the following directories:
    # storage_datadir, storage_logdir, storage_waldir, server_datadir, storage_keyringdir, prometheus_datadir
    if 'config' not in nodemgr:
        nodemgr['config'] = {}
    nodemgrhosts = []
    i = 1
    for node in nodemgr['nodes']:
        if 'config' not in node:
            node['config'] = {}
        if 'nodetype' not in node:
            node['nodetype'] = 'both'
        attrlist = ['storage_datadir', 'storage_logdir', 'storage_waldir', 'server_datadir', 
                'storage_keyringdir', 'prometheus_datadir']
        check_dir_attrlist(node, attrlist, dirmap)
        if 'id' not in node:
            node['id'] = i
        node['name'] = '%s.nodemgr%s' % (namespace, str(node['id']))
        nodemgrhosts.append(node['name'])
        i = i + 1
    nodemgr['hosts'] = nodemgrhosts

    # As meta node is storage node, we allow to specify the following directories:
    # storage_datadir, storage_logdir, storage_waldir, storage_keyringdir, prometheus_datadir
    ha_mode = meta.get('ha_mode', '')
    nodecnt = len(meta['nodes'])
    if ha_mode == '':
        if nodecnt > 1:
            ha_mode = 'rbr'
        else:
            ha_mode = 'no_rep'
    validate_ha_mode(ha_mode)
    meta['ha_mode'] = ha_mode
    if nodecnt == 0 and 'meta' in jscfg and not 'group_seeds' in meta:
        raise ValueError('Error: There must be at least one node in meta shard')
    if nodecnt > 1 and ha_mode == 'no_rep'  and not iscantian:
        raise ValueError('Error: ha_mode is no_rep, but there are multiple nodes in meta shard')
    elif nodecnt == 1 and ha_mode != 'no_rep':
        raise ValueError('Error: ha_mode is mgr/rbr, but there is only one node in meta shard')
    hasPrimary=False
    if 'config' not in meta:
        meta['config'] = {}
    metahosts = []
    i = 1
    for node in meta['nodes']:
        if 'config' not in node:
            node['config'] = {}
        # These attr should not be set explicitly.
        attrlist = ['storage_datadir', 'storage_logdir', 'storage_waldir', 
                'storage_keyringdir', 'prometheus_datadir']
        check_dir_attrlist(node, attrlist, dirmap)
        if 'id' not in node:
            node['id'] = i
        node['name'] = '%s.meta%s' % (namespace, str(node['id']))
        node['uri'] = "%s:3306" % node['name']
        node['shard_id'] = 'meta'
        metahosts.append(node['name'])
        i = i + 1
        if 'is_primary' not in node:
            node['is_primary'] = False
        if node['is_primary']:
            if hasPrimary:
                raise ValueError('Error: Two primaries found in meta shard, there should be one and only one Primary specified !')
            else:
                hasPrimary = True
    if nodecnt > 0:
        if not hasPrimary:
            meta['nodes'][0]['is_primary'] = True
    if 'enable_rocksdb' not in meta:
        meta['enable_rocksdb'] = True
    meta['hosts'] = metahosts

    if 'xpanel' in jscfg:
        xpanel = jscfg['xpanel']
        if 'config' not in xpanel:
            xpanel['config'] = {}
        if 'imageType' not in xpanel:
            xpanel['imageType'] = 'url'
        if xpanel['imageType'] == 'file':
            if arch == 'x86_64':
                xpanel['image'] = 'kunlun-xpanel:%s' % config['product_version']
            else:
                xpanel['image'] = 'kunlun-xpanel:%s-%s' % (config['product_version'], arch)
        else:
            xpanel['image'] = xpanel['image'].replace('VERSION', config['product_version'])
        if 'imageFile' not in xpanel:
            xpanel['imageFile'] = 'kunlun-xpanel-%s.tar.gz' % config['product_version']
        if 'nodes' in xpanel:
            for node in xpanel['nodes']:
                if 'config' not in node:
                    node['config'] = {}
                if 'port' not in node:
                    node['port'] = 18080
                addPortToMachine(portmap, node['ip'], node['port'])
                if 'name' not in node:
                    node['name'] = '%s.xpanel_%d' % (namespace, node['port'])
        else:
            node = xpanel
            if 'port' not in node:
                node['port'] = 18080
            addPortToMachine(portmap, node['ip'], node['port'])
            if 'name' not in node:
                node['name'] = '%s.xpanel_%d' % (namespace, node['port'])
            xpanel['nodes'] = [{"ip":node['ip'], "port": node['port'], "name":node["name"]}]


    # The kibana_port must be specified, since it is access by user, and it is mapped to the machine.
    if 'elasticsearch' in jscfg:
        node = jscfg['elasticsearch']
        if 'ip' not in node:
            raise ValueError('Error: the ip of elasticsearch must be specified!')
        if 'kibana_port' not in node:
            node['kibana_port'] = 5601
        addPortToMachine(portmap, node['ip'], node['kibana_port'])

    if 'clusters1' in jscfg:
        validate_and_set_docker_clusters1(jscfg, machines, dirmap)

    if config['verbose']:
        for node in nodemgr['nodes']:
            my_print(str(node))

def validate_and_set_docker_clusters1(jscfg, machines, dirmap):
    meta = jscfg['meta']
    clusters = jscfg['clusters1']
    config = jscfg['config']
    namespace = config['namespace']

    for cluster in clusters:
        cname = cluster['name']
        hamode = cluster['ha_mode']
        i = 1
        for node in cluster['comp']['nodes']:
            node['name'] = '%s.%s.comp%d' % (namespace, cname, i)
            if 'config' not in node:
                node['config'] = {}
            attrlist = ['server_datadir', 'prometheus_datadir']
            check_dir_attrlist(node, attrlist, dirmap)
            if 'id' not in node:
                node['id'] = i
            i += 1
        i = 1
        attrlist = ['storage_datadir', 'storage_logdir', 'storage_waldir', 
                'storage_keyringdir', 'prometheus_datadir']
        for shard in cluster['data']:
            j = 1
            hasPrimary = False
            for node in shard['nodes']:
                node['shard_id'] = 'shard%d' % i
                node['name'] = '%s.%s.shard%d_%d' % (namespace, cname, i , j)
                node['uri'] = "%s:3306" % node['name']
                check_dir_attrlist(node, attrlist, dirmap)
                if 'id' not in node:
                    node['id'] = j
                if 'is_primary' not in node:
                    node['is_primary'] = False
                if node['is_primary']:
                    if hasPrimary:
                        raise ValueError('Error: Two primaries found in %s, there should be one and only one Primary specified !' % node['shard_id'])
                    else:
                        hasPrimary = True
                j += 1
            if not hasPrimary:
                shard['nodes'][0]['is_primary'] = True
            i += 1

def get_cdc_config(node, cdc):
    config = cdc['config'].copy()
    config.update(node['config'])
    return config

def get_downloadbase(downloadsite):
    if downloadsite == 'public':
        downbase = "https://downloads.kunlunbase.com"
    elif downloadsite == 'devsite':
        downbase = "http://klustron.cn:14000"
    else:
        downbase = "http://192.168.0.104:14000"
    return downbase

def download_file(basesite, uri, contentTypes, targetdir, overwrite):
    retry = 5
    url = "%s/%s" % (basesite, uri)
    fname = os.path.basename(uri)
    target = '%s/%s' % (targetdir, fname)
    #my_print("downloading %s to clustermgr ..." % url)
    #return
    if not os.path.exists(target) or overwrite:
        if os.path.exists(target):
            my_print("%s exists, removing first (overwrite mode)" % fname)
            os.unlink(target)
        my_print("downloading %s to %s" % (url, targetdir))
        i = 0
        good = False
        while i < retry:
            info = ulib.urlretrieve(url, target)
            msg = info[1]
            #my_print(str(type(msg)))
            if sys.version_info.major == 2:
                explen = msg.getheader('Content-Length', '0')
                exptype = msg.getheader('Content-Type', 'unknown')
            else:
                explen = msg.get('Content-Length', '0')
                exptype = msg.get('Content-Type', 'unknown')
            statinfo = os.stat(target)
            reallen = str(statinfo.st_size)
            #my_print("exptype:%s, explen:%s, reallen:%s" % (exptype, explen, reallen))
            if explen == reallen and exptype in contentTypes:
                good = True
                break
            i += 1
        if not good:
            raise ValueError('Error: fail to download %s' % url)
    else:
        my_print("%s exists, skipping download" % fname)

def get_nodes_from_seeds(seed, defport=0):
    nodes = []
    for addr in seed.split(','):
        parts = addr.split(':')
        host = parts[0]
        port = defport
        if len(parts) > 1:
            port = int(parts[1])
        nodes.append({"ip": host, "port": port})
    return nodes

# Only accept BASEDIR currently.
def genHostsCommands(runarg, machines, command):
    commandlist=[]
    for ip in machines:
        mach = machines.get(ip)
        cmd1 = re.sub('BASEDIR', mach['basedir'], command)
        commandlist.append([ip, cmd1])
    return commandlist

def genSubHostsCommands(runarg, machines, clusterobj, sub, command):
    if sub not in clusterobj:
        raise ValueError('no %s info in clusterobj!' % sub)
    submachines = {}
    ips = set()
    for node in clusterobj[sub]['nodes']:
        ip = node['ip']
        if ip not in ips:
            ips.add(ip)
            submachines[ip] = machines.get(ip)
    return genHostsCommands(runarg, submachines, command)

def getstoragecommand(command, node, machines):
    mach = machines.get(node['ip'])
    cmd = command
    if 'data_dir_path' in node:
        cmd = re.sub('DATA_DIR_PATH', node['data_dir_path'], cmd)
    if 'log_dir_path' in node:
        cmd = re.sub('LOG_DIR_PATH', node['log_dir_path'], cmd)
        cmd = re.sub('INNODB_LOG_DIR_PATH', node.get('innodb_log_dir_path', node['log_dir_path']), cmd)
    cmd = re.sub('PORT', str(node['port']), cmd)
    cmd = re.sub('BASEDIR', mach['basedir'], cmd)
    return cmd

# Accept DATA_DIR_PATH, LOG_DIR_PATH, INNODB_LOG_DIR_PATH, PORT
def genMetaCommands(runarg, machines, meta, command):
    if meta is None:
        raise ValueError('No meta info!')
    commandlist=[]
    i = 0
    for node in meta['nodes']:
        i = i + 1
        if runarg['index'] > 0 and runarg['index'] != i:
            continue
        cmd = getstoragecommand(command, node, machines)
        commandlist.append([node['ip'], cmd])
    return commandlist

# Accept DATA_DIR_PATH, LOG_DIR_PATH, INNODB_LOG_DIR_PATH, PORT, like above
def genDataCommands(runarg, machines, datas, command):
    if datas is None:
        raise ValueError('No datas info!')
    commandlist=[]
    if runarg['shard'] > len(datas):
        raise ValueError("no such shard:%d" % runarg['shard'])
    if runarg['shard'] > 0:
        i = 0
        for node in datas[runarg['shard']-1]['nodes']:
            i = i + 1
            if runarg['index'] > 0 and runarg['index'] != i:
                continue
            cmd = getstoragecommand(command, node, machines)
            commandlist.append([node['ip'], cmd])
    else:
        for shard in datas:
            i = 0
            for node in shard['nodes']:
                i = i + 1
                if runarg['index'] > 0 and runarg['index'] != i:
                    continue
                cmd = getstoragecommand(command, node, machines)
                commandlist.append([node['ip'], cmd])
    return commandlist

# Only accept PORT and DATADIR
def genCompCommands(runarg, machines, comp, command):
    if comp is None:
        raise ValueError('No comp info!')
    commandlist=[]
    i = 0
    for node in comp['nodes']:
        i = i + 1
        if runarg['index'] > 0 and runarg['index'] != i:
            continue
        mach = machines.get(node['ip'])
        cmd1 = re.sub('DATADIR', node['datadir'], command)
        cmd2 = re.sub('PORT', str(node['port']), cmd1)
        cmd2 = re.sub('BASEDIR', mach['basedir'], cmd2)
        commandlist.append([node['ip'], cmd2])
    return commandlist

def runShellCommand(runarg, clusterobj, machines, runtype, command):
    meta = None
    datas = None
    comp = None
    if 'meta' in clusterobj:
        meta = clusterobj['meta']
    if 'data' in clusterobj:
        datas = clusterobj['data']
    if 'comp' in clusterobj:
        comp = clusterobj['comp']
    if runtype == 'meta':
        commandlist = genMetaCommands(runarg, machines, meta, command)
    elif runtype == 'comp':
        commandlist = genCompCommands(runarg, machines, comp, command)
    elif runtype == 'data':
        commandlist = genDataCommands(runarg, machines, datas, command)
    elif runtype in ['cluster_mananger', 'node_mamanger', 'xpanel', 'cdc']:
        commandlist = genSubHostsCommands(runarg, machines, clusterobj, runtype, command)
    else:
        commandlist = genHostsCommands(runarg, machines, command)

    if runarg['dryrun']:
        for cmdpair in commandlist:
            ip = cmdpair[0]
            mach = machines.get(ip)
            sshport = mach.get('sshport', 22)
            cmdpat = "bash remote_run.sh --sshport=%d --user=%s %s \"%s\"\n"
            my_print(cmdpat % (sshport, mach['user'], ip, cmdpair[1]))
    else:
        for cmdpair in commandlist:
            ip = cmdpair[0]
            mach = machines.get(ip)
            sshport = mach.get('sshport', 22)
            if mach['sshpass']:
                cmdpat = "bash remote_run.sh --sshpass --password='%s' --sshport=%d --user=%s %s \"%s\"\n"
                os.system(cmdpat % (mach['password'], sshport, mach['user'], ip, cmdpair[1]))
            else:
                cmdpat = "bash remote_run.sh --sshport=%d --user=%s %s \"%s\"\n"
                os.system(cmdpat % (sshport, mach['user'], ip, cmdpair[1]))

def genSqlmetaCommands(runarg, meta):
    if meta is None:
        raise ValueError('No meta info!')
    commandslist = []
    cmdpat=r'mysql -h %s -P %d -upgx -ppgx_pwd -e "$sql" '
    i=0
    for node in meta['nodes']:
        i = i + 1
        if runarg['index'] > 0 and runarg['index'] != i:
            continue
        commandslist.append(cmdpat % (node['ip'], node['port']))
    return commandslist

def genSqldataCommands(runarg, datas):
    if datas is None:
        raise ValueError('No data info!')
    commandslist = []
    if runarg['shard'] > len(datas):
        raise ValueError("no such shard:%d" % runarg['shard'])
    cmdpat=r'mysql -h %s -P %d -upgx -ppgx_pwd -e "$sql" '
    if runarg['shard'] > 0:
        i = 0
        for node in datas[runarg['shard']-1]['nodes']:
            i = i + 1
            if runarg['index'] > 0 and runarg['index'] != i:
                continue
            commandslist.append(cmdpat % (node['ip'], node['port']))
    else:
        for shard in datas:
            i = 0
            for node in shard['nodes']:
                i = i + 1
                if runarg['index'] > 0 and runarg['index'] != i:
                    continue
                commandslist.append(cmdpat % (node['ip'], node['port']))
    return commandslist

def genSqlcompCommands(runarg, comp):
    if comp is None:
        raise ValueError('No comp info!')
    commandslist = []
    cmdpat=r'psql postgres://%s:%s@%s:%d/postgres -c "$sql" '
    i = 0
    for node in comp['nodes']:
        i = i + 1
        if runarg['index'] > 0 and runarg['index'] != i:
            continue
        commandslist.append(cmdpat % (node['user'], node['password'], node['ip'], node['port']))
    return commandslist

def runSqlCommand(runarg, clusterobj, machines, runtype, command):
    runtype = runarg['runtype']
    command = runarg['command']
    meta = None
    datas = None
    comp = None
    if 'meta' in clusterobj:
        meta = clusterobj['meta']
    if 'data' in clusterobj:
        datas = clusterobj['data']
    if 'comp' in clusterobj:
        comp = clusterobj['comp']
    cmdlist = []
    if runtype == 'sqlmeta':
        cmdlist = genSqlmetaCommands(runarg, meta)
    elif runtype == 'sqldata':
        cmdlist = genSqldataCommands(runarg, datas)
    elif runtype == 'sqlcomp':
        cmdlist = genSqlcompCommands(runarg, comp)
    else: # this can not happen actually.
        raise ValueError("invalid runtype:%s" % runtype)

    os.system('mkdir -p exec')
    comf = open(r'exec/run%s.sh' % runtype, 'w')
    comf.write('#! /bin/bash\n')
    comf.write('''
if test "$#" != 1; then
    echo "$0 sqlcommand"
    exit 1
fi
''')
    comf.write('sql="$1" \n')
    for cmd in cmdlist:
        comf.write(cmd)
        comf.write('\n')
    comf.close()

    if runarg['dryrun']:
        cmdpat="bash -x exec/run%s.sh \"%s\""
        my_print(cmdpat % (runtype, command))
    else:
        cmdpat="bash -x exec/run%s.sh \"%s\""
        os.system(cmdpat % (runtype, command))

def runDriver(runarg, clusterobj, machines, command):
    if runarg['runtype'] in ['hosts', 'meta', 'data', 'comp', 'cluster_manager', 'node_manager', 'xpanel', 'cdc']:
        runShellCommand(runarg, clusterobj, machines, runarg['runtype'], command)
    else:
        runSqlCommand(runarg, clusterobj, machines, runarg['runtype'], command)


def get_master_conn(runarg, metaseeds):
    for addr in metaseeds.split(','):
        parts = addr.split(':')
        host = parts[0]
        port = 3306
        if len(parts) > 1:
            port = int(parts[1])
        mysql_conn_params = {}
        mysql_conn_params['host'] = host
        mysql_conn_params['port'] = port
        mysql_conn_params['user'] = runarg['user']
        mysql_conn_params['password'] = runarg['password']
        mysql_conn_params['database'] = 'Kunlun_Metadata_DB'
        conn = None
        csr = None
        try:
            conn = mc.connect(**mysql_conn_params)
            csr = conn.cursor()
            csr.execute("select @@super_read_only")
            row = csr.fetchone()
            if row is None or row[0] == '1':
                csr.close()
                csr = None
                conn.close()
                conn = None
                continue
            else:
                my_print("%s:%s is master" % (host, str(port)))
                csr.close()
                return conn
        except mc.errors.InterfaceError as err:
            if conn is not None:
                conn.close()
            continue
    return None

def fetch_cluster_info(conn):
    cur = conn.cursor()

    cur.execute("select hostaddr, port from meta_db_nodes")
    rows = cur.fetchall()
    meta_nodes = []
    for row in rows:
        meta_nodes.append({"ip": row[0], "port": row[1]})
    #my_print("=== meta_nodes ===")
    #my_print(str(meta_nodes))

    cur.execute("select hostaddr, port, raft_port, prometheus_port from cluster_mgr_nodes")
    rows = cur.fetchall()
    clustermgr_nodes = []
    for row in rows:
        clustermgr_nodes.append({
            "ip": row[0],
            'brpc_http_port': row[1],
            'brpc_raft_port': row[2],
            'prometheus_port_start': row[3]
            })
    #my_print("=== clustermgr_nodes ===")
    #my_print(str(clustermgr_nodes))

    # 0:hostaddr, 1:nodemgr_port,  2:nodemgr_tcp_port, 3:nodemgr_prometheus_port, 4:current_port_pos
    # 5:machine_type, 6:port_range, 7:used_port, 8:comp_datadir, 9:datadir, 10:logdir, 11:wal_log_dir
    cur.execute("select hostaddr,nodemgr_port,nodemgr_tcp_port,nodemgr_prometheus_port,current_port_pos,machine_type,port_range,used_port,comp_datadir,datadir,logdir,wal_log_dir from server_nodes where id > 1")
    rows=cur.fetchall()
    storage_map = {}
    server_map = {}
    ips = set()
    for row in rows:
        ip = row[0]
        ips.add(ip)
        mtype = row[5]
        if mtype == 'storage':
            storage_map[ip] = row
        elif mtype == 'computer':
            server_map[ip] = row
    #my_print("=== storage_map ===")
    #my_print(str(storage_map))
    #my_print("=== server_map ===")
    #my_print(str(server_map))
    nodemgr_nodes = []
    for ip in ips:
        if ip in storage_map and ip in server_map:
            rowa = storage_map[ip]
            rowb = server_map[ip]
            nodemgr_nodes.append({
                "ip": ip,
                "nodetype": "both",
                'brpc_http_port': rowa[1],
                'tcp_port': rowa[2],
                'prometheus_port_start': rowa[3],
                'storage_curport': rowa[4],
                'server_curport': rowb[4],
                'storage_portrange': rowa[6],
                'server_portrange': rowb[6],
                'storage_usedports': rowa[7],
                'server_usedports': rowb[7],
                "server_datadirs": rowb[8],
                "storage_datadirs": rowa[9],
                "storage_logdirs": rowa[10],
                "storage_waldirs": rowa[11]
                })
        elif ip in storage_map:
            rowa = storage_map[ip]
            nodemgr_nodes.append({
                "ip": ip,
                "nodetype": "storage",
                'brpc_http_port': rowa[1],
                'tcp_port': rowa[2],
                'prometheus_port_start': rowa[3],
                'storage_curport': rowa[4],
                'storage_portrange': rowa[6],
                'storage_usedports': rowa[7],
                "storage_datadirs": rowa[9],
                "storage_logdirs": rowa[10],
                "storage_waldirs": rowa[11]
                })
        elif ip in server_map:
            rowb = server_map[ip]
            nodemgr_nodes.append({
                "ip": ip,
                "nodetype": "server",
                'brpc_http_port': rowb[1],
                'tcp_port': rowb[2],
                'prometheus_port_start': rowb[3],
                'server_curport': rowb[4],
                'server_portrange': rowb[6],
                'server_usedports': rowb[7],
                "server_datadirs": rowb[8]
                })
        else:
            pass
    #my_print("=== nodemgr_nodes ===")
    #my_print(str(nodemgr_nodes))

    #0:cluster_name, 1:shard-id, 2:shard-name, 3:ip, 4:port, 5:ha_mode
    cur.execute("select a.name, b.id, b.name, c.hostaddr, c.port, a.ha_mode from db_clusters a, shards b, shard_nodes c where a.id=b.db_cluster_id and b.id=c.shard_id order by 2")
    rows = cur.fetchall()
    shard_map = {}
    hamode_map = {}
    for row in rows:
        cluster_name = row[0]
        shard_id = row[1]
        shard_name = row[2]
        hamode_map[cluster_name] = row[5]
        node = {"ip": row[3], "port": row[4]}
        if cluster_name not in shard_map:
            shard_map[cluster_name] = {}
        shards = shard_map[cluster_name]
        if shard_id not in shards:
            shards[shard_id] = []
        nodes = shards[shard_id]
        nodes.append(node)
    #my_print("=== shard_map ===")
    #my_print(str(shard_map))
    #my_print("=== hamode_map ===")
    #my_print(str(hamode_map))

    #0:cluster_name, 1:comp-name, 2:ip, 3:port, 4:mysql_port, 5:user_name, 6:password
    cur.execute("select a.name, c.name, c.hostaddr, c.port, c.mysql_port, c.user_name, c.passwd from db_clusters a, comp_nodes c where a.id=c.db_cluster_id")
    rows = cur.fetchall()
    comp_map = {}
    for row in rows:
        cluster_name = row[0]
        node = {"ip": row[2], "name": row[1], "port": row[3], "mysql_port": row[4], "user": row[5], "password": row[6]}
        if cluster_name not in comp_map:
            comp_map[cluster_name] = []
        comps = comp_map[cluster_name]
        comps.append(node)
    #my_print("=== comp_map ===")
    #my_print(str(comp_map))

    clusters = []
    for cname in shard_map:
        shards = shard_map[cname]
        comps = comp_map[cname]
        datas = []
        for shard_id in shards:
            shard = shards[shard_id]
            datas.append({"nodes": shard})
        clusters.append({
            "name": cname,
            "ha_mode": hamode_map[cname],
            "comp": {"nodes": comps},
            "data": datas
            })
    #my_print("=== clusters ===")
    #my_print(str(clusters))

    confobj = {
            "meta": {"nodes": meta_nodes},
            "cluster_manager": {"nodes": clustermgr_nodes},
            "node_manager": {"nodes": nodemgr_nodes},
            "clusters": clusters
            }
    #my_print("=== config file ===")
    #my_print(str(confobj)) 
    return confobj
