HEX
Server: Apache
System: Windows NT MAGNETO-ARM 10.0 build 22000 (Windows 10) AMD64
User: Michel (0)
PHP: 7.4.7
Disabled: NONE
Upload Files
File: C:/Program Files/MySQL/MySQL Workbench 8.0/workbench/client_utils.py
# Copyright (c) 2013, 2020, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0,
# as published by the Free Software Foundation.
#
# This program is also distributed with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms, as
# designated in a particular file or component or in included license
# documentation.  The authors of MySQL hereby grant you an additional
# permission to link the program and your derivative works with the
# separately licensed software that they have included with MySQL.
# This program is distributed in the hope that it will be useful,  but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See
# the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA

import platform
import sys
import os

import tempfile
import subprocess

from queue import Queue, Empty
from threading import Thread
import mforms
import grt

from workbench.log import log_info, log_error, log_debug
from wb_common import to_unicode

from .db_utils import ConnectionTunnel


def get_path_to_mysql():
    # get path to mysql client from options
    try:
        path = grt.root.wb.options.options["mysqlclient"]
        if path:
            if os.path.exists(path):
                return path
            if any(os.path.exists(os.path.join(p,path)) for p in os.getenv("PATH").split(os.pathsep)):
                return path
            return None
    except:
        return None
    
    if sys.platform.lower() == "darwin":
        # if path is not specified, use bundled one
        return mforms.App.get().get_executable_path("mysql")
    elif sys.platform.lower() == "win32":
        return mforms.App.get().get_executable_path("mysql.exe")
    else:
        # if path is not specified, use bundled one
        path = mforms.App.get().get_executable_path("mysql")
        if path:
            return path
        # just pick default
        if any(os.path.exists(os.path.join(p,"mysql")) for p in os.getenv("PATH").split(os.pathsep)):
            return "mysql"
        return None


def start_reading_from(file):
    """Create a thread to read from the file object and feed a queue. Use for non-blocking reads from file objects."""

    def reader(file, q):
        while True:
            l = file.readline()
            if not l:
                break
            q.put(l)
        q.put(None)

    q = Queue()
    thr = Thread(target=reader, args=(file, q))
    thr.start()

    return q, thr


class MySQLScriptImporter(object):
    """Import a SQL script using the MySQL command line tool"""

    def __init__(self, connection_params):
        self._extra_params = []
        self._password = ""
    
        self._tool_path = get_path_to_mysql()
        self._upgrade_tool_path = ""
    
        self._tunnel = ConnectionTunnel(connection_params)

        conn = connection_params.parameterValues
        params = []
        if connection_params.driver.name == "MysqlNativeSocket":
            params.append("--protocol="+("pipe" if sys.platform == "win32" else "socket"))
            if conn["socket"]:
                params.append("--socket=" + conn["socket"])
        else:
            params.append("--protocol=tcp")
            if self._tunnel.port or conn["port"]:
                params.append("--port=" + str(self._tunnel.port or conn["port"]))
            if (self._tunnel.port and ["localhost"] or [conn["hostName"]])[0]:
                params.append("--host=" + (self._tunnel.port and ["localhost"] or [conn["hostName"]])[0])
        
        if conn.get("useSSL", 0):
            if conn.get("sslCert", ""):
                params.append("--ssl-cert=%s" % conn["sslCert"])
            if conn.get("sslCA", ""):
                params.append("--ssl-ca=%s" % conn["sslCA"])
            if conn.get("sslKey", ""):
                params.append("--ssl-key=%s" % conn["sslKey"])
            if conn.get("sslCipher", ""):
                params.append("--ssl-cipher=%s" % conn["sslCipher"])
                
        if conn.get("OPT_ENABLE_CLEARTEXT_PLUGIN", ""):
            params.append("--enable-cleartext-plugin")
            
        params += ["--user=" + conn["userName"]]
        self._connection_params = params


    def set_extra_params(self, param_list):
        self._extra_params = param_list


    def set_password(self, password):
        self._password = password


    def report_progress(self, message, current, total):
        pass


    def report_output(self, text):
        print(text)

    def add_command_parameter(self, param_list, parameter, data = None):
        is_windows = platform.system() == 'Windows'

        if parameter == "command":
            param_list.append(data)
        elif parameter == "defaults-extra-file":
            if is_windows:
                pwdfile = tempfile.NamedTemporaryFile(delete=False, suffix=".cnf")
                pwdfilename = pwdfile.name
                tmpdir = None
            else:
                # use a pipe to feed the password to the client
                tmpdir = tempfile.mkdtemp()
                pwdfilename = os.path.join(tmpdir, 'extraparams.cnf')
                os.mkfifo(pwdfilename)
            param_list.append("--defaults-extra-file=%s" % pwdfilename)
            return tmpdir, pwdfilename
        elif parameter == "default-character-set" and data:
            param_list.append("--default-character-set=%s" % data)
        elif parameter == "connection-params":
            param_list += self._connection_params
        elif parameter == "extra-params":
            param_list += self._extra_params
        elif parameter == "default-schema" and data:
            param_list.append(data)

    def launch_process(self, command, working_directory):
        is_windows = platform.system() == 'Windows'
        real_command = ""
        info = None
        if is_windows:
            info = subprocess.STARTUPINFO()
            info.dwFlags |= subprocess.STARTF_USESHOWWINDOW
            info.wShowWindow = subprocess.SW_HIDE
            # Command line can contain object names in case of export and filename in case of import
            # Object names must be in utf-8 but filename must be encoded in the filesystem encoding,
            # which probably isn't utf-8 in windows.
            fse = sys.getfilesystemencoding()
            real_command = command.encode(fse) if isinstance(command, str) else command
        else:
            real_command = command
        
        try:
            log_debug("Executing command: %s\n" % real_command)
            proc = subprocess.Popen(real_command, cwd=working_directory, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT,startupinfo=info)
        except OSError as exc:
            log_error("Error executing command %s\n%s\n" % (real_command, exc))
            import traceback
            traceback.print_exc()
            raise RuntimeError("Error executing %s:\n%s" % (real_command, str(exc)))
        return proc


    def reset_schemas(self, default_schema=None, default_charset="utf8"):
        if not self._upgrade_tool_path:
            raise RuntimeError("You need to specify a valid location to mysql_upgrade tool.")

        command = []
        
        self.add_command_parameter(command, "command", self._upgrade_tool_path)
        tmpdir, pwdfilename = self.add_command_parameter(command, "defaults-extra-file")
        self.add_command_parameter(command, "default-character-set", default_charset)
        self.add_command_parameter(command, "connection-params")
        self.add_command_parameter(command, "extra-params")
        self.add_command_parameter(command, "default-schema", default_schema)
        command.append("--force")

        command_string = " ".join(command)
        
        proc = None
        try:
            self.report_progress("Preparing...", 0, 2)
            proc = self.launch_process(command, None)

            if self._password is None:
                self._password = ''
            pwdfile = open(pwdfilename, 'w')
            pwdfile.write('[client]\npassword=%s\n' % self._password.replace("\\", "\\\\"))
            pwdfile.close()

            line_count = 0
            for line in iter(proc.stdout.readline,''):
                if line.startswith('Checking'):
                    line_count += 1
                    self.report_progress(line, line_count, 2)

            proc.wait()
            self.report_progress("Finished...", 2, 2)
        except Exception as e:
            log_error("There was an exception running a process: %s\n%s" % (command, str(e)))
        finally:
            if pwdfilename:
                os.remove(pwdfilename)
            if tmpdir:
                os.rmdir(tmpdir)


                
                
    def import_script(self, path, default_schema=None, default_charset="utf8"):
        if not self._tool_path:
            raise RuntimeError("mysql command line client not found. Please fix its path in Preferences -> Administration")

        is_windows = platform.system() == 'Windows'
        
        if is_windows:
            params = ['"%s"' % self._tool_path]
            pwdfile = tempfile.NamedTemporaryFile(delete=False, suffix=".cnf")
            pwdfilename = pwdfile.name
            tmpdir = None
        else:
            params = [to_unicode(self._tool_path)]
            # use a pipe to feed the password to the client
            tmpdir = tempfile.mkdtemp()
            pwdfilename = os.path.join(tmpdir, 'extraparams.cnf')
            os.mkfifo(pwdfilename)
        params.append('--defaults-extra-file=' + pwdfilename)

        if default_charset:
            params.append("--default-character-set=%s" % default_charset)
        params += self._connection_params
        params += self._extra_params

        if default_schema:
            params.append(default_schema)

        cmdstr = " ".join(params)

        workdir = os.path.dirname(path)

        log_info("Feeding data from %s to %s (cwd=%s)\n" % (path, cmdstr, workdir))
        p1 = None
        try:
            self.report_progress("Preparing...", None, None)
            if not is_windows:
                try:
                    p1 = subprocess.Popen(params, cwd=workdir, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding='utf8')
                except OSError as exc:
                    log_error("Error executing command %s\n%s\n" % (" ".join(params), exc))
                    raise RuntimeError("Error executing %s:\n%s" % (" ".join(params), str(exc)))

            # in !Windows feed password to client after it's started (otherwise the fifo would block on open for writing)
            with open(pwdfilename, 'w') as pwdfile:
                pwdfile.write('[client]\npassword=')
                if self._password is None:
                    self._password = ''
                pwdfile.write(self._password.replace("\\", "\\\\"))
                pwdfile.write('\n')

            if is_windows:
                try:
                    info = subprocess.STARTUPINFO()
                    info.dwFlags |= subprocess.STARTF_USESHOWWINDOW
                    info.wShowWindow = subprocess.SW_HIDE
                    # Command line can contain object names in case of export and filename in case of import
                    # Object names must be in utf-8 but filename must be encoded in the filesystem encoding,
                    # which probably isn't utf-8 in windows.
                    log_debug("Executing command: %s\n" % cmdstr)
                    p1 = subprocess.Popen(cmdstr, cwd=workdir, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT,startupinfo=info, shell=cmdstr[0] != '"', encoding='utf8')
                except OSError as exc:
                    log_error("Error executing command %s\n%s\n" % (cmdstr, str(exc)))
                    import traceback
                    traceback.print_exc()
                    raise RuntimeError("Error executing %s:\n%s" % (cmdstr, str(exc)))

            # do the import
            total_size = os.stat(path).st_size
            processed = 0
            self.report_progress("Importing %s..." % os.path.basename(path), 0, total_size)
            stdout_q, thr = start_reading_from(p1.stdout)
           
            with open(path, "r") as input_file:
                while p1 and p1.poll() == None:
                    try:
                        if stdout_q:
                            text = stdout_q.get_nowait()
                            if text:
                                log_info("Task stdout: %s\n" % text)
                                if 'Access denied for user' in text:
                                    raise grt.DBLoginError(text)
                                elif "Can't open named pipe to host" in text and sys.platform.lower() == "win32":
                                    text = "%s\n%s" % (text, "Please check if the server started with the --enabled-named-pipe parameter. The parameter can also be set in the config file.")
                                self.report_output(text.strip())
                            elif text is None:
                                  stdout_q = None
                    except Empty:
                        pass

                    line = input_file.readline()
                    if not line:
                        break
                    processed += len(line)
                    try:
                        p1.stdin.write(line)
                    except IOError as e:
                        log_error("Exception writing to stdin from cmdline client: %s\n" % e)
                        if e.errno == 32: # broken pipe
                            log_error("Broken pipe from child process\n")
                            break
                        elif e.errno == 22: # invalid argument (happens in Windows, when child process just dies)
                          log_error("Broken pipe from child process\n")
                          break
                        raise e
                    self.report_progress(None, processed, total_size)
                

            # close the writer end of the client's pipe, so it can exit
            p1.stdin.close()

            self.report_progress("Finished executing script", processed, total_size)
            
            # flush queue from reader
            if stdout_q:
                while True:
                    text = stdout_q.get()
                    if text is None:
                        break
                    log_info("Task stdout: %s\n" % text)
                    if 'Access denied for user' in text:
                        raise grt.DBLoginError(text)
                    elif "Can't open named pipe to host" in text and sys.platform.lower() == "win32":
                        text = "%s\n%s" % (text, "Please check if the server started with the --enabled-named-pipe parameter. The parameter can also be set in the config file.")
                    self.report_output(text.strip())

            # let reader thread die
            thr.join()

            p1.wait()

            exitcode = p1.returncode
            
            log_info("mysql tool exited with code %s\n" % exitcode)

            if exitcode != 0:
                self.report_progress("Operation failed with exitcode " + str(exitcode), None, None)
            else:
                self.report_progress("Operation completed successfully", None, None)
            
            return exitcode
        finally:
            if pwdfilename:
                os.remove(pwdfilename)
            if tmpdir:
                os.rmdir(tmpdir)