mysql转pgsql
关于pgsql
pgsql全称postgresql,是个关系型数据库,但是其实不止于此,他可以以jsonb格式存储json数据(类似于mongo的bson),分区、触发器、强大的插件等都是选择的理由。官方的简介:
PostgreSQL is a powerful, open source object-relational database system with over 30 years of active development that has earned it a strong reputation for reliability, feature robustness, and performance.
为什么转pgsql?
有个产品,原先使用的是mysql5存储,通过程序根据用户id将其相关数据定位到不同的分表存储。即便如此单表还是很容易超1亿条记录。我并不是数据库专家,但是使用经验上来说,单表数据超过1亿条记录,数据入库的性能确实是个问题。
有人可能会说,mysql8也支持分区功能。这一点我不是很确定也没详细了解。选择pgsql一是分区功能,二是如果万一项目做大了,pgsql有不同的大数据分支可无缝切换,比如greenplume。另外,程序猿也需要尝鲜,这也是切换的动力之一。
切换流程和工具
1. 首先程序得支持pgsql
这一点得看之前程序dao层是否分离,以及驱动的封装情况。如果dao层分离出来了,db操作封装的好,直接写一个pgsql兼容的驱动即可无缝切换
2. 生成pgsql的创建语句
下面是我自己写自己用的php程序my2pg.php
,替换其中的$db
相关为你自己的数据库操作即可:
<?php
/**
* @Author: insuns@gmail.com
* @Date: 2020-09-29 14:17:07
* @Last Modified time: 2020-10-12 19:32:45
*/
define('SCHEME', 'public.');
include dirname(__DIR__) . '/router/core.inc.php';
$db = &$YDB;
$q = $db->query('SHOW TABLES');
$tables = [];
while ($r = $db->fetchArray($q, 'num')) {
$tables[] = $r[0];
}
//获得每个表的创建语句
$data = [];
$setAutoincrementId = [];
foreach ($tables as $t) {
$q = $db->query('SHOW FULL COLUMNS FROM ' . $t);
$sql = $key = $comment = [];
while ($r = $db->fetchArray($q)) {
$type = getPgType($r);
$s = '"' . $r['Field'] . '" ' . $type;
if ($r['Default'] != null && $r['Default'] != '' && $r['Default'] != '0000-00-00' && $r['Default'] != '0000-00-00 00:00:00') {
if ($r['Default'] != 'CURRENT_TIMESTAMP') {
$s .= ' DEFAULT '' . $r['Default'];
$s .= ''::' . preg_replace('/(.+?)/', '', $type);
} else {
$s .= ' DEFAULT CURRENT_TIMESTAMP';
}
}
if ($r['Key'] == 'PRI' && $r['Extra'] == 'auto_increment') {
$s .= ' PRIMARY KEY';
}
$sql[] = $s;
$c = getCommint($t, $r);
if ($c) {
$comment[] = $c;
}
}
$key = getIndex($t);
$data[] = 'CREATE TABLE IF NOT EXISTS ' . SCHEME . $t . '(' . implode(", ", $sql) . ");n"
. ($comment ? "BEGIN;n" . implode("n", $comment) . "nCOMMIT;n" : '')
. ($key ? "BEGIN;n" . implode("n", $key) . "nCOMMIT;n" : '');
// $data[] = $key ? "BEGIN;n" . implode("n", $key) . "nCOMMIT;n" : '';
$setAutoincrementId[] = 'SELECT setval('' . $t . '_id_seq', (SELECT max(id) from "' . $t . '"));';
}
file_put_contents('pg.sql', implode("n", $data));
file_put_contents('setId.sql', implode("n", $setAutoincrementId));
echo "finished.n";
function getPgType($f) {
$pgType = '';
if ($f['Extra'] == 'auto_increment') {
$type = trim(str_replace('unsigned', '', strtolower(preg_replace('/(.+?)/', '', $f['Type']))));
if ($type == 'int' || $type == 'mediumint') {
$pgType = 'SERIAL';
} elseif ($type == 'smallint') {
$pgType = 'SMALLSERIAL';
} elseif ($type == 'bigint') {
$pgType = 'BIGSERIAL';
}
} else {
$isUnsigned = stripos($f['Type'], 'unsigned') !== false;
if (stripos($f['Type'], 'tinyint(') === 0) {
$pgType = 'smallint';
} elseif (stripos($f['Type'], 'int(') === 0) {
$pgType = $isUnsigned ? "bigint" : 'integer';
} elseif (stripos($f['Type'], "smallint(") === 0) {
$pgType = $isUnsigned ? 'integer' : 'smallint';
} elseif (stripos($f['Type'], "mediumint(") === 0) {
$pgType = "integer";
} elseif (stripos($f['Type'], "bigint(") === 0) {
$pgType = 'bigint';
} elseif (stripos($f['Type'], "year") === 0) {
$pgType = "integer";
} elseif ($f['Type'] == "longtext") {
$pgType = 'text';
} elseif ($f['Type'] == "mediumtext") {
$pgType = 'text';
} elseif ($f['Type'] == "tinytext") {
$pgType = "text";
} elseif (stripos($f['Type'], "varchar(") === 0) {
$pgType = $f['Type'];
} elseif ($f['Type'] == "datetime") {
$pgType = "timestamp without time zone";
} elseif ($f['Type'] == "timestamp") {
$pgType = "timestamp with time zone";
} elseif (stripos($f['Type'], "double") === 0) {
$pgType = "double precision";
} elseif (stripos($f['Type'], "float") === 0) {
$pgType = "real";
} elseif (stripos($f['Type'], "blob") === 0) {
$pgType = "bytea";
} elseif (stripos($f['Type'], "binary") === 0) {
$pgType = "bytea";
} elseif (stripos($f['Type'], "enum(") === 0 || stripos($f['Type'], "set(") === 0) {
$pgType = "varchar";
} elseif (stripos($f['Type'], "linestring") === 0) {
$pgType = "path";
} elseif (stripos($f['Type'], "point") === 0) {
$pgType = "point";
}
if (!$pgType) {
$pgType = $f['Type'];
}
}
return str_replace('unsigned', '', $pgType);
}
/**
* 获得表注释
*
* @param [type] $t
* @param [type] $f
*/
function getCommint($t, $f) {
return $f['Comment'] ? 'COMMENT ON COLUMN ' . SCHEME . $t . '."' . $f['Field'] . '" IS '' . $f['Comment'] . '';' : '';
}
function getIndex($t) {
$q = $GLOBALS['YDB']->query('SHOW CREATE TABLE ' . $t);
$ct = $GLOBALS['YDB'] -> fetchArray($q, 'num');
if (!empty($ct[1])) {
preg_match_all('/(UNIQUEs+)?KEYs+`(.+?)`s+((.+?))/', $ct[1], $m);
if (!empty($m[2])) {
$keys = [];
foreach ($m[2] as $k => $v) {
$kn = $t . '_' . $v;
$v = str_replace('`', '"', $v);
$f = empty($m[3][$k]) ? $v : str_replace('`', '"', $m[3][$k]);
$keys[] = /* 'DROP INDEX IF EXISTS ' . $kn . ";n" . */ 'CREATE ' . (!empty($m[1][$k]) ? 'UNIQUE ' : '') . 'INDEX ' . $kn . ' ON ' . SCHEME . $t . '(' . $f . ');';
}
return $keys;
}
}
return [];
}
执行php my2pg.php
将在当前目录生成setId.sql
和pg.sql
两文件,其中pg.sql
是根据你mysql数据库的表结构生成对应的pgsql的数据表结构。setId.sql
是在你数据导入完成后,调整自增id的。其实在等下讲的工具中就有可以自动生成pgsql表结构语句的,不过他生成的表结构是不添加注释的,也没生成索引等,所以我自己写了程序来完成。
注:这个程序只支持常用的数据类型转换,对我来说已经够用了,复杂的自己在getPgType
添加下。
创建新的数据库,导入建表语句
登录pgsql:
su postgres
psql -U dbuser -d dbname -W
创建数据库,用户,赋权:
CREATE USER dbuser WITH PASSWORD '********';
CREATE DATABASE dbname OWNER dbuser;
GRANT ALL PRIVILEGES ON DATABASE dbname TO dbuser;
注意将dbuser换成你自己的用户名,dbname换成你自己数据库的名称,********
换成你自己的密码。
执行导入建表语句:
- 退出命令行:
exit
- 导入建表语句:
psql -U dbuser -d dbname -f pg.sql
。注意pg.sql
路径替换成你自己的
同步数据
同步数据我用到了rds_dbsync,下列讲下docker版本的操作流程。
- 克隆项目到本地:
cd ~ && git clone https://github.com/aliyun/rds_dbsync.git
; - 修改
cd rds_dbsync && vi dbsync/my.cfg
,由于是mysql同步到pgsql,因此只要修改[src.mysql]
和[desc.pgsql]
即可; - 编译docker镜像:
docker build -t dbsync .
添加
docker-compose.yml
:version: "3.5" services: dbsync: image: dbsync container_name: dbsync volumes: - "~/rds_dbsync/dbsync/my.cfg:/dbsync/my.cfg" command: mysql2pgsql networks: default: external: name: you_docker_network_name #注意you_docker_network_name修改成你mysql和pgsql相同的网络
- 开始同步数据:
docker-compose up -d
等待结束即可,查看进程或者消息可以执行:docker logs --tail 20 dbsync
博主