LogoLogo
WebsitePredictoorData ChallengesData FarmingOcean.pyOcean.js
  • 👋Ocean docs
  • 🌊Discover Ocean
    • Why Ocean?
    • What is Ocean?
    • What can you do with Ocean?
    • OCEAN: The Ocean token
    • Networks
    • Network Bridges
    • FAQ
    • Glossary
  • 📚User Guides
    • Basic concepts
    • Using Wallets
      • Set Up MetaMask
    • Host Assets
      • Uploader
      • Arweave
      • AWS
      • Azure Cloud
      • Google Storage
      • Github
    • Liquidity Pools [deprecated]
  • 💻Developers
    • Architecture Overview
    • Ocean Nodes
      • Node Architecture
    • Contracts
      • Data NFTs
      • Datatokens
      • Data NFTs and Datatokens
      • Datatoken Templates
      • Roles
      • Pricing Schemas
      • Fees
    • Publish Flow Overview
    • Revenue
    • Fractional Ownership
    • Community Monetization
    • Metadata
    • Identifiers (DIDs)
    • New DDO Specification
    • Obsolete DDO Specification
    • Storage Specifications
    • Fine-Grained Permissions
    • Retrieve datatoken/data NFT addresses & Chain ID
    • Get API Keys for Blockchain Access
    • Barge
      • Local Setup
    • Ocean.js
      • Configuration
      • Creating a data NFT
      • Publish
      • Mint Datatokens
      • Update Metadata
      • Asset Visibility
      • Consume Asset
      • Run C2D Jobs
    • Ocean CLI
      • Install
      • Publish
      • Edit
      • Consume
      • Run C2D Jobs
    • DDO.js
      • Instantiate a DDO
      • DDO Fields interactions
      • Validate
      • Edit DDO Fields
    • Compute to data
    • Compute to data
    • Uploader
      • Uploader.js
      • Uploader UI
      • Uploader UI to Market
    • VSCode Extension
    • Old Infrastructure
      • Aquarius
        • Asset Requests
        • Chain Requests
        • Other Requests
      • Provider
        • General Endpoints
        • Encryption / Decryption
        • Compute Endpoints
        • Authentication Endpoints
      • Subgraph
        • Get data NFTs
        • Get data NFT information
        • Get datatokens
        • Get datatoken information
        • Get datatoken buyers
        • Get fixed-rate exchanges
        • Get veOCEAN stats
    • Developer FAQ
  • 📊Data Scientists
    • Ocean.py
      • Install
      • Local Setup
      • Remote Setup
      • Publish Flow
      • Consume Flow
      • Compute Flow
      • Ocean Instance Tech Details
      • Ocean Assets Tech Details
      • Ocean Compute Tech Details
      • Datatoken Interface Tech Details
    • Join a Data Challenge
    • Sponsor a Data Challenge
    • Data Value-Creation Loop
    • What data is valuable?
  • 👀Predictoor
  • 💰Data Farming
    • Predictoor DF
      • Guide to Predictoor DF
    • FAQ
  • 🔨Infrastructure
    • Set Up a Server
    • Deploy Aquarius
    • Deploy Provider
    • Deploy Ocean Subgraph
    • Deploy C2D
    • For C2D, Set Up Private Docker Registry
  • 🤝Contribute
    • Collaborators
    • Contributor Code of Conduct
    • Legal Requirements
Powered by GitBook
LogoLogo

Ocean Protocol

  • Website
  • Blog
  • Data Challenges

Community

  • Twitter
  • Discord
  • Telegram
  • Instagram

Resources

  • Whitepaper
  • GitHub
  • Docs

Copyright 2024 Ocean Protocol Foundation Ltd.

On this page
  • Start Compute Job
  • Compute Job Status
  • Compute Job Result
  • Compute Job Result Logs
  • Stop Compute Job
  • Get Priced C2D Environments
  • Get Free C2D Environments

Was this helpful?

Edit on GitHub
Export as PDF
  1. Data Scientists
  2. Ocean.py

Ocean Compute Tech Details

Technical details about OceanCompute functions

Using this class, we are able to manipulate a compute job, run it on Ocean environment and retrieve the results after the execution is finished.

Start Compute Job

  • start(self, consumer_wallet, dataset: ComputeInput, compute_environment: str, algorithm: Optional[ComputeInput] = None, algorithm_meta: Optional[AlgorithmMetadata] = None, algorithm_algocustomdata: Optional[dict] = None, additional_datasets: List[ComputeInput] = []) -> str

Starts a compute job.

It can be called within Ocean Compute class.

Parameters

  • consumer_wallet - the eth Account of consumer who pays & starts for compute job.

  • dataset - ComputeInput object, each of them includes mandatory the DDO and service.

  • compute_environment - string that represents the ID from the chosen C2D environment.

  • additional_datasets - list of ComputeInput objects for additional datasets in case of starting a compute job for multiple datasets.

Optional parameters

  • algorithm - ComputeInput object, each of them includes mandatory the DDO and service for algorithm.

  • algorithm_meta - either provide just the algorithm metadata as AlgorithmMetadata.

  • algorithm_algocustomedata - additional user data for the algorithm as dictionary.

Returns

str

Returns a string type job ID.

Defined in

Source code
 @enforce_types
    def start(
        self,
        consumer_wallet,
        dataset: ComputeInput,
        compute_environment: str,
        algorithm: Optional[ComputeInput] = None,
        algorithm_meta: Optional[AlgorithmMetadata] = None,
        algorithm_algocustomdata: Optional[dict] = None,
        additional_datasets: List[ComputeInput] = [],
    ) -> str:
        metadata_cache_uri = self._config_dict.get("METADATA_CACHE_URI")
        ddo = Aquarius.get_instance(metadata_cache_uri).get_ddo(dataset.did)
        service = ddo.get_service_by_id(dataset.service_id)
        assert (
            ServiceTypes.CLOUD_COMPUTE == service.type
        ), "service at serviceId is not of type compute service."

        consumable_result = is_consumable(
            ddo,
            service,
            {"type": "address", "value": consumer_wallet.address},
            with_connectivity_check=True,
        )
        if consumable_result != ConsumableCodes.OK:
            raise AssetNotConsumable(consumable_result)

        # Start compute job
        job_info = self._data_provider.start_compute_job(
            dataset_compute_service=service,
            consumer=consumer_wallet,
            dataset=dataset,
            compute_environment=compute_environment,
            algorithm=algorithm,
            algorithm_meta=algorithm_meta,
            algorithm_custom_data=algorithm_algocustomdata,
            input_datasets=additional_datasets,
        )
        return job_info["jobId"]

Compute Job Status

  • status(self, ddo: DDO, service: Service, job_id: str, wallet) -> Dict[str, Any]

Gets status of the compute job.

It can be called within Ocean Compute class.

Parameters

  • ddo - DDO offering the compute service of this job

  • service - Service object of compute

  • job_id - ID of the compute job

  • wallet - eth Account which initiated the compute job

Returns

Dict[str, Any]

A dictionary which contains the status for an existing compute job, keys are (ok, status, statusText).

Defined in

Source code
@enforce_types
    def status(self, ddo: DDO, service: Service, job_id: str, wallet) -> Dict[str, Any]:
        """
        Gets job status.

        :param ddo: DDO offering the compute service of this job
        :param service: compute service of this job
        :param job_id: str id of the compute job
        :param wallet: Wallet instance
        :return: dict the status for an existing compute job, keys are (ok, status, statusText)
        """
        job_info = self._data_provider.compute_job_status(
            ddo.did, job_id, service, wallet
        )
        job_info.update({"ok": job_info.get("status") not in (31, 32, None)})

        return job_info

Compute Job Result

  • result(self, ddo: DDO, service: Service, job_id: str, index: int, wallet ) -> Dict[str, Any]

Gets compute job result.

It can be called within Ocean Compute class.

Parameters

  • ddo - DDO offering the compute service of this job

  • service - Service object of compute

  • job_id - ID of the compute job

  • index - compute result index

  • wallet - eth Account which initiated the compute job

Returns

Dict[str, Any]

A dictionary wich contains the results/logs urls for an existing compute job, keys are (did, urls, logs).

Defined in

Source code
@enforce_types
    def result(
        self, ddo: DDO, service: Service, job_id: str, index: int, wallet
    ) -> Dict[str, Any]:
        """
        Gets job result.

        :param ddo: DDO offering the compute service of this job
        :param service: compute service of this job
        :param job_id: str id of the compute job
        :param index: compute result index
        :param wallet: Wallet instance
        :return: dict the results/logs urls for an existing compute job, keys are (did, urls, logs)
        """
        result = self._data_provider.compute_job_result(job_id, index, service, wallet)

        return result

Compute Job Result Logs

  • compute_job_result_logs(self, ddo: DDO, service: Service, job_id: str, wallet, log_type="output") -> Dict[str, Any]

Gets job output if exists.

It can be called within Ocean Compute class.

Parameters

  • ddo - DDO offering the compute service of this job

  • service - Service object of compute

  • job_id - ID of the compute job

  • wallet - eth Account which initiated the compute job

  • log_type - string which selects what kind of logs to display. Default "output"

Returns

Dict[str, Any]

A dictionary which includes the results/logs urls for an existing compute job, keys are (did, urls, logs).

Defined in

Source code
@enforce_types
    def compute_job_result_logs(
        self,
        ddo: DDO,
        service: Service,
        job_id: str,
        wallet,
        log_type="output",
    ) -> Dict[str, Any]:
        """
        Gets job output if exists.

        :param ddo: DDO offering the compute service of this job
        :param service: compute service of this job
        :param job_id: str id of the compute job
        :param wallet: Wallet instance
        :return: dict the results/logs urls for an existing compute job, keys are (did, urls, logs)
        """
        result = self._data_provider.compute_job_result_logs(
            ddo, job_id, service, wallet, log_type
        )

        return result

Stop Compute Job

  • stop(self, ddo: DDO, service: Service, job_id: str, wallet) -> Dict[str, Any]

Attempts to stop the running compute job.

It can be called within Ocean Compute class.

Parameters

  • ddo - DDO offering the compute service of this job

  • service - Service object of compute

  • job_id - ID of the compute job

  • wallet - eth Account which initiated the compute job

Returns

Dict[str, Any]

A dictionary which contains the status for the stopped compute job, keys are (ok, status, statusText).

Defined in

Source code
@enforce_types
    def stop(self, ddo: DDO, service: Service, job_id: str, wallet) -> Dict[str, Any]:
        """
        Attempt to stop the running compute job.

        :param ddo: DDO offering the compute service of this job
        :param job_id: str id of the compute job
        :param wallet: Wallet instance
        :return: dict the status for the stopped compute job, keys are (ok, status, statusText)
        """
        job_info = self._data_provider.stop_compute_job(
            ddo.did, job_id, service, wallet
        )
        job_info.update({"ok": job_info.get("status") not in (31, 32, None)})
        return job_info

Get Priced C2D Environments

  • get_c2d_environments(self, service_endpoint: str, chain_id: int)

Get list of compute environments.

It can be called within Ocean Compute class.

Parameters

  • service_endpoint - string Provider URL that is stored in compute service.

  • chain_id - using Provider multichain, chain_id is required to specify the network for your environment. It has int type.

Returns

list

A list of objects containing information about each compute environment. For each compute environment, these are the following keys: (id, feeToken, priceMin, consumerAddress, lastSeen, namespace, status).

Defined in

Source code
    @enforce_types
    def get_c2d_environments(self, service_endpoint: str, chain_id: int):
        return DataServiceProvider.get_c2d_environments(service_endpoint, chain_id)

Get Free C2D Environments

  • get_free_c2d_environment(self, service_endpoint: str, chain_id)

Get list of free compute environments.

Important thing is that not all Providers contain free environments (priceMin = 0).

It can be called within Ocean Compute class.

Parameters

  • service_endpoint - string Provider URL that is stored in compute service.

  • chain_id - using Provider multichain, chain_id is required to specify the network for your environment. It has int type.

Returns

list

A list of objects containing information about each compute environment. For each compute environment, these are the following keys: (id, feeToken, priceMin, consumerAddress, lastSeen, namespace, status).

Defined in

Source code
@enforce_types
    def get_free_c2d_environment(self, service_endpoint: str, chain_id):
        environments = self.get_c2d_environments(service_endpoint, chain_id)
        return next(env for env in environments if float(env["priceMin"]) == float(0))

Last updated 1 year ago

Was this helpful?

📊
ocean/ocean_compute.py
ocean/ocean_compute.py
ocean/ocean_compute.py
ocean/ocean_compute.py
ocean/ocean_compute.py
ocean/ocean_compute.py
ocean/ocean_compute.py