#!/usr/bin/env python
# Copyright (c) 2020 Klustron inc. All rights reserved.
# This source code is licensed under Apache 2.0 License,
# combined with Common Clause Condition 1.0, as detailed in the NOTICE file.

import sys
import json
import getpass
import argparse
import platform
import os
import os.path
import time
from cluster_common import *
from upgrade import *

def purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo = False):
    process_dirmap(comf, dirmap, machines, usesudo, config)
    process_fileslistmap(comf, filesmap, machines, 'clustermgr', config)
    process_commandslist_setenv(comf, config, machines, commandslist)
    dirmap.clear()
    filesmap.clear()
    del commandslist[:]

def generate_server_startstop(config, machines, node, idx, filesmap):
    mach = machines.get(node['ip'])
    serverdir = "kunlun-server-%s" % config['product_version']
    envfname = 'env.sh.node'
    # start wrapper
    startname = '%d-start-server-%d.sh' % (idx, node['port'])
    startname_to = 'start-server-%d.sh' % node['port']
    startf = open('clustermgr/%s' % startname, 'w')
    startf.write("#! /bin/bash\n")
    startf.write("cd %s || exit 1\n" % mach['basedir'])
    startf.write("test -f %s && . ./%s\n" % (envfname, envfname))
    startf.write("cd instance_binaries/computer/%s/%s/scripts || exit 1\n" % (str(node['port']), serverdir))
    startf.write("$KUNLUN_PYTHON start_pg.py --port=%d\n" % node['port'])
    startf.close()
    addNodeToFilesListMap(filesmap, node, startname, './%s' % startname_to)
    # stop wrapper, actually may not be necessary.
    stopname = '%d-stop-server-%d.sh' % (idx, node['port'])
    stopname_to = 'stop-server-%d.sh' % node['port']
    stopf = open('clustermgr/%s' % stopname, 'w')
    stopf.write("#! /bin/bash\n")
    stopf.write("cd %s || exit 1\n" % mach['basedir'])
    stopf.write("test -f %s && . ./%s\n" % (envfname, envfname))
    stopf.write("cd instance_binaries/computer/%s/%s/scripts || exit 1\n" % (str(node['port']), serverdir))
    stopf.write("$KUNLUN_PYTHON stop_pg.py --port=%d\n" % node['port'])
    stopf.close()
    addNodeToFilesListMap(filesmap, node, stopname, './%s' % stopname_to)

def generate_storage_startstop(config, machines, node, idx, filesmap):
    mach = machines.get(node['ip'])
    storagedir = "kunlun-storage-%s" % config['product_version']
    envfname = 'env.sh.node'
    # start wrapper
    startname = '%d-start-storage-%d.sh' % (idx, node['port'])
    startname_to = 'start-storage-%d.sh' % node['port']
    startf = open('clustermgr/%s' % startname, 'w')
    startf.write("#! /bin/bash\n")
    startf.write("cd %s || exit 1\n" % mach['basedir'])
    startf.write("test -f %s && . ./%s\n" % (envfname, envfname))
    startf.write("cd instance_binaries/storage/%s/%s/dba_tools || exit 1\n" % (str(node['port']), storagedir))
    startf.write("bash startmysql.sh %d\n" % node['port'])
    startf.close()
    addNodeToFilesListMap(filesmap, node, startname, './%s' % startname_to)
    # stop wrapper, actually may not be necessary.
    stopname = '%d-stop-storage-%d.sh' % (idx, node['port'])
    stopname_to = 'stop-storage-%d.sh' % node['port']
    stopf = open('clustermgr/%s' % stopname, 'w')
    stopf.write("#! /bin/bash\n")
    stopf.write("cd %s || exit 1\n" % mach['basedir'])
    stopf.write("test -f %s && . ./%s\n" % (envfname, envfname))
    stopf.write("cd instance_binaries/storage/%s/%s/dba_tools || exit 1\n" % (str(node['port']), storagedir))
    stopf.write("bash stopmysql.sh %d\n" % node['port'])
    stopf.close()
    addNodeToFilesListMap(filesmap, node, stopname, './%s' % stopname_to)

def generate_storage_service(config, machines, commandslist, node, idx, filesmap):
    mach = machines.get(node['ip'])
    nodemgrobj = node['nodemgr']
    storagedir = "kunlun-storage-%s" % config['product_version']
    fname = "%d-kunlun-storage-%d.service" % (idx, node['port'])
    servname = "kunlun-storage-%d" % node['port']
    fname_to = "kunlun-storage-%d.service" % node['port']
    servicef = open('clustermgr/%s' % fname, 'w')
    servicef.write("# kunlun-storage-%d systemd service file\n\n" % node['port'])
    servicef.write("[Unit]\n")
    servicef.write("Description=kunlun-storage-%d\n" % node['port'])
    servicef.write("After=network.target\n\n")
    servicef.write("[Install]\n")
    servicef.write("WantedBy=multi-user.target\n\n")
    servicef.write("[Service]\n")
    servicef.write("Type=forking\n")
    servicef.write("User=%s\n" % mach['user'])
    servicef.write("Restart=on-failure\n")
    servicef.write("WorkingDirectory=%s\n" % (mach['basedir']))
    servicef.write("ExecStart=/bin/bash start-storage-%d.sh\n" % (node['port']))
    servicef.write("ExecStop=/bin/bash stop-storage-%d.sh\n" % (node['port']))
    servicef.close()
    addNodeToFilesListMap(filesmap, node, fname, './%s' % fname_to)
    addToCommandsList(commandslist, node['ip'], '.', "sudo cp -f %s /usr/lib/systemd/system/" % fname_to)
    addToCommandsList(commandslist, node['ip'], '.', "sudo systemctl enable %s" % servname)

def generate_clustermgr_startstop(config, machines, node, idx, filesmap):
    mach = machines.get(node['ip'])
    clustermgrdir = "kunlun-clustermgr-manager-%s" % config['product_version']
    # start wrapper
    startname = '%d-start-clustermgr-%d.sh' % (idx, node['brpc_raft_port'])
    startname_to = 'start-clustermgr-%d.sh' % node['brpc_raft_port']
    startf = open('clustermgr/%s' % startname, 'w')
    startf.write("#! /bin/bash\n")
    startf.write("cd %s || exit 1\n" % mach['basedir'])
    startf.write("cd %s/bin || exit 1\n" % clustermgrdir)
    if node['valgrind']:
        startf.write("export USE_VALGRIND=1\n")
    startf.write("bash start_cluster_mgr.sh\n")
    startf.close()
    addNodeToFilesListMap(filesmap, node, startname, './%s' % startname_to)
    addNodeToFilesListMap(filesmap, node, startname, './start-clustermgr.sh')
    # stop wrapper, actually may not be necessary.
    stopname = '%d-stop-clustermgr-%d.sh' % (idx, node['brpc_raft_port'])
    stopname_to = 'stop-clustermgr-%d.sh' % node['brpc_raft_port']
    stopf = open('clustermgr/%s' % stopname, 'w')
    stopf.write("#! /bin/bash\n")
    stopf.write("cd %s || exit 1\n" % mach['basedir'])
    stopf.write("cd %s/bin || exit 1\n" % clustermgrdir)
    stopf.write("bash stop_cluster_mgr.sh\n")
    stopf.close()
    addNodeToFilesListMap(filesmap, node, stopname, './%s' % stopname_to)
    addNodeToFilesListMap(filesmap, node, stopname, './stop-clustermgr.sh')

def generate_clustermgr_service(config, machines, commandslist, node, idx, filesmap):
    mach = machines.get(node['ip'])
    clustermgrdir = "kunlun-cluster-manager-%s" % config['product_version']
    fname = "%d-kunlun-cluster-manager-%d.service" % (idx, node['brpc_raft_port'])
    servname = "kunlun-cluster-manager-%d" % node['brpc_raft_port']
    fname_to = "kunlun-cluster-manager-%d.service" % node['brpc_raft_port']
    servicef = open('clustermgr/%s' % fname, 'w')
    servicef.write("# kunlun-cluster-manager-%d systemd service file\n\n" % node['brpc_raft_port'])
    servicef.write("[Unit]\n")
    servicef.write("Description=kunlun-cluster-manager-%d\n" % node['brpc_raft_port'])
    servicef.write("After=network.target\n\n")
    servicef.write("[Install]\n")
    servicef.write("WantedBy=multi-user.target\n\n")
    servicef.write("[Service]\n")
    servicef.write("Type=forking\n")
    servicef.write("PIDFile=%s/%s/bin/clustermgr.pid\n" % (mach['basedir'], clustermgrdir))
    servicef.write("User=%s\n" % mach['user'])
    servicef.write("Restart=on-failure\n")
    servicef.write("WorkingDirectory=%s/%s/bin\n" % (mach['basedir'], clustermgrdir))
    servicef.write("ExecStart=/bin/bash start_cluster_mgr.sh\n")
    servicef.write("ExecStop=/bin/bash stop_cluster_mgr.sh\n")
    servicef.close()
    addNodeToFilesListMap(filesmap, node, fname, './%s' % fname_to)
    addToCommandsList(commandslist, node['ip'], '.', "sudo cp -f %s /usr/lib/systemd/system/" % fname_to)
    addToCommandsList(commandslist, node['ip'], '.', "sudo systemctl enable %s" % servname)

def generate_cdc_service(config, machines, commandslist, node, idx, filesmap):
    mach = machines.get(node['ip'])
    cdcdir = "kunlun-cdc-%s" % config['product_version']
    fname = "%d-kunlun-cdc-%d.service" % (idx, node['raft_port'])
    servname = "kunlun-cdc-%d" % node['raft_port']
    fname_to = "kunlun-cdc-%d.service" % node['raft_port']
    servicef = open('clustermgr/%s' % fname, 'w')
    servicef.write("# kunlun-cdc-%d systemd service file\n\n" % node['raft_port'])
    servicef.write("[Unit]\n")
    servicef.write("Description=kunlun-cdc-%d\n" % node['raft_port'])
    servicef.write("After=network.target\n\n")
    servicef.write("[Install]\n")
    servicef.write("WantedBy=multi-user.target\n\n")
    servicef.write("[Service]\n")
    servicef.write("Type=forking\n")
    #servicef.write("PIDFile=%s/%s/bin/cdc.pid\n" % (mach['basedir'], cdcdir))
    servicef.write("User=%s\n" % mach['user'])
    servicef.write("Restart=on-failure\n")
    servicef.write("WorkingDirectory=%s/%s/bin\n" % (mach['basedir'], cdcdir))
    servicef.write("ExecStart=/bin/bash start_kunlun_cdc.sh\n")
    servicef.write("ExecStop=/bin/bash stop_kunlun_cdc.sh\n")
    servicef.close()
    addNodeToFilesListMap(filesmap, node, fname, './%s' % fname_to)
    addToCommandsList(commandslist, node['ip'], '.', "sudo cp -f %s /usr/lib/systemd/system/" % fname_to)
    addToCommandsList(commandslist, node['ip'], '.', "sudo systemctl enable %s" % servname)

def generate_nodemgr_startstop(config, machines, node, idx, filesmap):
    mach = machines.get(node['ip'])
    nodemgrdir = "kunlun-node-manager-%s" % config['product_version']
    envfname = 'env.sh.node'
    # start wrapper
    startname = '%d-start-nodemgr-%d.sh' % (idx, node['brpc_http_port'])
    startname_to = 'start-nodemgr-%d.sh' % node['brpc_http_port']
    startf = open('clustermgr/%s' % startname, 'w')
    startf.write("#! /bin/bash\n")
    startf.write("cd %s || exit 1\n" % mach['basedir'])
    startf.write("test -f %s && . ./%s\n" % (envfname, envfname))
    startf.write("cd %s/bin || exit 1\n" % nodemgrdir)
    if node['valgrind']:
        startf.write("export USE_VALGRIND=1\n")
    startf.write("bash start_node_mgr.sh\n")
    startf.close()
    addNodeToFilesListMap(filesmap, node, startname, './%s' % startname_to)
    addNodeToFilesListMap(filesmap, node, startname, './start-nodemgr.sh')
    # stop wrapper, actually may not be necessary.
    stopname = '%d-stop-nodemgr-%d.sh' % (idx, node['brpc_http_port'])
    stopname_to = 'stop-nodemgr-%d.sh' % node['brpc_http_port']
    stopf = open('clustermgr/%s' % stopname, 'w')
    stopf.write("#! /bin/bash\n")
    stopf.write("cd %s || exit 1\n" % mach['basedir'])
    stopf.write("test -f %s && . ./%s\n" % (envfname, envfname))
    stopf.write("cd %s/bin || exit 1\n" % nodemgrdir)
    stopf.write("bash stop_node_mgr.sh\n")
    stopf.close()
    addNodeToFilesListMap(filesmap, node, stopname, './%s' % stopname_to)
    addNodeToFilesListMap(filesmap, node, stopname, './stop-nodemgr.sh')

def generate_nodemgr_service(config, machines, commandslist, node, idx, filesmap):
    mach = machines.get(node['ip'])
    nodemgrdir = "kunlun-node-manager-%s" % config['product_version']
    fname = "%d-kunlun-node-manager-%d.service" % (idx, node['brpc_http_port'])
    servname = "kunlun-node-manager-%d" % node['brpc_http_port']
    fname_to = "kunlun-node-manager-%d.service" % node['brpc_http_port']
    servicef = open('clustermgr/%s' % fname, 'w')
    servicef.write("# kunlun-node-manager-%d systemd service file\n\n" % node['brpc_http_port'])
    servicef.write("[Unit]\n")
    servicef.write("Description=kunlun-node-manager-%d\n" % node['brpc_http_port'])
    servicef.write("After=network.target\n\n")
    servicef.write("[Install]\n")
    servicef.write("WantedBy=multi-user.target\n\n")
    servicef.write("[Service]\n")
    servicef.write("Type=forking\n")
    servicef.write("PIDFile=%s/%s/bin/nodemgr.pid\n" % (mach['basedir'], nodemgrdir))
    servicef.write("User=%s\n" % mach['user'])
    servicef.write("Restart=on-failure\n")
    servicef.write("WorkingDirectory=%s\n" % mach['basedir'])
    servicef.write("ExecStart=/bin/bash start-nodemgr-%d.sh\n" % node['brpc_http_port'])
    servicef.write("ExecStop=/bin/bash stop-nodemgr-%d.sh\n" % node['brpc_http_port'])
    servicef.close()
    addNodeToFilesListMap(filesmap, node, fname, './%s' % fname_to)
    addToCommandsList(commandslist, node['ip'], '.', "sudo cp -f %s /usr/lib/systemd/system/" % fname_to)
    addToCommandsList(commandslist, node['ip'], '.', "sudo systemctl enable %s" % servname)

def generate_hdfs_coresite_xml(host, port):
    url = "hdfs://%s:%d" % (host, port)
    coref = open('clustermgr/core-site.xml', 'w')
    coref.write('<?xml version="1.0" encoding="UTF-8"?>\n')
    coref.write('<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>\n')
    coref.write('<configuration>\n')
    coref.write('<property>\n')
    coref.write('<name>fs.defaultFS</name>\n')
    coref.write('<value>%s</value>\n' % url)
    coref.write('</property>\n')
    coref.write('</configuration>\n')
    coref.close()

def generate_node_env(comf, config, machines, ip, idx):
    mach = machines.get(ip)
    nodemgrdir = "kunlun-node-manager-%s" % config['product_version']
    filemap = get_3rdpackages_filemap(config)
    jdkdir = filemap['jdk'][1]
    hadoopdir = filemap['hadoop'][1]
    haproxydir = filemap['haproxy'][1]
    prometheusdir = filemap['prometheus'][1]
    filebeatdir = filemap['filebeat'][1]
    es_file = filemap['elasticsearch'][0]
    kibana_file = filemap['kibana'][0]
    mysqldriver_file = filemap['mysql-driver'][0]
    mysqldriver_dir = filemap['mysql-driver'][1]
    fname = '%d-env.sh.node' % idx
    fname_to = 'env.sh.node'
    envf = open('clustermgr/%s' % fname, 'w')
    #envf.write("#! /bin/bash\n")
    envf.write("export JDK_DIR=%s; #KUNLUN_SET_ENV\n" % jdkdir)
    envf.write("export HADOOP_DIR=%s; #KUNLUN_SET_ENV\n" % hadoopdir)
    envf.write("export HAPROXY_DIR=%s; #KUNLUN_SET_ENV\n" % haproxydir)
    envf.write("export FILEBEAT_DIR=%s; #KUNLUN_SET_ENV\n" % filebeatdir)
    envf.write("export PROMETHEUS_DIR=%s; #KUNLUN_SET_ENV\n" % prometheusdir)
    envf.write("export MYSQLDRIVER_DIR=%s; #KUNLUN_SET_ENV\n" % mysqldriver_dir)
    envf.write("export ES_FILE=%s; #KUNLUN_SET_ENV\n" % es_file)
    envf.write("export KIBANA_FILE=%s; #KUNLUN_SET_ENV\n" % kibana_file)
    envf.write("export MYSQLDRIVER_FILE=%s; #KUNLUN_SET_ENV\n" % mysqldriver_file)
    envf.write("export KUNLUN_VERSION=%s; #KUNLUN_SET_ENV\n" % config['product_version'])
    envf.write("JAVA_HOME=%s/program_binaries/%s; #KUNLUN_SET_ENV\n" % (mach['basedir'], jdkdir))
    envf.write("PATH=%s/%s/bin/util:$PATH; #KUNLUN_SET_ENV\n" % (mach['basedir'], nodemgrdir))
    envf.write("PATH=$JAVA_HOME/bin:$PATH; #KUNLUN_SET_ENV\n")
    envf.write("HADOOP_HOME=%s/program_binaries/%s; #KUNLUN_SET_ENV\n" % (mach['basedir'], hadoopdir))
    envf.write("PATH=$HADOOP_HOME/bin:$PATH; #KUNLUN_SET_ENV\n")
    envf.write("FILEBEAT_HOME=%s/program_binaries/%s; #KUNLUN_SET_ENV\n" % (mach['basedir'], filebeatdir))
    envf.write("PATH=$FILEBEAT_HOME:$PATH; #KUNLUN_SET_ENV\n")
    envf.write("export JAVA_HOME; #KUNLUN_SET_ENV\n")
    envf.write("export HADOOP_HOME; #KUNLUN_SET_ENV\n")
    envf.write("export FILEBEAT_HOME; #KUNLUN_SET_ENV\n")
    envf.write("export PATH; #KUNLUN_SET_ENV\n")
    if config['pyexec'] != "none":
        envf.write("export KUNLUN_PYTHON=%s; #KUNLUN_SET_ENV\n" % config['pyexec'])
    else:
        envf.write("test -z \"$KUNLUN_PYTHON\" && python --version>/dev/null 2>/dev/null && KUNLUN_PYTHON=python; #KUNLUN_SET_ENV\n")
        envf.write("test -z \"$KUNLUN_PYTHON\" && python2 --version>/dev/null 2>/dev/null && KUNLUN_PYTHON=python2; #KUNLUN_SET_ENV\n")
        envf.write("test -z \"$KUNLUN_PYTHON\" && python3 --version>/dev/null 2>/dev/null && KUNLUN_PYTHON=python3; #KUNLUN_SET_ENV\n")
        envf.write("export KUNLUN_PYTHON; #KUNLUN_SET_ENV\n")
    envf.close()
    process_file(comf, config, machines, ip, 'clustermgr/%s' % fname, '%s/%s' % (mach['basedir'],fname_to))

def setup_meta_env(node, machines, dirmap, commandslist, config):
    storagedir = "kunlun-storage-%s" % config['product_version']
    serverdir = "kunlun-server-%s" % config['product_version']
    mach = machines.get(node['ip'])
    # Set up the files
    addToDirMap(dirmap, node['ip'], "%s/%s" % (mach['basedir'], node['program_dir']))
    addToCommandsList(commandslist, node['ip'], '.', 'cp -fr %s/program_binaries/%s %s' %  (mach['basedir'], storagedir, node['program_dir']))
    addToCommandsList(commandslist, node['ip'], '.', 'cp -fr %s/program_binaries/%s %s' %  (mach['basedir'], serverdir, node['program_dir']))

def setup_storage_env(node, machines, dirmap, commandslist, config):
    storagedir = "kunlun-storage-%s" % config['product_version']
    mach = machines.get(node['ip'])
    # Set up the files
    addToDirMap(dirmap, node['ip'], "%s/%s" % (mach['basedir'], node['program_dir']))
    addToCommandsList(commandslist, node['ip'], '.', 'cp -fr %s/program_binaries/%s %s' %  (mach['basedir'], storagedir, node['program_dir']))

def setup_server_env(node, machines, dirmap, commandslist, config):
    serverdir = "kunlun-server-%s" % config['product_version']
    mach = machines.get(node['ip'])
    # Set up the files
    addToDirMap(dirmap, node['ip'], "%s/%s" % (mach['basedir'], node['program_dir']))
    addToCommandsList(commandslist, node['ip'], '.', 'cp -fr %s/program_binaries/%s %s' %  (mach['basedir'], serverdir, node['program_dir']))

def install_nodemgr_env(comf, mach, machines, config):
    progname = "kunlun-node-manager-%s" % config['product_version']
    ip = mach['ip']
    # Set up the files
    if not config['cloud']:
        process_file(comf, config, machines, ip, 'clustermgr/%s.tgz' % progname, mach['basedir'])
    process_command_noenv(comf, config, machines, ip, mach['basedir'], 'tar -xzf %s.tgz' % progname)

def setup_nodemgr_commands(config, idx, machines, node, commandslist, dirmap, filesmap, metaseeds, hasHDFS, configmap, withRocksdb=True):
    cmdpat = "bash change_config.sh %s \"%s\" \"%s\"\n"
    nodemgrdir = "kunlun-node-manager-%s" % config['product_version']
    storagedir = "kunlun-storage-%s" % config['product_version']
    serverdir = "kunlun-server-%s" % config['product_version']
    proxysqldir = "kunlun-proxysql-%s" % config['product_version']
    haproxypkg="haproxy-2.5.0-bin.tar.gz"
    confpath = "%s/conf/node_mgr.cnf" % nodemgrdir
    ismariadb = config['mariadb']

    mach = machines.get(node['ip'])
    if hasHDFS:
        addNodeToFilesListMap(filesmap, node, "core-site.xml", '.')
    targetdir = "program_binaries"
    if not config['cloud']:
        addNodeToFilesListMap(filesmap, node, "%s.tgz" % storagedir, targetdir)
        addNodeToFilesListMap(filesmap, node, "%s.tgz" % serverdir, targetdir)
        addNodeToFilesListMap(filesmap, node, "%s.tgz" % proxysqldir, targetdir)
        addNodeToFilesListMap(filesmap, node, haproxypkg, ".")
        addToCommandsList(commandslist, node['ip'], targetdir, "tar -xzf %s.tgz" % storagedir)
        addToCommandsList(commandslist, node['ip'], targetdir, "tar -xzf %s.tgz" % serverdir)
        addToCommandsList(commandslist, node['ip'], targetdir, "tar -xzf %s.tgz" % proxysqldir)
        addToCommandsList(commandslist, node['ip'], ".", "tar -xzf %s" % haproxypkg)
        #addToCommandsList(commandslist, node['ip'], targetdir, "rm -f %s.tgz" % storagedir)
        #addToCommandsList(commandslist, node['ip'], targetdir, "rm -f %s.tgz" % serverdir)
        #addToCommandsList(commandslist, node['ip'], targetdir, "rm -f %s.tgz" % proxysqldir)
    comstr = "test -d etc && echo > etc/instances_list.txt 2>/dev/null; exit 0"
    addNodeToFilesListMap(filesmap, node, "../install/use_python3_ifneed.sh", targetdir)
    addToCommandsList(commandslist, node['ip'], "%s/%s" %(targetdir, storagedir), comstr)
    addToCommandsList(commandslist, node['ip'], "%s/%s" %(targetdir, serverdir), comstr)
    if ismariadb:
        addToCommandsList(commandslist, node['ip'], "%s/%s" %(targetdir, serverdir), "sed -i s/utf8mb4_0900_bin/utf8mb4_nopad_bin/g share/postgresql/information_schema.sql")
    addToCommandsList(commandslist, node['ip'], "%s/%s/lib" %(targetdir, storagedir), "bash %s/process_deps.sh " % mach['basedir'])
    addToCommandsList(commandslist, node['ip'], "%s/%s/lib" %(targetdir, serverdir), "bash %s/process_python3.sh" % mach['basedir'])
    addToCommandsList(commandslist, node['ip'], targetdir, "bash use_python3_ifneed.sh %s" % config['product_version'])
    addToCommandsList(commandslist, node['ip'], "%s/%s/lib" %(targetdir, serverdir), "bash %s/process_deps.sh postgresql" % mach['basedir'])
    addToCommandsList(commandslist, node['ip'], "%s/%s/lib" %(targetdir, proxysqldir), "bash %s/process_deps.sh" % mach['basedir'])
    #my_print("pg_with_san: %s" % str(mach['pg_with_san']))
    if mach['haspg'] and not mach['pg_with_san']:
        addNodeToFilesListMap(filesmap, node, "../install/build_driver_forpg.sh", '.')
        addToCommandsList(commandslist, node['ip'], ".", "cp -f %s/%s/resources/psycopg2-2.8.4.tar.gz ." %(targetdir, serverdir))
        addToCommandsList(commandslist, node['ip'], ".",  "bash %s/build_driver_forpg.sh %s 0" % (mach['basedir'], mach['basedir']), "computing")
    setup_mgr_common(config, commandslist, dirmap, filesmap, machines, node, targetdir, storagedir, serverdir)
    for item in ["server_datadirs", "storage_datadirs", "storage_logdirs", "storage_waldirs", "storage_keyringdir", "vec_tmpdir"]:
        nodedirs = node[item].strip()
        for d in nodedirs.split(","):
            addToDirMap(dirmap, node['ip'], d.strip())
    if not config['cloud']:
        fmap = get_3rdpackages_filemap(config)
        hadoop_file = fmap['hadoop'][0]
        jdk_file = fmap['jdk'][0]
        filebeat_file = fmap['filebeat'][0]
        addNodeToFilesListMap(filesmap, node, hadoop_file, targetdir)
        addNodeToFilesListMap(filesmap, node, jdk_file, targetdir)
        addNodeToFilesListMap(filesmap, node, filebeat_file, targetdir)
        addToCommandsList(commandslist, node['ip'], targetdir, "tar -xzf %s" % hadoop_file)
        addToCommandsList(commandslist, node['ip'], targetdir, "tar -xzf %s" % jdk_file)
        addToCommandsList(commandslist, node['ip'], targetdir, "tar -xzf %s" % filebeat_file)
    if hasHDFS:
        addToCommandsList(commandslist, node['ip'], '.', "cp -f ./core-site.xml program_binaries/\\${HADOOP_DIR}/etc/hadoop")
    addToCommandsList(commandslist, node['ip'], nodemgrdir, "chmod a+x bin/util/*")
    addToCommandsList(commandslist, node['ip'], '.', 'cp -f env.sh.node %s/bin/extra.env' % nodemgrdir)
    addToCommandsList(commandslist, node['ip'], '.', 'cp -f env.sh.node program_binaries/%s/dba_tools/extra.env' % storagedir)
    if config['setbashenv']:
        addToCommandsList(commandslist, node['ip'], '.', 'test -f env.sh.node && cat env.sh.node >> ~/.bashrc')
    script_name = "setup_nodemgr_%d.sh" % idx
    scriptf = open('clustermgr/%s' % script_name, 'w')
    scriptf.write("#! /bin/bash\n")
    for confkey in configmap:
        scriptf.write(cmdpat % (confpath, confkey, str(configmap[confkey])))
    scriptf.write(cmdpat % (confpath, 'meta_group_seeds', metaseeds))
    scriptf.write(cmdpat % (confpath, 'brpc_http_port', node['brpc_http_port']))
    scriptf.write(cmdpat % (confpath, 'nodemgr_tcp_port', node['tcp_port']))
    scriptf.write(cmdpat % (confpath, 'local_ip', node['ip']))
    scriptf.write(cmdpat % (confpath, 'program_binaries_path', '%s/program_binaries' % mach['basedir']))
    scriptf.write(cmdpat % (confpath, 'instance_binaries_path', '%s/instance_binaries' % mach['basedir']))
    scriptf.write(cmdpat % (confpath, 'prometheus_path', '%s/program_binaries/${PROMETHEUS_DIR}' % mach['basedir']))
    scriptf.write(cmdpat % (confpath, 'storage_prog_package_name', storagedir))
    scriptf.write(cmdpat % (confpath, 'computer_prog_package_name', serverdir))
    if check_version_to_minor(config['product_version'], 1, 1):
        scriptf.write(cmdpat % (confpath, 'proxysql_prog_package_name', proxysqldir))
    if 'prometheus_port_start' in node:
        scriptf.write(cmdpat % (confpath, 'prometheus_port_start', node['prometheus_port_start']))
    if node['verbose_log']:
        for cname in ['template.cnf', 'template-small.cnf', 'template-meta.cnf', 'template-meta-small.cnf', 'template-rbr.cnf', 'template-rbr-small.cnf']:
            cpath = "program_binaries/%s/dba_tools/%s" % (storagedir, cname)
            scriptf.write(cmdpat % (cpath, 'general_log', 'ON'))
        cpath = "program_binaries/%s/resources/postgresql.conf" % serverdir
        scriptf.write(cmdpat % (cpath, 'enable_sql_log', 'true'))
    if not withRocksdb:
        cmdpat = "sed -i /rocksdb/d '%s' \n"
        for cname in ['template.cnf', 'template-small.cnf', 'template-meta.cnf', 'template-meta-small.cnf', 'template-rbr.cnf', 'template-rbr-small.cnf']:
            cpath = "program_binaries/%s/dba_tools/%s" % (storagedir, cname)
            scriptf.write(cmdpat % cpath)
    scriptf.close()
    addNodeToFilesListMap(filesmap, node, script_name, '.')
    addNodeToFilesListMap(filesmap, node, 'clear_instances.sh', '.')
    addNodeToFilesListMap(filesmap, node, 'clear_instance.sh', '.')
    addToCommandsList(commandslist, node['ip'], '.', "bash ./%s" % script_name)
    if config['send_license']:
        addToCommandsList(commandslist, node['ip'], '.', "cp -f %s %s/conf" % (config['license_file'], nodemgrdir))
        addToCommandsList(commandslist, node['ip'], '.', "cp -f %s program_binaries/%s" % (config['license_file'], storagedir))
        addToCommandsList(commandslist, node['ip'], '.', "cp -f %s program_binaries/%s" % (config['license_file'], serverdir))

def install_clustermgr_env(comf, mach, machines, config):
    progname = "kunlun-cluster-manager-%s" % config['product_version']
    ip = mach['ip']
    # Set up the files
    if not config['cloud']:
        process_file(comf, config, machines, ip, 'clustermgr/%s.tgz' % progname, mach['basedir'])
    process_command_noenv(comf, config, machines, ip, mach['basedir'], 'tar -xzf %s.tgz' % progname)

def setup_clustermgr_commands(config, idx, machines, node, commandslist, dirmap, filesmap, metaseeds, initmember, initcommon, configmap):
    cmdpat = "bash change_config.sh %s \"%s\" \"%s\"\n"
    clustermgrdir = "kunlun-cluster-manager-%s" % config['product_version']
    storagedir = "kunlun-storage-%s" % config['product_version']
    serverdir = "kunlun-server-%s" % config['product_version']
    confpath = "%s/conf/cluster_mgr.cnf" % clustermgrdir
    mach = machines.get(node['ip'])
    targetdir = "program_binaries"
    if initcommon:
        setup_mgr_common(config, commandslist, dirmap, filesmap, machines, node, targetdir, storagedir, serverdir)
    script_name = "setup_clustermgr_%d.sh" % idx
    scriptf = open('clustermgr/%s' % script_name, 'w')
    scriptf.write("#! /bin/bash\n")
    for confkey in configmap:
        scriptf.write(cmdpat % (confpath, confkey, str(configmap[confkey])))
    scriptf.write(cmdpat % (confpath, 'meta_group_seeds', metaseeds))
    scriptf.write(cmdpat % (confpath, 'brpc_raft_port', node['brpc_raft_port']))
    scriptf.write(cmdpat % (confpath, 'brpc_http_port', node['brpc_http_port']))
    scriptf.write(cmdpat % (confpath, 'local_ip', node['ip']))
    scriptf.write(cmdpat % (confpath, 'raft_group_member_init_config', initmember))
    scriptf.write(cmdpat % (confpath, 'prometheus_path', '%s/program_binaries/${PROMETHEUS_DIR}' % mach['basedir']))
    if 'prometheus_port_start' in node:
        scriptf.write(cmdpat % (confpath, 'prometheus_port_start', node['prometheus_port_start']))
    if 'brpc_raft_election_timeout_ms' in node:
        scriptf.write(cmdpat % (confpath, 'brpc_raft_election_timeout_ms', node['brpc_raft_election_timeout_ms']))
    scriptf.close()
    addNodeToFilesListMap(filesmap, node, script_name, '.')
    addToCommandsList(commandslist, node['ip'], '.', "bash ./%s" % script_name)
    if config['send_license']:
        addToCommandsList(commandslist, node['ip'], '.', "cp -f %s %s/conf" % (config['license_file'], clustermgrdir))

def generate_ssl_certs(config, node, commandslist, dirmap, filesmap, getbackmap):
    clustermgrdir = "kunlun-cluster-manager-%s" % config['product_version']
    cmdpat = "./cluster_mgr license_gen ../conf/license.lic 0 ../cert_dir"
    targetdir = "%s/bin" % clustermgrdir
    addToCommandsList(commandslist, node['ip'], targetdir, "mkdir -p ../cert_dir")
    addToCommandsList(commandslist, node['ip'], targetdir, cmdpat)
    addIpToGetBackListMap(getbackmap, node['ip'], '%s/cert_dir/ca.pem' % clustermgrdir, '.')
    addIpToGetBackListMap(getbackmap, node['ip'], '%s/cert_dir/server-cert.pem' % clustermgrdir, '.')
    addIpToGetBackListMap(getbackmap, node['ip'], '%s/cert_dir/server-key.pem' % clustermgrdir, '.')

def install_clustermgr(args):
    jscfg = get_json_from_file(args.config)
    init_global_config2(jscfg, args)
    if jscfg['config']['multipledc']:
        install_clustermgr_multidc(jscfg)
    else:
        install_clustermgr_onedc(jscfg)

def install_clustermgr_onedc(jscfg):
    install_clustermgr_int(jscfg, setup_machines2, validate_and_set_config2)

def install_clustermgr_multidc(jscfg):
    install_clustermgr_int(jscfg, setup_machines3, validate_and_set_config3)

def install_clustermgr_int(jscfg, machine_func, validate_func):
    machines = {}
    machine_func(jscfg, machines)
    validate_func(jscfg, machines)
    comf = open(r'clustermgr/install.sh', 'w')
    comf.write('#! /bin/bash\n')
    comf.write("cat /dev/null > runlog\n")
    comf.write("cat /dev/null > lastlog\n")
    if jscfg['config']['verbose']:
        comf.write("trap 'cat lastlog' DEBUG\n")
        comf.write("trap 'exit 1' ERR\n")
    else:
        comf.write("trap 'cat lastlog >> runlog' DEBUG\n")
        comf.write("trap 'cat lastlog; exit 1' ERR\n")
    install_with_config(jscfg, comf, machines)
    output_info(comf, "Installation completed !")
    comf.close()

def stop_clustermgr(args):
    jscfg = get_json_from_file(args.config)
    init_global_config2(jscfg, args)
    if jscfg['config']['multipledc']:
        stop_clustermgr_multidc(jscfg)
    else:
        stop_clustermgr_onedc(jscfg)

def stop_clustermgr_onedc(jscfg):
    stop_clustermgr_int(jscfg, setup_machines2, validate_and_set_config2)

def stop_clustermgr_multidc(jscfg):
    stop_clustermgr_int(jscfg, setup_machines3, validate_and_set_config3)

def stop_clustermgr_int(jscfg, machine_func, validate_func):
    machines = {}
    machine_func(jscfg, machines)
    validate_func(jscfg, machines)
    comf = open(r'clustermgr/stop.sh', 'w')
    comf.write('#! /bin/bash\n')
    comf.write("cat /dev/null > runlog\n")
    comf.write("cat /dev/null > lastlog\n")
    if jscfg['config']['verbose']:
        comf.write("trap 'cat lastlog' DEBUG\n")
    else:
        comf.write("trap 'cat lastlog >> runlog' DEBUG\n")
    stop_with_config(jscfg, comf, machines)
    output_info(comf, "Stop action completed !")
    comf.close()

def start_clustermgr(args):
    jscfg = get_json_from_file(args.config)
    init_global_config2(jscfg, args)
    if jscfg['config']['multipledc']:
        start_clustermgr_multidc(jscfg)
    else:
        start_clustermgr_onedc(jscfg)

def start_clustermgr_onedc(jscfg):
    start_clustermgr_int(jscfg, setup_machines2, validate_and_set_config2)

def start_clustermgr_multidc(jscfg):
    start_clustermgr_int(jscfg, setup_machines3, validate_and_set_config3)

def start_clustermgr_int(jscfg, machine_func, validate_func):
    machines = {}
    machine_func(jscfg, machines)
    validate_func(jscfg, machines)
    comf = open(r'clustermgr/start.sh', 'w')
    comf.write('#! /bin/bash\n')
    comf.write("cat /dev/null > runlog\n")
    comf.write("cat /dev/null > lastlog\n")
    if jscfg['config']['verbose']:
        comf.write("trap 'cat lastlog' DEBUG\n")
        comf.write("trap 'exit 1' ERR\n")
    else:
        comf.write("trap 'cat lastlog >> runlog' DEBUG\n")
        comf.write("trap 'cat lastlog; exit 1' ERR\n")
    start_with_config(jscfg, comf, machines)
    output_info(comf, "Start action completed !")
    comf.close()

def clean_clustermgr(args):
    jscfg = get_json_from_file(args.config)
    init_global_config2(jscfg, args)
    if jscfg['config']['multipledc']:
        clean_clustermgr_multidc(jscfg)
    else:
        clean_clustermgr_onedc(jscfg)

def clean_clustermgr_onedc(jscfg):
    clean_clustermgr_int(jscfg, setup_machines2, validate_and_set_config2)

def clean_clustermgr_multidc(jscfg):
    clean_clustermgr_int(jscfg, setup_machines3, validate_and_set_config3)

def clean_clustermgr_int(jscfg, machine_func, validate_func):
    machines = {}
    machine_func(jscfg, machines)
    validate_func(jscfg, machines)
    comf = open(r'clustermgr/clean.sh', 'w')
    comf.write('#! /bin/bash\n')
    comf.write("cat /dev/null > runlog\n")
    comf.write("cat /dev/null > lastlog\n")
    if jscfg['config']['verbose']:
        comf.write("trap 'cat lastlog' DEBUG\n")
    else:
        comf.write("trap 'cat lastlog >> runlog' DEBUG\n")
    clean_with_config(jscfg, comf, machines)
    output_info(comf, "Clean action completed !")
    comf.close()

def upgrade_clustermgr(args):
    jscfg = get_json_from_file(args.config)
    init_global_config2(jscfg, args)
    if jscfg['config']['multipledc']:
        upgrade_clustermgr_multidc(jscfg)
    else:
        upgrade_clustermgr_onedc(jscfg)

def upgrade_clustermgr_onedc(jscfg):
    upgrade_clustermgr_int(jscfg, setup_machines2, validate_and_set_config2)

def upgrade_clustermgr_multidc(jscfg):
    upgrade_clustermgr_int(jscfg, setup_machines3, validate_and_set_config3)

def upgrade_clustermgr_int(jscfg, machine_func, validate_func):
    machines = {}
    machine_func(jscfg, machines)
    validate_func(jscfg, machines)
    comf = open(r'clustermgr/upgrade.sh', 'w')
    comf.write('#! /bin/bash\n')
    comf.write("cat /dev/null > runlog\n")
    comf.write("cat /dev/null > lastlog\n")
    if jscfg['config']['verbose']:
        comf.write("trap 'cat lastlog' DEBUG\n")
    else:
        comf.write("trap 'cat lastlog >> runlog' DEBUG\n")
    upgrade_with_config(jscfg, comf, machines)
    output_info(comf, "Upgrade action completed !")
    comf.close()

def service_clustermgr(args):
    jscfg = get_json_from_file(args.config)
    init_global_config2(jscfg, args)
    if jscfg['config']['multipledc']:
        service_clustermgr_multidc(jscfg)
    else:
        service_clustermgr_onedc(jscfg)

def service_clustermgr_onedc(jscfg):
    service_clustermgr_int(jscfg, setup_machines2, validate_and_set_config2)

def service_clustermgr_multidc(jscfg):
    service_clustermgr_int(jscfg, setup_machines3, validate_and_set_config3)

def service_clustermgr_int(jscfg, machine_func, validate_func):
    machines = {}
    machine_func(jscfg, machines)
    validate_func(jscfg, machines)
    comf = open(r'clustermgr/service.sh', 'w')
    comf.write('#! /bin/bash\n')
    comf.write("cat /dev/null > runlog\n")
    comf.write("cat /dev/null > lastlog\n")
    if jscfg['config']['verbose']:
        comf.write("trap 'cat lastlog' DEBUG\n")
    else:
        comf.write("trap 'cat lastlog >> runlog' DEBUG\n")
    service_with_config(jscfg, comf, machines)
    output_info(comf, "Service action completed !")
    comf.close()


def get_machine_info(args):
    jscfg = get_json_from_file(args.config)
    init_global_config2(jscfg, args)
    if jscfg['config']['multipledc']:
        get_machine_info_multidc(jscfg)
    else:
        get_machine_info_onedc(jscfg)

def get_machine_info_onedc(jscfg):
    get_machine_info_int(jscfg, setup_machines2, validate_and_set_config2)

def get_machine_info_multidc(jscfg):
   get_machine_info_int(jscfg, setup_machines3, validate_and_set_config3)

def get_machine_info_int(jscfg, machine_func, validate_func):
    machines = {}
    machine_func(jscfg, machines)
    validate_func(jscfg, machines)
    scriptname = r'clustermgr/get_machine_info.sh'
    comf = open(scriptname, 'w')
    comf.write('#! /bin/bash\n')
    comf.write("cat /dev/null > runlog\n")
    comf.write("cat /dev/null > lastlog\n")
    if jscfg['config']['verbose']:
        comf.write("trap 'cat lastlog' DEBUG\n")
    else:
        comf.write("trap 'cat lastlog >> runlog' DEBUG\n")
    ips = get_node_info_with_config(jscfg, comf, machines)
    comf.close()
    get_machine_info_with_config(jscfg, scriptname, ips)

def get_cluster_info(args):
    if args.config != "":
        jscfg = get_json_from_file(args.config)
        init_global_config2(jscfg, args)
        machines = {}
        if jscfg['config']['multipledc']:
            setup_machines3(jscfg, machines)
            validate_and_set_config3(jscfg, machines)
        else:
            setup_machines2(jscfg, machines)
            validate_and_set_config2(jscfg, machines)
        metaseeds = jscfg['meta']['group_seeds']
    elif args.metaseeds != "":
        metaseeds = args.metaseeds
    else:
        raise ValueError('Error: either --config or --metaseeds must be specified')
    get_cluster_info_with_seeds(metaseeds)

def setup_mgr_common(config, commandslist, dirmap, filesmap, machines, node, targetdir, storagedir, serverdir):
    mach = machines.get(node['ip'])
    addToDirMap(dirmap, node['ip'], "%s/%s" % (mach['basedir'], targetdir))
    addToDirMap(dirmap, node['ip'], "%s/%s/util" % (mach['basedir'], targetdir))
    addToDirMap(dirmap, node['ip'], "%s/instance_binaries" % mach['basedir'])
    addToDirMap(dirmap, node['ip'], "%s/instance_binaries/storage" % mach['basedir'])
    addToDirMap(dirmap, node['ip'], "%s/instance_binaries/computer" % mach['basedir'])

    fmap = get_3rdpackages_filemap(config)
    prome_file = fmap['prometheus'][0]
    prome_dir = fmap['prometheus'][1]
    if not config['cloud']:
        addNodeToFilesListMap(filesmap, node, prome_file, targetdir)
        addToCommandsList(commandslist, node['ip'], targetdir, "tar -xzf %s" % prome_file)
        #addToCommandsList(commandslist, node['ip'], targetdir, "rm -f %s.tgz" % storagedir)
        #addToCommandsList(commandslist, node['ip'], targetdir, "tar -czf %s.tgz %s" % (storagedir, storagedir))
        #addToCommandsList(commandslist, node['ip'], targetdir, "rm -f %s.tgz" % serverdir)
        #addToCommandsList(commandslist, node['ip'], targetdir, "tar -czf %s.tgz %s" % (serverdir, serverdir))
    if 'prometheus_datadir' in node:
        prome_datadir = node['prometheus_datadir']
        if not prome_datadir.startswith('/'):
            raise ValueError('Error: the prometheus datadir %s must be absolute path!' % prome_datadir)
        addToDirMap(dirmap, node['ip'], prome_datadir)
        addToCommandsList(commandslist, node['ip'], "%s/%s" % (targetdir, prome_dir) , "mv -f data data.old")
        addToCommandsList(commandslist, node['ip'], "%s/%s" % (targetdir, prome_dir) , "ln -s %s data" % prome_datadir)
        addToCommandsList(commandslist, node['ip'], "%s/%s" % (targetdir, prome_dir) , "mv -f data.old/* data")
    if config['send_license']:
        addNodeToFilesListMap(filesmap, node, config['license_file'], ".")

def setup_basedir(comf, config, machines, ip, usesudo):
    mach = machines.get(ip)
    if usesudo:
        process_command_noenv(comf, config, machines, ip, '/',
            'sudo mkdir -p %s && sudo chown -R %s:\`id -gn %s\` %s' % (mach['basedir'],
            mach['user'], mach['user'], mach['basedir']))
    else:
        process_command_noenv(comf, config, machines, ip, '/', 'mkdir -p %s' % mach['basedir'])

def setup_cdc(jscfg, machines, comf):
    if 'cdc' not in jscfg:
        return
    cdc = jscfg['cdc']
    config = jscfg['config']
    autostart = config['autostart']
    usesudo = config['sudo']
    cdcdir = "kunlun-cdc-%s" % config['product_version']
    idx = 0
    for node in cdc['nodes']:
        dirmap = {}
        filesmap = {}
        commandslist = []
        mach = machines.get(node['ip'])
        output_info(comf, "Setting up cdc on %s ..." % node['ip'])
        process_command_noenv(comf, config, machines, node['ip'], '/',
                'sudo mkdir -p %s && sudo chown -R %s:\`id -gn %s\` %s' % (mach['basedir'],
                mach['user'], mach['user'], mach['basedir']))
        addNodeToFilesListMap(filesmap, node, '%s.tgz' % cdcdir, ".")
        addNodeToFilesListMap(filesmap, node, '../install/change_config_inplace.sh', ".")
        addToCommandsList(commandslist, node['ip'], mach['basedir'], "tar -xzf %s.tgz" % cdcdir)
        script_name = "setup_cdc_%d.sh" % idx
        scriptf = open('clustermgr/%s' % script_name, 'w')
        scriptf.write("#! /bin/bash\n")
        cmdpat = "bash change_config_inplace.sh %s \"%s\" \"%s\"\n"
        confpath = "%s/conf/kunlun_cdc.cnf" % cdcdir
        configmap = get_cdc_config(node, cdc)
        configmap['local_ip'] = node['ip']
        configmap['http_port'] = node['http_port']
        configmap['ha_group_member'] = node['ha_group_member']
        configmap['server_id'] = node['server_id']
        for confkey in configmap:
            scriptf.write(cmdpat % (confpath, confkey, str(configmap[confkey])))
        scriptf.close()
        addNodeToFilesListMap(filesmap, node, script_name, "setup_cdc.sh")
        addToCommandsList(commandslist, node['ip'], mach['basedir'], "bash setup_cdc.sh")
        if autostart:
            generate_cdc_service(config, machines, commandslist, node, idx, filesmap)
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)
        idx += 1

def get_haproxy_ips(jscfg):
    haproxyips = set()
    if 'cluster' not in jscfg:
        return haproxyips
    for cluster in clusters:
        if 'haproxy' in cluster:
            haproxyips.add(cluster['haproxy']['ip'])
    return haproxyips

def get_xpanel_ips(jscfg):
    xpanelips = set()
    if 'xpanel' not in jscfg:
        return xpanelips
    for node in jscfg['xpanel']['nodes']:
        xpanelips.add(node['ip'])
    return xpanelips

def setup_onexpanel(jscfg, machines, comf, config, xpanel, node, image, imageFile):
    mach = machines.get(node['ip'])
    if xpanel['imageType'] == 'file':
        if not config['cloud']:
            process_command_noenv(comf, config, machines, node['ip'], '/',
                    'sudo mkdir -p %s && sudo chown -R %s:\`id -gn %s\` %s' % (mach['basedir'],
                    mach['user'], mach['user'], mach['basedir']))
            process_file(comf, config, machines, node['ip'], 'clustermgr/%s' % imageFile, mach['basedir'])
        cmdpat = "sudo docker inspect %s >& /dev/null || ( gzip -cd %s | sudo docker load )"
        mach = machines.get(node['ip'])
        process_command_noenv(comf, config, machines, node['ip'], mach['basedir'], 
                    cmdpat % (image, imageFile))

def setup_xpanel(jscfg, machines, comf):
    if 'xpanel' not in jscfg:
        return
    config = jscfg['config']
    xpanel = jscfg['xpanel']
    for node in xpanel['nodes']:
        output_info(comf, "Setting up xpanel on %s ..." % node['ip'])
        setup_onexpanel(jscfg, machines, comf, config, xpanel, node, xpanel['image'], xpanel['imageFile'])

def install_xpanel(jscfg, machines, dirmap, filesmap, commandslist, metaseeds, comf):
    if 'xpanel' not in jscfg:
        return
    config = jscfg['config']
    autostart = config['autostart']
    xpanel = jscfg['xpanel']
    restart = 'no'
    if autostart:
        restart = 'always'
    for node in xpanel['nodes']:
        mach = machines.get(node['ip'])
        output_info(comf, "Starting xpanel on %s ..." % node['ip'])
        cmdpat = "sudo docker run --privileged -v /etc/hosts:/etc/hosts -itd --restart={} --env METASEEDS=%s --env SAAS=%s --name %s -p %d:80 %s bash -c '/bin/bash /kunlun/start.sh'".format(restart)
        process_command_noenv(comf, config, machines, node['ip'], '/', cmdpat % (metaseeds, str(xpanel['saas']), node['name'], node['port'], xpanel['image']))

def stop_xpanel(jscfg, machines, dirmap, filesmap, commandslist, comf):
    if 'xpanel' not in jscfg:
        return
    config = jscfg['config']
    xpanel = jscfg['xpanel']
    for node in xpanel['nodes']:
        output_info(comf, "Stopping xpanel on %s ..." % node['ip'])
        cmdpat = "sudo docker container stop %s"
        process_command_noenv(comf, config, machines, node['ip'], '/', cmdpat % node['name'])

def start_xpanel(jscfg, machines, dirmap, filesmap, commandslist, comf):
    if 'xpanel' not in jscfg:
        return
    config = jscfg['config']
    xpanel = jscfg['xpanel']
    for node in xpanel['nodes']:
        output_info(comf, "Starting xpanel on %s ..." % node['ip'])
        cmdpat = "sudo docker container start %s"
        process_command_noenv(comf, config, machines, node['ip'], '/', cmdpat % node['name'])

def clean_onexpanel(jscfg, machines, comf, config, xpanel, node, image, imageFile):
    cmdpat = "sudo docker container rm -f %s"
    process_command_noenv(comf, config, machines, node['ip'], '/', cmdpat % node['name'])
    cmdpat = "sudo docker image rm -f %s"
    process_command_noenv(comf, config, machines, node['ip'], '/', cmdpat % image)
    if xpanel['imageType'] == 'file' and not config['cloud']:
        mach = machines.get(node['ip'])
        process_command_noenv(comf, config, machines, node['ip'], mach['basedir'], 'rm -f %s' % imageFile)

def clean_xpanel(jscfg, machines, comf):
    if 'xpanel' not in jscfg:
        return
    config = jscfg['config']
    xpanel = jscfg['xpanel']
    for node in xpanel['nodes']:
        output_info(comf, "Cleaning xpanel on %s ..." % node['ip'])
        clean_onexpanel(jscfg, machines, comf, config, xpanel, node, xpanel['image'], xpanel['imageFile'])

def setup_elasticsearch(jscfg, machines, comf):
    if 'elasticsearch' not in jscfg:
        return
    config = jscfg['config']
    node = jscfg['elasticsearch']
    mach = machines.get(node['ip'])
    fmap = get_3rdpackages_filemap(config)
    es_pack = fmap['elasticsearch'][0]
    k_pack = fmap['kibana'][0]
    if not config['cloud']:
        output_info(comf, "Setting up elasticsearch and kibana on %s ..." % node['ip'])
        process_command_noenv(comf, config, machines, node['ip'], '/',
                'sudo mkdir -p %s && sudo chown -R %s:\`id -gn %s\` %s' % (mach['basedir'],
            mach['user'], mach['user'], mach['basedir']))
        process_file(comf, config, machines, node['ip'], 'clustermgr/%s' % es_pack, mach['basedir'])
        process_file(comf, config, machines, node['ip'], 'clustermgr/%s' % k_pack, mach['basedir'])

def install_elasticsearch(jscfg, machines, metaseeds, comf):
    if 'elasticsearch' not in jscfg:
        return
    config = jscfg['config']
    autostart = config['autostart']
    node = jscfg['elasticsearch']
    mach = machines.get(node['ip'])
    fmap = get_3rdpackages_filemap(config)
    es_image = fmap['elasticsearch'][1]
    k_image = fmap['kibana'][1]
    es_port = node['port']
    k_port = node['kibana_port']
    output_info(comf, "install elasticsearch and kibana on %s ..." % node['ip'])
    cmdpat = "sudo docker inspect %s >& /dev/null || ( gzip -cd %s | sudo docker load )"
    process_command_setenv(comf, config, machines, node['ip'], mach['basedir'], cmdpat % (es_image, "\\${ES_FILE}"))
    process_command_setenv(comf, config, machines, node['ip'], mach['basedir'], cmdpat % (k_image, "\\${KIBANA_FILE}"))
    restart = 'no'
    if autostart:
        restart = 'always'
    es_opts = ""
    if 'datadir' in node:
        es_opts = " -e TAKE_FILE_OWNERSHIP=yes -v %s:/usr/share/elasticsearch/data " % node['datadir']
    cmdpat = "sudo docker run -itd --restart={} %s --name elasticsearch_%d  -p %d:9200 -e discovery.type=single-node %s".format(restart)
    process_command_noenv(comf, config, machines, node['ip'], mach['basedir'], cmdpat % (es_opts, es_port, es_port, es_image))
    k_opts = ""
    #currently, not gracefully way to set kibana datad directory.
    #if 'kibana_datadir' in node:
    #    k_opts = " -v %s:/usr/share/kibana/data " % node['kibana_datadir']
    cmdpat = "sudo docker run -itd --restart={} %s --name kibana_%d  -p %d:5601 -e ELASTICSEARCH_HOSTS=http://%s:%d %s".format(restart)
    process_command_noenv(comf, config, machines, node['ip'], mach['basedir'], cmdpat % (k_opts, k_port, k_port, node['ip'], es_port, k_image))
    es_host = "%s:%d" % (node['ip'], es_port)
    k_host = "%s:%d" % (node['ip'], k_port)
    for node in jscfg['node_manager']['nodes']:
        mach = machines.get(node['ip'])
        targetdir = "%s/program_binaries/\\${FILEBEAT_DIR}" % mach['basedir']
        cmdpat = "sed -i 's/localhost:9200/%s/g' filebeat.yml"
        process_command_setenv(comf, config, machines, node['ip'], targetdir, cmdpat % es_host)
        cmdpat = "sed -i '/localhost:5601/s/#host/host/g' filebeat.yml"
        process_command_setenv(comf, config, machines, node['ip'], targetdir, cmdpat)
        cmdpat = "sed -i 's/localhost:5601/%s/g' filebeat.yml"
        process_command_setenv(comf, config, machines, node['ip'], targetdir, cmdpat % k_host)

def start_elasticsearch(jscfg, machines, comf):
    if 'elasticsearch' not in jscfg:
        return
    config = jscfg['config']
    node = jscfg['elasticsearch']
    mach = machines.get(node['ip'])
    es_port = node['port']
    k_port = node['kibana_port']
    output_info(comf, "start elasticsearch and kibana on %s ..." % node['ip'])
    cmdpat = "sudo docker container start kibana_%d"
    process_command_noenv(comf, config, machines, node['ip'], mach['basedir'], cmdpat % k_port)
    cmdpat = "sudo docker container start elasticsearch_%d"
    process_command_noenv(comf, config, machines, node['ip'], mach['basedir'], cmdpat % es_port)

def stop_elasticsearch(jscfg, machines, comf):
    if 'elasticsearch' not in jscfg:
        return
    config = jscfg['config']
    node = jscfg['elasticsearch']
    mach = machines.get(node['ip'])
    es_port = node['port']
    k_port = node['kibana_port']
    output_info(comf, "stop elasticsearch and kibana on %s ..." % node['ip'])
    cmdpat = "sudo docker container stop kibana_%d"
    process_command_noenv(comf, config, machines, node['ip'], mach['basedir'], cmdpat % k_port)
    cmdpat = "sudo docker container stop elasticsearch_%d"
    process_command_noenv(comf, config, machines, node['ip'], mach['basedir'], cmdpat % es_port)

def clean_elasticsearch(jscfg, machines, metaseeds, comf):
    if 'elasticsearch' not in jscfg:
        return
    config = jscfg['config']
    node = jscfg['elasticsearch']
    mach = machines.get(node['ip'])
    fmap = get_3rdpackages_filemap(config)
    es_image = fmap['elasticsearch'][1]
    es_port = node['port']
    k_image = fmap['kibana'][1]
    k_port = node['kibana_port']
    output_info(comf, "clean elasticsearch and kibana on %s ..." % node['ip'])
    cmdpat = "sudo docker container rm -f kibana_%d"
    process_command_noenv(comf, config, machines, node['ip'], mach['basedir'], cmdpat % k_port)
    cmdpat = "sudo docker container rm -f elasticsearch_%d"
    process_command_noenv(comf, config, machines, node['ip'], mach['basedir'], cmdpat % es_port)
    cmdpat = 'sudo docker image rm -f %s'
    process_command_noenv(comf, config, machines, node['ip'], mach['basedir'], cmdpat % k_image)
    process_command_noenv(comf, config, machines, node['ip'], mach['basedir'], cmdpat % es_image)
    # Removing the image files will be added later.

def get_cluster_memo_asjson(cluster):
    comps = cluster['comp']['nodes']
    mobj = {
            "comps": str(len(comps)),
            "computer_passwd": comps[0]['password'],
            "computer_user": comps[0]['user'],
            "cpu_cores": str(cluster['storage_cpu_cores']),
            "dbcfg": str(cluster['dbcfg']),
            'fullsync_level': str(cluster['fullsync_level']),
            "ha_mode": cluster['ha_mode'],
            "innodb_size": str(cluster['innodb_buffer_pool_size_MB']),
            "max_connections": str(cluster['max_connections']),
            "max_storage_size": str(cluster['max_storage_size_GB']),
            "nodes": str(len(cluster['data'][0]['nodes'])),
            "shards": str(len(cluster['data'])),
            "conf_degrade_time": str(cluster['degrade_time'])
            }
    if cluster['enable_degrade']:
        mobj['conf_degrade_state'] = 'ON'
    else:
        mobj['conf_degrade_state'] = 'OFF'
    return mobj

def install_clusters(jscfg, machines, dirmap, filesmap, commandslist, reg_metaname, metaseeds, comf, metaobj):
    if 'clusters' not in jscfg or len(jscfg['clusters']) == 0:
        return
    config = jscfg['config']
    if config['multipledc']:
        my_print("skip cluster operation for multiple dc")
        return
    storagedir = "kunlun-storage-%s" % config['product_version']
    serverdir = "kunlun-server-%s" % config['product_version']
    clusters = jscfg['clusters']
    meta_hamode = jscfg['meta']['ha_mode']
    usesudo = config['sudo']
    iscantian = config['cantian']
    ismariadb = config['mariadb']
    
    if 'nodemapmaster' not in metaobj:
        metaobj['nodemapmaster'] = {'op':'add', "elements":[]}
    elements = metaobj['nodemapmaster']["elements"]
    memoeles = []
    metaobj['cluster_info'] = {'op':'add', 'elements': memoeles}
    vareles = metaobj['set_variables']['elements']

    i = 1
    for cluster in clusters:
        cluster_name = cluster['name']
        output_info(comf, "installing cluster %s ..." % cluster_name)
        for shard in cluster['data']:
            if shard['is_ref']:
                continue
            for node in shard['nodes']:
                setup_storage_env(node, machines, dirmap, commandslist, config)
        for node in cluster['comp']['nodes']:
            setup_server_env(node, machines, dirmap, commandslist, config)
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)
        memo_obj = get_cluster_memo_asjson(cluster)
        memoeles.append({"name": cluster_name, "memo": memo_obj})
        # Storage nodes
        cmdpat = 'export ASAN_OPTIONS=halt_on_error=0; \\$KUNLUN_PYTHON install-mysql.py --config=./%s --target_node_index=%d --cluster_id=%s --shard_id=%s --server_id=%d --fullsync=%d'
        cmdpat += ' --meta_addrs=%s ' % metaseeds
        if iscantian:
            cmdpat += ' --initialize=no '
        template_file = "./template.cnf"
        if cluster['storage_template'] == 'small':
            template_file = "./template-small.cnf"
        cmdpat += ' --dbcfg=%s' % template_file
        extraopt = " --ha_mode=%s" % cluster['ha_mode']
        j = 1
        pries = []
        secs = []
        pairs = []
        for shard in cluster['data']:
            if shard['is_ref']:
                continue
            if not 'group_uuid' in shard:
                shard['group_uuid'] = getuuid()
            shard_id = "shard_%d" % i
            my_shardname = "cluster%d_shard%d.json" % (i,j)
            shardf = open(r'clustermgr/%s' % my_shardname, 'w')
            json.dump(shard, shardf, indent=4)
            shardf.close()
            k = 0
            pnode = None
            snodes = []
            for node in shard['nodes']:
                configmap = get_data_config(node, shard, cluster)
                mach = machines.get(node['ip'])
                if 'fullsync_level' in cluster and cluster['fullsync_level'] != 1:
                    vareles.append({"ip": node['ip'], 'port':node['port'],
                        'variable_name':'fullsync_consistency_level',
                        "type":"integer", 'value':cluster['fullsync_level']})
                targetdir='%s/%s/dba_tools' % (node['program_dir'], storagedir)
                addNodeToFilesListMap(filesmap, node, my_shardname, targetdir)
                if not cluster['enable_rocksdb']:
                    addToCommandsList(commandslist, node['ip'], targetdir, 'sed -i /rocksdb/d %s' % template_file)
                cfgpat =  "bash change_config.sh %s '%s' '%s'"
                targetfile = '%s/%s' % (targetdir, template_file)
                if cluster['enable_global_mvcc']:
                    addToCommandsList(commandslist, node['ip'], mach['basedir'], cfgpat % (targetfile, "enable_global_mvcc", "true"))
                if 'max_binlog_size' in node:
                    addToCommandsList(commandslist, node['ip'], mach['basedir'], cfgpat % (targetfile, "max_binlog_size", str(node['max_binlog_size'])))
                for confkey in configmap:
                    addToCommandsList(commandslist, node['ip'], mach['basedir'], cfgpat % (targetfile, confkey, str(configmap[confkey])))
                cmd = cmdpat % (my_shardname, k, cluster_name, shard_id, k+1, node['fullsync'])
                generate_storage_startstop(config, machines, node, k, filesmap)
                if node.get('is_primary', False):
                    pnode = node
                    vareles.append({"ip": node['ip'], 'port':node['port'],
                        'variable_name':'ha_role', "type":"integer", 'value': 1})
                    pairs.append({"node":node, "cfg": my_shardname})
                    pries.append([node['ip'], targetdir, cmd])
                else:
                    secs.append([node['ip'], targetdir, cmd])
                    snodes.append(node)
                addToDirMap(dirmap, node['ip'], node['data_dir_path'])
                addToDirMap(dirmap, node['ip'], node['log_dir_path'])
                addToDirMap(dirmap, node['ip'], node['innodb_log_dir_path'])
                addToDirMap(dirmap, node['ip'], os.path.dirname(node['keyring_file_path']))
                if 'use_vec_engine' in node and node['use_vec_engine']:
                    addToDirMap(dirmap, node['ip'], node['vec_data_dir'])
                k += 1
            for node in snodes:
                elements.append({"host": node['ip'], "port":node['port'], 
                    "master_host":pnode['ip'], "master_port":pnode['port'], "is_meta": False})
            j += 1

        for item in pries:
            addToCommandsList(commandslist, item[0], item[1], item[2] + extraopt, "storage")
        for item in secs:
            addToCommandsList(commandslist, item[0], item[1], item[2] + extraopt, "storage")

        # Computing nodes
        pg_compname = 'cluster%d_comp.json' % i
        compf = open(r'clustermgr/%s' % pg_compname, 'w')
        json.dump([], compf, indent=4)
        compf.close()
        reg_shardname = "cluster%d_shards.json" % i
        shardf = open(r'clustermgr/%s' % reg_shardname, 'w')
        shards = []
        j = 1
        for shard in cluster['data']:
            obj = {'shard_name': 'shard_%d' % j}
            obj['enable_degrade'] = cluster['enable_degrade']
            obj['degrade_time'] = cluster['degrade_time']
            j += 1
            nodes = []
            for node in shard['nodes']:
                n = {'user':'pgx', 'password':'pgx_pwd'}
                n['ip'] = node['ip']
                n['port'] = node['port']
                if 'ro_weight' in node:
                    n['ro_weight'] = node['ro_weight']
                if 'master_priority' in node:
                    n['master_priority'] = node['master_priority']
                nodes.append(n)
            obj['shard_nodes'] = nodes
            shards.append(obj)
        json.dump(shards, shardf, indent=4)
        shardf.close()

        node = cluster['comp']['nodes'][0]
        targetdir='%s/%s/scripts' % (node['program_dir'], serverdir)
        addNodeToFilesListMap(filesmap, node, pg_compname, targetdir)
        addNodeToFilesListMap(filesmap, node, reg_metaname, targetdir)
        addNodeToFilesListMap(filesmap, node, reg_shardname, targetdir)
        cmdpat='\\$KUNLUN_PYTHON create_cluster.py --shards_config=./%s \
            --comps_config=./%s  --meta_config=./%s --cluster_name=%s --meta_ha_mode=%s --ha_mode=%s --cluster_owner=abc --cluster_biz=%s'
        if iscantian:
            cmdpat += ' --cantian '
        if 'ldpreload' in node:
            cmdpat = 'LD_PRELOAD=%s ASAN_OPTIONS=halt_on_error=0 ' % node['ldpreload'] + cmdpat
        addToCommandsList(commandslist, node['ip'], targetdir,
            cmdpat % (reg_shardname, pg_compname, reg_metaname, cluster_name, meta_hamode, cluster['ha_mode'], cluster_name), "parent")


        origcmdpat = r'\$KUNLUN_PYTHON add_comp_self.py  --meta_config=./%s --cluster_name=%s --user=%s --password=%s --hostname=%s --port=%d --mysql_port=%d --datadir=%s --install --ha_mode=%s'
        if iscantian:
            origcmdpat += ' --cantian '
        idx=0
        for node in cluster['comp']['nodes']:
            configmap = get_comp_config(node, cluster['comp'])
            if iscantian:
                configmap['enable_parallel_remotescan'] = 'off'
                configmap['shared_preload_libraries'] = "\\'remote_rel.so,ddl2kunlun.so\\'"
                configmap['enable_global_deadlock_detection'] = 'false'
                configmap['trace_global_deadlock_detection'] = 'false'
                configmap['cantian_mode'] = 'on'
                configmap['default_storage_engine'] = 'cantian'
                configmap['dbns_separator'] = "\\'_kl_\\'"
            if ismariadb:
                if node['use_vec_engine']:
                    configmap['shared_preload_libraries'] = "\\'remote_rel.so,ddl2kunlun.so,libtornado_cn.so\\'"
                else:
                    configmap['shared_preload_libraries'] = "\\'remote_rel.so,ddl2kunlun.so\\'"
                configmap['enable_global_deadlock_detection'] = 'false'
                configmap['trace_global_deadlock_detection'] = 'false'
                configmap['storage_set_names_collate'] = "\\'utf8mb4_nopad_bin\\'"
                configmap['storage_table_collate'] = "\\'utf8mb4_nopad_bin\\'"
                configmap['dbns_separator'] = "\\'_kl_\\'"
            mach = machines.get(node['ip'])
            if 'ldpreload' in node:
                cmdpat = '%s ' + 'LD_PRELOAD=%s ASAN_OPTIONS=halt_on_error=0:detect_leaks=0:detect_stack_use_after_return=1:detect_invalid_pointer_pairs=1:check_initialization_order=1 ' % node['ldpreload'] + origcmdpat + " --nopreload"
            else:
                cmdpat = '%s' + origcmdpat
            targetfile='%s/%s/resources/postgresql.conf' % (node['program_dir'], serverdir)
            cfgpat =  "bash change_config.sh %s %s %s"
            if cluster['enable_global_mvcc'] and check_version_to_minor(config['product_version'], 1, 2):
                addToCommandsList(commandslist, node['ip'], mach['basedir'], cfgpat % (targetfile, "enable_global_mvcc", "true"))
            if 'max_connections' in cluster:
                addToCommandsList(commandslist, node['ip'], mach['basedir'], cfgpat % (targetfile, "max_files_per_process", str(cluster['max_connections'])))
            if 'statement_timeout' in cluster:
                addToCommandsList(commandslist, node['ip'], mach['basedir'], cfgpat % (targetfile, "statement_timeout", str(cluster['statement_timeout'])))
            if 'enable_parallel_append' in cluster:
                addToCommandsList(commandslist, node['ip'], mach['basedir'], cfgpat % (targetfile, "enable_parallel_append", str(cluster['enable_parallel_append'])))
            if 'enable_parallel_hash' in cluster:
                addToCommandsList(commandslist, node['ip'], mach['basedir'], cfgpat % (targetfile, "enable_parallel_hash", str(cluster['enable_parallel_hash'])))
            if 'enable_parallel_remotescan' in cluster:
                addToCommandsList(commandslist, node['ip'], mach['basedir'], cfgpat % (targetfile, "enable_parallel_remotescan", str(cluster['enable_parallel_remotescan'])))
            for confkey in configmap:
                addToCommandsList(commandslist, node['ip'], mach['basedir'], cfgpat % (targetfile, confkey, str(configmap[confkey])))
            targetdir='%s/%s/scripts' % (node['program_dir'], serverdir)
            addNodeToFilesListMap(filesmap, node, reg_metaname, targetdir)
            absenvfname = '%s/env.sh.node' % (mach['basedir'])
            envpfx = "test -f %s && . %s; " % (absenvfname, absenvfname)
            addToCommandsList(commandslist, node['ip'], targetdir, cmdpat % (envpfx, reg_metaname, cluster_name,
                node['user'], node['password'], node['ip'], node['port'], node['mysql_port'], node['datadir'], meta_hamode), "parent")
            addToDirMap(dirmap, node['ip'], node['datadir'])
            if 'use_vec_engine' in node and node['use_vec_engine']:
                addToDirMap(dirmap, node['ip'], node['vec_data_dir'])
            generate_server_startstop(config, machines, node, idx, filesmap)
            idx += 1
        if 'haproxy' in cluster:
            node = cluster['haproxy']
            confname = '%d-haproxy-%d.cfg' % (i, node['port'])
            targetconfname = 'haproxy-%d.cfg' % node['port']
            generate_haproxy_config(cluster, machines, 'clustermgr', confname)
            addNodeToFilesListMap(filesmap, node, confname, targetconfname)
            cmdpat = "\\${HAPROXY_DIR}/sbin/haproxy -f %s >& haproxy-%d.log" % (targetconfname, node['port'])
            addToCommandsList(commandslist, node['ip'], ".", cmdpat)
        if 'initialization' in cluster:
            initobj = cluster.get("initialization")
            initfile = "%s_auto_init.sql" % cluster_name
            initsqlf = open("clustermgr/%s" % initfile, 'w')
            for sqlc in initobj.get("sqlcommands", []):
                initsqlf.write(sqlc)
                initsqlf.write(";\n")
            initsqlf.close()
            node = cluster['comp']['nodes'][0]
            waitTime = initobj.get("waitseconds", 10)
            addNodeToFilesListMap(filesmap, node, initfile, ".")
            cmdpat = r'sleep %s; psql -f %s postgres://%s:%s@%s:%s/postgres'
            addToCommandsList(commandslist, node['ip'], ".", cmdpat % (str(waitTime), initfile,
                node['user'], node['password'], 'localhost', str(node['port'])), "computing")
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)
        i += 1

def start_clusters(clusters, nodemgrmaps, machines, comf, config):
    if config['multipledc']:
        my_print("skip cluster operation for multiple dc")
        return

    targetdir = '.'
    for cluster in clusters:
        cluster_name = cluster['name']
        commandslist = []
        output_info(comf, "Starting cluster %s ..." % cluster_name)
        cmdpat = "export ASAN_OPTIONS=halt_on_error=0; bash start-storage-%d.sh"
        for shard in cluster['data']:
            if shard['is_ref']:
                continue
            for node in shard['nodes']:
                nodemgrobj = nodemgrmaps.get(node['ip'])
                if not nodemgrobj['skip']:
                    continue
                addToCommandsList(commandslist, node['ip'], targetdir, cmdpat % node['port'])
        cmdpat = "export ASAN_OPTIONS=halt_on_error=0; bash start-server-%d.sh"
        for node in cluster['comp']['nodes']:
            nodemgrobj = nodemgrmaps.get(node['ip'])
            if not nodemgrobj['skip']:
                continue
            addToCommandsList(commandslist, node['ip'], targetdir, cmdpat % node['port'])
        if 'haproxy' in cluster:
            node = cluster['haproxy']
            targetconfname = 'haproxy-%d.cfg' % node['port']
            cmdpat = "\\${HAPROXY_DIR}/sbin/haproxy -f %s >& haproxy-%d.log" % (targetconfname, node['port'])
            addToCommandsList(commandslist, node['ip'], ".", cmdpat)
        process_commandslist_setenv(comf, config, machines, commandslist)

def stop_clusters(clusters, nodemgrmaps, machines, comf, config):
    if config['multipledc']:
        my_print("skip cluster operation for multiple dc")
        return

    targetdir = '.'
    for cluster in clusters:
        cluster_name = cluster['name']
        commandslist = []
        output_info(comf, "Stopping cluster %s ..." % cluster_name)
        cmdpat = "bash stop-storage-%d.sh"
        for shard in cluster['data']:
            if shard['is_ref']:
                continue
            for node in shard['nodes']:
                nodemgrobj = nodemgrmaps.get(node['ip'])
                if not nodemgrobj['skip']:
                    continue
                addToCommandsList(commandslist, node['ip'], targetdir, cmdpat % node['port'])
        cmdpat = "bash stop-server-%d.sh"
        for node in cluster['comp']['nodes']:
            nodemgrobj = nodemgrmaps.get(node['ip'])
            if not nodemgrobj['skip']:
                continue
            addToCommandsList(commandslist, node['ip'], targetdir, cmdpat % node['port'])
        if 'haproxy' in cluster:
            node = cluster['haproxy']
            cmdpat="cat haproxy-%d.pid | xargs kill -9" % node['port']
            addToCommandsList(commandslist, node['ip'], ".", cmdpat)
        process_commandslist_setenv(comf, config, machines, commandslist)

def clean_clusters(config, clusters, nodemgrmaps, machines, comf):
    if config['multipledc']:
        my_print("skip cluster operation for multiple dc")
        return None

    storagedir = "kunlun-storage-%s" % config['product_version']
    serverdir = "kunlun-server-%s" % config['product_version']
    targetdir = '.'
    names = []
    for cluster in clusters:
        cluster_name = cluster['name']
        commandslist = []
        output_info(comf, "Cleaning cluster %s ..." % cluster_name)
        names.append(cluster['name'])
        for shard in cluster['data']:
            if shard['is_ref']:
                continue
            for node in shard['nodes']:
                nodemgrobj = nodemgrmaps.get(node['ip'])
                if not nodemgrobj['skip']:
                    continue
                cmdpat = r'bash stopmysql.sh %d'
                targetdir = "%s/%s/dba_tools" % (node['program_dir'], storagedir)
                addToCommandsList(commandslist, node['ip'], targetdir, cmdpat % (node['port']))
                cmdpat = r'rm -fr %s'
                addToCommandsList(commandslist, node['ip'], '.', cmdpat % (node['data_dir_path']))
                addToCommandsList(commandslist, node['ip'], '.', cmdpat % (node['log_dir_path']))
                addToCommandsList(commandslist, node['ip'], '.', cmdpat % (node['innodb_log_dir_path']))
        for node in cluster['comp']['nodes']:
            nodemgrobj = nodemgrmaps.get(node['ip'])
            if not nodemgrobj['skip']:
                continue
            targetdir = "%s/%s/dba_tools" % (node['program_dir'], serverdir)
            cmdpat = r'\$KUNLUN_PYTHON stop_pg.py --port=%d'
            addToCommandsList(commandslist, node['ip'], targetdir, cmdpat % node['port'])
            cmdpat = r'rm -fr %s'
            addToCommandsList(commandslist, node['ip'], '.', cmdpat % (node['datadir']))
        if 'haproxy' in cluster:
            node = cluster['haproxy']
            cmdpat="cat haproxy-%d.pid | xargs kill -9" % node['port']
            addToCommandsList(commandslist, node['ip'], ".", cmdpat)
        process_commandslist_setenv(comf, config, machines, commandslist)
    return names

def setup_metanodes_multidc(jscfg, metanodes, my_metaname, metaobj):
    # We need to reorder the meta nodes order, so that the install order is:
    #    - meta nodes in primary dc(master first)
    #    - meta nodes in secondary dc(also master first, then replicas)
    #    - meta nodes in standby dc(like above)
    meta = jscfg['meta']
    dcprimary = jscfg['dcprimary']
    dcsecondarylist = jscfg['dcsecondarylist']
    dcstandbylist = jscfg['dcstandbylist']
    dc_meta_map = jscfg['dc_meta_map']
    i = 0
    if len(meta['nodes']) > 0:
        j = 0
        elements = []
        meta['fullsync_level'] = len(dc_meta_map[dcprimary['name']]) + len(dcsecondarylist) - 1
        for node in dc_meta_map[dcprimary['name']]:
            fname = '%s.%d' % (my_metaname, i)
            metaf = open(r'clustermgr/%s' % fname, 'w')
            i += 1
            idx = j
            json.dump({"group_uuid": meta['group_uuid'], "nodes": dc_meta_map[dcprimary['name']]}, metaf, indent=4)
            metaf.close()
            metanodes.append({"node": node, "file": fname, "index": idx, "fullsync": 1})
            if j > 0:
                node0 = dc_meta_map[dcprimary['name']][0]
                elements.append({"host": node['ip'], "port": node['port'],
                    'master_host': node0['ip'], 'master_port': node0['port'], 'is_meta':True})
            j += 1
        pdcmaster = dc_meta_map[dcprimary['name']][0]
        pdcmasternode = {"ip": pdcmaster["ip"], "port": pdcmaster['port'], 'is_primary': True}
        sdcmaster = None
        for dc in dcsecondarylist:
            nodes = dc_meta_map[dc['name']]
            node0 = nodes[0]
            if sdcmaster is None:
                sdcmaster = node0
            fname = '%s.%d' % (my_metaname, i)
            metaf = open(r'clustermgr/%s' % fname, 'w')
            i += 1
            json.dump({"group_uuid": meta['group_uuid'], "nodes": [pdcmasternode, node0]}, metaf, indent=4)
            metaf.close()
            idx = 1
            metanodes.append({"node": node0, "file": fname, "index": idx, "fullsync": 1})
            elements.append({"host": node0['ip'], "port": node0['port'],
                'master_host': pdcmaster['ip'], 'master_port': pdcmaster['port'], 'is_meta':True})
            node0['is_primary'] = True
            for j in range(1, len(nodes)):
                fname = '%s.%d' % (my_metaname, i)
                metaf = open(r'clustermgr/%s' % fname, 'w')
                i += 1
                idx = j
                json.dump({"group_uuid": meta['group_uuid'], "nodes": nodes}, metaf, indent=4)
                node = nodes[j]
                metanodes.append({"node": node, "file": fname, "index": idx, "fullsync": 0})
                elements.append({"host": node['ip'], "port": node['port'],
                    'master_host': node0['ip'], 'master_port': node0['port'], 'is_meta':True})

        if sdcmaster is None:
            sdcmaster = dc_meta_map[dcprimary['name']][1]
        sdcmasternode = {"ip": sdcmaster["ip"], "port": sdcmaster['port'], 'is_primary': True}
        for dc in dcstandbylist:
            nodes = dc_meta_map[dc['name']]
            node0 = nodes[0]
            fname = '%s.%d' % (my_metaname, i)
            metaf = open(r'clustermgr/%s' % fname, 'w')
            i += 1
            json.dump({"group_uuid": meta['group_uuid'], "nodes": [sdcmasternode, node0]}, metaf, indent=4)
            metaf.close()
            idx = 1
            metanodes.append({"node": node0, "file": fname, "index": idx, "fullsync": 0})
            elements.append({"host": node0['ip'], "port": node0['port'],
                'master_host': sdcmasternode['ip'], 'master_port': sdcmasternode['port'], 'is_meta':True})
            node0['is_primary'] = True
            for j in range(1, len(nodes)):
                fname = '%s.%d' % (my_metaname, i)
                metaf = open(r'clustermgr/%s' % fname, 'w')
                i += 1
                idx = j
                json.dump({"group_uuid": meta['group_uuid'], "nodes": nodes}, metaf, indent=4)
                metanodes.append({"node": nodes[j], "file": fname, "index": idx, "fullsync": 0})
                elements.append({"host": node['ip'], "port": node['port'],
                    'master_host': node0['ip'], 'master_port': node0['port'], 'is_meta':True})
        metaobj['nodemapmaster'] = {"op":"add", "elements": elements}

def setup_metanodes_onedc(jscfg, metanodes, my_metaname, metaobj):
    meta = jscfg['meta']
    if len(meta) == 0:
        return
    metaf = open(r'clustermgr/%s' % my_metaname, 'w')
    json.dump(meta, metaf, indent=4)
    metaf.close()
    pnode = None
    mlen = len(meta['nodes'])
    for idx in range(0, mlen):
        node = meta['nodes'][idx]
        if node['is_primary']:
            metanodes.append({"node": node, "file":my_metaname, "index":idx, "fullsync":1})
            pnode = node
            break
    elements = []
    for idx in range(0, mlen):
        node = meta['nodes'][idx]
        if node['is_primary']:
            continue
        metanodes.append({"node": node, "file":my_metaname, "index":idx, "fullsync":1})
        elements.append({"host": node['ip'], "port": node['port'], "master_host": pnode['ip'], 
            "master_port": pnode['port'], "is_meta": True})
    metaobj['nodemapmaster'] = {'op':'add', 'elements': elements}


def install_with_config(jscfg, comf, machines):
    meta = jscfg['meta']
    clustermgr = jscfg['cluster_manager']
    nodemgr = jscfg['node_manager']
    meta_hamode = meta.get('ha_mode', '')
    config = jscfg['config']
    storagedir = "kunlun-storage-%s" % config['product_version']
    serverdir = "kunlun-server-%s" % config['product_version']
    clustermgrdir = "kunlun-cluster-manager-%s" % config['product_version']
    nodemgrdir = "kunlun-node-manager-%s" % config['product_version']
    cdcdir = "kunlun-cdc-%s" % config['product_version']
    autostart = config['autostart']
    usesudo = config['sudo']
    iscantian = config['cantian']
    ismariadb = config['mariadb']

    cluster_name = 'meta'
    extraopt = " --ha_mode=%s" % meta_hamode
    metaseeds = meta['group_seeds']
    my_print('metaseeds:%s' % metaseeds)

    nodemgrmaps = {}
    nodemgrips = set()
    for node in nodemgr['nodes']:
        nodemgrmaps[node['ip']] = node
        nodemgrips.add(node['ip'])

    # TODO: we need to add priority here.
    clustermgrips = set()
    members=[]
    for node in clustermgr['nodes']:
        clustermgrips.add(node['ip'])
        members.append("%s:%d:0" % (node['ip'], node['brpc_raft_port']))
    initmember = clustermgr.get('raft_group_member_init_config', '')
    initmember = "%s%s," % (initmember, ",".join(members))
    my_print('raft_group_member_init_config:%s' % initmember)

    haproxyips = get_haproxy_ips(jscfg)
    workips = set()
    workips.update(nodemgrips)
    workips.update(clustermgrips)
    workips.update(haproxyips)
    if 'elasticsearch' in jscfg:
        workips.add(jscfg['elasticsearch']['ip'])
    # my_print("workips:%s" % str(workips))
    fmap = get_3rdpackages_filemap(config)
    #my_print(fmap)
    haproxy_file = fmap['haproxy'][0]

    output_info(comf, "initializing all working nodes ...")
    i = 0
    for ip in workips:
        mach = machines.get(ip)
        if usesudo:
            process_command_noenv(comf, config, machines, ip, '/',
                'sudo mkdir -p %s && sudo chown -R %s:\`id -gn %s\` %s' % (mach['basedir'],
                    mach['user'], mach['user'], mach['basedir']))
        else:
            process_command_noenv(comf, config, machines, ip, '/', 'mkdir -p %s' % mach['basedir'])
        process_file(comf, config, machines, ip, 'clustermgr/env.sh.template', mach['basedir'])
        extstr = "sed -s 's#KUNLUN_BASEDIR#%s#g' env.sh.template > env.sh" % mach['basedir']
        process_command_noenv(comf, config, machines, ip, mach['basedir'], extstr)
        extstr = "sed -i 's#KUNLUN_VERSION#%s#g' env.sh" % config['product_version']
        process_command_noenv(comf, config, machines, ip, mach['basedir'], extstr)
        process_file(comf, config, machines, ip, 'install/process_deps.sh', mach['basedir'])
        process_file(comf, config, machines, ip, 'install/process_python3.sh', mach['basedir'])
        process_file(comf, config, machines, ip, 'install/change_config.sh', mach['basedir'])
        process_file(comf, config, machines, ip, 'install/build_driver_formysql.sh', mach['basedir'])
        if not config['cloud']:
            process_file(comf, config, machines, ip, 'clustermgr/%s' % fmap['mysql-driver'][0], mach['basedir'])
            generate_node_env(comf, config, machines, ip, i)
        process_command_setenv(comf, config, machines, ip, mach['basedir'], 'bash ./build_driver_formysql.sh %s' % mach['basedir'])
        if ip in haproxyips and not config['cloud']:
            process_file(comf, config, machines, ip, 'clustermgr/%s' % haproxy_file, mach['basedir'])
            process_command_noenv(comf, config, machines, ip, mach['basedir'], 'tar -xzf %s' % haproxy_file)
        i += 1

    setup_xpanel(jscfg, machines, comf)
    setup_elasticsearch(jscfg, machines, comf)
    setup_cdc(jscfg, machines, comf)

    dirmap = {}
    filesmap = {}
    commandslist = []

    # used for install storage nodes
    reg_metaname = 'reg_meta.json'
    xpanel_sqlfile = 'dba_tools_db.sql'
    if iscantian:
        xpanel_sqlfile = 'dba_tools_db_cantian.sql'
    if ismariadb:
        xpanel_sqlfile = 'dba_tools_db_mariadb.sql'
    if not 'group_uuid' in meta:
	    meta['group_uuid'] = getuuid()
    metaf = open(r'clustermgr/%s' % reg_metaname, 'w')
    objs = []
    if len(meta['nodes']) > 0:
        for node in meta['nodes']:
            mach = machines.get(node['ip'])
            obj = {}
            obj['is_primary'] = node.get('is_primary', False)
            obj['data_dir_path'] = node['data_dir_path']
            obj['nodemgr_bin_path'] = "%s/%s/bin" % (mach['basedir'], nodemgrdir)
            obj['ip'] = node['ip']
            obj['port'] = node['port']
            obj['user'] = "pgx"
            obj['password'] = "pgx_pwd"
            if 'master_priority' in node:
                obj['master_priority'] = node['master_priority']
            objs.append(obj)
    elif not metaseeds == '': # For case just providing the seeds.
        for addr in metaseeds.split(','):
            parts = addr.split(':')
            obj = {}
            obj['is_primary'] = False
            obj['data_dir_path'] = ''
            obj['nodemgr_bin_path'] = ''
            obj['ip'] = parts[0]
            if (len(parts) > 1):
                obj['port'] = int(parts[1])
            else:
                obj['port'] = 3306
            obj['user'] = "pgx"
            obj['password'] = "pgx_pwd"
            objs.append(obj)
    json.dump(objs, metaf, indent=4)
    metaf.close()

    my_metaname = 'mysql_meta.json'
    metaobj = {}
    metaobj['set_variables'] = {'op':'add', 'elements':[]}
    vareles = metaobj['set_variables']['elements']
    metanodes = []
    if config['multipledc']:
        setup_metanodes_multidc(jscfg, metanodes, my_metaname, metaobj)
    else:
        setup_metanodes_onedc(jscfg, metanodes, my_metaname, metaobj)

    # process elastic search data.
    if 'elasticsearch' in jscfg:
        node = jscfg['elasticsearch']
        esobj = {
            "op": "add",
            "data": {
                "host": node['ip'],
                "port": node['port']
            }
        }
        metaobj['elasticsearch'] = esobj

    # process backup data
    hasHDFS = False
    if 'backup' in jscfg:
        node = jscfg['backup']
        if 'hdfs' in node:
            hasHDFS = True
            hdfs = node['hdfs']
            generate_hdfs_coresite_xml(hdfs['ip'], hdfs['port'])
            metaobj['hdfsbackup'] = {
                    "op": "add",
                    "data": {
                            "host": hdfs['ip'],
                            "port": hdfs['port']
                        }
                    }
        if 'ssh' in node:
            sshbackup = node['ssh']
            metaobj['sshbackup'] = {
                    "op": "add",
                    "data": {
                        "host": sshbackup['ip'],
                        "port": sshbackup['port'],
                        "user": sshbackup['user'],
                        "targetRoot": sshbackup['targetDir']
                        }
                    }

    # process datacenter data
    if 'datacenters' in jscfg and len(jscfg['datacenters']) > 0:
        metaobj['datacenters'] = { 'op': 'add', 'elements': jscfg['datacenters']}

   # bootstrap the cluster
    if len(nodemgr['nodes']) > 0:
        metaobj['node_manager'] = {'op': 'add', 'elements': nodemgr['nodes']}
    if len(clustermgr['nodes']) > 0:
        metaobj['cluster_manager'] = {'op': 'add', 'elements': clustermgr['nodes']}

    i = 0
    for node in nodemgr['nodes']:
        if node['skip']:
            continue
        configmap = get_nodemgr_config(node, nodemgr)
        mach = machines.get(node['ip'])
        output_info(comf, "Setting up node_mgr on %s ..." % node['ip'])
        install_nodemgr_env(comf, mach, machines, config)
        withRocksdb = True
        if not config['enable_rocksdb']:
            withRocksdb = False
        setup_nodemgr_commands(config, i, machines, node, commandslist, dirmap, filesmap, metaseeds, hasHDFS, configmap, withRocksdb)
        generate_nodemgr_startstop(config, machines, node, i, filesmap)
        if autostart:
            generate_nodemgr_service(config, machines, commandslist, node, i, filesmap)
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)
        i += 1

    i = 0
    for node in clustermgr['nodes']:
        configmap = get_clustermgr_config(node, clustermgr)
        output_info(comf, "Setting up cluster_mgr on %s ..." % node['ip'])
        mach = machines.get(node['ip'])
        install_clustermgr_env(comf, mach, machines, config)
        setup_clustermgr_commands(config, i, machines, node, commandslist, dirmap, filesmap,
            metaseeds, initmember, node['ip'] not in nodemgrips, configmap)
        if autostart:
            generate_clustermgr_service(config, machines, commandslist, node, i, filesmap)
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)
        i += 1

    if config['need_ssl'] and config['send_license'] and len(clustermgr['nodes']) > 0:
        firstmgr = clustermgr['nodes'][0]
        getbackmap = {}
        generate_ssl_certs(config, node, commandslist, dirmap, filesmap, getbackmap)
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, False)
        process_getbackmap(comf, getbackmap, machines, 'clustermgr', config)

    basecmdpat = 'export ASAN_OPTIONS=halt_on_error=0; \\$KUNLUN_PYTHON install-mysql.py --config=./%s --target_node_index=%d --cluster_id=%s --shard_id=%s --server_id=%d --fullsync=%d'
    basecmdpat += ' --meta_addrs=%s ' % metaseeds
    extracmd = " --initialize=no"
    shard_id = 'meta'
    pries = []
    secs = []
    i = 0
    if len(meta['nodes']):
        output_info(comf, "setup meta nodes ...")
    for item in metanodes:
        node = item['node']
        if 'use_vec_engine' in node and node['use_vec_engine']:
            template_file = "./template.cnf"
            if config['small']:
                template_file = './template-small.cnf'
        else:
            template_file = "./template-meta.cnf"
            if config['small']:
                template_file = './template-meta-small.cnf'
        cmdpat = basecmdpat + ' --dbcfg=%s' % template_file
        if iscantian and i > 0:
            cmdpat += extracmd
        fname = item['file']
        idx = item['index']
        fullsync = item['fullsync']
        configmap = get_meta_config(node, meta)
        if 'fullsync_level' in meta and meta['fullsync_level'] != 1:
            vareles.append({"ip": node['ip'], 'port':node['port'],
            'variable_name':'fullsync_consistency_level',
            "type": "integer",
            'value':meta['fullsync_level']})
        setup_meta_env(node, machines, dirmap, commandslist, config)
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)
        targetdir='%s/%s/dba_tools' % (node['program_dir'], storagedir)
        node['nodemgr'] = nodemgrmaps.get(node['ip'])
        mach = machines.get(node['ip'])
        addToDirMap(dirmap, node['ip'], node['data_dir_path'])
        addToDirMap(dirmap, node['ip'], node['log_dir_path'])
        addToDirMap(dirmap, node['ip'], node['innodb_log_dir_path'])
        if 'use_vec_engine' in node and node['use_vec_engine']:
            addToDirMap(dirmap, node['ip'], node['vec_data_dir'])
        addNodeToFilesListMap(filesmap, node, reg_metaname, "%s/%s/scripts" % (node['program_dir'], serverdir))
        addNodeToFilesListMap(filesmap, node, fname, "%s/%s" % (targetdir, my_metaname))
        addNodeToFilesListMap(filesmap, node, xpanel_sqlfile, targetdir)
        if not meta['enable_rocksdb']:
            addToCommandsList(commandslist, node['ip'], targetdir, 'sed -i /rocksdb/d %s' % template_file)
        if meta['enable_ssl_connect']:
            addNodeToFilesListMap(filesmap, node, 'ca.pem', targetdir)
            addNodeToFilesListMap(filesmap, node, 'server-cert.pem', targetdir)
            addNodeToFilesListMap(filesmap, node, 'server-key.pem', targetdir)
            configmap['#ssl-ca'] = '%s/data/ca.pem' % node['data_dir_path']
            configmap['#ssl-cert'] = '%s/data/server-cert.pem' % node['data_dir_path']
            configmap['#ssl-key'] = '%s/data/server-key.pem' % node['data_dir_path']
        cfgpat =  "bash change_config.sh %s '%s' '%s'"
        targetfile = '%s/%s' % (targetdir, template_file)
        for confkey in configmap:
            addToCommandsList(commandslist, node['ip'], mach['basedir'], cfgpat % (targetfile, confkey, str(configmap[confkey])))
        cmd = cmdpat % (my_metaname, idx, cluster_name, shard_id, i+1, fullsync)
        if node.get('is_primary', False):
            pries.append([node['ip'], targetdir, cmd])
            vareles.append({"ip": node['ip'], 'port':node['port'],
                'variable_name':'ha_role', "type":"integer", 'value': 1})
        else:
            secs.append([node['ip'], targetdir, cmd])
        generate_storage_startstop(config, machines, node, i, filesmap)
        if autostart:
            generate_storage_service(config, machines, commandslist, node, i, filesmap)
        i+=1
    for item in pries:
        addToCommandsList(commandslist, item[0], item[1], item[2] + extraopt, "storage")
    for item in secs:
        addToCommandsList(commandslist, item[0], item[1], item[2] + extraopt, "storage")
    purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)

    # bootstrap the cluster
    if len(meta['nodes']) > 0:
        #firstmeta = meta['nodes'][0]
        firstmeta = None
        output_info(comf, "setup system tables ...")
        for node in meta['nodes']:
            if node.get('is_primary', False):
                firstmeta = node
                break
        targetdir='%s/%s/scripts' % (firstmeta['program_dir'], serverdir)
        cmdpat=r'\$KUNLUN_PYTHON bootstrap.py --config=./%s --bootstrap_sql=./meta_inuse.sql' + extraopt
        if config['mariadb']:
            cmdpat=r'\$KUNLUN_PYTHON bootstrap.py --config=./%s --bootstrap_sql=./meta_inuse_mariadb.sql' + extraopt
        if iscantian:
            cmdpat=r'\$KUNLUN_PYTHON bootstrap.py --config=./%s --bootstrap_sql=./meta_inuse_cantian.sql --cantian' + extraopt
        addToCommandsList(commandslist, firstmeta['ip'], targetdir, cmdpat % reg_metaname, "computing")
        targetdir='%s/%s/dba_tools' % (firstmeta['program_dir'], storagedir)
        cmdpat=r'bash imysql.sh %s < %s'
        addToCommandsList(commandslist, firstmeta['ip'], targetdir, cmdpat % (str(firstmeta['port']), xpanel_sqlfile), "storage")
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)

    install_clusters(jscfg, machines, dirmap, filesmap, commandslist, reg_metaname, metaseeds, comf, metaobj)
    install_elasticsearch(jscfg, machines, metaseeds, comf)

    metajson = 'metadata.json'
    metaf = open('clustermgr/%s' % metajson, 'w')
    json.dump(metaobj, metaf, indent=4)
    metaf.close()

    worknode = get_worknode(jscfg)

    if worknode is not None:
        output_info(comf, "update metadata from %s ..." % worknode['ip'])
        mach = machines.get(worknode['ip'])
        addNodeToFilesListMap(filesmap, worknode, 'modify_metadata.py', '.')
        addNodeToFilesListMap(filesmap, worknode, metajson, '.')
        addToCommandsList(commandslist, worknode['ip'], machines.get(worknode['ip'])['basedir'],
                "\\$KUNLUN_PYTHON modify_metadata.py --config %s --seeds=%s" % (metajson, metaseeds), "storage")
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)

    # start the nodemgr and clustermgr process finally.
    output_info(comf, "starting node_mgr nodes ...")
    for node in nodemgr['nodes']:
        if node['skip']:
            continue
        output_info(comf, "Starting node_mgr on %s ..." % node['ip'])
        if autostart:
            servname = 'kunlun-node-manager-%d.service' % node['brpc_http_port']
            generate_systemctl_start(servname, node['ip'], commandslist)
        else:
            addToCommandsList(commandslist, node['ip'], ".", "bash start-nodemgr-%d.sh </dev/null >& nodemgr_start.log" % node['brpc_http_port'])
    purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)
    output_info(comf, "starting cluster_mgr nodes ...")
    for node in clustermgr['nodes']:
        output_info(comf, "Starting cluster_mgr on %s ..." % node['ip'])
        if autostart:
            servname = 'kunlun-cluster-manager-%d.service' % node['brpc_raft_port']
            generate_systemctl_start(servname, node['ip'], commandslist)
        else:
            envpfx = "export ASAN_OPTIONS=halt_on_error=0;"
            if node['valgrind']:
                envpfx = "export ASAN_OPTIONS=halt_on_error=0; export USE_VALGRIND=1;"
            addToCommandsList(commandslist, node['ip'], "%s/bin" % clustermgrdir, "%s bash start_cluster_mgr.sh </dev/null >& start.log" % envpfx)
    purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)
    if 'cdc' in jscfg:
        output_info(comf, "starting cdc nodes ...")
        for node in jscfg['cdc']['nodes']:
            output_info(comf, "Starting cdc on %s ..." % node['ip'])
            if autostart:
                servname = 'kunlun-cdc-%d.service' % node['raft_port']
                generate_systemctl_start(servname, node['ip'], commandslist)
            else:
                envpfx = "export ASAN_OPTIONS=halt_on_error=0;"
                if node['valgrind']:
                    envpfx = "export ASAN_OPTIONS=halt_on_error=0; export USE_VALGRIND=1;"
                addToCommandsList(commandslist, node['ip'], "%s/bin" % cdcdir, "%s bash start_kunlun_cdc.sh </dev/null >& start.log" % envpfx)
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)

    # install xpanel
    install_xpanel(jscfg, machines, dirmap, filesmap, commandslist, metaseeds, comf)

# This logic works for both upgrade and replacement currently.
def upgrade_xpanel(jscfg, machines, comf, metaseeds, checkNode = False):
    if 'xpanel' not in jscfg:
        return
    config = jscfg['config']
    xpanel = jscfg['xpanel']
    autostart = config['autostart']
    restart = 'no'
    if autostart:
        restart = 'always'
    for node in xpanel['nodes']:
        if checkNode:
            if 'upgrade' not in node or not node['upgrade']:
                continue
        output_info(comf, "Cleaning xpanel(%s) on %s ..." % (config['product_version'], node['ip']))
        clean_onexpanel(jscfg, machines, comf, config, xpanel, node, xpanel['image'], xpanel['imageFile'])
        output_info(comf, "Setting up xpanel(%s) on %s ..." % (config['upgrade_version'], node['ip']))
        setup_onexpanel(jscfg, machines, comf, config, xpanel, node, xpanel['upgrade_image'], xpanel['upgrade_imageFile'])
        output_info(comf, "Starting xpanel(%s) on %s ..." % (config['upgrade_version'], node['ip']))
        cmdpat = "sudo docker run -itd --restart={} --env METASEEDS=%s --name %s -p %d:80 %s bash -c '/bin/bash /kunlun/start.sh'".format(restart)
        process_command_noenv(comf, config, machines, node['ip'], '/', cmdpat % (metaseeds, node['name'], node['port'], xpanel['upgrade_image']))

# this works for version change upgrade.
def upgrade_clustermgr_pkg(config, machines, node, dirmap, filesmap, commandslist, idx):
    stage = config['upgrade_stage']
    upd_clustermgrdir = "kunlun-cluster-manager-%s" % config['upgrade_version']
    mach = machines.get(node['ip'])
    if stage == 'do':
        addNodeToFilesListMap(filesmap, node, "%s.tgz" % upd_clustermgrdir, '.')
        if config['send_license']:
            addNodeToFilesListMap(filesmap, node, config['license_file'], ".")
        script_name = "upgrade_clustermgr_extra_%d.sh" % idx
        scriptf = open('clustermgr/%s' % script_name, 'w')
        scriptf.write("#! /bin/bash\n")
        upgrade_clustermgr_config(config['product_version'], config['upgrade_version'], scriptf)
        scriptf.close()
        addNodeToFilesListMap(filesmap, node, script_name, './upgrade_clustermgr_extra.sh')
        addNodeToFilesListMap(filesmap, node, "upgrade_scripts/upgrade_clustermgr.sh", '.')
    addToCommandsList(commandslist, node['ip'], ".", "bash -e upgrade_clustermgr.sh %s %s %s %s %s" % (
        mach['basedir'], config['product_version'], config['upgrade_version'], stage, config['license_file']))

# This is for replacement, just replace the package and restart again.
def replace_clustermgr_pkg(config, machines, node, dirmap, filesmap, commandslist, idx):
    clustermgrdir = "kunlun-cluster-manager-%s" % config['product_version']
    mach = machines.get(node['ip'])
    addNodeToFilesListMap(filesmap, node, "%s.tgz" % clustermgrdir, '.')
    stage = config['upgrade_stage']
    if stage == 'do':
        addNodeToFilesListMap(filesmap, node, "upgrade_scripts/replace_clustermgr.sh", '.')
        if config['send_license']:
            addNodeToFilesListMap(filesmap, node, config['license_file'], ".")
    addToCommandsList(commandslist, node['ip'], ".", "bash -e replace_clustermgr.sh %s %s %s %s" % (
        mach['basedir'], config['product_version'], stage, config['license_file']))

def upgrade_nodemgr_pkg(config, machines, node, dirmap, filesmap, commandslist, idx):
    upd_nodemgrdir = "kunlun-node-manager-%s" % config['upgrade_version']
    upd_storagedir = "kunlun-storage-%s" % config['upgrade_version']
    upd_serverdir = "kunlun-server-%s" % config['upgrade_version']
    upd_proxysqldir = "kunlun-proxysql-%s" % config['upgrade_version']
    stage = config['upgrade_stage'] 
    mach = machines.get(node['ip'])
    if stage == 'do':
        if config['send_license']:
            addNodeToFilesListMap(filesmap, node, config['license_file'], ".")
        targetdir = "program_binaries"
        addNodeToFilesListMap(filesmap, node, "%s.tgz" % upd_nodemgrdir, ".")
        addNodeToFilesListMap(filesmap, node, "%s.tgz" % upd_storagedir, targetdir)
        addNodeToFilesListMap(filesmap, node, "%s.tgz" % upd_serverdir, targetdir)
        addNodeToFilesListMap(filesmap, node, "%s.tgz" % upd_proxysqldir, targetdir)
        script_name = "upgrade_nodemgr_extra_%d.sh" % idx
        scriptf = open('clustermgr/%s' % script_name, 'w')
        scriptf.write("#! /bin/bash\n")
        upgrade_nodemgr_config(config['product_version'], config['upgrade_version'], scriptf)
        scriptf.close()
        addNodeToFilesListMap(filesmap, node, script_name, './upgrade_nodemgr_extra.sh')
        addNodeToFilesListMap(filesmap, node, "upgrade_scripts/upgrade_nodemgr.sh", '.')
    addToCommandsList(commandslist, node['ip'], ".", "bash -e upgrade_nodemgr.sh %s %s %s %s %s" % (
        mach['basedir'], config['product_version'], config['upgrade_version'], stage, config['license_file']))

# notice that, dirmap is processed before filesmap, and then commandslist.
# So a command is run after all files are processed, even if it is added before a file is added.
def replace_nodemgr_pkg(config, machines, node, dirmap, filesmap, commandslist, idx):
    nodemgrdir = "kunlun-node-manager-%s" % config['product_version']
    storagedir = "kunlun-storage-%s" % config['product_version']
    serverdir = "kunlun-server-%s" % config['product_version']
    proxysqldir = "kunlun-proxysql-%s" % config['product_version']
    stage = config['upgrade_stage']
    mach = machines.get(node['ip'])
    has_upgrade = False
    targetdir = "program_binaries"
    if stage == 'do':
        if config['send_license']:
            addNodeToFilesListMap(filesmap, node, config['license_file'], ".")
        addNodeToFilesListMap(filesmap, node, "upgrade_scripts/replace_nodemgr.sh", '.')
    if 'upgrade_storage' in node and node['upgrade_storage']:
        has_upgrade = True
        if stage == 'do':
            addNodeToFilesListMap(filesmap, node, "%s.tgz" % storagedir, targetdir)
        addToCommandsList(commandslist, node['ip'], ".",
            "bash -e replace_nodemgr.sh %s %s storage %s %s" % (
                mach['basedir'], config['product_version'], stage, config['license_file']))
    if 'upgrade_server' in node and node['upgrade_server']:
        has_upgrade = True
        if stage == 'do':
            addNodeToFilesListMap(filesmap, node, "%s.tgz" % serverdir, targetdir)
        addToCommandsList(commandslist, node['ip'], ".",
            "bash -e replace_nodemgr.sh %s %s server %s %s" % (
                mach['basedir'], config['product_version'], stage, config['license_file']))
    if 'upgrade_proxysql' in node and node['upgrade_proxysql']:
        has_upgrade = True
        if stage == 'do':
            addNodeToFilesListMap(filesmap, node, "%s.tgz" % proxysqldir, targetdir)
        addToCommandsList(commandslist, node['ip'], ".",
            "bash -e replace_nodemgr.sh %s %s proxysql %s" % (
                mach['basedir'], config['product_version'], stage))
    if 'upgrade_nodemgr' in node and node['upgrade_nodemgr']:
        has_upgrade = True
        if stage == 'do':
            addNodeToFilesListMap(filesmap, node, "%s.tgz" % nodemgrdir, ".")
        addToCommandsList(commandslist, node['ip'], ".",
            "bash -e replace_nodemgr.sh %s %s nodemgr %s %s" % (
                mach['basedir'], config['product_version'], stage, config['license_file']))
    return has_upgrade

def upgrade_cdc_pkg(jscfg, machines, comf):
    if 'cdc' not in jscfg:
        return
    cdc = jscfg['cdc']
    config = jscfg['config']
    autostart = config['autostart']
    usesudo = config['sudo']
    cdcdir = "kunlun-cdc-%s" % config['product_version']
    upd_cdcdir = "kunlun-cdc-%s" % config['upgrade_version']
    stage = config['upgrade_stage']
    dirmap = {}
    filesmap = {}
    commandslist = []
    for node in cdc['nodes']:
        mach = machines.get(node['ip'])
        if stage == 'do':
            output_info(comf, "Upgrading cdc on %s ..." % node['ip'])
            addNodeToFilesListMap(filesmap, node, "%s.tgz" % upd_cdcdir, ".")
            addNodeToFilesListMap(filesmap, node, "upgrade_scripts/upgrade_cdc.sh", '.')
        else:
            output_info(comf, "Purge upgrade of cdc on %s ..." % node['ip'])
        addToCommandsList(commandslist, node['ip'], ".", "bash -e upgrade_cdc.sh %s %s %s %s" % (mach['basedir'],
            config['product_version'], config['upgrade_version'], stage))
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)

def replace_cdc_pkg(jscfg, machines, comf):
    if 'cdc' not in jscfg:
        return
    cdc = jscfg['cdc']
    config = jscfg['config']
    autostart = config['autostart']
    usesudo = config['sudo']
    cdcdir = "kunlun-cdc-%s" % config['product_version']
    dirmap = {}
    filesmap = {}
    commandslist = []
    stage = config['upgrade_stage']
    for node in cdc['nodes']:
        if 'upgrade' not in node or not node['upgrade']:
            continue
        mach = machines.get(node['ip'])
        if stage  == 'do':
            output_info(comf, "Replacing cdc on %s ..." % node['ip'])
            addNodeToFilesListMap(filesmap, node, "%s.tgz" % cdcdir, ".")
            addNodeToFilesListMap(filesmap, node, "upgrade_scripts/replace_cdc.sh", '.')
        else:
            output_info(comf, "Purge replacement of cdc on %s ..." % node['ip'])
        addToCommandsList(commandslist, node['ip'], ".",
            "bash -e replace_cdc.sh %s %s %s" % (mach['basedir'], config['product_version'], stage))
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)

def upgrade_storage_nodes(jscfg, machines, node, dirmap, filesmap, commandslist, idx):
    config = jscfg['config']
    verfrom = config['product_version']
    verto = config['upgrade_version']
    mach = machines.get(node['ip'])
    stage = config['upgrade_stage']
    if stage  == 'do':
        upscript = "upgrade_scripts/upgrade_storage_%s_%s.sh" % (verfrom, verto)
        if not os.path.exists("clustermgr/%s" % upscript):
            upscript="upgrade_scripts/upgrade_storage.sh"
        addNodeToFilesListMap(filesmap, node, upscript, "./upgrade_storage.sh")
        addToCommandsList(commandslist, node['ip'], ".", "bash -e ./upgrade_storage.sh %s %s %s do_replica" % (mach['basedir'], verfrom, verto))
    addToCommandsList(commandslist, node['ip'], ".", "bash -e ./upgrade_storage.sh %s %s %s %s" % (mach['basedir'], verfrom, verto, stage))

def replace_storage_nodes(jscfg, machines, node, dirmap, filesmap, commandslist, idx):
    if 'upgrade_storage' not in node or not node['upgrade_storage']:
        return
    config = jscfg['config']
    verfrom = config['product_version']
    stage = config['upgrade_stage']
    mach = machines.get(node['ip'])
    if stage  == 'do':
        addNodeToFilesListMap(filesmap, node, "upgrade_scripts/replace_storage.sh", ".")
        addToCommandsList(commandslist, node['ip'], ".", "bash -e ./replace_storage.sh %s %s do_replica" % (mach['basedir'], verfrom))
    addToCommandsList(commandslist, node['ip'], ".", "bash -e replace_storage.sh %s %s %s" % (mach['basedir'], verfrom, stage))

def upgrade_server_nodes(jscfg, machines, node, dirmap, filesmap, commandslist, idx):
    config = jscfg['config']
    verfrom = config['product_version']
    verto = config['upgrade_version']
    stage = config['upgrade_stage']
    mach = machines.get(node['ip'])
    if stage  == 'do':
        upscript = "upgrade_scripts/upgrade_server_%s_%s.sh" % (verfrom, verto)
        if not os.path.exists("clustermgr/%s" % upscript):
            upscript="upgrade_scripts/upgrade_server.sh"
        addNodeToFilesListMap(filesmap, node, upscript, "./upgrade_server.sh")
    addToCommandsList(commandslist, node['ip'], ".", "bash -e ./upgrade_server.sh %s %s %s %s" % (mach['basedir'], verfrom, verto, stage))

def replace_server_nodes(jscfg, machines, node, dirmap, filesmap, commandslist, idx):
    if 'upgrade_server' not in node or not node['upgrade_server']:
        return
    config = jscfg['config']
    verfrom = config['product_version']
    stage = config['upgrade_stage']
    mach = machines.get(node['ip'])
    if stage  == 'do':
        addNodeToFilesListMap(filesmap, node, "upgrade_scripts/replace_server.sh", ".")
    addToCommandsList(commandslist, node['ip'], ".", "bash -e replace_server.sh %s %s %s" % (mach['basedir'], verfrom, stage))

def upgrade_proxysql_nodes(jscfg, machines, node, dirmap, filesmap, commandslist, idx):
    config = jscfg['config']
    verfrom = config['product_version']
    verto = config['upgrade_version']
    stage = config['upgrade_stage']
    mach = machines.get(node['ip'])
    if stage  == 'do':
        upscript = "upgrade_scripts/upgrade_proxysql_%s_%s.sh" % (verfrom, verto)
        if not os.path.exists("clustermgr/%s" % upscript):
            upscript="upgrade_scripts/upgrade_proxysql.sh"
        addNodeToFilesListMap(filesmap, node, upscript, "./upgrade_proxysql.sh")
    addToCommandsList(commandslist, node['ip'], ".", "bash -e ./upgrade_proxysql.sh %s %s %s %s" % (mach['basedir'], verfrom, verto, stage))

def replace_proxysql_nodes(jscfg, machines, node, dirmap, filesmap, commandslist, idx):
    if 'upgrade_proxysql' not in node or not node['upgrade_proxysql']:
        return
    config = jscfg['config']
    verfrom = config['product_version']
    stage = config['upgrade_stage']
    mach = machines.get(node['ip'])
    if stage  == 'do':
        addNodeToFilesListMap(filesmap, node, "upgrade_scripts/replace_proxysql.sh", ".")
    addToCommandsList(commandslist, node['ip'], ".", "bash -e replace_proxysql.sh %s %s %s" % (mach['basedir'], verfrom, stage))

def replace_packages(jscfg, comf, machines):
    meta = jscfg['meta']
    clustermgr = jscfg['cluster_manager']
    nodemgr = jscfg['node_manager']
    meta_hamode = meta.get('ha_mode', '')
    config = jscfg['config']
    usesudo = config['sudo']
    metaseeds = meta['group_seeds']
    stage = config['upgrade_stage']
    dirmap = {}
    filesmap = {}
    commandslist = []
    i = 0
    for node in clustermgr['nodes']:
        if 'upgrade' not in node or not node['upgrade']:
            continue
        if stage == 'do':
            output_info(comf, "Replacing cluster_mgr on %s ..." % node['ip'])
        else:
            output_info(comf, "Purge replacement of cluster_mgr on %s ..." % node['ip'])
        replace_clustermgr_pkg(config, machines, node, dirmap, filesmap, commandslist, i)
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, False)
        i = i + 1
    i = 0
    for node in nodemgr['nodes']:
        has_upgrade = replace_nodemgr_pkg(config, machines, node, dirmap, filesmap, commandslist, i)
        if has_upgrade:
            if stage == 'do':
                output_info(comf, "Setting up packages for replacement on %s ..." % node['ip'])
            else:
                output_info(comf, "Purge replacement of packages on %s ..." % node['ip'])
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, False)
        i = i + 1
    i = 0
    for node in nodemgr['nodes']:
        if 'upgrade_storage' not in node or not node['upgrade_storage']:
            continue
        if stage == 'do':
            output_info(comf, "Replacing storage nodes on %s ..." % node['ip'])
        else:
            output_info(comf, "Purge replacement of storage nodes on %s ..." % node['ip'])
        replace_storage_nodes(jscfg, machines, node, dirmap, filesmap, commandslist, i)
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, False)
        i = i + 1
    i = 0
    for node in nodemgr['nodes']:
        if 'upgrade_server' not in node or not node['upgrade_server']:
            continue
        if stage == 'do':
            output_info(comf, "Replacing server nodes on %s ..." % node['ip'])
        else:
            output_info(comf, "Purge replacement of server nodes on %s ..." % node['ip'])
        replace_server_nodes(jscfg, machines, node, dirmap, filesmap, commandslist, i)
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, False)
        i = i + 1
    i = 0
    for node in nodemgr['nodes']:
        if 'upgrade_proxysql' not in node or not node['upgrade_proxysql']:
            continue
        if stage == 'do':
            output_info(comf, "Replacing proxysql nodes on %s ..." % node['ip'])
        else:
            output_info(comf, "Purge replacement of proxysql nodes on %s ..." % node['ip'])
        replace_proxysql_nodes(jscfg, machines, node, dirmap, filesmap, commandslist, i)
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, False)
        i = i + 1
    upgrade_xpanel(jscfg, machines, comf, metaseeds, True)
    replace_cdc_pkg(jscfg, machines, comf)

def upgrade_packages(jscfg, comf, machines):
    meta = jscfg['meta']
    clustermgr = jscfg['cluster_manager']
    nodemgr = jscfg['node_manager']
    meta_hamode = meta.get('ha_mode', '')
    config = jscfg['config']
    usesudo = config['sudo']
    metaseeds = meta['group_seeds']
    stage = config['upgrade_stage']
    dirmap = {}
    filesmap = {}
    commandslist = []
    i = 0
    for node in clustermgr['nodes']:
        if stage == 'do':
            output_info(comf, "Upgrading cluster_mgr on %s ..." % node['ip'])
        else:
            output_info(comf, "Purge upgrade of cluster_mgr on %s ..." % node['ip'])
        upgrade_clustermgr_pkg(config, machines, node, dirmap, filesmap, commandslist, i)
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, False)
        i = i + 1
    i = 0
    for node in nodemgr['nodes']:
        if stage == 'do':
            output_info(comf, "Upgrading node_mgr on %s ..." % node['ip'])
        else:
            output_info(comf, "Purge upgrade of node_mgr on %s ..." % node['ip'])
        upgrade_nodemgr_pkg(config, machines, node, dirmap, filesmap, commandslist, i)
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, False)
        i = i + 1
    i = 0
    for node in nodemgr['nodes']:
        if stage == 'do':
            output_info(comf, "Upgrading storage nodes on %s ..." % node['ip'])
        else:
            output_info(comf, "Purge upgrade of storage nodes on %s ..." % node['ip'])
        upgrade_storage_nodes(jscfg, machines, node, dirmap, filesmap, commandslist, i)
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, False)
        i = i + 1
    i = 0
    for node in nodemgr['nodes']:
        if stage == 'do':
            output_info(comf, "Upgrading server nodes on %s ..." % node['ip'])
        else:
            output_info(comf, "Purge upgrade of server nodes on %s ..." % node['ip'])
        upgrade_server_nodes(jscfg, machines, node, dirmap, filesmap, commandslist, i)
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, False)
        i = i + 1
    i = 0
    for node in nodemgr['nodes']:
        if stage == 'do':
            output_info(comf, "Upgrading proxysql nodes on %s ..." % node['ip'])
        else:
            output_info(comf, "Purge upgrade of proxysql nodes on %s ..." % node['ip'])
        upgrade_proxysql_nodes(jscfg, machines, node, dirmap, filesmap, commandslist, i)
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, False)
        i = i + 1
    upgrade_xpanel(jscfg, machines, comf, metaseeds)
    upgrade_cdc_pkg(jscfg, machines, comf)

def upgrade_with_config(jscfg, comf, machines):
    config = jscfg['config']
    verfrom = config['product_version']
    verto = config['upgrade_version']
    validate_upgrade(verfrom, verto)
    upd_cmp = compare_version(verto, verfrom)
    if upd_cmp == 0:
        my_print("same version replacement")
        replace_packages(jscfg, comf, machines)
    elif upd_cmp > 0:
        my_print("upgrade from lower version to higher version")
        upgrade_packages(jscfg, comf, machines)

def generate_systemctl_clean(servname, ip, commandslist):
    syscmdpat1 = "sudo systemctl stop %s"
    syscmdpat2 = "sudo systemctl disable %s"
    syscmdpat3 = "sudo rm -f /usr/lib/systemd/system/%s"
    addToCommandsList(commandslist, ip, '/', syscmdpat1 % servname)
    addToCommandsList(commandslist, ip, '/', syscmdpat2 % servname)
    addToCommandsList(commandslist, ip, '/', syscmdpat3 % servname)

def clean_with_config(jscfg, comf, machines):
    meta = jscfg['meta']
    clustermgr = jscfg['cluster_manager']
    nodemgr = jscfg['node_manager']
    config = jscfg['config']
    storagedir = "kunlun-storage-%s" % config['product_version']
    storagedirpfx = "kunlun-storage-"
    clustermgrdir = "kunlun-cluster-manager-%s" % config['product_version']
    nodemgrdir = "kunlun-node-manager-%s" % config['product_version']
    cdcdir = "kunlun-cdc-%s" % config['product_version']
    autostart = config['autostart']
    usesudo = config['sudo']
    sudopfx=""
    if usesudo:
        sudopfx="sudo "

    dirmap = {}
    filesmap = {}
    commandslist = []

    metaseeds = meta['group_seeds']

    nodemgrmaps = {}
    for node in nodemgr['nodes']:
        nodemgrmaps[node['ip']] = node

    if 'cdc' in jscfg:
        cdcdir = "kunlun-cdc-%s" % config['product_version']
        for node in jscfg['cdc']['nodes']:
            mach = machines.get(node['ip'])
            output_info(comf, "Cleaning cdc on %s ..." % node['ip'])
            if autostart:
                servname = 'kunlun-cdc-%d.service' % node['raft_port']
                generate_systemctl_clean(servname, node['ip'], commandslist)
            addToCommandsList(commandslist, node['ip'], "%s/bin" % cdcdir, "bash stop_kunlun_cdc.sh")
            addToCommandsList(commandslist, node['ip'], ".", 'rm -fr %s/%s' % (mach['basedir'], cdcdir))
            addToCommandsList(commandslist, node['ip'], ".", 'rm -fr %s/kunlun-cdc*.service' % mach['basedir'])
            purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)

    # clean the nodemgr processes
    for node in nodemgr['nodes']:
        if node['skip']:
            continue
        mach = machines.get(node['ip'])
        output_info(comf, "Cleaning node_mgr and its managed instances on %s ..." % node['ip'])
        if autostart:
            servname = 'kunlun-node-manager-%d.service' % node['brpc_http_port']
            generate_systemctl_clean(servname, node['ip'], commandslist)
        addToCommandsList(commandslist, node['ip'], "%s/bin" % nodemgrdir, "bash stop_node_mgr.sh")
        #for item in ["server_datadirs", "storage_datadirs", "storage_logdirs", "storage_waldirs", "storage_keyringdir"]:
        #    nodedirs = node[item].strip()
        #    for d in nodedirs.split(","):
        #        cmdpat = '%srm -fr %s/*'
        #        addToCommandsList(commandslist, node['ip'], "/", cmdpat % (sudopfx, d))
        addNodeToFilesListMap(filesmap, node, 'clear_instances.sh', '.')
        addNodeToFilesListMap(filesmap, node, 'clear_instance.sh', '.')
        addToCommandsList(commandslist, node['ip'], ".", 'bash ./clear_instances.sh %s %s >& clear.log || true' % (
            mach['basedir'], config['product_version']))
        addToCommandsList(commandslist, node['ip'], ".",
            "ps -fe | grep node_exporter | grep ':%d' | grep -v grep | awk '{print \\$2}' | while read f; do kill -9 \\$f; done" % (
            node['prometheus_port_start']))
        addToCommandsList(commandslist, node['ip'], ".", 'rm -fr %s/instance_binaries' % mach['basedir'])
        addToCommandsList(commandslist, node['ip'], ".", 'rm -fr %s/kunlun-node-manager*.service' % mach['basedir'])
        addToCommandsList(commandslist, node['ip'], ".", 'rm -fr %s/setup_nodemgr*.sh' % mach['basedir'])
        addToCommandsList(commandslist, node['ip'], ".", 'rm -fr %s/start-nodemgr*.sh' % mach['basedir'])
        addToCommandsList(commandslist, node['ip'], ".", 'rm -fr %s/stop-nodemgr*.sh' % mach['basedir'])
        addToCommandsList(commandslist, node['ip'], ".", 'rm -fr %s/kunlun_install*.log' % mach['basedir'])
        # meta related is also cleared here.
        addToCommandsList(commandslist, node['ip'], ".", 'rm -fr %s/kunlun-storage*.service' % mach['basedir'])
        addToCommandsList(commandslist, node['ip'], ".", 'rm -fr %s/start-storage*.sh' % mach['basedir'])
        addToCommandsList(commandslist, node['ip'], ".", 'rm -fr %s/stop-storage*.sh' % mach['basedir'])
        addToCommandsList(commandslist, node['ip'], ".", 'rm -fr %s/%s' % (mach['basedir'], nodemgrdir))
        if 'prometheus_datadir' in node:
            addToCommandsList(commandslist, node['ip'], ".", "rm -fr %s/*" % node['prometheus_datadir'])
        for item in ["server_datadirs", "storage_datadirs", "storage_logdirs", "storage_waldirs", "storage_keyringdir", "vec_tmpdir"]:
            nodedirs = node[item].strip()
            for d in nodedirs.split(","):
                addToCommandsList(commandslist, node['ip'], ".", "rm -fr %s/*" % d)
        if config['setbashenv']:
            addToCommandsList(commandslist, node['ip'], ".", "sed -i /KUNLUN_SET_ENV/d  ~/.bashrc")
        if not config['cloud']:
            addToCommandsList(commandslist, node['ip'], ".", 'rm -fr %s/program_binaries' % mach['basedir'])
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)

    rnames = None
    if 'clusters' in jscfg and len(jscfg['clusters']) > 0:
        output_info(comf, "Cleaning all clusters specified in the configuration file ...")
        rnames = clean_clusters(config, jscfg['clusters'], nodemgrmaps, machines, comf)
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)
    clean_elasticsearch(jscfg, machines, metaseeds, comf)

    # clean the nodemgr processes
    for node in clustermgr['nodes']:
        mach = machines.get(node['ip'])
        output_info(comf, "Cleaning cluster_mgr on %s ..." % node['ip'])
        if autostart:
            servname = 'kunlun-cluster-manager-%d.service' % node['brpc_raft_port']
            generate_systemctl_clean(servname, node['ip'], commandslist)
        addToCommandsList(commandslist, node['ip'], "%s/bin" % clustermgrdir, "bash stop_cluster_mgr.sh")
        addToCommandsList(commandslist, node['ip'], ".",
            "ps -fe | grep prometheus | grep ':%d' | grep -v grep | awk '{print \\$2}' | while read f; do kill -9 \\$f; done" % (
            node['prometheus_port_start']))
        addToCommandsList(commandslist, node['ip'], ".", 'rm -fr %s/instance_binaries' % mach['basedir'])
        addToCommandsList(commandslist, node['ip'], ".", 'rm -fr %s/kunlun-cluster-manager*.service' % mach['basedir'])
        addToCommandsList(commandslist, node['ip'], ".", 'rm -fr %s/setup_clustermgr*.sh' % mach['basedir'])
        addToCommandsList(commandslist, node['ip'], ".", 'rm -fr %s/%s' % (mach['basedir'], clustermgrdir))
        if 'prometheus_datadir' in node:
            addToCommandsList(commandslist, node['ip'], ".", "rm -fr %s/*" % node['prometheus_datadir'])
        if not config['cloud']:
            addToCommandsList(commandslist, node['ip'], ".", 'rm -fr %s/program_binaries' % mach['basedir'])
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)

    worknode = get_worknode(jscfg)

    if worknode is not None:
        ip = worknode['ip']
        mach = machines.get(ip)
        if len(nodemgr['nodes']) > 0 and len(meta['nodes']) == 0 and 'group_seeds' in meta:
            metaobj = {}
            metaobj['node_manager'] = {'op':'remove', 'elements': nodemgr['nodes']}
            if rnames is not None and len(rnames) > 0:
                metaobj['delete_cluster'] = {'op':'remove', "elements": rnames}
            metajson = 'metadata.json'
            metaf = open('clustermgr/%s' % metajson, 'w')
            json.dump(metaobj, metaf, indent=4)
            metaf.close()
            addNodeToFilesListMap(filesmap, worknode, 'modify_metadata.py', '.')
            addNodeToFilesListMap(filesmap, worknode, metajson, '.')
            # Skip if we clean the meta.
            addToCommandsList(commandslist, ip, machines.get(worknode['ip'])['basedir'],
                "\\$KUNLUN_PYTHON modify_metadata.py --config %s --seeds=%s" % (metajson, metaseeds), "storage")
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)

    # clean the meta nodes
    for node in meta['nodes']:
        nodemgrobj = nodemgrmaps.get(node['ip'])
        # skip it if it is processed by nodemgr clean routine.
        if not nodemgrobj['skip']:
            purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)
            continue
        output_info(comf, "Cleaning meta node on %s ..." % node['ip'])
        if autostart:
            servname = 'kunlun-storage-%d.service' % node['port']
            generate_systemctl_clean(servname, node['ip'], commandslist)
        targetdir='%s/%s/dba_tools' % (node['program_dir'], storagedir)
        cmdpat = r'bash stopmysql.sh %d'
        addToCommandsList(commandslist, node['ip'], targetdir, cmdpat % node['port'], "storage")
        cmdpat = r'%srm -fr %s'
        addToCommandsList(commandslist, node['ip'], ".", cmdpat % (sudopfx, node['log_dir_path']))
        addToCommandsList(commandslist, node['ip'], ".", cmdpat % (sudopfx, node['data_dir_path']))
        addToCommandsList(commandslist, node['ip'], ".", cmdpat % (sudopfx, node['innodb_log_dir_path']))
        addToCommandsList(commandslist, node['ip'], ".", cmdpat % (sudopfx, node['program_dir']))
        addToCommandsList(commandslist, node['ip'], ".", 'rm -fr %s/kunlun-storage*.service' % mach['basedir'])
        addToCommandsList(commandslist, node['ip'], ".", 'rm -fr %s/start-storage*.sh' % mach['basedir'])
        addToCommandsList(commandslist, node['ip'], ".", 'rm -fr %s/stop-storage*.sh' % mach['basedir'])
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)

    # clean xpanel
    clean_xpanel(jscfg, machines, comf)

def generate_systemctl_stop(servname, ip, commandslist):
    syscmdpat1 = "sudo systemctl stop %s"
    addToCommandsList(commandslist, ip, '/', syscmdpat1 % servname)

def stop_with_config(jscfg, comf, machines):
    meta = jscfg['meta']
    clustermgr = jscfg['cluster_manager']
    nodemgr = jscfg['node_manager']
    config = jscfg['config']
    storagedir = "kunlun-storage-%s" % config['product_version']
    clustermgrdir = "kunlun-cluster-manager-%s" % config['product_version']
    nodemgrdir = "kunlun-node-manager-%s" % config['product_version']
    cdcdir = "kunlun-cdc-%s" % config['product_version']
    autostart = config['autostart']
    usesudo = config['sudo']

    dirmap = {}
    filesmap = {}
    commandslist = []

    if 'cdc' in jscfg:
        for node in jscfg['cdc']['nodes']:
            mach = machines.get(node['ip'])
            output_info(comf, "Stopping cdc on %s ..." % node['ip'])
            if autostart:
                servname = 'kunlun-cdc-%d.service' % node['raft_port']
                generate_systemctl_stop(servname, node['ip'], commandslist)
            addToCommandsList(commandslist, node['ip'], "%s/bin" % cdcdir, "bash stop_kunlun_cdc.sh")
            purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)

    # stop the clustermgr processes
    for node in clustermgr['nodes']:
        output_info(comf, "Stopping cluster_mgr on %s ..." % node['ip'])
        if autostart:
            servname = 'kunlun-cluster-manager-%d.service' % node['brpc_raft_port']
            generate_systemctl_stop(servname, node['ip'], commandslist)
        addToCommandsList(commandslist, node['ip'], "%s/bin" % clustermgrdir, "bash stop_cluster_mgr.sh")
        addToCommandsList(commandslist, node['ip'], ".",
            "ps -fe | grep prometheus | grep ':%d' | grep -v grep | awk '{print \\$2}' | while read f; do kill -9 \\$f; done" % (
            node['prometheus_port_start']))
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)

    nodemgrmaps = {}
    for node in nodemgr['nodes']:
        nodemgrmaps[node['ip']] = node

    # stop the nodemgr processes
    for node in nodemgr['nodes']:
        if node['skip']:
            continue
        output_info(comf, "Stopping node_mgr and its managed instances on %s ..." % node['ip'])
        mach = machines.get(node['ip'])
        if autostart:
            servname = 'kunlun-node-manager-%d.service' % node['brpc_http_port']
            generate_systemctl_stop(servname, node['ip'], commandslist)
        addToCommandsList(commandslist, node['ip'], "%s/bin" % nodemgrdir, "bash stop_node_mgr.sh")
        addNodeToFilesListMap(filesmap, node, 'stop_instances.sh', '.')
        addToCommandsList(commandslist, node['ip'], ".", 'bash ./stop_instances.sh %s %s >& stop.log || true' % (
            mach['basedir'], config['product_version']))
        addToCommandsList(commandslist, node['ip'], ".",
            "ps -fe | grep node_exporter | grep ':%d' | grep -v grep | awk '{print \\$2}' | while read f; do kill -9 \\$f; done" % (
            node['prometheus_port_start']))
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)

    if 'clusters' in jscfg and len(jscfg['clusters']) > 0:
        output_info(comf, "Stopping all clusters specified in the configuration file ...")
        stop_clusters(jscfg['clusters'], nodemgrmaps, machines, comf, config)
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)
    stop_elasticsearch(jscfg, machines, comf)

    for node in meta['nodes']:
        nodemgrobj = nodemgrmaps.get(node['ip'])
        # skip it if it is processed by nodemgr clean routine.
        if not nodemgrobj['skip']:
            continue
        output_info(comf, "Stopping meta node on %s ..." % node['ip'])
        if autostart:
            servname = 'kunlun-storage-%d.service' % node['port']
            generate_systemctl_stop(servname, node['ip'], commandslist)
        targetdir='.'
        cmdpat = r'bash stop-storage-%d.sh'
        addToCommandsList(commandslist, node['ip'], targetdir, cmdpat % node['port'], "storage")
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)

    # stop xpanel
    stop_xpanel(jscfg, machines, dirmap, filesmap, commandslist, comf)

def generate_systemctl_start(servname, ip, commandslist):
    syscmdpat1 = "sudo systemctl start %s"
    addToCommandsList(commandslist, ip, '/', syscmdpat1 % servname)

def start_with_config(jscfg, comf, machines):
    meta = jscfg['meta']
    clustermgr = jscfg['cluster_manager']
    nodemgr = jscfg['node_manager']
    config = jscfg['config']
    storagedir = "kunlun-storage-%s" % config['product_version']
    clustermgrdir = "kunlun-cluster-manager-%s" % config['product_version']
    nodemgrdir = "kunlun-node-manager-%s" % config['product_version']
    cdcdir = "kunlun-cdc-%s" % config['product_version']
    autostart = config['autostart']
    usesudo = config['sudo']

    dirmap = {}
    filesmap = {}
    commandslist = []

    nodemgrmaps = {}
    for node in nodemgr['nodes']:
        nodemgrmaps[node['ip']] = node

    # start the nodemgr processes
    for node in nodemgr['nodes']:
        if node['skip']:
            continue
        output_info(comf, "Starting node_mgr on %s ..." % node['ip'])
        if autostart:
            servname = 'kunlun-node-manager-%d.service' % node['brpc_http_port']
            generate_systemctl_start(servname, node['ip'], commandslist)
        else:
            mach = machines.get(node['ip'])
            addNodeToFilesListMap(filesmap, node, 'start_instances.sh', '.')
            addToCommandsList(commandslist, node['ip'], ".", 'bash ./start_instances.sh %s %s >& start.log || true' % (
                mach['basedir'], config['product_version']))
            addToCommandsList(commandslist, node['ip'], '.', "bash start-nodemgr-%d.sh </dev/null >& nodemgr_start.log" % node['brpc_http_port'])
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)

    for node in meta['nodes']:
        nodemgrobj = nodemgrmaps.get(node['ip'])
        # skip it if it is processed by nodemgr clean routine.
        if not nodemgrobj['skip']:
            continue
        output_info(comf, "Starting meta node on %s ..." % node['ip'])
        if autostart:
            servname = 'kunlun-storage-%d.service' % node['port']
            generate_systemctl_start(servname, node['ip'], commandslist)
        else:
            cmdpat = r'export ASAN_OPTIONS=halt_on_error=0; bash start-storage-%d.sh'
            addToCommandsList(commandslist, node['ip'], '.', cmdpat % node['port'], "storage")
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)

    if 'clusters' in jscfg and len(jscfg['clusters']) > 0:
        output_info(comf, "Starting all clusters specified in the configuration file ...")
        start_clusters(jscfg['clusters'], nodemgrmaps, machines, comf, config)
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)
    start_elasticsearch(jscfg, machines, comf)

    # start the clustermgr processes
    for node in clustermgr['nodes']:
        output_info(comf, "Starting cluster_mgr on %s ..." % node['ip'])
        if autostart:
            servname = 'kunlun-cluster-manager-%d.service' % node['brpc_raft_port']
            generate_systemctl_start(servname, node['ip'], commandslist)
        else:
            envpfx = "export ASAN_OPTIONS=halt_on_error=0;"
            if node['valgrind']:
                envpfx = "export ASAN_OPTIONS=halt_on_error=0; export USE_VALGRIND=1;"
            addToCommandsList(commandslist, node['ip'], "%s/bin" % clustermgrdir, "%s bash start_cluster_mgr.sh </dev/null >& start.log" % envpfx)
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)

    if 'cdc' in jscfg:
        for node in jscfg['cdc']['nodes']:
            output_info(comf, "Starting cdc on %s ..." % node['ip'])
            if autostart:
                servname = 'kunlun-cdc-%d.service' % node['raft_port']
                generate_systemctl_start(servname, node['ip'], commandslist)
            else:
                envpfx = "export ASAN_OPTIONS=halt_on_error=0;"
                if node['valgrind']:
                    envpfx = "export ASAN_OPTIONS=halt_on_error=0; export USE_VALGRIND=1;"
                addToCommandsList(commandslist, node['ip'], "%s/bin" % cdcdir, "%s bash start_kunlun_cdc.sh </dev/null >& start.log" % envpfx)
            purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)

    # start xpanel
    start_xpanel(jscfg, machines, dirmap, filesmap, commandslist, comf)

def service_with_config(jscfg, comf, machines):
    meta = jscfg['meta']
    clustermgr = jscfg['cluster_manager']
    nodemgr = jscfg['node_manager']
    config = jscfg['config']
    usesudo = config['sudo']

    dirmap = {}
    filesmap = {}
    commandslist = []

    nodemgrmaps = {}
    for node in nodemgr['nodes']:
        nodemgrmaps[node['ip']] = node

    clustermgrips = set()
    for node in clustermgr['nodes']:
        clustermgrips.add(node['ip'])

    i = 0
    nodemgrips = set()
    for node in nodemgr['nodes']:
        nodemgrips.add(node['ip'])
        if node['skip']:
            continue
        output_info(comf, "Servicing node_mgr on %s ..." % node['ip'])
        generate_nodemgr_service(config, machines, commandslist, node, i, filesmap)
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)
        i += 1

    i = 0
    for node in meta['nodes']:
        node['nodemgr'] = nodemgrmaps.get(node['ip'])
        output_info(comf, "Servicing meta node on %s ..." % node['ip'])
        generate_storage_service(config, machines, commandslist, node, i, filesmap)
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)
        i+=1

    i = 0
    for node in clustermgr['nodes']:
        output_info(comf, "Servicing cluster_mgr on %s ..." % node['ip'])
        generate_clustermgr_service(config, machines, commandslist, node, i, filesmap)
        purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)
        i += 1

    if 'cdc' in jscfg:
        i = 0
        for node in jscfg['cdc']['nodes']:
            output_info(comf, "Servicing cdc on %s ..." % node['ip'])
            generate_cdc_service(config, machines, commandslist, node, i, filesmap)
            purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, usesudo)
            i += 1

def gen_cluster_config(args):
    if args.cluster_name == '':
        raise ValueError('Error: cluster_name must be provided')
    if args.outfile == '':
        raise ValueError('Error: outfile must be provided')
    jscfg = get_json_from_file(args.config)
    init_global_config2(jscfg, args)
    machines = {}
    setup_machines2(jscfg, machines)
    validate_and_set_config2(jscfg, machines)
    comf = open(r'clustermgr/%s' % args.outfile, 'w')
    resobj = {}
    resobj['machines'] = jscfg.get('machines',[])
    targetCluster = None
    for cluster in jscfg['clusters']:
        if cluster['name'] == args.cluster_name:
            targetCluster = cluster
    if targetCluster is None:
        raise Exception("Target cluster is not found")
    targetCluster['meta'] = jscfg['meta']
    targetCluster['clustermgr'] = jscfg['cluster_manager']
    resobj['cluster'] = targetCluster
    json.dump(resobj, comf, indent=4)
    comf.close()

def get_node_info_with_config(jscfg, comf, machines):
    meta = jscfg['meta']
    clustermgr = jscfg['cluster_manager']
    nodemgr = jscfg['node_manager']
    config = jscfg['config']

    dirmap = {}
    filesmap = {}
    commandslist = []
    getbackmap = {}
    appname = "get_machine_info.py"

    check_license_config(jscfg)
    license_conf = jscfg['license']
    if not os.path.exists(config['infodir']):
        os.mkdir(config['infodir'])
    ips = set()
    for node in clustermgr['nodes']:
        ips.add(node['ip'])
    for node in nodemgr['nodes']:
        if node['skip']:
            continue
        ips.add(node['ip'])
    for ip in ips:
        setup_basedir(comf, config, machines, ip, config['sudo'])
        mach = machines.get(ip)
        addIpToFilesListMap(filesmap, ip, appname, '.')
        addToCommandsList(commandslist, ip, '.', "python2 %s --output=node.info || python3 %s --output=node.info" % (appname, appname))
        addIpToGetBackListMap(getbackmap, ip, 'node.info', 'node.info.%s' %  ip)
    purge_cache_commands(config, comf, machines, dirmap, filesmap, commandslist, False)
    process_getbackmap(comf, getbackmap, machines, config['infodir'], config)
    return ips

def get_machine_info_with_config(jscfg, scriptname, ips):
    config = jscfg['config']
    check_license_config(jscfg)
    license_conf = jscfg['license']
    os.system('/bin/bash %s' % scriptname)
    infoobj = {"valid_days": str(license_conf['valid_days']), 
            "custom_info": license_conf['custom_info'],
            "issue_time": license_conf['issue_time']}
    mach_objs = []
    for ip in ips:
        path = '%s/node.info.%s' % (config['infodir'], ip)
        mach_objs.append(get_json_from_file(path))
    infoobj['mach_infos'] = mach_objs
    f = open('clustermgr/machine_info.json', 'w')
    json.dump(infoobj, f, indent=4)
    f.close()

def get_cluster_info_with_seeds(metaseeds):
    runarg = {"user": "pgx", "password": "pgx_pwd"}
    conn = get_master_conn(runarg, metaseeds)
    info = fetch_cluster_info(conn)
    my_print(str(info))
    conn.close()

# this should be put in contrib/common
def get_common_3rd_packages_filemap():
    return {
            "mysql-driver": ["mysql-connector-python-2.1.3.tar.gz", "mysql-connector-python-2.1.3"]
            }

# The original design is a 3-item list, [file-name, sub-dir, dir-after-extracted],
# but later decided to remove sub-dir to simplify code. So there will not be
# sub-dir unless it is really necessary.

# this should be put in contrib/x86_64
# If it is a gzip for docker image, the second item is the image name
# if it is a gzip for a directory, the second item is the dir after decompressed.
def get_x86_64_3rdpackages_filemap():
    return {
            "filebeat": ["filebeat-7.10.1-linux-x86_64.tar.gz", "filebeat-7.10.1-linux-x86_64"],
            "elasticsearch": ["elasticsearch-7.10.1.tar.gz", "elasticsearch:7.10.1"],
            "kibana": ["kibana-7.10.1.tar.gz", "kibana:7.10.1"],
            "haproxy": ["haproxy-2.5.0-bin.tar.gz", "haproxy-2.5.0-bin"],
            "jdk":["jdk-8u131-linux-x64.tar.gz", "jdk1.8.0_131"],
            "hadoop": ["hadoop-3.3.1.tar.gz", "hadoop-3.3.1"],
            "prometheus" : ["prometheus.tgz", "prometheus"]
            }

def get_aarch64_3rdpackages_filemap():
    return {
            "filebeat": ["filebeat-7.13.4-linux-arm64.tar.gz", "filebeat-7.13.4-linux-arm64"],
            "elasticsearch": ["elasticsearch-7.13.4.tar.gz", "elasticsearch:7.13.4"],
            "kibana": ["kibana-7.13.4.tar.gz", "kibana:7.13.4"],
            "haproxy": ["haproxy-2.5.0-bin.tar.gz", "haproxy-2.5.0-bin"],
            "jdk":["jdk-8u371-linux-aarch64.tar.gz", "jdk1.8.0_371"],
            "hadoop": ["hadoop-3.3.6-aarch64.tar.gz", "hadoop-3.3.6"],
            "prometheus" : ["prometheus.tgz", "prometheus"]
            }

def get_arch_3rdpackages_filemap(config):
    arch = config['targetarch']
    if arch == 'x86_64':
        return get_x86_64_3rdpackages_filemap()
    elif arch == 'aarch64':
        return get_aarch64_3rdpackages_filemap()
    else : # not ready for loongarch64, etc
        raise ValueError('bad arch: %s' % arch)

def get_3rdpackages_filemap(config):
    tmap = get_common_3rd_packages_filemap()
    tmap.update(get_arch_3rdpackages_filemap(config))
    return tmap

def download_util(args):
    if args.config == '':
        jscfg = {}
    else:
        jscfg = get_json_from_file(args.config)
    init_global_config2(jscfg, args)
    config = jscfg['config']
    arch = config['targetarch']
    prodver = config['product_version']
    downbase = get_downloadbase(config['downloadsite'])
    targetdir="clustermgr"
    binarynames = ["machine_info"]
    contentTypes = set()
    contentTypes.add('application/octet-stream')
    contentTypes.add('unknown')
    my_print("contentTypes:%s" % str(contentTypes))
    vertup = get_version_info(args.product_version)
    verstr = "%s.%s" % (vertup[0], vertup[1])
    # download the binary packages
    for name in binarynames:
        fname = name
        fpath = "util_%s/%s/%s" % (arch, verstr, fname)
        download_file(downbase, fpath, contentTypes, targetdir, config['overwrite'])

def download_packages(args):
    if args.config == '':
        jscfg = {}
    else:
        jscfg = get_json_from_file(args.config)
    init_global_config2(jscfg, args)
    config = jscfg['config']
    arch = config['targetarch']
    prodver = config['product_version']
    ispro = config['professional']
    downtype = config['downloadtype']
    isrc = config['rc']
    contentTypes = set()
    downbase = get_downloadbase(config['downloadsite'])
    targetdir="clustermgr"
    contentTypes.add('application/x-gzip')
    contentTypes.add('application/octet-stream')
    contentTypes.add('unknown')
    pkgtype = "enterprise"
    if ispro:
        pkgtype = "professional"
    binarynames = ["kunlun-storage", "kunlun-server", "kunlun-cluster-manager", "kunlun-node-manager", "kunlun-proxysql"]
    if not ispro:
        binarynames.append('kunlun-cdc')
    # download the binary packages
    for name in binarynames:
        fname = "%s-%s.tgz" % (name, prodver)
        if downtype == 'release':
            if isrc:
                fpath = "releases_%s/%s-RC/release-binaries/%s" % (arch, prodver, pkgtype)
            else:
                fpath = "releases_%s/%s/release-binaries/%s" % (arch, prodver, pkgtype)
        elif downtype == 'daily_rel':
            fpath = "dailybuilds_%s/%s" % (arch, pkgtype)
        else:
            fpath = "dailybuilds_debug_%s/%s" % (arch, pkgtype)
        if name == 'kunlun-storage':
            if args.cantian:
                fpath = fpath + "/cantian"
            elif args.mariadb:
                fpath = fpath + "/mariadb"
        if name == 'kunlun-server':
            if args.mariadb:
                fpath = fpath + "/mariadb"
        fpath = fpath + "/" + fname
        download_file(downbase, fpath, contentTypes, targetdir, config['overwrite'])
    # download the xpanel docker image
    # xpanelname = "kunlun-xpanel"
    if downtype == 'release':
        if isrc:
            fpath = 'releases_%s/%s-RC/docker-allinone/kunlun-xpanel-%s.tar.gz' % (arch, prodver, prodver)
        else:
            fpath = 'releases_%s/%s/docker-allinone/kunlun-xpanel-%s.tar.gz' % (arch, prodver, prodver)
    else:
        fpath = 'dailybuilds_%s/docker-images/kunlun-xpanel-%s.tar.gz' % (arch, prodver)
    download_file(downbase, fpath, contentTypes, targetdir, config['overwrite'])
    commap = get_common_3rd_packages_filemap()
    for pkgname in commap:
        finfo = commap[pkgname]
        fpath = 'contrib/common/%s' % finfo[0]
        download_file(downbase, fpath, contentTypes, targetdir, False)
    archmap = get_arch_3rdpackages_filemap(config)
    for pkgname in archmap:
        finfo = archmap[pkgname]
        fpath = 'contrib/%s/%s' % (arch, finfo[0])
        download_file(downbase, fpath, contentTypes, targetdir, config['overwrite'])

def run_command(args):
    jscfg = get_json_from_file(args.config)
    init_global_config2(jscfg, args)
    machines = {}
    setup_machines2(jscfg, machines)
    validate_and_set_config2(jscfg, machines)
    cluster_name = args.cluster_name
    metaobj = None
    if 'meta' in jscfg:
        meta = jscfg['meta']
        if 'nodes' in meta and len(meta['nodes']) > 0:
            metaobj = meta
        elif 'group_seeds' in meta:
            metaobj = {'nodes': get_nodes_from_seeds(meta['group_seeds'], 3306)}
    #my_print(str(metaobj))
    if cluster_name == '':
        dataobj = None
        compobj = None
    elif cluster_name not in jscfg['cluster_map']:
        raise ValueError('Error: the cluster %s is not found!' % cluster_name)
    else:
        cluster = jscfg['cluster_map'][cluster_name]
        dataobj = cluster['data']
        compobj = cluster['comp']
    clusterobj = {
            'meta': metaobj,
            'data': dataobj,
            'comp': compobj
            }
    if 'xpanel' in jscfg:
        clusterobj['xpanel'] = jscfg['xpanel']
    if 'cdc' in jscfg:
        clusterobj['cdc'] = jscfg['cdc']
    if 'node_manager' in jscfg:
        clusterobj['node_manager'] = jscfg['node_manager']
    if 'cluster_manager' in jscfg:
        clusterobj['cluster_manager'] = jscfg['cluster_manager']
    runarg = {
            "runtype": args.runtype,
            "dryrun": args.dryrun,
            "shard": args.shard,
            "index": args.index,
            "command": args.command
            }
    runDriver(runarg, clusterobj, machines, args.command)

if  __name__ == '__main__':
    defconfig = get_default_config2()
    parser = argparse.ArgumentParser(description='Specify the arguments.')
    actions=["download", "install", "clean", "start", "stop", "service", "gen_cluster_config", "get_cluster_info", "run", "get_machine_info", "upgrade"]
    parser.add_argument('--config', type=str, help="The config path", default="")
    parser.add_argument('--action', type=str, help="The action", default='install', choices=actions)
    # general enviroment and product  version for all action.
    parser.add_argument('--product_version', type=str, help="kunlun version", default=defconfig['product_version'])
    parser.add_argument('--upgrade_version', type=str, help="upgrade version", default=defconfig['upgrade_version'])
    parser.add_argument('--defuser', type=str, help="the default user", default=defconfig['defuser'])
    parser.add_argument('--defbase', type=str, help="the default basedir", default=defconfig['defbase'])
    # config affects how to run actions.
    parser.add_argument('--pyexec', help="specify the python version to use",
            default=defconfig['pyexec'], choices=["none", "python2", "python3"])
    parser.add_argument('--autostart', help="whether to start the cluster automaticlly",
            default=defconfig['autostart'], action='store_true')
    parser.add_argument('--sudo', help="whether to use sudo", default=defconfig['sudo'], action='store_true')
    parser.add_argument('--verbose', help="verbose mode, to show more information",
            default=defconfig['verbose'], action='store_true')
    parser.add_argument('--localip', type=str, help="The local ip address", default=defconfig['localip'])
    parser.add_argument('--small', help="whether to use small template", default=defconfig['small'], action='store_true')
    parser.add_argument('--cloud', help="whether run on cloud images", default=defconfig['cloud'], action='store_true')
    parser.add_argument('--setbashenv', help="whether to set the user bash env",
            default=defconfig['setbashenv'], action='store_true') #excluded from config
    # cluster_manager config
    parser.add_argument('--defbrpc_http_port_clustermgr', type=int, help="default brpc_http_port for cluster_manager",
            default=defconfig['defbrpc_http_port_clustermgr'])
    parser.add_argument('--defbrpc_raft_port_clustermgr', type=int, help="default brpc_raft_port for cluster_manager",
            default=defconfig['defbrpc_raft_port_clustermgr'])
    parser.add_argument('--defprometheus_port_start_clustermgr', type=int, help="default prometheus starting port for cluster_manager",
            default=defconfig['defprometheus_port_start_clustermgr'])
    # node_manager config
    parser.add_argument('--defbrpc_http_port_nodemgr', type=int, help="default brpc_http_port for node_manager",
            default=defconfig['defbrpc_http_port_nodemgr'])
    parser.add_argument('--deftcp_port_nodemgr', type=int, help="default tcp_port for node_manager",
            default=defconfig['deftcp_port_nodemgr'])
    parser.add_argument('--defstorage_portrange_nodemgr', type=str, help="default port-range for storage nodes",
            default=defconfig['defstorage_portrange_nodemgr'])
    parser.add_argument('--defserver_portrange_nodemgr', type=str, help="default port-range for server nodes",
            default=defconfig['defserver_portrange_nodemgr'])
    parser.add_argument('--defprometheus_port_start_nodemgr', type=int, help="default prometheus starting port for node_manager",
            default=defconfig['defprometheus_port_start_nodemgr'])
    # cdc config
    parser.add_argument('--defraft_port_cdc', type=int, help="default raft_port for cdc", default=defconfig['defraft_port_cdc'])
    parser.add_argument('--defhttp_port_cdc', type=int, help="default http_port for cdc", default=defconfig['defhttp_port_cdc'])
    # used by install action to also include download action first.
    parser.add_argument('--download', help="whether to download before install operation",
            default=defconfig['download'], action='store_true')
    # used by gen_cluster_config action, which is passed to legacy run.py.
    parser.add_argument('--outfile', type=str, help="the path for the cluster config", default="cluster.json")
    # used by both gen_cluster_config and run action
    parser.add_argument('--cluster_name', type=str, help="the name of the cluster to generate the config file or run command", default="")
    # used only by download action
    parser.add_argument('--downloadsite', type=str, help="the download base site", choices=['public', 'devsite', 'internal'],
            default=defconfig['downloadsite'])
    parser.add_argument('--downloadtype', type=str, help="the packages type", choices=['release', 'daily_rel', 'daily_debug'],
            default=defconfig['downloadtype'])
    parser.add_argument('--targetarch', type=str, help="the cpu arch for the packages to download/install", default=defconfig['targetarch'])
    parser.add_argument('--overwrite', help="whether to overwrite existing file during download",
            default=defconfig['overwrite'], action='store_true')
    parser.add_argument('--professional', help="whether to download the professional version",
            default=defconfig['professional'], action='store_true')
    parser.add_argument('--rc', help="whether to download the release candidate version", default=defconfig['rc'], action='store_true')
    parser.add_argument('--mariadb', help="whether to use klustron-storage-mariadb", default=defconfig['mariadb'], action='store_true')
    parser.add_argument('--cantian', help="whether it is for cantian installation", default=defconfig['cantian'], action='store_true')
    parser.add_argument('--multipledc', help="whether used for installation in multiple datacenters",
            default=defconfig['multipledc'], action='store_true')
    # used only for run action
    parser.add_argument('--runtype', type=str, help="The run type", 
            choices=['hosts', 'meta', 'data', 'comp','sqlmeta', 'sqldata', 'sqlcomp', 'xpanel', 'cdc', 'node_manager', 'cluster_manager'])
    parser.add_argument('--dryrun', help="do not run the command, just show", default=False, action='store_true')
    parser.add_argument('--shard', type=int, help="The shard number, only used for data/sqldata, start from 1", default=0)
    parser.add_argument('--index', type=int, help="The index number, start from 1", default=0)
    parser.add_argument('--command', type=str, help="the command")
    # used for distributing license
    parser.add_argument('--license_file', type=str, help="the name of the license file", default=defconfig['license_file'])
    # used for storing files for machine info
    parser.add_argument('--infodir', type=str, help="the directory to store the machine info files", default=defconfig['infodir'])
    # used for upgrade
    parser.add_argument('--upgrade_stage', type=str, help="the state for upgrade action",
        default=defconfig['upgrade_stage'], choices=["do","clean"])
    # used for get_cluster_info
    parser.add_argument('--metaseeds', type=str, help="the meta seeds used for getting cluster information", default="")

    args = parser.parse_args()
    if not args.defbase.startswith('/'):
        raise ValueError('Error: the default basedir must be absolute path!')
    if args.autostart:
        args.sudo = True

    # set the default config file to be config.json for non-download action.
    if args.config == '' and args.action != 'download' and args.action != 'download_util' and args.action != 'get_cluster_info':
        args.config = 'config.json'

    my_print(str(sys.argv))
    checkdirs(['clustermgr'])

    if args.action == 'download':
        download_packages(args)
    elif args.action == 'install':
        if args.download:
            download_packages(args)
        install_clustermgr(args)
    if args.action == 'get_machine_info':
        if args.download:
            download_util(args)
        get_machine_info(args)
    elif args.action == 'clean':
        clean_clustermgr(args)
    elif args.action == 'start':
        start_clustermgr(args)
    elif args.action == 'stop':
        stop_clustermgr(args)
    elif args.action == 'service':
        service_clustermgr(args)
    elif args.action == 'upgrade':
        upgrade_clustermgr(args)
    elif args.action == 'gen_cluster_config':
        gen_cluster_config(args)
    elif args.action == 'get_cluster_info':
        get_cluster_info(args)
    elif args.action == 'run':
        run_command(args)
    else:
        # just defensive, for more more actions later.
        pass
