This tutorial will guide you through creating a Python script that uploads a specified directory of files to a Writer Knowledge Graph using the Writer Python SDK. You’ll use Poetry for dependency management and environment setup.

Prerequisites

  • Python 3.8 or higher installed
  • Poetry installed (Installation guide)
  • A Writer API key (follow the Quickstart to create an app and obtain an API key)
  • A Writer Knowledge Graph ID

You can obtain a Knowledge Graph ID by either creating a new graph using the Create Graph endpoint or using the List Graphs endpoint to retrieve an existing Graph’s ID.

Set up the project

1

Step 1

Create a new directory for your project:

mkdir writer-file-uploader
cd writer-file-uploader
2

Step 2

Initialize a new Poetry project:

poetry init

Follow the prompts, accepting the defaults for most options.

3

Step 3

Add the necessary dependencies to your project:

poetry add writer-sdk python-dotenv

Set up environment variables

1

Step 1

Create a .env file in the project root:

touch .env
2

Step 2

Add your Writer API key and graph ID to the .env file:

WRITER_API_KEY=your_api_key_here
GRAPH_ID=your_graph_id_here

Create the main script

1

Set Up basic structure and imports

Create a file named main.py in the project root and add the following code:

import os
import sys
from dotenv import load_dotenv
from writer import Writer

# Load environment variables from .env file
load_dotenv()

def main(directory_path: str) -> None:
    # Check if the provided path is a valid directory
    if not os.path.isdir(directory_path):
        print(f"Error: {directory_path} is not a valid directory")
        sys.exit(1)

    # Initialize the Writer client
    client = Writer()

    # Get the graph ID from environment variables
    graph_id = os.getenv("GRAPH_ID")
    if not graph_id:
        print("Error: GRAPH_ID environment variable is not set")
        sys.exit(1)

    print(f"Using graph: {graph_id}")

if __name__ == "__main__":
    # Ensure the script is called with a directory path argument
    if len(sys.argv) != 2:
        print("Usage: python main.py <directory_path>")
        sys.exit(1)

    directory_path = sys.argv[1]
    main(directory_path)

In this initial setup, you’re importing the necessary modules for your script. The os module will be used for file operations, sys for system-level operations, load_dotenv to load environment variables, and Writer from the Writer SDK to interact with the Writer API.

The load_dotenv() function is crucial as it loads environment variables from a .env file. This allows you to keep sensitive information like API keys separate from your code, enhancing security and flexibility.

In the main function, you first check if the provided path is a valid directory. This is an important validation step to ensure the script doesn’t proceed with an invalid input. If the path is invalid, the script exits with an error message.

You then initialize the Writer client, which you’ll use throughout the script to interact with the Writer API. The graph ID is retrieved from environment variables. If the GRAPH_ID isn’t set, the script exits with an error, ensuring all necessary information is available before proceeding.

The if __name__ == "__main__": block at the end ensures that the main function is only called if the script is run directly, not if it’s imported as a module.

2

Add function to get files in directory

Add the following function to list files in the specified directory:

from typing import List

def get_files_in_directory(directory_path: str) -> List[str]:
    return [
        os.path.join(directory_path, f)
        for f in os.listdir(directory_path)
        if os.path.isfile(os.path.join(directory_path, f))
    ]

# Update the main function
def main(directory_path: str) -> None:
    # ... (previous code)

    files = get_files_in_directory(directory_path)
    print(f"Found {len(files)} files in the directory.")

The get_files_in_directory function uses a list comprehension to create a list of full file paths for all files (excluding directories) in the specified directory. This function leverages several os module methods to ensure cross-platform compatibility.

You use os.path.join to create full file paths, which is important for ensuring your script works across different operating systems. The os.listdir function lists all entries in the directory, while os.path.isfile checks if each entry is a file rather than a subdirectory.

In the main function, you call this new function and print the number of files found. This provides immediate feedback to the user about what the script will process, which is a good practice for user-friendly command-line tools.

3

Implement file upload function

Add a function to upload a single file:

def upload_file(file_path: str, client: Writer) -> str:
    # Open and read the file contents
    with open(file_path, 'rb') as file_obj:
        file_contents = file_obj.read()

    # Upload the file using the Writer SDK
    file = client.files.upload(
        content=file_contents,
        content_disposition=f"attachment; filename={os.path.basename(file_path)}",
        content_type="application/octet-stream",
    )

    return file.id

The upload_file function is where you interact with the Writer API to upload a file. This function takes two parameters: the path to the file you want to upload, and the Writer client you initialized earlier.

You start by opening the file in binary mode (‘rb’) to read its contents. This ensures that the file is read correctly regardless of its type (text, image, etc.).

Next, you use the Writer SDK’s upload method to send the file to Writer. The content_disposition parameter tells the server to treat this as an attachment and specifies the filename. You’re using application/octet-stream as a generic content type, which is a safe choice that works for any file type.

The function returns the ID of the uploaded file. This ID is crucial as you’ll use it later to associate the file with a specific graph.

4

Implement graph association function

Add a function to associate a file with a graph:

def associate_file_with_graph(file_id: str, graph_id: str, client: Writer) -> None:
    client.graphs.add_file_to_graph(
        graph_id,
        file_id=file_id,
    )

The associate_file_with_graph function takes three parameters: the ID of the file you just uploaded, the ID of the graph you want to associate it with, and the Writer client.

This function uses the Writer SDK’s add_file_to_graph method to create the association between the file and the graph. This step is essential for making the uploaded file available within the context of a specific Writer graph.

5

Combine upload and association

Create a function that combines file upload and graph association:

def upload_and_associate_file(file_path: str, client: Writer, graph_id: str) -> str:
    file_id = upload_file(file_path, client)
    associate_file_with_graph(file_id, graph_id, client)
    return f"Processed {file_path}: File ID {file_id} added to graph {graph_id}"

The upload_and_associate_file function is a higher-level function that combines the upload and association steps into a single operation. This encapsulation simplifies the main logic of your script and makes it easier to handle each file as a unit.

In this function, you first call upload_file to upload the file and get its ID. Then, you immediately call associate_file_with_graph to link this file with the specified graph. This ensures that each file is associated with the graph as soon as it’s uploaded.

The function returns a string describing the action taken. This return value is useful for providing detailed feedback to the user, allowing them to track the progress of each file.

6

Implement single-threaded processing

Update the main function to process files one by one:

def main(directory_path: str) -> None:
    # ... (previous code)

    files = get_files_in_directory(directory_path)
    for file in files:
        result = upload_and_associate_file(file, client, graph_id)
        print(result)

    print("All files have been processed.")

This implementation processes files sequentially, one at a time. It’s a straightforward approach that’s easy to understand and debug. You iterate through each file in the list, call upload_and_associate_file, and immediately print the result.

While this method is simpler, it may be slower for a large number of files because it processes them one after another. However, it provides real-time feedback to the user as each file is processed, which can be beneficial for tracking progress.

7

Add multi-threading for parallel processing

Finally, update the main function to use ThreadPoolExecutor for parallel processing:

from concurrent.futures import ThreadPoolExecutor, as_completed

def main(directory_path: str) -> None:
    # ... (previous code)

    files = get_files_in_directory(directory_path)
    with ThreadPoolExecutor() as executor:
        future_to_file = {executor.submit(upload_and_associate_file, file, client, graph_id): file for file in files}
        for future in as_completed(future_to_file):
            print(future.result())

    print("All files have been processed.")

This final version of the main function introduces parallel processing using Python’s ThreadPoolExecutor. This approach can significantly speed up the overall process, especially when dealing with many files or slow network connections.

You create a “future” for each file processing task. A future represents a computation that may or may not have completed yet. The executor.submit method starts each task in a separate thread.

The as_completed function yields futures as they complete. This allows you to print results as soon as they’re available, rather than waiting for all files to be processed. This approach provides a good balance between efficiency and user feedback.

8

Add graph status reporting

As a final touch, add graph status reporting at the beginning and end:

def main(directory_path: str) -> None:
    # ... (previous code)

    graph = client.graphs.retrieve(graph_id=graph_id)
    print(f"Initial files in graph: {graph.file_status}")

    # ... (processing files)

    graph = client.graphs.retrieve(graph_id=graph_id)
    print(f"Final files in graph: {graph.file_status}")

This final addition provides a before-and-after snapshot of the graph’s file status. By retrieving and displaying the graph status at the beginning and end of the process, you give the user a clear picture of the script’s impact.

The initial status shows the state of the graph before any operations, while the final status confirms that all files were successfully associated with the graph. This kind of reporting is valuable for verifying the script’s effectiveness and can be crucial for debugging or auditing purposes.

Verify the final script

Here is the final main.py file:

import os
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List
from dotenv import load_dotenv
from writerai import Writer

# Load environment variables
load_dotenv()

def upload_and_associate_file(file_path: str, client: Writer, graph_id: str) -> str:
    with open(file_path, 'rb') as file_obj:
        file_contents = file_obj.read()

    file = client.files.upload(
        content=file_contents,
        content_disposition=f"attachment; filename={os.path.basename(file_path)}",
        content_type="application/octet-stream",
    )

    client.graphs.add_file_to_graph(
        graph_id,
        file_id=file.id,
    )

    return f"Processed {file_path}: File ID {file.id} added to graph {graph_id}"

def get_files_in_directory(directory_path: str) -> List[str]:
    return [
        os.path.join(directory_path, f)
        for f in os.listdir(directory_path)
        if os.path.isfile(os.path.join(directory_path, f))
    ]

def main(directory_path: str) -> None:
    if not os.path.isdir(directory_path):
        print(f"Error: {directory_path} is not a valid directory")
        sys.exit(1)

    client = Writer()
    graph_id = os.getenv("GRAPH_ID")
    if not graph_id:
        print("Error: GRAPH_ID environment variable is not set")
        sys.exit(1)

    print(f"Using graph: {graph_id}")

    graph = client.graphs.retrieve(graph_id=graph_id)
    print(f"Files in graph: {graph.file_status}")

    files = get_files_in_directory(directory_path)

    with ThreadPoolExecutor() as executor:
        future_to_file = {executor.submit(upload_and_associate_file, file, client, graph_id): file for file in files}
        for future in as_completed(future_to_file):
            print(future.result())

    print("All files have been processed.")

    # List files for graph
    graph = client.graphs.retrieve(graph_id=graph_id)
    print(f"Files in graph: {graph.file_status}")

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: python src/main.py <directory_path>")
        sys.exit(1)

    directory_path = sys.argv[1]
    main(directory_path)

Run the script

1

Step 1

Activate the Poetry shell:

poetry shell
2

Step 2

Run the script, providing a directory path as an argument:

python main.py /path/to/your/directory

The script will upload all files in the specified directory to your Knowledge Graph and associate them with it.

Conclusion

You’ve now created a Python script that uses the Writer Python SDK to upload a directory of files to a Knowledge Graph. You can further customize this script to handle different file types, add error handling, or integrate it into larger applications.