결과물
제작 동기
디스코드에서 친구들과 철권을 하면서 10선을 주로 하는데 스코어를 직접 수정하면서 게임을 하려니깐 매우 불편했다.
그래서 철권에서 이기거나 졌을 때 뭔가 자동적으로 스코어를 올려주는 프로그램이 있으면 굉장히 편하겠다는 생각에 제작하게 되었다.
(롤 같은 Riot api같이 철권에서도 좀 제공해줬으면 하는 생각..)
인프라
한창 AWS를 다루고 있기에 서버리스 아키텍처로 만들어보고 싶었다.
디스코드 클라에서 커멘드를 전달하는 과정은 구동중인 서버가 필요하기 때문에 집에 있던 라즈베리 서버로 구축했다.
게임에서 이긴장면 식별을 판단하는 Python 클라이언트에서는 boto3(AWS SDK)를 통해서 DB와 상호작용.
구축 인프라는 어떤식으로든 가능하다.(클라이언트 자체에서 Discord API 사용도 가능하다)
동작 원리
pyautogui(pyautogui. screenshot())모듈을 사용하여 화면을 실시간으로 캡쳐하고 이긴 장면/진 장면/그 외 장면들을 기존의 학습한 데이터와 비교하고 판단하여 처리하는 것이다.
아무런 장면도 아닐 경우에는 1초마다 계속 None을 출력
*이긴 경우/패배한 경우의 로그가 찍히면 자동으로 디스코드 봇을 통하여 스코어가 올라간다.
즉 판단된 데이터가 승리[1] 이면 컬럼1(Score1)에 1이 입력 반대로 패배[0] 이면 컬럼2(Score2)에 1 입력
그리고 이후의 데이터부터는 계속 누적되면서 +1 증가
여기서 이긴 장면 데이터를 학습시키는 게 제일 큰 문제인데 스팀에서 스샷 한 사진은 전체 화면, 즉 배경, 캐릭터 등 너무나도 학습 요소가 많기 때문에 식별하기가 힘들다.
그래서 생각해 낸 게 동그라미 스코어바
이 부분을 좌표로 잘라내어 이미지를 학습/비교 하는것이다.
하지만 저 잘라낸 이미지조차도 필요치 않은 여백이 많기 때문에 특정 캐릭터들의 스킬이 스코어 부분 전체에 닿아버리게 되면 부정확한 정보를 식별하는 경우가 있다.
그래서 한번 더 수정한 것이 흰색 네모칸의 부분(두 가지 좌표 영역)만 잘라내고 np.zeros를 이용하여 이미지를 합쳐서 생성한 후 장면을 식별하는 것.
이렇게 하니깐 99프로 정확하게 나온다.
이런 식으로 모든 데이터들을 학습/비교할 때는 먼저 잘라내고 진행하는 것이다.
(학습 데이터들도 훨씬 적어지고 식별성도 높아진다.)
prediction == 1인 경우 이긴장면이라고 판단하여 DynamoDB에 데이터 입력
DB에 입력되는 순간 DynamoDB Stream 기능이 동작하여 람다 함수를 호출
람다 코드에는 Discord bot 채널 id, 토큰을 이용하여 DB에서 스캔된 데이터를 전송
여기서도 문제가 데이터가 보내질 때마다 새로운 메시지로 입력이 되기 때문에 채널이 굉장히 시끄러워질 수 있다.
그래서 맨 처음 보낼 때는 메시지를 보내고 그다음부터는 메시지를 수정하는 방식으로 바꾸어야 한다.
그러려면 맨 처음 보낸 메시지의 ID 값을 가져와야 한다.
discord_message_id = response.json()['id']
def send_discord_message(bot_token, channel_id, message):
global discord_message_id
이렇게 전역변수로 정의하면 id값을 저장하고 업데이트할 수 있다.
그리고 이런 식으로 디스코드 클라이언트에서 "!숫자 이름 이름" 입력하게 되면 스코어 값이 초기화가 되면서 새로운 스코어 보드를 띄운다.
그리고 입력한 명령어대로 10승 이상을 먼저 하게 된다면 누가 승리했는지가 업데이트된다.
win = int(latest_data['win'])
if score1 >= win:
result_message = f"{format_name1} 승리!"
elif score2 >= win:
result_message = f"{format_name2} 승리!"
else:
result_message = None
if discord_message_id:
if result_message:
message += f"\n{result_message}" # 승패 메시지를 추가
elif previous_data != name:
send_discord_message(bot_token, channel_id, message)
elif previous_data2 != win:
send_discord_message(bot_token, channel_id, message)
update_discord_message(bot_token, channel_id, discord_message_id, message)
else:
send_discord_message(bot_token, channel_id, message)
그리고 elif previous_data != name:/ elif previous_data2 != win: 이 부분은 상대하는 사람들, 선승 조건이 다 다르다
그렇기에 매번 DB에서 수동으로 바꿔주고 전역변수로 업데이트를 하기 때문에 계속 배포하여 변수를 초기화해야 하는 귀찮음이 발생한다.
그래서 name 컬럼과 win 컬럼의 데이터를 바꿀 때마다 새로운 메시지를 보내게끔 구현했다.
try:
response = s3_client.get_object(Bucket=S3_BUCKET_NAME, Key=S3_FILE_NAME)
previous_data = json.loads(response['Body'].read().decode('utf-8'))
response2 = s3_client.get_object(Bucket=S3_BUCKET_NAME, Key=S3_FILE_NAME2)
previous_data2 = json.loads(response2['Body'].read().decode('utf-8'))
except s3_client.exceptions.NoSuchKey:
previous_data = None
previous_data2 = None
lambda_handler가 호출될 때마다 S3 버킷에 name 컬럼, win 컬럼의 데이터를 json 파일로 저장시키고 람다 함수가 호출될 때마다 저장된 json 데이터를 불러와서 새로 수정한 상대방 이름, 스코어랑 비교하여 같지 않으면 메시지를 업데이트하지 말고 새로 보내는 원리이다.
name = latest_data['name']
win = int(latest_data['win'])
s3_client.put_object(Bucket=S3_BUCKET_NAME, Key=S3_FILE_NAME, Body=json.dumps(name))
s3_client.put_object(Bucket=S3_BUCKET_NAME, Key=S3_FILE_NAME2, Body=json.dumps(win))
format_name1 = name[:2]
format_name2 = name[6:]
제일 힘들었던 점은 학습 조건을 바꾸기까지의 과정...
처음부터 그냥 이렇게 학습시켰더라면 얼마나 편했을까 싶다.
변수가 너무 많이 생길 수 있는 상황의 이미지를 계속 학습시키는 뻘짓을 며칠 동안 계속하고 있었다.
해결했으니 다행..ㅜ
Source
Python 클라이언트
import os
import glob
import cv2
import numpy as np
import time
import pyautogui
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
import joblib
import boto3
from datetime import datetime
from boto3.dynamodb.conditions import Key
from PIL import Image
aws_access_key_id = ''
aws_secret_access_key = ''
region_name = ''
dynamodb = boto3.resource('dynamodb', region_name=region_name, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
table = dynamodb.Table('')
model_path = 'model.pkl'
# 이미지 파일이 저장된 폴더 경로
folder_path_win = '' # 이긴 이미지 폴더
folder_path_lose = '' # 진 이미지 폴더
folder_path_nothing = '' # 그 외 이미지 폴더
saveimage = '' # 비교한 학습 사진 저장 폴더
# 이긴 이미지와 진 이미지를 나타내는 레이블
label_win = 1
label_lose = 0
label_nothing = 3
# 이미지에서 특정 부분 좌표
x1, y1 = 959, 53 # 첫 번째 좌표
x2, y2 = 1003, 75
x3, y3 = 1554, 53 # 두 번째 좌표
x4, y4 = 1594, 74
# 이미지 데이터와 레이블을 저장할 리스트
X = []
y = []
image_files_win = glob.glob(os.path.join(folder_path_win, '*.jpg'))
image_files_lose = glob.glob(os.path.join(folder_path_lose, '*.jpg'))
image_files_nothing = glob.glob(os.path.join(folder_path_nothing, '*.jpg'))
for image_files in [image_files_win, image_files_lose, image_files_nothing]:
for image_file in image_files:
image = cv2.imread(image_file, cv2.IMREAD_COLOR)
cropped_image_first = image[y1:y2, x1:x2]
cropped_image_second = image[y3:y4, x3:x4]
# 첫 번째와 두 번째 영역을 하나의 이미지로 합치기
height_first, width_first, _ = cropped_image_first.shape
height_second, width_second, _ = cropped_image_second.shape
# 두 영역의 높이를 비교하여 새 이미지의 높이 결정
new_height = max(height_first, height_second)
# 새 이미지 생성
new_image = np.zeros((new_height, width_first + width_second, 3), dtype=np.uint8)
# 첫 번째 영역을 새 이미지에 추가
new_image[:height_first, :width_first] = cropped_image_first
# 두 번째 영역을 새 이미지에 추가
new_image[:height_second, width_first:] = cropped_image_second
flattened_image = new_image.flatten()
X.append(flattened_image)
# 레이블 설정
if image_files == image_files_win:
y.append(label_win)
elif image_files == image_files_lose:
y.append(label_lose)
else:
y.append(label_nothing)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
clf = RandomForestClassifier()
clf.fit(X_train, y_train)
# 모델 저장
joblib.dump(clf, model_path)
print("모델 학습이 완료되었습니다.")
def load_model(model_path):
clf = joblib.load(model_path)
return clf
def insert_data(prediction):
response = table.scan(
Limit=1,
Select='ALL_ATTRIBUTES',
)
items = response['Items']
if items:
latest_item = items[0]
score1 = int(latest_item['score1'])
score2 = int(latest_item['score2'])
else:
score1 = 0
score2 = 0
# 새로운 아이템 생성
if prediction == 1:
score1 += 1
else:
score2 += 1
current_time = datetime.now().isoformat()
item = {
'idx': current_time,
'score1': score1,
'score2': score2
}
if items:
response = table.update_item(
Key={'idx': latest_item['idx']},
UpdateExpression='SET score1 = :s1, score2 = :s2',
ExpressionAttributeValues={
':s1': score1,
':s2': score2,
}
)
print(f"이전 데이터 업데이트 결과: {response}")
def process_screenshot():
screenshot = pyautogui.screenshot()
frame = np.array(screenshot)
game_screen_first = frame[y1:y2, x1:x2]
game_screen_second = frame[y3:y4, x3:x4]
# 두 영역을 하나의 이미지로 합치기
height_first, width_first, _ = game_screen_first.shape
height_second, width_second, _ = game_screen_second.shape
# 두 영역의 높이를 비교하여 새 이미지의 높이 결정
new_height = max(height_first, height_second)
# 새 이미지 생성
new_width = width_first + width_second
new_image = np.zeros((new_height, new_width, 3), dtype=np.uint8)
# 첫 번째 영역을 새 이미지에 추가
new_image[:height_first, :width_first] = game_screen_first
# 두 번째 영역을 새 이미지에 추가
new_image[:height_second, width_first:] = game_screen_second
return cv2.cvtColor(new_image, cv2.COLOR_RGB2BGR) # RGB를 BGR로 변환하여 리턴
if __name__ == "__main__":
clf = load_model(model_path)
try:
while True:
# 화면 캡처 및 처리
game_screen = process_screenshot()
flattened_image = game_screen.flatten()
flattened_image = flattened_image.reshape(1, -1)
prediction = clf.predict(flattened_image)
# 이긴 화면인 경우
if prediction == 1:
print("게임에서 이긴 화면이 감지되었습니다.")
screenshot_path = os.path.join(saveimage, f"win_{datetime.now().strftime('%Y%m%d%H%M%S')}.jpg")
cv2.imwrite(screenshot_path, game_screen)
insert_data(prediction)
time.sleep(6)
# 진 화면인 경우
elif prediction == 0:
print("게임에서 진 화면이 감지되었습니다.")
screenshot_path = os.path.join(saveimage, f"loser_{datetime.now().strftime('%Y%m%d%H%M%S')}.jpg")
cv2.imwrite(screenshot_path, game_screen)
insert_data(prediction)
time.sleep(6)
# 이긴 화면과 진 화면이 아닌 경우
else:
print("None")
time.sleep(1) # 1초마다 처리
except KeyboardInterrupt:
print("프로그램이 종료되었습니다.")
AWS IAM 액세스 키, 시크릿 키, 리전, 테이블 이름은 본인 걸로 적어줘야 한다.
Lambda 함수(tekken score)
import requests
import json
from decimal import Decimal
import boto3
s3_client = boto3.client('s3')
S3_BUCKET_NAME = ''
S3_FILE_NAME = 'latest_data.json'
S3_FILE_NAME2 = 'latest_data2.json'
REGION = 'ap-northeast-2'
dynamodb = boto3.resource('dynamodb', region_name=REGION)
table = dynamodb.Table('tekken')
# 이전에 보낸 메시지의 ID를 저장할 변수
discord_message_id = None
def convert_decimal_to_str(obj):
if isinstance(obj, Decimal):
return str(obj)
raise TypeError
def send_discord_message(bot_token, channel_id, message):
global discord_message_id # 전역 변수로 선언하여 메시지 ID를 저장하고 업데이트
url = f"https://discord.com/api/v9/channels/{channel_id}/messages"
headers = {
"Authorization": f"Bot {bot_token}",
"Content-Type": "application/json"
}
payload = {
"content": message
}
response = requests.post(url, headers=headers, data=json.dumps(payload))
if response.status_code != 200:
print("Failed to send message to Discord")
else:
print("Message sent successfully")
# 메시지가 성공적으로 보내졌을 때 메시지의 ID를 저장
discord_message_id = response.json()['id']
def update_discord_message(bot_token, channel_id, message_id, new_content):
url = f"https://discord.com/api/v9/channels/{channel_id}/messages/{message_id}"
headers = {
"Authorization": f"Bot {bot_token}",
"Content-Type": "application/json"
}
payload = {
"content": new_content
}
response = requests.patch(url, headers=headers, data=json.dumps(payload))
if response.status_code == 200:
print("Message edited successfully.")
else:
print("Failed to edit message.")
def lambda_handler(event, context):
try:
response = s3_client.get_object(Bucket=S3_BUCKET_NAME, Key=S3_FILE_NAME)
previous_data = json.loads(response['Body'].read().decode('utf-8'))
response2 = s3_client.get_object(Bucket=S3_BUCKET_NAME, Key=S3_FILE_NAME2)
previous_data2 = json.loads(response2['Body'].read().decode('utf-8'))
except s3_client.exceptions.NoSuchKey:
previous_data = None
previous_data2 = None
response = table.scan()
data = response['Items']
# 데이터가 비어 있는지 확인
if not data:
print("No data found in the table.")
return {
'statusCode': 200,
'body': 'No data found in the table.'
}
# 가장 최신 데이터
latest_data = data[0] # 가장 첫 번째 항목
score1 = int(latest_data['score1'])
score2 = int(latest_data['score2'])
message = f"```yaml\n🔥Tekken Score🔥\n┌{'─'*14}┐\n {latest_data['win']:^3}선승 \n├{'─'*14}┤\n {latest_data['name']:^10} \n {latest_data['score1']} : {latest_data['score2']} \n└{'─'*14}┘```"
# Discord로 메시지를 전송합니다.
bot_token = ""
channel_id = ""
global discord_message_id # 전역 변수로 선언한 메시지 ID를 사용
name = latest_data['name']
win = int(latest_data['win'])
s3_client.put_object(Bucket=S3_BUCKET_NAME, Key=S3_FILE_NAME, Body=json.dumps(name))
s3_client.put_object(Bucket=S3_BUCKET_NAME, Key=S3_FILE_NAME2, Body=json.dumps(win))
format_name1 = name[:2]
format_name2 = name[6:]
if score1 >= win:
result_message = f"{format_name1} 승리!"
elif score2 >= win:
result_message = f"{format_name2} 승리!"
else:
result_message = None
if discord_message_id:
# 이미 이전에 메시지를 보냈을 경우 해당 메시지를 업데이트
if result_message:
message += f"\n{result_message}" # 승패 메시지를 추가
elif previous_data != name:
send_discord_message(bot_token, channel_id, message)
elif previous_data2 != win:
send_discord_message(bot_token, channel_id, message)
update_discord_message(bot_token, channel_id, discord_message_id, message)
else:
# 처음 메시지를 보낼 경우 메시지를 보냄
send_discord_message(bot_token, channel_id, message)
return {
'statusCode': 200,
'body': 'Message sent to Discord!'
}
마찬가지로 Discore 채널 ID, 토큰, 테이블 이름, 리전 등 본인 걸로 바꿔줘야 한다.
Lambda 함수(tekken score reset)
import json
import boto3
dynamodb = boto3.resource('dynamodb')
def lambda_handler(event, context):
try:
table = dynamodb.Table('tekken')
response = table.put_item(
Item={
'idx': event.get('idx'),
'name': event.get('name'),
'score1': event.get('score1'),
'score2': event.get('score2'),
'win': event.get('win')
}
)
return {
'statusCode': 200,
'body': json.dumps('Data inserted successfully!')
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps(str(e))
}
Discord Client bot
import discord
import requests
import json
intents = discord.Intents.default()
intents.message_content = True
client = discord.Client(intents=intents)
token = ""
@client.event
async def on_ready():
print("봇이 온라인으로 전환되었습니다.")
@client.event
async def on_message(message):
if message.content.startswith('!'):
content = message.content[1:].strip()
args = content.split()
api_url = ""
if len(args) == 3 and args[0].isdigit():
try:
win = int(args[0])
score1, score2 = 0, 0 # 기본값으로 초기화
name1, name2 = args[1], args[2]
# 데이터 전송
payload = {'idx': '1', 'name': f'{name1} vs {name2}', 'score1': str(score1), 'score2': str(score2), 'win': win}
json_payload = json.dumps(payload)
headers = {'Content-Type': 'application/json'}
response = requests.post(api_url, data=json_payload, headers=headers)
if response.status_code == 200:
await message.channel.send("Score Reset")
else:
await message.channel.send("데이터 전송에 실패했습니다.")
except ValueError:
await message.channel.send("명령어가 올바르지 않습니다. 숫자를 입력하세요.")
return
elif len(args) == 5 and args[0].isdigit():
try:
win = int(args[0])
name1, name2 = args[1], args[2]
score1, score2 = int(args[3]), int(args[4])
payload = {'idx': '1', 'name': f'{name1} vs {name2}', 'score1': str(score1), 'score2': str(score2), 'win': win}
json_payload = json.dumps(payload)
headers = {'Content-Type': 'application/json'}
response = requests.post(api_url, data=json_payload, headers=headers)
if response.status_code == 200:
await message.channel.send("Score Update")
else:
await message.channel.send("데이터 전송에 실패했습니다.")
except ValueError:
await message.channel.send("명령어가 올바르지 않습니다. 숫자를 입력하세요.")
return
else:
await message.channel.send("잘못된 입력입니다.")
return
client.run(token)
개선할 점
이김/짐에 대한 장면 식별성은 아직까지 99프로 정도는 정확하게 나온다.
1프로는 내가 모르는 상황이 있을 수 있다는 점
그렇기에 혹시나 모를 상황(잘못된 정보를 식별할 경우)을 대비해서 이겼을 때/졌을 때마다 중간에 sleep(60) 정도 준다.
그럼 라운드가 시작되고 나서 최소 60초 동안은 데이터를 전송하지 않기 때문에 조금이나마 신뢰성 있는 정보를 전달하는데 도움을 준다.
이 부분은 지속적으로 학습하고 관찰해야 되는 부분
추후에 프로그램으로 만들어서 사용자들에게 제공해 보는 시간도 가져보겠다.
(인프라만 바꿔주면 될 듯하다.)
위 코드를 통해 직접 만들어서 사용할 경우 댓글 달아주면 가지고 있는 데이터셋 또는 model.pkl을 제공해 주겠다.
'포트폴리오 > 개인 자작' 카테고리의 다른 글
[토이 프로젝트] 클라이밍 스스로 규칙 앱 개발 Feat. S3 presigned_url (1) | 2024.07.14 |
---|---|
[학기 프로젝트] 유니티 WebGL을 활용한 웹 게임 개발 (0) | 2024.06.11 |
긴급 구조 SOS 시스템 개발 프로젝트 (4) | 2023.12.01 |
[2022] Hackathon AI 졸음 운전 검문소 (0) | 2023.12.01 |
music.foolblack.com(음악 플레이어 소개 사이트) (0) | 2023.11.29 |
댓글