Exporting Market Data to CSV Using the Candle API
Learn how to retrieve candle data for a specified period using Upbit’s Candle API and export it to a CSV file.
Overview
This guide demonstrates how to use the Upbit Minute Candle API to save market data for a specified period into a CSV file. Each record includes the candle date time in UTC, along with the opening price, high price, low price, closing price, accumulated trade value, and accumulated trade volume. The candle aggregation interval (in minutes) can be set using the supported unit values provided by Upbit. After execution, the results can be verified in CSV format as shown below.
Exporting Market Data to CSV Using the Candle API
For a smooth tutorial experience, use Python 3.7 or later. Upbit’s Candle API returns up to 200 candles per request. If the number of candles in your target period exceeds 200, you must compute time windows and call the API multiple times.
Provide the candle period to export and the minute aggregation interval (hereafter, unit). The Minute Candle API returns an array of candles generated at unit intervals. Compute the request window using the candle start time, the unit, and the desired number of candles per response (hereafter, count). Call the Minute Candle API according to the computed windows. From each response, write only the candles whose timestamps fall within the specified period to the CSV file.
As a result, if you write only the API responses to a CSV file, the spacing between recorded candles may not be uniform. In this guide, when no candlestick data exists, an empty row is written to the CSV file. This approach allows users to distinguish between intervals with executed trades and intervals without trades.
The final candle row is determined by the alignment of the user-specified end time to (exclusive) with the candle interval (unit). If the end time does not align with the interval, the last recorded candle corresponds to the most recent candle generated before the end time.
For example, when unit=15, candles are generated at 00:00, 00:15, 00:30, … . If the specified end time is 2025-09-10T00:21:00, which is not aligned with a 15-minute interval, the last row in the CSV will contain the candle generated at 2025-09-10T00:15:00.
Complete Code
Below is the complete code for retrieving candle data over a specified period and exporting it to a CSV file. When you run it, a data directory is created in the current path, and the CSV file is generated inside that directory.
import requests
from datetime import datetime, timedelta, timezone
from pathlib import Path
from time import sleep
import csv
class MinCandleCsvExporter():
def __init__(self):
self.csv_header = [
"candle_date_time_utc", "trading_pair", "unit",
"opening_price", "high_price", "low_price", "trade_price",
"candle_acc_trade_volume", "candle_acc_trade_price"
]
self.unit_limit = [
1, 3, 5, 10, 15, 30, 60, 240
]
def _get_candles_minutes(self, trading_pair, to_dt, unit):
if unit not in self.unit_limit:
raise ValueError("unit must be in 1, 3, 5, 10, 15, 30, 60, 240")
path = f"https://{region}-api.upbit.com/v1/candles/minutes/{unit}"
params = {
"market": trading_pair,
"to": to_dt,
"count": 200
}
try:
res = requests.get(path, params=params)
res.raise_for_status()
data = res.json()
except Exception as e:
raise RuntimeError(f"Error: {e}")
if isinstance(data, list):
return data[::-1]
else:
raise RuntimeError(f"Unexpected API response format: {data}")
def _string_to_datetime(self, zulu_str):
if zulu_str.endswith("Z"):
zulu_str = zulu_str[:-1]
return datetime.fromisoformat(zulu_str).replace(tzinfo=timezone.utc)
def _datetime_to_string(self, dt):
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
else:
dt = dt.astimezone(timezone.utc)
return dt.strftime("%Y-%m-%dT%H:%M:%SZ")
def _floor_time_by_unit(self, dt, unit_min):
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
else:
dt = dt.astimezone(timezone.utc)
if unit_min >= 60:
total_minutes = dt.hour * 60 + dt.minute
floored_total_minutes = (total_minutes // unit_min) * unit_min
floored_hour = floored_total_minutes // 60
floored_minute = floored_total_minutes % 60
floored = dt.replace(
hour=floored_hour, minute=floored_minute, second=0, microsecond=0)
else:
minute = (dt.minute // unit_min) * unit_min
floored = dt.replace(minute=minute, second=0, microsecond=0)
return floored.strftime("%Y-%m-%dT%H:%M:%SZ")
def _get_csv_writer(self, csv_path):
exists = csv_path.exists()
csv_path.parent.mkdir(parents=True, exist_ok=True)
f = open(csv_path, "a", newline="", encoding="utf-8")
writer = csv.writer(f)
if not exists:
writer.writerow(self.csv_header)
return f, writer
def _write_empty_row(self, writer, ts_utc, trading_pair, unit):
writer.writerow([
self._datetime_to_string(ts_utc),
trading_pair, unit, "", "", "", "", "", ""
])
def _write_candle_row(self, writer, candle):
ts_utc = candle.get("candle_date_time_utc")
trading_pair = candle.get("market", "")
unit = candle.get("unit", "")
opening = candle.get("opening_price", "")
high = candle.get("high_price", "")
low = candle.get("low_price", "")
close = candle.get("trade_price", "")
acc_vol = candle.get("candle_acc_trade_volume", "")
acc_val = candle.get("candle_acc_trade_price", "")
if isinstance(ts_utc, str) and not ts_utc.endswith("Z"):
ts_utc = ts_utc + "Z"
writer.writerow([
ts_utc, trading_pair, unit, opening, high, low, close, acc_vol, acc_val
])
def _iter_write_range(self, trading_pair, unit, start_dt, end_dt, writer):
from_dt = self._string_to_datetime(start_dt)
to_dt = self._string_to_datetime(end_dt)
unit_dt = timedelta(minutes=unit)
interval = timedelta(minutes=unit * 200)
to_params_dt = from_dt + interval
last_dt = to_dt + interval
while to_params_dt <= last_dt:
print(f"querying {from_dt} to {to_params_dt} Candle Data...")
candles = self._get_candles_minutes(
trading_pair, self._datetime_to_string(to_params_dt), unit)
expected_candle_dt = from_dt
candle_idx = 0
if not candles or len(candles) == 0:
raise LookupError(
f"No candle data found for {trading_pair} from {self._datetime_to_string(from_dt)} to {self._datetime_to_string(to_params_dt)}. "
f"Please check the trading_pair and time range.")
batch_end_dt = min(to_dt, to_params_dt - unit_dt)
while expected_candle_dt <= batch_end_dt and candle_idx < len(candles):
prev_expected_dt = expected_candle_dt
current_candle_dt = self._string_to_datetime(
candles[candle_idx].get("candle_date_time_utc"))
if current_candle_dt == expected_candle_dt:
self._write_candle_row(writer, candles[candle_idx])
candle_idx = candle_idx + 1
expected_candle_dt += unit_dt
elif current_candle_dt < expected_candle_dt:
candle_idx = candle_idx + 1
else:
self._write_empty_row(
writer, expected_candle_dt, trading_pair, unit)
expected_candle_dt += unit_dt
if expected_candle_dt == prev_expected_dt and candle_idx >= len(candles):
self._write_empty_row(
writer, expected_candle_dt, trading_pair, unit)
expected_candle_dt += unit_dt
while expected_candle_dt <= batch_end_dt:
self._write_empty_row(
writer, expected_candle_dt, trading_pair, unit)
expected_candle_dt += unit_dt
from_dt = expected_candle_dt
to_params_dt = from_dt + interval
sleep(0.5)
def run_export_minutes_csv(self, trading_pair, unit, from_dt_str, to_dt_str, csv_file_path):
from_dt = self._string_to_datetime(from_dt_str)
to_dt = self._string_to_datetime(to_dt_str)
floored_from_dt = self._floor_time_by_unit(from_dt, unit)
floored_to_dt = self._floor_time_by_unit(to_dt, unit)
if from_dt > to_dt:
raise ValueError("from_dt must be before to_dt")
if to_dt > datetime.now(timezone.utc):
raise ValueError("to_dt must be before current time")
csv_path = Path(csv_file_path)
f, writer = self._get_csv_writer(csv_path)
try:
print(
f"{trading_pair}_{from_dt}_{to_dt}_{unit}m_candles.csv is creating...")
self._iter_write_range(
trading_pair, unit, floored_from_dt, floored_to_dt, writer)
print(
f"{trading_pair}_{from_dt}_{to_dt}_{unit}m_candles.csv has been created.")
finally:
f.close()
if __name__ == "__main__":
exporter = MinCandleCsvExporter()
trading_pair = "{fiat}-BTC"
from_dt = "2025-09-01T00:00:00Z" # UTC
to_dt = "2025-09-10T10:00:00Z" # UTC
unit = 10
csv_file = f"./data/{trading_pair}_{from_dt}_{to_dt}_{unit}m_candles.csv"
exporter.run_export_minutes_csv(
trading_pair, unit, from_dt, to_dt, csv_file_path=csv_file)
Detailed Guide
Implementing the MinCandleCsvExporter Module
Create a class that implements the methods to fetch candlestick data for a user-specified period and write it to a CSV file.
Initial Setup
Define the CSV header fields and the set of unit values supported by Upbit’s Minute Candle API. Each time you instantiate the MinCandleCsvExporter module, the default header and allowed unit values are initialized as follows.
class MinCandleCsvExporter():
def __init__(self):
self.csv_header = [
"candle_date_time_utc", "trading_pair", "unit",
"opening_price", "high_price", "low_price", "trade_price",
"candle_acc_trade_volume", "candle_acc_trade_price"
]
self.unit_limit = [
1, 3, 5, 10, 15, 30, 60, 240
]
CSV File Methods
- Extracting Candle Data
This is the primary method users will call on the MinCandleCsvExporter class. It accepts the trading pair, the candle unit, the start and end timestamps, and the CSV file path; it then queries the Candle API and writes the resulting candle data to the CSV file.
def run_export_minutes_csv(self, trading_pair, unit, from_dt_str, to_dt_str, csv_file_path):
from_dt = self._string_to_datetime(from_dt_str)
to_dt = self._string_to_datetime(to_dt_str)
floored_from_dt = self._floor_time_by_unit(from_dt, unit)
floored_to_dt = self._floor_time_by_unit(to_dt, unit)
if from_dt > to_dt:
raise ValueError("from_dt must be before to_dt")
if to_dt > datetime.now(timezone.utc):
raise ValueError("to_dt must be before current time")
csv_path = Path(csv_file_path)
f, writer = self._get_csv_writer(csv_path)
try:
print(
f"{trading_pair}_{from_dt}_{to_dt}_{unit}m_candles.csv is creating...")
self._iter_write_range(
trading_pair, unit, floored_from_dt, floored_to_dt, writer)
print(
f"{trading_pair}_{from_dt}_{to_dt}_{unit}m_candles.csv has been created.")
finally:
f.close()
- Creating the CSV Writer
This method checks whether the user-specified CSV file path exists. If the file exists, it opens the file and returns both the file object and the writer object. These objects are then used to write data into the CSV file. If the file does not exist, the method creates the parent directory (if necessary), generates a new CSV file, and returns the file object and writer object.
def _get_csv_writer(self, csv_path):
exists = csv_path.exists()
csv_path.parent.mkdir(parents=True, exist_ok=True)
f = open(csv_path, "a", newline="", encoding="utf-8")
writer = csv.writer(f)
if not exists:
writer.writerow(self.csv_header)
return f, writer
API Call Methods
- Minute Candle API Call
This method calls Upbit’s Minute Candle API to retrieve candles for the specified trading pair. The API returns an array of candles created at intervals defined by the unit parameter. The API also supports the to parameter, which is used to define the query range. Based on this, the method calculates the appropriate call interval for successive API requests.
def _get_candles_minutes(self, trading_pair, to_dt, unit):
if unit not in self.unit_limit:
raise ValueError("unit must be in 1, 3, 5, 10, 15, 30, 60, 240")
path = f"https://{region}-api.upbit.com/v1/candles/minutes/{unit}"
params = {
"market": trading_pair,
"to": to_dt,
"count": 200
}
try:
res = requests.get(path, params=params)
res.raise_for_status()
data = res.json()
except Exception as e:
raise RuntimeError(f"Error: {e}")
if isinstance(data, list):
return data[::-1]
else:
raise RuntimeError(f"Unexpected API response format: {data}")
- Iterating Over Candle Periods and Writing to CSV
This method queries the Minute Candle API over the user-specified period (from, to) and unit, and writes the results to a CSV file. If candle data exists, it records the candles in chronological order. If no data exists for a time slot, it writes an empty row to ensure continuity of the time series. To ensure reliability, each API call is followed by a 0.5-second delay.
def _iter_write_range(self, trading_pair, unit, start_dt, end_dt, writer):
from_dt = self._string_to_datetime(start_dt)
to_dt = self._string_to_datetime(end_dt)
unit_dt = timedelta(minutes=unit)
interval = timedelta(minutes=unit * 200)
to_params_dt = from_dt + interval
last_dt = to_dt + interval
while to_params_dt <= last_dt:
print(f"querying {from_dt} to {to_params_dt} Candle Data...")
candles = self._get_candles_minutes(
trading_pair, self._datetime_to_string(to_params_dt), unit)
expected_candle_dt = from_dt
candle_idx = 0
if not candles or len(candles) == 0:
raise LookupError(
f"No candle data found for {trading_pair} from {self._datetime_to_string(from_dt)} to {self._datetime_to_string(to_params_dt)}. "
f"Please check the trading_pair and time range.")
batch_end_dt = min(to_dt, to_params_dt - unit_dt)
while expected_candle_dt <= batch_end_dt and candle_idx < len(candles):
prev_expected_dt = expected_candle_dt
current_candle_dt = self._string_to_datetime(
candles[candle_idx].get("candle_date_time_utc"))
if current_candle_dt == expected_candle_dt:
self._write_candle_row(writer, candles[candle_idx])
candle_idx = candle_idx + 1
expected_candle_dt += unit_dt
elif current_candle_dt < expected_candle_dt:
candle_idx = candle_idx + 1
else:
self._write_empty_row(
writer, expected_candle_dt, trading_pair, unit)
expected_candle_dt += unit_dt
if expected_candle_dt == prev_expected_dt and candle_idx >= len(candles):
self._write_empty_row(
writer, expected_candle_dt, trading_pair, unit)
expected_candle_dt += unit_dt
while expected_candle_dt <= batch_end_dt:
self._write_empty_row(
writer, expected_candle_dt, trading_pair, unit)
expected_candle_dt += unit_dt
from_dt = expected_candle_dt
to_params_dt = from_dt + interval
sleep(0.5)
Candle Data Output Methods
- Writing Actual Candle Data
This method extracts values from each candle (such as candle date time in UTC, trading pair, unit, open, high, low, close, accumulated trade volume, and accumulated trade price) and writes them into the CSV file.
def _write_candle_row(self, writer, candle):
ts_utc = candle.get("candle_date_time_utc", "")
trading_pair = candle.get("market", "")
unit = candle.get("unit", "")
opening = candle.get("opening_price", "")
high = candle.get("high_price", "")
low = candle.get("low_price", "")
close = candle.get("trade_price", "")
acc_vol = candle.get("candle_acc_trade_volume", "")
acc_val = candle.get("candle_acc_trade_price", "")
if isinstance(ts_utc, str) and not ts_utc.endswith("Z"):
ts_utc = ts_utc + "Z"
writer.writerow([
ts_utc, trading_pair, unit, opening, high, low, close, acc_vol, acc_val
])
- Writing Empty Candle Data
If no candle exists for a given time slot, this method writes an empty row to the CSV file, preserving consistent time intervals across the dataset.
def _write_empty_row(self, writer, ts_utc, trading_pair, unit):
writer.writerow([
self._datetime_to_string(ts_utc),
trading_pair, unit, "", "", "", "", "", ""
])
Time Conversion Methods
- Converting String to datetime
This method converts a user-provided string into a datetime object. It enables comparison and arithmetic operations between time values.
def _string_to_datetime(self, zulu_str):
if zulu_str.endswith("Z"):
zulu_str = zulu_str[:-1]
return datetime.fromisoformat(zulu_str).replace(tzinfo=timezone.utc)
- Converting datetime to String
This method converts a datetime object back into a string format. Since Upbit’s Candle API requires time parameters as strings, the conversion is necessary before making API requests.
def _datetime_to_string(self, dt):
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
else:
dt = dt.astimezone(timezone.utc)
return dt.strftime("%Y-%m-%dT%H:%M:%SZ")
- Flooring Time to Candle Units
This method normalizes a user-provided timestamp to align with the nearest valid candle unit. For example, if the unit is 5 minutes and the input time is 2025-09-10T10:22:00, the floored time will be 2025-09-10T10:20:00. This ensures more accurate and consistent API queries.
def _floor_time_by_unit(self, dt, unit_min):
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
else:
dt = dt.astimezone(timezone.utc)
if unit_min >= 60:
total_minutes = dt.hour * 60 + dt.minute
floored_total_minutes = (total_minutes // unit_min) * unit_min
floored_hour = floored_total_minutes // 60
floored_minute = floored_total_minutes % 60
floored = dt.replace(
hour=floored_hour, minute=floored_minute, second=0, microsecond=0)
else:
minute = (dt.minute // unit_min) * unit_min
floored = dt.replace(minute=minute, second=0, microsecond=0)
return floored.strftime("%Y-%m-%dT%H:%M:%SZ")
Updated 6 days ago