WebSocket Integration Best Practices
This document serves as a guideline for implementing Upbit WebSocket integration, providing implementation requirements to consider during development, such as authentication, connection management, and request rate limiting.
Get Started
This guide presents recommended considerations and best practices for evolving a simple WebSocket connection example into a more robust, production-ready integration. It covers implementation requirements you can apply to WebSocket integrations—including authentication based on the data you subscribe to, connection management techniques, and request rate limiting.
Quotation
- Singapore (sg): wss://sg-api.upbit.com/websocket/v1
- Indonesia (id): wss://id-api.upbit.com/websocket/v1
- Thailand (th): wss://th-api.upbit.com/websocket/v1
Exchange
- Singapore (sg): wss://sg-api.upbit.com/websocket/v1/private
- Indonesia (id): wss://id-api.upbit.com/websocket/v1/private
- Thailand (th): wss://th-api.upbit.com/websocket/v1/private
Key Considerations for Upbit WebSocket Integration
Initial Connection and Subscription Request
When integrating with Upbit WebSocket, the data retrieval process generally consists of two stages: (1) establishing the initial connection and (2) sending a subscription request message to start receiving data.
The initial connection stage opens a communication channel for bidirectional data exchange; once connected, data can flow in both directions between the client and server. Until the client sends a specific request, the Upbit WebSocket server remains idle and does not transmit any data, and the connection may time out.
To subscribe to data, the client must send a subscription request message to the WebSocket server. A subscription request includes the following fields:
- Request ticket (ticket) to identify the request
- Data type to subscribe such as candle, ticker, orderbook, trade, myOrder, or myAsset, with the corresponding market pairs specified.
- You can receive information for multiple pairs under a single data type and subscribe to multiple data types at the same time.
- For example, the following request subscribes to the orderbook data for
{fiat}-BTC
and{fiat}-ETH
, as well as trade data for{fiat}-XRP
at the same time:
[{"ticket":"uuid_"},{"type":"orderbook", "codes":["{fiat}-BTC","{fiat}-ETH"]},{"type":"trade", "codes":["{fiat}-XRP"]}]
- Data format: You can subscribe in either the default format (DEFAULT) or the compact format (SIMPLE). When using the compact format, each JSON key is shortened (e.g., type → ty) to reduce data size. Additionally, you can request data in list form by using JSON_LIST or SIMPLE_LIST, which returns the response as a JSON array (list) instead of a JSON object.
A single subscription request is sufficient to receive continuous stream data for a given data item. To subscribe to additional data, you can send a new subscription message without establishing a new connection. The previous subscription is replaced by the new one..
Snapshot Data and Real-Time Stream Data
Data available through WebSocket is divided into two categories: snapshot data and real-time stream data. Users may request both types simultaneously or subscribe to snapshot data only when submitting a subscription request.
- Snapshot refers to receiving information once at the time of the request.
- Real-time streams provide continuously updated information as long as the WebSocket connection is maintained. Depending on the data type, updates may be sent at regular intervals or whenever relevant events occur.
Connection Maintenance and Reconnection
The Upbit WebSocket server will terminate the connection with an Idle Timeout if no data is sent or received for 120 seconds. Even when no data is being transmitted, you can maintain the connection by using the ping/pong option, timeout settings, or explicit Ping messages as described in the WebSocket Usage and Error Guide. If the connection is disconnected, you should implement appropriate reconnection attempts to prevent data loss.
Authentication
Subscriptions to myAsset, myOrder, and trade information can only be made through the wss://sg-api.upbit.com/websocket/v1/private endpoint, not wss://sg-api.upbit.com/websocket/v1. When requesting a /private channel connection, the request header must include an authentication token generated with your API Key.
Rate Limit
Like the REST API, the WebSocket is subject to request rate limit policies for both connection attempts and message transmissions. When implementing WebSocket integration, it is important to ensure that excessive connection attempts or message sends do not occur by applying an appropriate throttling strategy. Unlike REST, a single WebSocket request can establish a persistent stream of data, which makes the rate limit constraints relatively less restrictive.
Best Practice Implementation with Python Example
Let’s start with a simple example from the tutorials and progressively extend it into a production-ready integration. Using Python code, we will walk through each step required to incorporate the practical considerations necessary for a reliable real-world implementation.
Basic WebSocket Connection Example
The following is a basic example of connecting to the Upbit WebSocket server using the websocket-client library, as described in the Development Environment Setup Guide. This example only establishes and closes the connection with the server; it does not send subscription messages or perform authentication.
- Based on WebSocketApp from the websocket-client library, the connection is handled through an event loop. Callback functions are defined for the open, message, error, and close events to specify the actions to be taken when each event occurs. A ThreadedWebSocketApp class is implemented to start a new thread for establishing the connection and subscribing to data, as well as to allow the connection to be closed.
- In the main function, an instance of ThreadedWebSocketApp is created, the connection thread is started, and later closed.
- This example is configured to automatically close the connection after 150 seconds. In actual usage, you should remove this part to keep the connection open.
import threading
import websocket
import json
import time
class ThreadedWebSocketApp(threading.Thread):
def __init__(self, url):
# type: (str) -> None
threading.Thread.__init__(self)
self.daemon = True
self.url = url
self.ws_app = None
self._stop_evt = threading.Event()
@staticmethod
def connect(url):
# type: (str) -> "ThreadedWebSocketApp"
t = ThreadedWebSocketApp(url)
t.start()
return t
def run(self):
self.ws_app = websocket.WebSocketApp(
self.url,
on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close
)
self.ws_app.run_forever()
self.ws_app = None
def close(self):
self._stop_evt.set()
try:
if self.ws_app:
self.ws_app.close()
except Exception:
pass
def send_message(self, message):
try:
if self.ws_app and self.ws_app.sock and self.ws_app.sock.connected:
self.ws_app.send(message)
except Exception as e:
self.on_error(e)
def _on_open(self, ws):
print("Opened")
def _on_message(self, ws, data):
try:
obj = json.loads(data)
print("Received(JSON):", obj)
except Exception:
print("Received(raw):", data)
def _on_error(self, ws, err):
print("Error:", err)
def _on_close(self, ws, code, reason):
print("Closed")
if __name__ == "__main__":
ws = ThreadedWebSocketApp.connect(url="wss://{region}-api.upbit.com/websocket/v1")
try:
time.sleep(150)
finally:
ws.close()
ws.join(timeout=3)
When you run the code, it will establish a connection and remain idle for 150 seconds without performing any actions. However, according to the Upbit WebSocket server policy, if no data is sent or received for 120 seconds, the connection will be closed. As a result, after approximately 120 seconds, you will see the connection terminated with an error message like the one shown below. (The program itself will exit only after the full 150 seconds have elapsed.)
Opened
Error: Connection to remote host was lost.
Closed
Connection Maintenance and Reconnection
For private stream data such as myOrder or myAsset, data is only received when an actual order or asset change occurs. Therefore, using the basic example above alone is not sufficient to maintain the connection while waiting. To address this, connection keep-alive and reconnection logic have been added as shown below. With these improvements, the WebSocketApp now sends ping frames at regular intervals based on the configured ping_interval (default: 30 seconds), and will close the connection if a pong is not received within 10 seconds. In addition, if the server terminates the connection, it will attempt to reconnect up to the maximum number of retries (default: 3).
- Inject ping and reconnection-related settings at connection creation (Lines 13–14, 27–28, 48–50)
- Retry connection within the maximum retry count. However, if the connection is closed at the user’s request, it is excluded from the retry process (Lines 84–93).
import threading
import websocket
import json
import time
class ThreadedWebSocketApp(threading.Thread):
def __init__(self, url, ping_interval=30, ping_timeout=10,
max_retries=3, retry_sleep=2.0):
# type: (str, int, int, int, float) -> None
threading.Thread.__init__(self)
self.daemon = True
self.url = url
self.ping_interval = ping_interval
self.ping_timeout = ping_timeout
self.max_retries = max_retries
self.retry_sleep = retry_sleep
self.ws_app = None
self._stop_evt = threading.Event()
@staticmethod
def connect(url, ping_interval=30, ping_timeout=10,
max_retries=3, retry_sleep=2.0):
# type: (str, int, int, int, float) -> "ThreadedWebSocketApp"
t = ThreadedWebSocketApp(
url,
ping_interval=ping_interval,
ping_timeout=ping_timeout,
max_retries=max_retries,
retry_sleep=retry_sleep
)
t.start()
return t
def run(self):
attempts = 0
while not self._stop_evt.is_set():
self._opened_once = False
self.ws_app = websocket.WebSocketApp(
self.url,
on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close
)
self.ws_app.run_forever(
ping_interval=self.ping_interval,
ping_timeout=self.ping_timeout,
reconnect=int(self.retry_sleep) if self.retry_sleep else None
)
self.ws_app = None
def close(self):
self._stop_evt.set()
try:
if self.ws_app:
self.ws_app.close()
try:
self.ws_app.keep_running = False
except Exception:
pass
except Exception:
pass
def send_message(self, message):
try:
if self.ws_app and self.ws_app.sock and self.ws_app.sock.connected:
self.ws_app.send(message)
except Exception as e:
self.on_error(e)
def _on_open(self, ws):
self._attempts = 0
self.on_open()
def _on_message(self, ws, data):
self.on_message(data)
def _on_error(self, ws, err):
self.on_error(err)
def _on_close(self, ws, code, reason):
if not self._stop_evt.is_set():
self._attempts += 1
if self._attempts <= self.max_retries:
print("Try to reconnect:", self._attempts)
if self._attempts > self.max_retries:
try:
ws.keep_running = False
except Exception:
pass
self.on_close()
def on_open(self):
print("Opened")
def on_message(self, data):
try:
obj = json.loads(data)
print("Received(JSON):", obj)
except Exception:
print("Received(raw):", data)
def on_error(self, err):
print("Error:", err)
def on_close(self):
print("Closed")
if __name__ == "__main__":
ws = ThreadedWebSocketApp.connect(
url="wss://{region}-api.upbit.com/websocket/v1",
ping_interval=30,
ping_timeout=10,
max_retries=3,
retry_sleep=2.0
)
try:
time.sleep(150)
finally:
ws.close()
ws.join(timeout=3)
When running the code above, the connection will now remain active even after 120 seconds, and—as configured—will terminate normally at 150 seconds due to the client’s request.
Example: Subscribing to Real-Time Stream Data
The previous example only established a connection to the Upbit WebSocket server. To receive actual data, you must send a subscription request message. Referring to the WebSocket Usage and Error Handling guide, the example code was modified as follows:
- In the main function, after establishing the connection, create a subscription request message for orderbook data and send it using the send_message method. The program then receives the stream for about 5 seconds before adding another subscription request for trade data. Sending this request allows you to receive both types of data streams simultaneously. The program continues receiving for another 5 seconds, then calls close() to end the connection. (Lines 151–166)
- To verify, modify the [on_message] method so it appends received JSON data to a file named [
code
_type
.jsonl] based on the type and pair code. When you run the code, it creates files such as [SGD-BTC_orderbook.jsonl], [SGD-BTC_trade.jsonl] under the execution directory and records the transmitted data. (Lines 101–133)
import threading
import websocket
import json
import time
import uuid
import os
class ThreadedWebSocketApp(threading.Thread):
def __init__(self, url, ping_interval=30, ping_timeout=10,
max_retries=3, retry_sleep=2.0):
# type: (str, int, int, int, float) -> None
threading.Thread.__init__(self)
self.daemon = True
self.url = url
self.ping_interval = ping_interval
self.ping_timeout = ping_timeout
self.max_retries = max_retries
self.retry_sleep = retry_sleep
self.ws_app = None
self._stop_evt = threading.Event()
@staticmethod
def connect(url, ping_interval=30, ping_timeout=10,
max_retries=3, retry_sleep=2.0):
# type: (str, int, int, int, float) -> "ThreadedWebSocketApp"
t = ThreadedWebSocketApp(
url,
ping_interval=ping_interval,
ping_timeout=ping_timeout,
max_retries=max_retries,
retry_sleep=retry_sleep
)
t.start()
return t
def run(self):
attempts = 0
while not self._stop_evt.is_set():
self._opened_once = False
self.ws_app = websocket.WebSocketApp(
self.url,
on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close
)
self.ws_app.run_forever(
ping_interval=self.ping_interval,
ping_timeout=self.ping_timeout,
reconnect=int(self.retry_sleep) if self.retry_sleep else None
)
self.ws_app = None
def close(self):
self._stop_evt.set()
try:
if self.ws_app:
self.ws_app.close()
try:
self.ws_app.keep_running = False
except Exception:
pass
except Exception:
pass
def send_message(self, message):
try:
if self.ws_app and self.ws_app.sock and self.ws_app.sock.connected:
self.ws_app.send(message)
except Exception as e:
self.on_error(e)
def _on_open(self, ws):
self._attempts = 0
print("Opened")
def _on_message(self, ws, data):
if isinstance(data, bytes):
try:
data = data.decode("utf-8")
except Exception:
print("Received(binary): <{} bytes>".format(len(data)))
return
try:
obj = json.loads(data)
except Exception:
print("Received(raw):", data)
self._append_jsonl("misc.jsonl", {"raw": data})
return
code = obj.get("code")
mtype = obj.get("type")
if code and mtype:
path = "{0}_{1}.jsonl".format(code, mtype)
else:
path = "misc.jsonl"
self._append_jsonl(path, obj)
def _on_error(self, ws, err):
print("Error:", err)
def _on_close(self, ws, code, reason):
if not self._stop_evt.is_set():
self._attempts += 1
if self._attempts <= self.max_retries:
print("Try to reconnect:", self._attempts)
if self._attempts > self.max_retries:
try:
ws.keep_running = False
except Exception:
pass
print("Closed")
def on_open(self):
print("Opened")
def _append_jsonl(self, path, obj):
try:
d = os.path.dirname(path)
if d and not os.path.exists(d):
os.makedirs(d)
with open(path, "a", encoding="utf-8") as f:
f.write(json.dumps(obj, ensure_ascii=False) + "\n")
except TypeError:
with open(path, "a") as f:
f.write(json.dumps(obj, ensure_ascii=False) + "\n")
if __name__ == "__main__":
ws = ThreadedWebSocketApp.connect(
url="wss://{region}-api.upbit.com/websocket/v1",
ping_interval=30,
ping_timeout=10,
max_retries=3,
retry_sleep=2.0
)
try:
time.sleep(1)
sub_orderbook = [
{"ticket": str(uuid.uuid4())},
{"type": "orderbook", "codes": ["{fiat}-BTC"]}
]
ws.send_message(json.dumps(sub_orderbook))
time.sleep(5)
sub_trade = [
{"ticket": str(uuid.uuid4())},
{"type": "orderbook", "codes": ["{fiat}-BTC"]},
{"type": "trade", "codes": ["{fiat}-BTC"]}
]
ws.send_message(json.dumps(sub_trade))
time.sleep(5)
finally:
ws.close()
ws.join(timeout=3)
Authentication
This example demonstrates establishing a new WebSocket connection to the /private URL when subscribing to data that requires authentication (such as My Order or My Asset).
- Added support for setting the API Key’s Access Key and Secret Key. (Lines 25–26, 39–40, 178–179)
- Implemented JWT token generation for authentication and added it to the request headers. (Lines 45–62)
- Connected to the /private channel and subscribed to myOrder type data. (Lines 176–180, 191–195)
import threading
import websocket
import json
import time
import uuid
import os
import jwt
class ThreadedWebSocketApp(threading.Thread):
def __init__(self, url, ping_interval=30, ping_timeout=10,
max_retries=3, retry_sleep=2.0,
access_key=None, secret_key=None):
# type: (str, int, int, int, float, str, str) -> None
threading.Thread.__init__(self)
self.daemon = True
self.url = url
self.ping_interval = ping_interval
self.ping_timeout = ping_timeout
self.max_retries = max_retries
self.retry_sleep = retry_sleep
self.ws_app = None
self._stop_evt = threading.Event()
self.access_key = access_key
self.secret_key = secret_key
@staticmethod
def connect(url, ping_interval=30, ping_timeout=10,
max_retries=3, retry_sleep=2.0,
access_key=None, secret_key=None):
# type: (str, int, int, int, float, str, str) -> "ThreadedWebSocketApp"
t = ThreadedWebSocketApp(
url,
ping_interval=ping_interval,
ping_timeout=ping_timeout,
max_retries=max_retries,
retry_sleep=retry_sleep,
access_key=access_key,
secret_key=secret_key,
)
t.start()
return t
def _create_jwt_token(self):
# type: () -> str
if not self.access_key or not self.secret_key:
return None
payload = {
"access_key": self.access_key,
"nonce": str(uuid.uuid4())
}
token = jwt.encode(payload, self.secret_key, algorithm="HS512")
return token if isinstance(token, str) else token.decode("utf-8")
def _build_headers(self):
# type: () -> list
headers = []
token = self._create_jwt_token()
if token:
headers.append("Authorization: Bearer {0}".format(token))
return headers
def run(self):
attempts = 0
while not self._stop_evt.is_set():
self._opened_once = False
self.ws_app = websocket.WebSocketApp(
self.url,
header=self._build_headers(),
on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close
)
self.ws_app.run_forever(
ping_interval=self.ping_interval,
ping_timeout=self.ping_timeout,
reconnect=int(self.retry_sleep) if self.retry_sleep else None
)
self.ws_app = None
def close(self):
self._stop_evt.set()
try:
if self.ws_app:
self.ws_app.close()
try:
self.ws_app.keep_running = False
except Exception:
pass
except Exception:
pass
def send_message(self, message):
try:
if self.ws_app and self.ws_app.sock and self.ws_app.sock.connected:
self.ws_app.send(message)
except Exception as e:
self.on_error(e)
def _on_open(self, ws):
self._attempts = 0
print("Opened")
def _on_message(self, ws, data):
if isinstance(data, bytes):
try:
data = data.decode("utf-8")
except Exception:
print("Received(binary): <{} bytes>".format(len(data)))
return
try:
obj = json.loads(data)
except Exception:
print("Received(raw):", data)
self._append_jsonl("misc.jsonl", {"raw": data})
return
code = obj.get("code")
mtype = obj.get("type")
if code and mtype:
path = "{0}_{1}.jsonl".format(code, mtype)
else:
path = "misc.jsonl"
self._append_jsonl(path, obj)
def _on_error(self, ws, err):
print("Error:", err)
def _on_close(self, ws, code, reason):
if not self._stop_evt.is_set():
self._attempts += 1
if self._attempts <= self.max_retries:
print("Try to reconnect:", self._attempts)
if self._attempts > self.max_retries:
try:
ws.keep_running = False
except Exception:
pass
print("Closed")
def _append_jsonl(self, path, obj):
try:
d = os.path.dirname(path)
if d and not os.path.exists(d):
os.makedirs(d)
with open(path, "a", encoding="utf-8") as f:
f.write(json.dumps(obj, ensure_ascii=False) + "\n")
except TypeError:
with open(path, "a") as f:
f.write(json.dumps(obj, ensure_ascii=False) + "\n")
if __name__ == "__main__":
ws = ThreadedWebSocketApp.connect(
url="wss://{region}-api.upbit.com/websocket/v1",
)
ws_private = ThreadedWebSocketApp.connect(
url="wss://{region}-api.upbit.com/websocket/v1/private",
access_key="YOUR_ACCESS_KEY",
secret_key="YOUR_SECRET_KEY"
)
try:
time.sleep(1)
sub_orderbook = [
{"ticket": str(uuid.uuid4())},
{"type": "orderbook", "codes": ["{fiat}-BTC"]}
]
ws.send_message(json.dumps(sub_orderbook))
time.sleep(5)
sub_trade = [
{"ticket": str(uuid.uuid4())},
{"type": "myOrder", "codes": ["{fiat}-BTC"]}
]
ws_private.send_message(json.dumps(sub_trade))
time.sleep(5)
finally:
ws.close()
ws.join(timeout=3)
ws_private.close()
ws_private.join(timeout=3)
Rate Limiter
As stated on the Rate Limits page, the Upbit WebSocket API enforces the following limits: A maximum of 5 connection requests per second, A maximum of 5 messages per second and 100 messages per minute for message transmissions To prevent excessive subscription message traffic, you can implement a dedicated Rate Limiter for WebSocket message sending.
-
- Implemented a FixedWindowLimiter class to ensure no more than 5 requests per second and 100 requests per minute are sent. When acquire() is called, it either decrements the remaining send allowance or, if the maximum call count has been exceeded, waits until the next time window (second or minute). (lines 9–48)
- Inside the send_message method, the call to self._send_limiter.acquire() ensures messages are only sent within the allowed limits. (Line 144)
import threading
import websocket
import json
import time
import uuid
import os
import jwt
class _FixedWindowLimiter(object):
def __init__(self, per_sec=5, per_min=100):
# type: (int, int) -> None
self.per_sec = per_sec
self.per_min = per_min
self._sec_ts = 0
self._min_ts = 0
self._sec_used = 0
self._min_used = 0
def acquire(self):
now = time.time()
sec = int(now)
minute = int(now // 60)
if sec != self._sec_ts:
self._sec_ts = sec
self._sec_used = 0
if minute != self._min_ts:
self._min_ts = minute
self._min_used = 0
if self._sec_used >= self.per_sec:
sleep_for = (self._sec_ts + 1) - now + 0.001
if sleep_for > 0:
time.sleep(sleep_for)
now = time.time()
self._sec_ts = int(now)
self._sec_used = 0
if self._min_used >= self.per_min:
sleep_for = ((self._min_ts + 1) * 60) - now + 0.001
if sleep_for > 0:
time.sleep(sleep_for)
now = time.time()
self._min_ts = int(now // 60)
self._min_used = 0
self._sec_used += 1
self._min_used += 1
class ThreadedWebSocketApp(threading.Thread):
def __init__(self, url, ping_interval=30, ping_timeout=10,
max_retries=3, retry_sleep=2.0,
access_key=None, secret_key=None, send_per_sec=5, send_per_min=100):
# type: (str, int, int, int, float, str, str, int, int) -> None
threading.Thread.__init__(self)
self.daemon = True
self.url = url
self.ping_interval = ping_interval
self.ping_timeout = ping_timeout
self.max_retries = max_retries
self.retry_sleep = retry_sleep
self.ws_app = None
self._stop_evt = threading.Event()
self.access_key = access_key
self.secret_key = secret_key
self._send_limiter = _FixedWindowLimiter(per_sec=send_per_sec, per_min=send_per_min)
@staticmethod
def connect(url, ping_interval=30, ping_timeout=10,
max_retries=3, retry_sleep=2.0,
access_key=None, secret_key=None, send_per_sec=5, send_per_min=100):
# type: (str, int, int, int, float, str, str, int, int) -> "ThreadedWebSocketApp"
t = ThreadedWebSocketApp(
url,
ping_interval=ping_interval,
ping_timeout=ping_timeout,
max_retries=max_retries,
retry_sleep=retry_sleep,
access_key=access_key,
secret_key=secret_key,
send_per_sec=send_per_sec,
send_per_min=send_per_min
)
t.start()
return t
def _create_jwt_token(self):
# type: () -> str
if not self.access_key or not self.secret_key:
return None
payload = {
"access_key": self.access_key,
"nonce": str(uuid.uuid4())
}
token = jwt.encode(payload, self.secret_key, algorithm="HS512")
return token if isinstance(token, str) else token.decode("utf-8")
def _build_headers(self):
# type: () -> list
headers = []
token = self._create_jwt_token()
if token:
headers.append("Authorization: Bearer {0}".format(token))
return headers
def run(self):
attempts = 0
while not self._stop_evt.is_set():
self._opened_once = False
self.ws_app = websocket.WebSocketApp(
self.url,
header=self._build_headers(),
on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close
)
self.ws_app.run_forever(
ping_interval=self.ping_interval,
ping_timeout=self.ping_timeout,
reconnect=int(self.retry_sleep) if self.retry_sleep else None
)
self.ws_app = None
def close(self):
self._stop_evt.set()
try:
if self.ws_app:
self.ws_app.close()
try:
self.ws_app.keep_running = False
except Exception:
pass
except Exception:
pass
def send_message(self, message):
try:
if self.ws_app and self.ws_app.sock and self.ws_app.sock.connected:
self._send_limiter.acquire()
self.ws_app.send(message)
except Exception as e:
self.on_error(e)
def _on_open(self, ws):
self._attempts = 0
print("Opened")
def _on_message(self, ws, data):
if isinstance(data, bytes):
try:
data = data.decode("utf-8")
except Exception:
print("Received(binary): <{} bytes>".format(len(data)))
return
try:
obj = json.loads(data)
except Exception:
print("Received(raw):", data)
self._append_jsonl("misc.jsonl", {"raw": data})
return
code = obj.get("code")
mtype = obj.get("type")
if code and mtype:
path = "{0}_{1}.jsonl".format(code, mtype)
else:
path = "misc.jsonl"
self._append_jsonl(path, obj)
def _on_error(self, ws, err):
print("Error:", err)
def _on_close(self, ws, code, reason):
if not self._stop_evt.is_set():
self._attempts += 1
if self._attempts <= self.max_retries:
print("Try to reconnect:", self._attempts)
if self._attempts > self.max_retries:
try:
ws.keep_running = False
except Exception:
pass
print("Closed")
def _append_jsonl(self, path, obj):
try:
d = os.path.dirname(path)
if d and not os.path.exists(d):
os.makedirs(d)
with open(path, "a", encoding="utf-8") as f:
f.write(json.dumps(obj, ensure_ascii=False) + "\n")
except TypeError:
with open(path, "a") as f:
f.write(json.dumps(obj, ensure_ascii=False) + "\n")
if __name__ == "__main__":
ws = ThreadedWebSocketApp.connect(
url="wss://{region}-api.upbit.com/websocket/v1",
)
ws_private = ThreadedWebSocketApp.connect(
url="wss://{region}-api.upbit.com/websocket/v1/private",
access_key="YOUR_ACCESS_KEY",
secret_key="YOUR_SECRET_KEY"
)
try:
time.sleep(1)
sub_orderbook = [
{"ticket": str(uuid.uuid4())},
{"type": "orderbook", "codes": ["{fiat}-BTC"]}
]
ws.send_message(json.dumps(sub_orderbook))
time.sleep(5)
sub_trade = [
{"ticket": str(uuid.uuid4())},
{"type": "myOrder", "codes": ["{fiat}-BTC"]}
]
ws_private.send_message(json.dumps(sub_trade))
time.sleep(5)
finally:
ws.close()
ws.join(timeout=3)
ws_private.close()
ws_private.join(timeout=3)
Wrapping Up
We have explored a simple synchronous client example that demonstrates a Best Practice implementation for building a stable integration environment while complying with Upbit WebSocket operational policies. In addition to the methods described in this guide, you can incorporate further improvements—such as adding logging modules—similar to the enhancements introduced in the REST API Integration Best Practice guide. Refer to one of the following pages for more information about WebSocket.
Updated 6 days ago