那位先生

个人站

前世五百次回眸,才能换得今生的一次擦肩而过。


DataX

1.DataX简介

DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

2.简单使用

1.全量同步

将表t_user的数据同步到t_user2,同步代码141_user2.json:

{
    "job": {
        "setting": {
            "speed": {
                 "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "xxxxxx",
                        "column": [
                            "tid",
                            "name",
							"password",
							"roleId"
                        ],
                        "splitPk": "tid",
                        "connection": [
                            {
                                "table": [
                                    "t_user"
                                ],
                                "jdbcUrl": [
									"jdbc:mysql://47.106.105.141:3306/first01"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "xxxxxx",
                        "column": [
                            "tid",
                            "name",
							"password",
							"roleId"
                        ],
                        "session": [
                        	"set session sql_mode='ANSI'"
                        ],
                        "preSql": [
                            "delete from t_user2 where 1=1"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://47.106.105.141:3306/first01?useUnicode=true&characterEncoding=gbk",
                                "table": [
                                    "t_user2"
                                ]
                            }
                        ]
                    }
                }
				
				
            }
        ]
    }
}

在linux下执行命令:pyton /opt/myware/datax/bin/datax.py /opt/myware/datax_json/141_user2.json

2.增量同步(定时)

每120秒,同步这120秒内更新的数据(通过update_time字段判断)。

{
    "job": {
        "setting": {
            "speed": {
                 "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "xxxxxx",				
                        "column": [
                            "env_id",
                            "city_code",
							"city_name",
							"temper",
							"update_time"
                        ],						
                        "splitPk": "env_id",
						"where":"update_time > FROM_UNIXTIME(${start_time}) and update_time < FROM_UNIXTIME(${end_time})",
                        "connection": [
                            {
                                "table": [
                                    "tb_env"
                                ],
                                "jdbcUrl": [
									"jdbc:mysql://47.106.105.141:3306/first01"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "update",
                        "username": "root",
                        "password": "xxxxx",
                        "column": [
                            "env_id",
                            "city_code",
							"city_name",
							"temper",
							"update_time"
                        ],
                        "session": [
                        	"set session sql_mode='ANSI'"
                        ],
                        "preSql": [
                           
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://47.106.105.141:3306/first01?useUnicode=true&characterEncoding=gbk",
                                "table": [
                                    "tb_env2"
                                ]
                            }
                        ]
                    }
                }
				
				
            }
        ]
    }
}

定时任务代码cron_env.sh:

#!/bin/bash
source /etc/profile
# 截至时间设置为当前时间戳
end_time=$(date +%s)
# 开始时间设置为120s前时间戳
start_time=$(($end_time - 120))
/opt/myware/datax/bin/datax.py /opt/myware/datax_json/141_env_cron.json -p "-Dstart_time=$start_time -Dend_time=$end_time" 
#/datax/bin/datax.py /mysql2mysql-approval-doc.json -p "-Dstart_time=$start_time -Dend_time=$end_time"

试下定时任务是否可用(可能会出现no suchFile,因为文件编码不对):

/bin/sh /opt/myware/datax_cron/cron_env.sh

创建定时任务crontab -e:

*/2 * * * * /opt/myware/datax_cron/cron_env.sh

3.参考文献

DataX官方GitHub
DataX MysqlReader 插件文档
DataX MysqlWriter 插件文档
阿里云开源离线同步工具DataX3.0介绍
采用DataX实现多表增量数据同步

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
你说多少就多少

比五毛钱特效专业哦