心澄

命由己造,相由心生,世间万物皆是化相,心不动,万物皆不动,心不变,万物皆不变
Program Languagerusting... going...
Email insunsgmail.com
Country China
LocationHangZhou, ZheJiang

mysql转pgsql


关于pgsql

pgsql全称postgresql,是个关系型数据库,但是其实不止于此,他可以以jsonb格式存储json数据(类似于mongo的bson),分区、触发器、强大的插件等都是选择的理由。官方的简介:

PostgreSQL is a powerful, open source object-relational database system with over 30 years of active development that has earned it a strong reputation for reliability, feature robustness, and performance.

为什么转pgsql?

有个产品,原先使用的是mysql5存储,通过程序根据用户id将其相关数据定位到不同的分表存储。即便如此单表还是很容易超1亿条记录。我并不是数据库专家,但是使用经验上来说,单表数据超过1亿条记录,数据入库的性能确实是个问题。

有人可能会说,mysql8也支持分区功能。这一点我不是很确定也没详细了解。选择pgsql一是分区功能,二是如果万一项目做大了,pgsql有不同的大数据分支可无缝切换,比如greenplume。另外,程序猿也需要尝鲜,这也是切换的动力之一。

切换流程和工具

1. 首先程序得支持pgsql

这一点得看之前程序dao层是否分离,以及驱动的封装情况。如果dao层分离出来了,db操作封装的好,直接写一个pgsql兼容的驱动即可无缝切换

2. 生成pgsql的创建语句

下面是我自己写自己用的php程序my2pg.php,替换其中的$db相关为你自己的数据库操作即可:

<?php

/**
 * @Author: insuns@gmail.com
 * @Date:   2020-09-29 14:17:07
 * @Last Modified time: 2020-10-12 19:32:45
 */
define('SCHEME', 'public.');
include dirname(__DIR__) . '/router/core.inc.php';
$db = &$YDB;
$q = $db->query('SHOW TABLES');
$tables = [];
while ($r = $db->fetchArray($q, 'num')) {
    $tables[] = $r[0];
}
//获得每个表的创建语句
$data = [];
$setAutoincrementId = [];
foreach ($tables as $t) {
    $q = $db->query('SHOW FULL COLUMNS FROM ' . $t);
    $sql = $key = $comment = [];
    while ($r = $db->fetchArray($q)) {
        $type = getPgType($r);
        $s = '"' . $r['Field'] . '" ' . $type;
        if ($r['Default'] != null && $r['Default'] != '' && $r['Default'] != '0000-00-00' && $r['Default'] != '0000-00-00 00:00:00') {
            if ($r['Default'] != 'CURRENT_TIMESTAMP') {
                $s .= ' DEFAULT '' . $r['Default'];
                $s .= ''::' . preg_replace('/(.+?)/', '', $type);
            } else {
                $s .= ' DEFAULT CURRENT_TIMESTAMP';
            }
        }
        if ($r['Key'] == 'PRI' && $r['Extra'] == 'auto_increment') {
            $s .= ' PRIMARY KEY';
        }
        $sql[] = $s;
        $c = getCommint($t, $r);
        if ($c) {
            $comment[] = $c;
        }
    }
    $key = getIndex($t);
    $data[] = 'CREATE TABLE IF NOT EXISTS ' . SCHEME . $t . '(' . implode(", ", $sql) . ");n"
        . ($comment ? "BEGIN;n" . implode("n", $comment) . "nCOMMIT;n" : '')
        . ($key ? "BEGIN;n" . implode("n", $key) . "nCOMMIT;n" : '');
    // $data[] = $key ? "BEGIN;n" . implode("n", $key) . "nCOMMIT;n" : '';

    $setAutoincrementId[] = 'SELECT setval('' . $t . '_id_seq', (SELECT max(id)  from "' . $t . '"));';
}
file_put_contents('pg.sql', implode("n", $data));
file_put_contents('setId.sql', implode("n", $setAutoincrementId));
echo "finished.n";

function getPgType($f) {
    $pgType = '';
    if ($f['Extra'] == 'auto_increment') {
        $type = trim(str_replace('unsigned', '', strtolower(preg_replace('/(.+?)/', '', $f['Type']))));
        if ($type == 'int' || $type == 'mediumint') {
            $pgType = 'SERIAL';
        } elseif ($type == 'smallint') {
            $pgType = 'SMALLSERIAL';
        } elseif ($type == 'bigint') {
            $pgType = 'BIGSERIAL';
        }
    } else {
        $isUnsigned = stripos($f['Type'], 'unsigned') !== false;
        if (stripos($f['Type'], 'tinyint(') === 0) {
            $pgType = 'smallint';
        } elseif (stripos($f['Type'], 'int(') === 0) {
            $pgType = $isUnsigned ? "bigint" : 'integer';
        } elseif (stripos($f['Type'], "smallint(") === 0) {
            $pgType = $isUnsigned ? 'integer' : 'smallint';
        } elseif (stripos($f['Type'], "mediumint(") === 0) {
            $pgType = "integer";
        } elseif (stripos($f['Type'], "bigint(") === 0) {
            $pgType = 'bigint';
        } elseif (stripos($f['Type'], "year") === 0) {
            $pgType = "integer";
        } elseif ($f['Type'] == "longtext") {
            $pgType = 'text';
        } elseif ($f['Type'] == "mediumtext") {
            $pgType = 'text';
        } elseif ($f['Type'] == "tinytext") {
            $pgType = "text";
        } elseif (stripos($f['Type'], "varchar(") === 0) {
            $pgType = $f['Type'];
        } elseif ($f['Type'] == "datetime") {
            $pgType = "timestamp without time zone";
        } elseif ($f['Type'] == "timestamp") {
            $pgType = "timestamp with time zone";
        } elseif (stripos($f['Type'], "double") === 0) {
            $pgType = "double precision";
        } elseif (stripos($f['Type'], "float") === 0) {
            $pgType = "real";
        } elseif (stripos($f['Type'], "blob") === 0) {
            $pgType = "bytea";
        } elseif (stripos($f['Type'], "binary") === 0) {
            $pgType = "bytea";
        } elseif (stripos($f['Type'], "enum(") === 0 || stripos($f['Type'], "set(") === 0) {
            $pgType = "varchar";
        } elseif (stripos($f['Type'], "linestring") === 0) {
            $pgType = "path";
        } elseif (stripos($f['Type'], "point") === 0) {
            $pgType = "point";
        }
        if (!$pgType) {
            $pgType = $f['Type'];
        }
    }
    return str_replace('unsigned', '', $pgType);
}
/**
 * 获得表注释
 *
 * @param [type] $t
 * @param [type] $f
 */
function getCommint($t, $f) {
    return $f['Comment'] ? 'COMMENT ON COLUMN ' . SCHEME . $t . '."' . $f['Field'] . '" IS '' . $f['Comment'] . '';' : '';
}

function getIndex($t) {
    $q = $GLOBALS['YDB']->query('SHOW CREATE TABLE ' . $t);
    $ct = $GLOBALS['YDB'] -> fetchArray($q, 'num');
    if (!empty($ct[1])) {
        preg_match_all('/(UNIQUEs+)?KEYs+`(.+?)`s+((.+?))/', $ct[1], $m);
        if (!empty($m[2])) {
            $keys = [];
            foreach ($m[2] as $k => $v) {
                $kn = $t . '_' . $v;
                $v = str_replace('`', '"', $v);
                $f = empty($m[3][$k]) ? $v : str_replace('`', '"', $m[3][$k]);
                $keys[] = /* 'DROP INDEX IF EXISTS ' . $kn . ";n" . */ 'CREATE ' . (!empty($m[1][$k]) ? 'UNIQUE ' : '') . 'INDEX ' . $kn . ' ON ' . SCHEME . $t . '(' . $f . ');';
            }
            return $keys;
        }
    }
    return [];
}

执行php my2pg.php将在当前目录生成setId.sqlpg.sql两文件,其中pg.sql是根据你mysql数据库的表结构生成对应的pgsql的数据表结构。setId.sql是在你数据导入完成后,调整自增id的。其实在等下讲的工具中就有可以自动生成pgsql表结构语句的,不过他生成的表结构是不添加注释的,也没生成索引等,所以我自己写了程序来完成。

注:这个程序只支持常用的数据类型转换,对我来说已经够用了,复杂的自己在getPgType添加下。

创建新的数据库,导入建表语句

登录pgsql:

su postgres
psql -U dbuser -d dbname -W

创建数据库,用户,赋权:

CREATE USER dbuser WITH PASSWORD '********';
CREATE DATABASE dbname OWNER dbuser;
GRANT ALL PRIVILEGES ON DATABASE dbname TO dbuser;
注意将dbuser换成你自己的用户名,dbname换成你自己数据库的名称,********换成你自己的密码。

执行导入建表语句:

  1. 退出命令行:exit
  2. 导入建表语句:psql -U dbuser -d dbname -f pg.sql。注意pg.sql路径替换成你自己的

同步数据

同步数据我用到了rds_dbsync,下列讲下docker版本的操作流程。

  1. 克隆项目到本地:cd ~ && git clone https://github.com/aliyun/rds_dbsync.git
  2. 修改cd rds_dbsync && vi dbsync/my.cfg,由于是mysql同步到pgsql,因此只要修改[src.mysql][desc.pgsql]即可;
  3. 编译docker镜像:docker build -t dbsync .
  4. 添加docker-compose.yml:

    version: "3.5"
    services:
      dbsync:
        image: dbsync
        container_name: dbsync
        volumes:
          - "~/rds_dbsync/dbsync/my.cfg:/dbsync/my.cfg"
        command: mysql2pgsql
    
    networks:
      default:
        external:
          name: you_docker_network_name
          #注意you_docker_network_name修改成你mysql和pgsql相同的网络
  5. 开始同步数据:docker-compose up -d

等待结束即可,查看进程或者消息可以执行:docker logs --tail 20 dbsync

  • 分享:
评论

    • 博主

    说点什么