Db와 Elasticsearch 연동(5)
Db와 Elasticsearch 연동(5)
db -> es 데이터 이전 방법을 기존과 다르게(기존 logstash 이용) db에 저장한 데이터들을 python의 elasticsearch 패키지를 설치해서 이용했다
1. DB에서 데이터 읽기
mariadb에서 데이터를 읽기위해서 python의 pymsql 라이브러리를 이용한다
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import pymysql
import json
# DB 설정 데이터 가져오기
with open('config.json', 'r') as config_file:
config = json.load(config_file)
hosturl = config['host']
username = config['username']
userpassword = config['password']
dbname = config['db']
# MariaDB 연결
db_connection = pymysql.connect(host=hosturl, user=username, password=userpassword, db=dbname, charset='utf8')
cursor = db_connection.cursor()
# 데이터 조회 쿼리
query = "SELECT id, url, title, content, category, site, date FROM notice_board"
cursor.execute(query)
# 결과 가져오기
data = cursor.fetchall()
db_connection.close()
# 데이터 확인
for row in data:
print(row)
2. Elasticsearch 설정
데이터를 저장할 elasticsearch와 연결을 위해서 설정을 해야한다 먼저 pip install elasticsearch로 설치한다
1
2
3
4
5
6
7
8
9
from elasticsearch import Elasticsearch
# Elasticsearch 클라이언트 설정
es = Elasticsearch(
['http://localhost:9200'],
http_auth=('elastic', 'es_pw'), # 기본 사용자와 비밀번호 초기설정시 알려줌 없으면 재발급
scheme="https",
port=9200
)
3. Elasticsearch로 데이터 전송
db에서 가져온 데이터를 elasticsearch에 저장한다 여러 데이터들을 효율적으로 보내기 위해 bulk api를 사용했다
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from elasticsearch.helpers import bulk
# Elasticsearch 인덱스 이름
index_name = 'notice_index'
# DB에서 가져온 데이터를 Elasticsearch 도큐먼트로 변환
def generate_docs(data):
for row in data:
yield {
"_index": index_name,
"_id": row[0], # id를 문서 ID로 사용
"_source": {
"url": row[1],
"title": row[2],
"content": row[3],
"category": row[4],
"site": row[5],
"date": row[6].strftime('%Y-%m-%d %H:%M:%S') # 날짜 형식 변환
}
}
# 데이터 전송
success, failed = bulk(es, generate_docs(data))
print(f"Successfully indexed {success} documents.")
print(f"Failed to index {failed} documents.")
이렇게 3단계를 거쳐서 데이터를 이동시켰다 하지만 이 코드를은 스케줄링에 따라 매일 실행될탠테 그때마다 위 코드를 실행한다면 중복된 데이터들이 들어갈 수 있다 이를 방지하기 위해서 db에 저장 될때 사용되는 Id값을 이용해서 중복 저장을 방지했다
4. 중복 데이터 저장 방지
애초에 elasticsearch에 저장된 id값중 마지막 값을 구한다 db에서 데이터를 가져올때 그 id값 이후 데이터를 가져와서 데이터가 중복되서 저장되는것을 막았다
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import pymysql
import json
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
# DB 설정 데이터 가져오기
with open('config.json', 'r') as config_file:
config = json.load(config_file)
db_config = {
'host': config['host'],
'user': config['username'],
'password': config['password'],
'db': config['db']
}
# Elasticsearch 설정 데이터 가져오기
es_config = {
'hosts': ['http://localhost:9200'],
'http_auth': ('elastic', config['es_pw']),
'scheme': 'https',
'port': 9200
}
# Elasticsearch 클라이언트 설정
es = Elasticsearch(
es_config['hosts'],
http_auth=es_config['http_auth'],
scheme=es_config['scheme'],
port=es_config['port']
)
# Elasticsearch에서 가장 최근의 `id` 가져오기
def get_last_indexed_id():
query = {
"size": 1,
"sort": [
{
"id": {
"order": "desc"
}
}
],
"_source": ["id"]
}
response = es.search(index="notice_index", body=query)
hits = response['hits']['hits']
if hits:
return hits[0]['_id']
return 0
# DB에서 데이터 읽기 (최신 `id` 이후 데이터만)
def fetch_data_from_db(last_id):
db_connection = pymysql.connect(
host=db_config['host'],
user=db_config['user'],
password=db_config['password'],
db=db_config['db'],
charset='utf8'
)
cursor = db_connection.cursor()
# 데이터 조회 쿼리 (last_id 이후 데이터만 가져오기)
query = "SELECT id, url, title, content, category, site, date FROM notice_board WHERE id > %s"
cursor.execute(query, (last_id,))
# 결과 가져오기
data = cursor.fetchall()
db_connection.close()
return data
# DB에서 가져온 데이터를 Elasticsearch 도큐먼트로 변환
def generate_docs(data):
for row in data:
yield {
"_index": 'notice_index', # Elasticsearch 인덱스 이름
"_id": row[0], # id를 문서 ID로 사용
"_source": {
"url": row[1],
"title": row[2],
"content": row[3],
"category": row[4],
"site": row[5],
"date": row[6].strftime('%Y-%m-%d %H:%M:%S') # 날짜 형식 변환
}
}
# Elasticsearch에 데이터 전송
def index_data():
last_id = get_last_indexed_id() # 가장 최근의 id 가져오기
data = fetch_data_from_db(last_id)
success, failed = bulk(es, generate_docs(data))
print(f"Successfully indexed {success} documents.")
print(f"Failed to index {failed} documents.")
# 메인 실행
if __name__ == "__main__":
index_data()
This post is licensed under CC BY 4.0 by the author.