2017年3月

python脚本实现Elasticsearch的冷热分离

起因

出于成本考虑,es 集群内节点机器配置有好有差,为了实现当天和前一天数据写在 ssd 节点上,历史数据迁移到普通节点,使用 python 写了一个小脚本,凌晨定时迁移分片,供参考. es 官方有类似 tags 的实现方式,等我实践后再来记录下

配置

启动脚本前必须修改下面两项配置,否则分片刚迁移过去,就又被均衡回来了

# 禁止集群自动分配,使主要分片或副本的分布失效
cluster.routing.allocation.disable_allocation: true 
# 新增加的分片才可以参与分配
cluster.routing.allocation.enable: new_primaries

热更新配置:

curl -XPUT {ip}:9200/_cluster/settings -d'{"transient":{"cluster.routing.allocation.disable_allocation": true, "cluster.routing.allocation.enable" : "new_primaries"}}'

脚本


#!/usr/bin/env python # -*- coding:utf-8 -*- ''' 迁移分片: 指定日期前的分片从ssd机器迁移到普通硬盘机器 当天新建索引分片从普通硬盘机器迁移到ssd 机器分布: ssd机器: es1, es2 普通硬盘机器: es3, es4 ''' import urllib2 import json import re import time import random import datetime es_address = 'http://x.x.x.x' def http_get(url): response = urllib2.urlopen(url) return response.read() def http_post(url, jsonData): jdata = json.dumps(jsonData) req = urllib2.Request(url, jdata) response = urllib2.urlopen(req) return response.read() def move_shard(index, shard, fromNode, toNode): url = es_address + '/_cluster/reroute' try: jsonData = { "commands" : [ { "move" : { "index" : index, "shard" : shard, "from_node" : fromNode, "to_node" : toNode } } ] } resp = http_post(url, jsonData) except urllib2.HTTPError as e: toNode = "es2" if toNode=="es1" else "es1" jsonData = { "commands" : [ { "move" : { "index" : index, "shard" : shard, "from_node" : fromNode, "to_node" : "es2" } } ] } resp = http_post(url, jsonData) return resp except Exception as e: raise e if __name__ == '__main__': keepDay = 1 today = day = time.strftime('%Y%m%d',time.localtime(time.time())) dt = datetime.datetime.now() - datetime.timedelta(days=keepDay) day = dt.strftime("%Y%m%d") print ">>>>>>>>>>>>>", day, "<<<<<<<<<<<<<" fromNodeList = ["es4", "es5"] toNodeList = ["es1", "es2"] resp = http_get(es_address + '/_cat/shards') shardList = resp.split("\n") for shard in shardList: try: info = re.split(r'\s*', shard) index = info[0] dateObj = re.match(r'(.*)-(\d+)[\.|-](\d+)[\.|-](\d+)', index) if (dateObj == None): continue date = dateObj.group(2) + dateObj.group(3) + dateObj.group(4) if (date==today and info[7] in toNodeList): move_shard(index, info[1], info[7], random.choice(fromNodeList)) print "move today: ", shard if (info[3]=='UNASSIGNED' or date>=day or info[7] not in fromNodeList): continue toNode = random.choice(toNodeList) move_shard(index, info[1], info[7], toNode) print info[7], '------------------->', toNode, shard except urllib2.HTTPError as e: print e, shard except Exception as e: print '-----------------------------', shard, e raise e if "gb" in info[5]: sleepSeconds = 600 if info[5]>"2gb" else 300 else: sleepSeconds = random.randint(10, 15) print 'sleep %d seconds' % sleepSeconds time.sleep(sleepSeconds)