Proxy locations

Europe

North America

South America

Asia

Africa

Oceania

See all locations

Network statusCareers

Back to blog

How to Track Amazon Prices With Python

How to Make Amazon Price Tracker With Python
Yelyzaveta Nechytailo

Yelyzaveta Nechytailo

2023-11-224 min read
Share

Keeping up with competitor pricing is an inseparable part of running a successful e-commerce business. It goes without saying that one of the biggest sources of pricing data is Amazon, the largest online retailer in the world. In today’s article, we’ll demonstrate how to automatically track Amazon prices at scale. 

1. Installing prerequisite libraries

Let’s begin by installing the libraries we’ll be using throughout the following tutorial.

pip install pandas
pip install matplotlib

We’ll use pandas for easier dict management and saving of results, while matplotlib will be used for plotting price histories.

2. Making the initial request

As we have all the prerequisites installed, we can start working on the code. To start off, we need to connect to the Oxylabs E-Commerce Scraper API, which will help us to fetch the data we need from Amazon.

import requests

USERNAME = 'username'
PASSWORD = 'password'

# Structure payload.
payload = {
    'source':'amazon_product',
    'domain':'com',
    'query':'B0C3LXN76L',
    'parse':True,
    'context':[{
        'key':'autoselect_variant', 
        'value': True
    }],
}

response = requests.request(
   'POST',
   'https://data.oxylabs.io/v1/queries',
   auth=(USERNAME, PASSWORD),
   json=payload,
)

print(response.json())

Here we have code that sets up a request to the Amazon Scraper API and creates a scraping job. 

As we will require product price information, we set our source to amazon_product. As the product we want to scrape is listed in amazon.com, we set the domain to com. Note, that we do not get the result of the scrape instantly, but rather the information about the job that was created. We will also add a context parameter to our payload, as the Scraper API documentation instructs us to do for accurate pricing data. You can also check the latter documentation for more details about the parameters and use of the API.

And if we check the response after running the code, we should see the job information:

job information

The next step would be to create some logic that would wait for our job to finish and then fetch the results.

 # Get response.
    response = requests.request(
        'POST',
        'https://data.oxylabs.io/v1/queries',
        auth=(USERNAME, PASSWORD),
        json=payload,
    )

    response_json = response.json()

    print(response_json)

    job_id = response_json["id"]

    status = ""

    # Pull until the job is done
    while status != "done":
        time.sleep(5)
        response = requests.request(
            'GET',
            f"https://data.oxylabs.io/v1/queries/{job_id}",
            auth=(USERNAME, PASSWORD),
        )
        response_json = response.json()

        status = response_json.get("status")

        print(f"Job status is {status}")

    # Fetch the job results
    response = requests.request(
        'GET',
        f"https://data.oxylabs.io/v1/queries/{job_id}/results",
        auth=(USERNAME, PASSWORD),
    )

    response_json = response.json()

In the code above, we create a while loop that keeps pooling the API for updates on the job status until it becomes done. Then, we fetch the job results.

3. Creating the core of the tracker

Having the connection to the Amazon Scraper API established, we can start building the core logic of our price tracker. The basic requirements for a price tracker could be a script that runs once a day to fetch today's price, then adds it to the historical data we already have and saves it. So, let’s start with that.

We’ll begin by creating a function that would read the historical data about past Amazon prices we could have already gathered.

def read_past_data(filepath):
   results = {}

   if not os.path.isfile(filepath):
       open(filepath, 'a').close()

   if not os.stat(filepath).st_size == 0:
       results_df = pd.read_json(filepath, convert_axes=False)
       results = results_df.to_dict()
       return results
  
   return results

The function takes the file path to our historical data file as an argument and returns the read data as a Python dictionary. It also has a few logical considerations:

  • If there is no data file, one should be created. 

  • If the data file is empty, we should return an empty dictionary.

Now that we have the historical price data loaded, we can think about a function that would take the past price tracker data and add the product price in Amazon today.

def add_todays_prices(results, tracked_product_codes):
   today = date.today()

   for code in tracked_product_codes:
       product = get_product(code)

       if product["title"] not in results:
           results[product["title"]] = {}
      
       results[product["title"]][today.strftime("%d %B, %Y")] = {
           "price": product["price"],
       }
  
   return results

This function takes past Amazon price tracking results and a list of product codes as arguments, then adds today’s price for the provided products to the already existing Amazon prices and returns the results back.

Having the prices updated for today, we can move on to saving our results back to the file we started from, thus finishing our process loop.

def save_results(results, filepath):
   df = pd.DataFrame.from_dict(results)

   df.to_json(filepath)

   return

Finally, we can move the connection to the Scraper API to a separate function and combine all we have done so far:

import os
import time
import requests
import os.path
from datetime import date
from datetime import timedelta
import pandas as pd
import matplotlib.pyplot as plt


def get_product(code):
    USERNAME = "username"
    PASSWORD = "password"

    # Structure payload.

    payload = {
        'source': 'amazon_product',
        'domain': 'com',
        'query': code,
        'parse': True,
        'context': [
            {
                'key': 'autoselect_variant', 'value': True
            }],
}

    # Post the scraping job
    response = requests.request(
        'POST',
        'https://data.oxylabs.io/v1/queries',
        auth=(USERNAME, PASSWORD),
        json=payload,
    )

    response_json = response.json()

    print(response_json)

    job_id = response_json["id"]

    status = ""

    # Wait until the job is done
    while status != "done":
        time.sleep(5)
        response = requests.request(
            'GET',
            f"https://data.oxylabs.io/v1/queries/{job_id}",
            auth=(USERNAME, PASSWORD),
        )
        response_json = response.json()

        status = response_json.get("status")

        print(f"Job status is {status}")

    # Fetch the job results
    response = requests.request(
        'GET',
        f"https://data.oxylabs.io/v1/queries/{job_id}/results",
        auth=(USERNAME, PASSWORD),
    )

    response_json = response.json()

    content = response_json["results"][0]["content"]

    title = content["title"]
    price = content["price"]

    product = {
        "title": title,
        "price": price,
    }
    return product

def read_past_data(filepath):
    results = {}

    if not os.path.isfile(filepath):
        open(filepath, 'a').close()

    if not os.stat(filepath).st_size == 0:
        results_df = pd.read_json(filepath, convert_axes=False)
        results = results_df.to_dict()
        return results
    
    return results

def save_results(results, filepath):
    df = pd.DataFrame.from_dict(results)

    df.to_json(filepath)

    return

def add_todays_prices(results, tracked_product_codes):
    today = date.today()

    for code in tracked_product_codes:
        product = get_product(code)

        if product["title"] not in results:
            results[product["title"]] = {}
        
        results[product["title"]][today.strftime("%d %B, %Y")] = {
            "price": product["price"],
        }
    
    return results

def main():
    results_file = "data.json"

    tracked_product_codes = [
        "B0C3LXN76L",
        "B082VRFWB8"
    ]

    past_results = read_past_data(results_file)

    updated_results = add_todays_prices(past_results, tracked_product_codes)

    save_results(updated_results, results_file)
    
if __name__ == "__main__":
    main()

We coordinate all the logic of the application in the main() function. Variable results_file holds value for the path to the historical price tracker information and tracked_product_codes has all the Amazon product codes we should track. Our script then reads past data from the file, fetches new prices and saves the results back to the file.

After we run the code, we can inspect our Amazon product prices in the specified results file:

results file

4. Plotting price history

Having the prices scraped and saved to a file, we can start adding a few useful features to our price tracker, like plotting the Amazon product price changes over time. 

We can do this by utilizing the matplotlib Python library that we installed earlier.

def plot_history_chart(results):
   for product in results:
       dates = []
       prices = []
      
       for entry_date in results[product]:
           dates.append(entry_date)
           prices.append(results[product][entry_date]["price"])

       plt.plot(dates,prices, label=product[:50])
      
       plt.xlabel("Date")
       plt.ylabel("Price")

   plt.title("Product prices over time")
   plt.legend()
   plt.show()

The function above will plot multiple product price changes over time into a single diagram and then show it. When we add a call to plot_history_chart function to our existing main and run our code again, we will see the results:

results table

5. Creating price drop alerts

Another useful functionality could be to get price drop alerts. This would help direct our attention to a specific product, which becomes especially useful when tracking multiple product prices at the same time.

def check_for_pricedrop(results):
     for product in results:
     today = date.today()
     yesterday = today - timedelta(days = 1)

     if yesterday.strftime("%d %B, %Y") in results[product]:
          change = results[product][today.strftime("%d %B, %Y")]["price"] - results[product].     [yesterday.strftime("%d %B, %Y")]["price"]
          if change < 0:
          print(f'Price for {product} has dropped by {change}!')

Here, we have created a function that checks the price change between yesterday's price entry and today's one and reports if the change was negative. When we add a call to check_for_pricedrop function to our existing main and run our code again, we’ll see the results in the command line:

command line

6. Finalized code

If we add all that we have done, our code will look like this:

import os
import time
import requests
import os.path
from datetime import date
from datetime import timedelta
import pandas as pd
import matplotlib.pyplot as plt


def get_product(code):
    USERNAME = "username"
    PASSWORD = "password"

    # Structure payload.

    payload = {
        'source': 'amazon_product',
        'domain': 'com',
        'query': code,
        'parse': True,
        'context': [
            {
                'key': 'autoselect_variant', 'value': True
            }],
}

    # Post the scraping job
    response = requests.request(
        'POST',
        'https://data.oxylabs.io/v1/queries',
        auth=(USERNAME, PASSWORD),
        json=payload,
    )

    response_json = response.json()

    print(response_json)

    job_id = response_json["id"]

    status = ""

    # Wait until the job is done
    while status != "done":
        time.sleep(5)
        response = requests.request(
            'GET',
            f"https://data.oxylabs.io/v1/queries/{job_id}",
            auth=(USERNAME, PASSWORD),
        )
        response_json = response.json()

        status = response_json.get("status")

        print(f"Job status is {status}")

    # Fetch the job results
    response = requests.request(
        'GET',
        f"https://data.oxylabs.io/v1/queries/{job_id}/results",
        auth=(USERNAME, PASSWORD),
    )

    response_json = response.json()

    content = response_json["results"][0]["content"]

    title = content["title"]
    price = content["price"]

    product = {
        "title": title,
        "price": price,
    }
    return product

def read_past_data(filepath):
    results = {}

    if not os.path.isfile(filepath):
        open(filepath, 'a').close()

    if not os.stat(filepath).st_size == 0:
        results_df = pd.read_json(filepath, convert_axes=False)
        results = results_df.to_dict()
        return results
    
    return results

def save_results(results, filepath):
    df = pd.DataFrame.from_dict(results)

    df.to_json(filepath)

    return

def add_todays_prices(results, tracked_product_codes):
    today = date.today()

    for code in tracked_product_codes:
        product = get_product(code)

        if product["title"] not in results:
            results[product["title"]] = {}
        
        results[product["title"]][today.strftime("%d %B, %Y")] = {
            "price": product["price"],
        }
    
    return results

def plot_history_chart(results):
    for product in results:
        dates = []
        prices = []
        
        for entry_date in results[product]:
            dates.append(entry_date)
            prices.append(float(results[product][entry_date]["price"]))

        plt.plot(dates,prices, label=product[:30])
        
        plt.xlabel("Date")
        plt.ylabel("Price")

    plt.title("Product prices over time")
    plt.legend(loc='lower center', bbox_to_anchor=(0.5, 1.05),
          ncol=3, fancybox=True, shadow=True)    
    plt.show()

def check_for_pricedrop(results):
    for product in results:
        today = date.today()
        yesterday = today - timedelta(days = 1)

        if yesterday.strftime("%d %B, %Y") in results[product]:
            change = float(results[product][today.strftime("%d %B, %Y")]["price"]) - float(results[product][yesterday.strftime("%d %B, %Y")]["price"])
            if change < 0:
                print(f'Price for {product} has dropped by {change}!')
        

def main():
    results_file = "data.json"

    tracked_product_codes = [
        "B0C3LXN76L",
        "B082VRFWB8"
    ]

    past_results = read_past_data(results_file)

    updated_results = add_todays_prices(past_results, tracked_product_codes)

    plot_history_chart(updated_results)

    check_for_pricedrop(updated_results)

    save_results(updated_results, results_file)
    
if __name__ == "__main__":
    main()

7. Scalability considerations

Having the code already laid out, we can see that while the core application is quite simple, it can be easily extended to accommodate more scale and complexity as time goes on: 

  • Alerting could have an improved price change tracking algorithm and have the notifications be sent to some external channel like Telegram. 

  • Plotting could be extended to save the resulting diagrams to a file or load them on some external webpage to be viewed. 

  • Result saving could be remade to use a database instead of saving to a file.

  • And so on. 

Conclusion

There you have it: we’ve successfully built a price tracker app for Amazon that delivers historical pricing data, sends price alerts, and generates price fluctuation charts. If you have any questions about this article, as usual, don't hesitate to contact us at hello@oxylabs.io or via the live chat. Also, if getting high-quality e-commerce data is your key priority and you don't necessarily want to use a scraper, check out our datasets.

About the author

Yelyzaveta Nechytailo

Yelyzaveta Nechytailo

Senior Content Manager

Yelyzaveta Nechytailo is a Senior Content Manager at Oxylabs. After working as a writer in fashion, e-commerce, and media, she decided to switch her career path and immerse in the fascinating world of tech. And believe it or not, she absolutely loves it! On weekends, you’ll probably find Yelyzaveta enjoying a cup of matcha at a cozy coffee shop, scrolling through social media, or binge-watching investigative TV series.

All information on Oxylabs Blog is provided on an "as is" basis and for informational purposes only. We make no representation and disclaim all liability with respect to your use of any information contained on Oxylabs Blog or any third-party websites that may be linked therein. Before engaging in scraping activities of any kind you should consult your legal advisors and carefully read the particular website's terms of service or receive a scraping license.

Related articles

Get the latest news from data gathering world

I’m interested

Scale up your business with Oxylabs®