본문 바로가기
서버인프라/카프카

[ 제 5강 AVRO 해보기] KAFKA AVRO 해보기

by techwold ted 2021. 2. 5.

구성해 보기

 

오늘 해보려고 하는 구성이 대략 이런거에요

우선 기존에는 구성했던 kafka를 보면

1대의 서버에 kafka와 zookeeper가 같이 실행 되고 있는 형태 이죠~

 

kafka 구성

여기에 schemaregistory 와 connector 를 설정해서 연결해 볼 예정입니다.

그리고 mariadb를 설치하여 전송된 메시지를 바로 DB에 insert해 보도록 하겠습니다.

 

1. SCHEMA REGISTRY 세팅

vi /etc/schema-registry/schema-registry.properties

############################## 수정 사항 ################################
listeners=http://0.0.0.0:8081
kafkastore.connection.url=192.168.2.77:2181,192.168.2.78:2181,192.168.2.79:2181/kafka

kafkastore.topic=_schemas
kafkastore.topic.replication.factor=3

 

위에 처럼 변경하여 주시고, schema registry 를 실행 합니다.

/usr/bin/schema-registry-start /etc/schema-registry/schema-registry.properties

### 위에처럼 실행 하여 주시거나

### 아래와 같이 실행 해 주시면 됩니다.
systemctl start confluent-schema-registry.service


tcp6       0      0 :::8081                 :::*                    LISTEN      19539/java
tcp6       0      0 :::43002                :::*                    LISTEN      19539/java

 

정상적으로 실행  되었다면 아위와 같이 8081 포트가 실행 된 모습을 확인 하실 수 있습니다.

 

이제 kafka-connect-jdbc 를 셋팅해 보겠습니다.

 

2. KAFKA-CONNECT-JDBC 셋팅

kafka connect properties 셋팅

name=sink_mysql_test
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
connection.url=jdbc:mysql://192.168.2.140:3306/TEST
connection.user=test
connection.password=12341234
auto.create=true

connect-avro-standalone.properties 수정

vi /etc/schema-registry/connect-avro-standalone.properties

bootstrap.servers=192.168.2.77:9092,192.168.2.78:9092,192.168.2.79:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
kafkastore.topic.replication.factor=3
plugin.path=/usr/share/java

kafka standalone 실행

/usr/bin/connect-standalone /etc/schema-registry/connect-avro-standalone.properties /etc/kafka-connect-jdbc/TEST.properties

위와 같이 실행 하면 TCP 8083이 올라 옵니다.

tcp6       0      0 :::8083                 :::*                    LISTEN      25491/java

 

여기까지 완료 되셨다면 avro connector 을 사용하기 위한 준비를 마쳤습니다.

 

자그럼 테스트 해보겠습니다.  테스트를 위해 python3 설치를 하고 아래와 같이 

vi 편집기를 열어서 test.py 파일로 작성해 주세요

#!/usr/bin/python3
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
import time

start = time.time()

# Avro 스키마 정의
value_schema_str = """
{
    "type" : "record",
    "name" : "myrecord",
    "fields" :
    [
       {"name" : "id", "type" : "int"},
       {"name" : "name", "type" : "string"},
       {"name" : "email", "type" : "string"}
    ]
}
"""

value_schema = avro.loads(value_schema_str)

# AvroProducer 초기화
avroProducer = AvroProducer({
    'bootstrap.servers': 'broker1:9092,broker2:9092,broker3:9092',
    'schema.registry.url': 'http://connector:8081'
}, default_value_schema=value_schema)

# 데이터 produce
for i in range(1, 10000):
    value = {"id": i, "name": "foo" + str(i), "email": "test" + str(i) + "@test.com"}
    avroProducer.produce(topic='member', value=value)
    if i % 100000 == 0:
        avroProducer.flush()

avroProducer.flush()
print("time:", time.time() - start)

 

자 그리고~ 실행!!!

python3 test.py

그럼 DB에 잘 데이터가 들어갔는지 확인해 보겠습니다.

 

잘들어가는거 보이시죠?

 

그리고, topic 확인도 필요하니....

/bin/kafka-consumer-groups --bootstrap-server kafka1.test.com:9092 kafka2.test.com:9093 kafka3.test.com:9094 --group connect-sink_mysql_test --topic 토픽명 --reset-offsets --to-latest --execute

 

처음 AVRO를 쓰려고 테스트 하고 있을때 KAFKA에대한 이해도도 없어서 꽤 많은 고생을 했던거로 기억합니다.

 

많은 에러가 날지 모르지만... 그래도 끝까지 도전해 보세요 


팁!! 혹시 example-topic이 살아 있으면 avro 실행시 아무 반응이 없을 수 있습니다. 이럴때는 기존 생성했던 topic 삭제후 /usr/bin/connect-standalone /etc/schema-registry/connect-avro-standalone.properties /etc/kafka-connect-jdbc/sink-quickstart-mysql-UNION_MINING.properties 다시 실행해 주세요. 이후 위 파이썬 스크립트를 실행해 보세요. 해결 될 겁니다.

 

 

댓글