Exporting Market Data to CSV Using the Candle API

Learn how to retrieve candle data for a specified period using Upbit’s Candle API and export it to a CSV file.

Overview

This guide demonstrates how to use the Upbit Minute Candle API to save market data for a specified period into a CSV file. Each record includes the candle date time in UTC, along with the opening price, high price, low price, closing price, accumulated trade value, and accumulated trade volume. The candle aggregation interval (in minutes) can be set using the supported unit values provided by Upbit. After execution, the results can be verified in CSV format as shown below.


Exporting Market Data to CSV Using the Candle API

For a smooth tutorial experience, use Python 3.7 or later. Upbit’s Candle API returns up to 200 candles per request. If the number of candles in your target period exceeds 200, you must compute time windows and call the API multiple times.

Provide the candle period to export and the minute aggregation interval (hereafter, unit). The Minute Candle API returns an array of candles generated at unit intervals. Compute the request window using the candle start time, the unit, and the desired number of candles per response (hereafter, count). Call the Minute Candle API according to the computed windows. From each response, write only the candles whose timestamps fall within the specified period to the CSV file.

Note
Candle data is generated based on executed trades. Therefore, if no trades occur during a given time interval, candle data for that interval does not exist.

As a result, if you write only the API responses to a CSV file, the spacing between recorded candles may not be uniform. In this guide, when no candlestick data exists, an empty row is written to the CSV file. This approach allows users to distinguish between intervals with executed trades and intervals without trades.

The final candle row is determined by the alignment of the user-specified end time to (exclusive) with the candle interval (unit). If the end time does not align with the interval, the last recorded candle corresponds to the most recent candle generated before the end time.

For example, when unit=15, candles are generated at 00:00, 00:15, 00:30, … . If the specified end time is 2025-09-10T00:21:00, which is not aligned with a 15-minute interval, the last row in the CSV will contain the candle generated at 2025-09-10T00:15:00.

Complete Code

Below is the complete code for retrieving candle data over a specified period and exporting it to a CSV file. When you run it, a data directory is created in the current path, and the CSV file is generated inside that directory.

import requests
from datetime import datetime, timedelta, timezone
from pathlib import Path
from time import sleep
import csv


class MinCandleCsvExporter():
    def __init__(self):
        self.csv_header = [
            "candle_date_time_utc", "trading_pair", "unit",
            "opening_price", "high_price", "low_price", "trade_price",
            "candle_acc_trade_volume", "candle_acc_trade_price"
        ]
        self.unit_limit = [
            1, 3, 5, 10, 15, 30, 60, 240
        ]

    def _get_candles_minutes(self, trading_pair, to_dt, unit):
        if unit not in self.unit_limit:
            raise ValueError("unit must be in 1, 3, 5, 10, 15, 30, 60, 240")
        path = f"https://{region}-api.upbit.com/v1/candles/minutes/{unit}"
        params = {
            "market": trading_pair,
            "to": to_dt,
            "count": 200
        }
        try:
            res = requests.get(path, params=params)
            res.raise_for_status()
            data = res.json()
        except Exception as e:
            raise RuntimeError(f"Error: {e}")

        if isinstance(data, list):
            return data[::-1]
        else:
            raise RuntimeError(f"Unexpected API response format: {data}")

    def _string_to_datetime(self, zulu_str):
        if zulu_str.endswith("Z"):
            zulu_str = zulu_str[:-1]
        return datetime.fromisoformat(zulu_str).replace(tzinfo=timezone.utc)

    def _datetime_to_string(self, dt):
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
        else:
            dt = dt.astimezone(timezone.utc)

        return dt.strftime("%Y-%m-%dT%H:%M:%SZ")

    def _floor_time_by_unit(self, dt, unit_min):
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
        else:
            dt = dt.astimezone(timezone.utc)

        if unit_min >= 60:
            total_minutes = dt.hour * 60 + dt.minute
            floored_total_minutes = (total_minutes // unit_min) * unit_min
            floored_hour = floored_total_minutes // 60
            floored_minute = floored_total_minutes % 60
            floored = dt.replace(
                hour=floored_hour, minute=floored_minute, second=0, microsecond=0)
        else:
            minute = (dt.minute // unit_min) * unit_min
            floored = dt.replace(minute=minute, second=0, microsecond=0)

        return floored.strftime("%Y-%m-%dT%H:%M:%SZ")

    def _get_csv_writer(self, csv_path):
        exists = csv_path.exists()
        csv_path.parent.mkdir(parents=True, exist_ok=True)
        f = open(csv_path, "a", newline="", encoding="utf-8")
        writer = csv.writer(f)
        if not exists:
            writer.writerow(self.csv_header)
        return f, writer

    def _write_empty_row(self, writer, ts_utc, trading_pair, unit):
        writer.writerow([
            self._datetime_to_string(ts_utc),
            trading_pair, unit, "", "", "", "", "", ""
        ])

    def _write_candle_row(self, writer, candle):
        ts_utc = candle.get("candle_date_time_utc")
        trading_pair = candle.get("market", "")
        unit = candle.get("unit", "")
        opening = candle.get("opening_price", "")
        high = candle.get("high_price", "")
        low = candle.get("low_price", "")
        close = candle.get("trade_price", "")
        acc_vol = candle.get("candle_acc_trade_volume", "")
        acc_val = candle.get("candle_acc_trade_price", "")
        if isinstance(ts_utc, str) and not ts_utc.endswith("Z"):
            ts_utc = ts_utc + "Z"
        writer.writerow([
            ts_utc, trading_pair, unit, opening, high, low, close, acc_vol, acc_val
        ])

    def _iter_write_range(self, trading_pair, unit, start_dt, end_dt, writer):
        from_dt = self._string_to_datetime(start_dt)
        to_dt = self._string_to_datetime(end_dt)
        unit_dt = timedelta(minutes=unit)
        interval = timedelta(minutes=unit * 200)
        to_params_dt = from_dt + interval
        last_dt = to_dt + interval

        while to_params_dt <= last_dt:
            print(f"querying {from_dt} to {to_params_dt} Candle Data...")
            candles = self._get_candles_minutes(
                trading_pair, self._datetime_to_string(to_params_dt), unit)
            expected_candle_dt = from_dt
            candle_idx = 0

            if not candles or len(candles) == 0:
                raise LookupError(
                    f"No candle data found for {trading_pair} from {self._datetime_to_string(from_dt)} to {self._datetime_to_string(to_params_dt)}. "
                    f"Please check the trading_pair and time range.")

            batch_end_dt = min(to_dt, to_params_dt - unit_dt)

            while expected_candle_dt <= batch_end_dt and candle_idx < len(candles):
                prev_expected_dt = expected_candle_dt

                current_candle_dt = self._string_to_datetime(
                    candles[candle_idx].get("candle_date_time_utc"))

                if current_candle_dt == expected_candle_dt:
                    self._write_candle_row(writer, candles[candle_idx])
                    candle_idx = candle_idx + 1
                    expected_candle_dt += unit_dt

                elif current_candle_dt < expected_candle_dt:
                    candle_idx = candle_idx + 1

                else:
                    self._write_empty_row(
                        writer, expected_candle_dt, trading_pair, unit)
                    expected_candle_dt += unit_dt

                if expected_candle_dt == prev_expected_dt and candle_idx >= len(candles):
                    self._write_empty_row(
                        writer, expected_candle_dt, trading_pair, unit)
                    expected_candle_dt += unit_dt

            while expected_candle_dt <= batch_end_dt:
                self._write_empty_row(
                    writer, expected_candle_dt, trading_pair, unit)
                expected_candle_dt += unit_dt

            from_dt = expected_candle_dt
            to_params_dt = from_dt + interval

            sleep(0.5)

    def run_export_minutes_csv(self, trading_pair, unit, from_dt_str, to_dt_str, csv_file_path):
        from_dt = self._string_to_datetime(from_dt_str)
        to_dt = self._string_to_datetime(to_dt_str)
        floored_from_dt = self._floor_time_by_unit(from_dt, unit)
        floored_to_dt = self._floor_time_by_unit(to_dt, unit)

        if from_dt > to_dt:
            raise ValueError("from_dt must be before to_dt")

        if to_dt > datetime.now(timezone.utc):
            raise ValueError("to_dt must be before current time")

        csv_path = Path(csv_file_path)
        f, writer = self._get_csv_writer(csv_path)
        try:
            print(
                f"{trading_pair}_{from_dt}_{to_dt}_{unit}m_candles.csv is creating...")
            self._iter_write_range(
                trading_pair, unit, floored_from_dt, floored_to_dt, writer)
            print(
                f"{trading_pair}_{from_dt}_{to_dt}_{unit}m_candles.csv has been created.")
        finally:
            f.close()


if __name__ == "__main__":

    exporter = MinCandleCsvExporter()

    trading_pair = "{fiat}-BTC"
    from_dt = "2025-09-01T00:00:00Z"  # UTC
    to_dt = "2025-09-10T10:00:00Z"  # UTC
    unit = 10
    csv_file = f"./data/{trading_pair}_{from_dt}_{to_dt}_{unit}m_candles.csv"

    exporter.run_export_minutes_csv(
        trading_pair, unit, from_dt, to_dt, csv_file_path=csv_file)

Detailed Guide

Implementing the MinCandleCsvExporter Module

Create a class that implements the methods to fetch candlestick data for a user-specified period and write it to a CSV file.

Initial Setup

Define the CSV header fields and the set of unit values supported by Upbit’s Minute Candle API. Each time you instantiate the MinCandleCsvExporter module, the default header and allowed unit values are initialized as follows.

class MinCandleCsvExporter():
    def __init__(self):
        self.csv_header = [
            "candle_date_time_utc", "trading_pair", "unit",
            "opening_price", "high_price", "low_price", "trade_price",
            "candle_acc_trade_volume", "candle_acc_trade_price"
        ]
        self.unit_limit = [
            1, 3, 5, 10, 15, 30, 60, 240
        ]

CSV File Methods

  • Extracting Candle Data

This is the primary method users will call on the MinCandleCsvExporter class. It accepts the trading pair, the candle unit, the start and end timestamps, and the CSV file path; it then queries the Candle API and writes the resulting candle data to the CSV file.

    def run_export_minutes_csv(self, trading_pair, unit, from_dt_str, to_dt_str, csv_file_path):
        from_dt = self._string_to_datetime(from_dt_str)
        to_dt = self._string_to_datetime(to_dt_str)
        floored_from_dt = self._floor_time_by_unit(from_dt, unit)
        floored_to_dt = self._floor_time_by_unit(to_dt, unit)

        if from_dt > to_dt:
            raise ValueError("from_dt must be before to_dt")

        if to_dt > datetime.now(timezone.utc):
            raise ValueError("to_dt must be before current time")

        csv_path = Path(csv_file_path)
        f, writer = self._get_csv_writer(csv_path)
        try:
            print(
                f"{trading_pair}_{from_dt}_{to_dt}_{unit}m_candles.csv is creating...")
            self._iter_write_range(
                trading_pair, unit, floored_from_dt, floored_to_dt, writer)
            print(
                f"{trading_pair}_{from_dt}_{to_dt}_{unit}m_candles.csv has been created.")
        finally:
            f.close()
  • Creating the CSV Writer

This method checks whether the user-specified CSV file path exists. If the file exists, it opens the file and returns both the file object and the writer object. These objects are then used to write data into the CSV file. If the file does not exist, the method creates the parent directory (if necessary), generates a new CSV file, and returns the file object and writer object.

    def _get_csv_writer(self, csv_path):
        exists = csv_path.exists()
        csv_path.parent.mkdir(parents=True, exist_ok=True)
        f = open(csv_path, "a", newline="", encoding="utf-8")
        writer = csv.writer(f)
        if not exists:
            writer.writerow(self.csv_header)
        return f, writer

API Call Methods

  • Minute Candle API Call

This method calls Upbit’s Minute Candle API to retrieve candles for the specified trading pair. The API returns an array of candles created at intervals defined by the unit parameter. The API also supports the to parameter, which is used to define the query range. Based on this, the method calculates the appropriate call interval for successive API requests.

    def _get_candles_minutes(self, trading_pair, to_dt, unit):
        if unit not in self.unit_limit:
            raise ValueError("unit must be in 1, 3, 5, 10, 15, 30, 60, 240")
        path = f"https://{region}-api.upbit.com/v1/candles/minutes/{unit}"
        params = {
            "market": trading_pair,
            "to": to_dt,
            "count": 200
        }
        try:
            res = requests.get(path, params=params)
            res.raise_for_status()
            data = res.json()
        except Exception as e:
            raise RuntimeError(f"Error: {e}")

        if isinstance(data, list):
            return data[::-1]
        else:
            raise RuntimeError(f"Unexpected API response format: {data}")
  • Iterating Over Candle Periods and Writing to CSV

This method queries the Minute Candle API over the user-specified period (from, to) and unit, and writes the results to a CSV file. If candle data exists, it records the candles in chronological order. If no data exists for a time slot, it writes an empty row to ensure continuity of the time series. To ensure reliability, each API call is followed by a 0.5-second delay.

    def _iter_write_range(self, trading_pair, unit, start_dt, end_dt, writer):
        from_dt = self._string_to_datetime(start_dt)
        to_dt = self._string_to_datetime(end_dt)
        unit_dt = timedelta(minutes=unit)
        interval = timedelta(minutes=unit * 200)
        to_params_dt = from_dt + interval
        last_dt = to_dt + interval

        while to_params_dt <= last_dt:
            print(f"querying {from_dt} to {to_params_dt} Candle Data...")
            candles = self._get_candles_minutes(
                trading_pair, self._datetime_to_string(to_params_dt), unit)
            expected_candle_dt = from_dt
            candle_idx = 0

            if not candles or len(candles) == 0:
                raise LookupError(
                    f"No candle data found for {trading_pair} from {self._datetime_to_string(from_dt)} to {self._datetime_to_string(to_params_dt)}. "
                    f"Please check the trading_pair and time range.")

            batch_end_dt = min(to_dt, to_params_dt - unit_dt)

            while expected_candle_dt <= batch_end_dt and candle_idx < len(candles):
                prev_expected_dt = expected_candle_dt

                current_candle_dt = self._string_to_datetime(
                    candles[candle_idx].get("candle_date_time_utc"))

                if current_candle_dt == expected_candle_dt:
                    self._write_candle_row(writer, candles[candle_idx])
                    candle_idx = candle_idx + 1
                    expected_candle_dt += unit_dt

                elif current_candle_dt < expected_candle_dt:
                    candle_idx = candle_idx + 1

                else:
                    self._write_empty_row(
                        writer, expected_candle_dt, trading_pair, unit)
                    expected_candle_dt += unit_dt

                if expected_candle_dt == prev_expected_dt and candle_idx >= len(candles):
                    self._write_empty_row(
                        writer, expected_candle_dt, trading_pair, unit)
                    expected_candle_dt += unit_dt

            while expected_candle_dt <= batch_end_dt:
                self._write_empty_row(
                    writer, expected_candle_dt, trading_pair, unit)
                expected_candle_dt += unit_dt

            from_dt = expected_candle_dt
            to_params_dt = from_dt + interval

            sleep(0.5)

Candle Data Output Methods

  • Writing Actual Candle Data

This method extracts values from each candle (such as candle date time in UTC, trading pair, unit, open, high, low, close, accumulated trade volume, and accumulated trade price) and writes them into the CSV file.

    def _write_candle_row(self, writer, candle):
        ts_utc = candle.get("candle_date_time_utc", "")
        trading_pair = candle.get("market", "")
        unit = candle.get("unit", "")
        opening = candle.get("opening_price", "")
        high = candle.get("high_price", "")
        low = candle.get("low_price", "")
        close = candle.get("trade_price", "")
        acc_vol = candle.get("candle_acc_trade_volume", "")
        acc_val = candle.get("candle_acc_trade_price", "")
        if isinstance(ts_utc, str) and not ts_utc.endswith("Z"):
            ts_utc = ts_utc + "Z"
        writer.writerow([
            ts_utc, trading_pair, unit, opening, high, low, close, acc_vol, acc_val
        ])
  • Writing Empty Candle Data

If no candle exists for a given time slot, this method writes an empty row to the CSV file, preserving consistent time intervals across the dataset.

    def _write_empty_row(self, writer, ts_utc, trading_pair, unit):
        writer.writerow([
            self._datetime_to_string(ts_utc),
            trading_pair, unit, "", "", "", "", "", ""
        ])

Time Conversion Methods

  • Converting String to datetime

This method converts a user-provided string into a datetime object. It enables comparison and arithmetic operations between time values.

    def _string_to_datetime(self, zulu_str):
        if zulu_str.endswith("Z"):
            zulu_str = zulu_str[:-1]
        return datetime.fromisoformat(zulu_str).replace(tzinfo=timezone.utc)
  • Converting datetime to String

This method converts a datetime object back into a string format. Since Upbit’s Candle API requires time parameters as strings, the conversion is necessary before making API requests.

    def _datetime_to_string(self, dt):
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
        else:
            dt = dt.astimezone(timezone.utc)

        return dt.strftime("%Y-%m-%dT%H:%M:%SZ")
  • Flooring Time to Candle Units

This method normalizes a user-provided timestamp to align with the nearest valid candle unit. For example, if the unit is 5 minutes and the input time is 2025-09-10T10:22:00, the floored time will be 2025-09-10T10:20:00. This ensures more accurate and consistent API queries.

    def _floor_time_by_unit(self, dt, unit_min):
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
        else:
            dt = dt.astimezone(timezone.utc)

        if unit_min >= 60:
            total_minutes = dt.hour * 60 + dt.minute
            floored_total_minutes = (total_minutes // unit_min) * unit_min
            floored_hour = floored_total_minutes // 60
            floored_minute = floored_total_minutes % 60
            floored = dt.replace(
                hour=floored_hour, minute=floored_minute, second=0, microsecond=0)
        else:
            minute = (dt.minute // unit_min) * unit_min
            floored = dt.replace(minute=minute, second=0, microsecond=0)

        return floored.strftime("%Y-%m-%dT%H:%M:%SZ")