Creates asset of type "dataset", having UrlFiles, with good defaults.
It can be called after instantiating Ocean object.
Parameters
name - name of the asset, string
url - url that is stored in the asset, string
publisher_wallet - wallet of the asset publisher/owner, eth Account
wait_for_aqua - boolean value which default is True, waiting for aquarius to fetch the asset takes additional time, but if you want to be sure that your asset is indexed, keep the default value.
Returns
tuple
A tuple which contains the data NFT, datatoken and the data asset.
Create asset of type "algorithm", having UrlFiles, with good defaults.
It can be called after instantiating Ocean object.
Parameters:
name - name of the asset, string
url - url that is stored in the asset, string
publisher_wallet - wallet of the asset publisher/owner, eth Account
image - docker image of that algorithm, string
tag - docker tag for that algorithm image, string
checksum - docker checksum for algorithm's image, string
wait_for_aqua - boolean value which default is True, waiting for aquarius to fetch the asset takes additional time, but if you want to be sure that your asset is indexed, keep the default value.
Returns
tuple
A tuple which contains the algorithm NFT, algorithm datatoken and the algorithm asset.
Creates asset of type "data", having ArweaveFile, with good defaults.
It can be called after instantiating Ocean object.
Parameters
name - name of the asset, string
transaction_id - transaction id from the arweave file, string
publisher_wallet - wallet of the asset publisher/owner, eth Account
wait_for_aqua - boolean value which default is True, waiting for aquarius to fetch the asset takes additional time, but if you want to be sure that your asset is indexed, keep the default value.
Returns
tuple
A tuple which contains the data NFT, datatoken and the data asset.
Creates asset of type "data", having GraphqlQuery files, with good defaults.
It can be called after instantiating Ocean object.
Parameters
name - name of the asset, string
url - url of subgraph that you are using, string
query - GraphQL query, string
publisher_wallet - wallet of the asset publisher/owner, eth Account
wait_for_aqua - boolean value which default is True, waiting for aquarius to fetch the asset takes additional time, but if you want to be sure that your asset is indexed, keep the default value.
Returns
tuple
A tuple which contains the data NFT, datatoken and the data asset.
Creates asset of type "data", having SmartContractCall files, with good defaults.
It can be called after instantiating Ocean object.
Parameters
name - name of the asset, string
contract_address - contract address that should be stored in the asset, string
contract_abi - ABI of functions presented in the contract, string
publisher_wallet - wallet of the asset publisher/owner, eth Account
wait_for_aqua - boolean value which default is True, waiting for aquarius to fetch the asset takes additional time, but if you want to be sure that your asset is indexed, keep the default value.
Returns
tuple
A tuple which contains the data NFT, datatoken and the data asset.
def create(
self,
metadata: dict,
publisher_wallet,
credentials: Optional[dict] = None,
data_nft_address: Optional[str] = None,
data_nft_args: Optional[DataNFTArguments] = None,
deployed_datatokens: Optional[List[Datatoken]] = None,
services: Optional[list] = None,
datatoken_args: Optional[List["DatatokenArguments"]] = None,
encrypt_flag: Optional[bool] = True,
compress_flag: Optional[bool] = True,
wait_for_aqua: bool = True,
) -> Optional[DDO]:
self._assert_ddo_metadata(metadata)
provider_uri = DataServiceProvider.get_url(self._config_dict)
if not data_nft_address:
data_nft_args = data_nft_args or DataNFTArguments(
metadata["name"], metadata["name"]
)
data_nft = data_nft_args.deploy_contract(
self._config_dict, publisher_wallet
)
# register on-chain
if not data_nft:
logger.warning("Creating new NFT failed.")
return None, None, None
logger.info(f"Successfully created NFT with address {data_nft.address}.")
else:
data_nft = DataNFT(self._config_dict, data_nft_address)
# Create DDO object
ddo = DDO()
# Generate the did, add it to the ddo.
ddo.did = data_nft.calculate_did()
# Check if it's already registered first!
if self._aquarius.ddo_exists(ddo.did):
raise AquariusError(
f"Asset id {ddo.did} is already registered to another asset."
)
ddo.chain_id = self._chain_id
ddo.metadata = metadata
ddo.credentials = credentials if credentials else {"allow": [], "deny": []}
ddo.nft_address = data_nft.address
datatokens = []
if not deployed_datatokens:
services = []
for datatoken_arg in datatoken_args:
new_dt = datatoken_arg.create_datatoken(
data_nft, publisher_wallet, with_services=True
)
datatokens.append(new_dt)
services.extend(datatoken_arg.services)
for service in services:
ddo.add_service(service)
else:
if not services:
logger.warning("services required with deployed_datatokens.")
return None, None, None
datatokens = deployed_datatokens
dt_addresses = []
for datatoken in datatokens:
if deployed_datatokens[0].address not in data_nft.getTokensList():
logger.warning(
"some deployed_datatokens don't belong to the given data nft."
)
return None, None, None
dt_addresses.append(datatoken.address)
for service in services:
if service.datatoken not in dt_addresses:
logger.warning("Datatoken services mismatch.")
return None, None, None
ddo.add_service(service)
# Validation by Aquarius
_, proof = self.validate(ddo)
proof = (
proof["publicKey"],
proof["v"],
proof["r"][0],
proof["s"][0],
)
document, flags, ddo_hash = self._encrypt_ddo(
ddo, provider_uri, encrypt_flag, compress_flag
)
data_nft.setMetaData(
0,
provider_uri,
Web3.toChecksumAddress(publisher_wallet.address.lower()).encode("utf-8"),
flags,
document,
ddo_hash,
[proof],
{"from": publisher_wallet},
)
# Fetch the ddo on chain
if wait_for_aqua:
ddo = self._aquarius.wait_for_ddo(ddo.did)
return (data_nft, datatokens, ddo)
Publishing Alternatives
Here are some examples similar to the create() above, but exposes more fine-grained control.
In the same python console:
# Specify metadata and services, using the Branin test dataset
date_created = "2021-12-28T10:55:11Z"
metadata = {
"created": date_created,
"updated": date_created,
"description": "Branin dataset",
"name": "Branin dataset",
"type": "dataset",
"author": "Trent",
"license": "CC0: PublicDomain",
}
# Use "UrlFile" asset type. (There are other options)
from ocean_lib.structures.file_objects import UrlFile
url_file = UrlFile(
url="https://raw.githubusercontent.com/trentmc/branin/main/branin.arff"
)
# Publish data asset
from ocean_lib.models.datatoken_base import DatatokenArguments
_, _, ddo = ocean.assets.create(
metadata,
{"from": alice},
datatoken_args=[DatatokenArguments(files=[url_file])],
)
DDO Encryption or Compression
The DDO is stored on-chain. It's encrypted and compressed by default. Therefore it supports GDPR "right-to-be-forgotten" compliance rules by default.
You can control this during create():
To disable encryption, use ocean.assets.create(..., encrypt_flag=False).
To disable compression, use ocean.assets.create(..., compress_flag=False).
To disable both, use ocean.assetspy.create(..., encrypt_flag=False, compress_flag=False).
Create _just_** a data NFT**
Calling create() like above generates a data NFT, a datatoken for that NFT, and a ddo. This is the most common case. However, sometimes you may want just the data NFT, e.g. if using a data NFT as a simple key-value store. Here's how:
If you call create() after this, you can pass in an argument data_nft_address:string and it will use that NFT rather than creating a new one.
Create a datatoken from a data NFT
Calling create() like above generates a data NFT, a datatoken for that NFT, and a ddo object. However, we may want a second datatoken. Or, we may have started with just the data NFT, and want to add a datatoken to it. Here's how:
If you call create() after this, you can pass in an argument deployed_datatokens:List[Datatoken1] and it will use those datatokens during creation.
Create an asset & pricing schema simultaneously
Ocean Assets allows you to bundle several common scenarios as a single transaction, thus lowering gas fees.
Any of the ocean.assets.create_<type>_asset() functions can also take an optional parameter that describes a bundled pricing schema (Dispenser or Fixed Rate Exchange).
@enforce_types
def search(self, text: str) -> list:
"""
Search for DDOs in aquarius that contain the target text string
:param text - target string
:return - List of DDOs that match with the query
"""
logger.info(f"Search for DDOs containing text: {text}")
text = text.replace(":", "\\:").replace("\\\\:", "\\:")
return [
DDO.from_dict(ddo_dict["_source"])
for ddo_dict in self._aquarius.query_search(
{"query": {"query_string": {"query": text}}}
)
if "_source" in ddo_dict
]
Searches Asset by GraphQL Query
query(self, query: dict) -> list
Searches a DDO by a specific query.
Parameter
query - dictionary type query to search for assets which include it.
Returns
list
A list of DDOs which have matches with the query provided as parameter.
@enforce_types
def query(self, query: dict) -> list:
"""
Search for DDOs in aquarius with a search query dict
:param query - dict with query parameters
More info at: https://docs.oceanprotocol.com/api-references/aquarius-rest-api
:return - List of DDOs that match the query.
"""
logger.info(f"Search for DDOs matching query: {query}")
return [
DDO.from_dict(ddo_dict["_source"])
for ddo_dict in self._aquarius.query_search(query)
if "_source" in ddo_dict
]
consumer_wallet - eth Account for the wallet that "ordered" the asset.
destination - destination path, as string, where the asset will be downloaded.
order_tx_id - transaction ID for the placed order, string and bytes formats are accepted.
Optional parameters
service - optionally if you want to provide the Service object through you downloaded the asset.
index - optionally if you want to download certain files, not the whole asset, you can specify how many files you want to download as positive integer format.
@enforce_types
def download_asset(
self,
ddo: DDO,
consumer_wallet,
destination: str,
order_tx_id: Union[str, bytes],
service: Optional[Service] = None,
index: Optional[int] = None,
userdata: Optional[dict] = None,
) -> str:
service = service or ddo.services[0] # fill in good default
if index is not None:
assert isinstance(index, int), logger.error("index has to be an integer.")
assert index >= 0, logger.error("index has to be 0 or a positive integer.")
assert (
service and service.type == ServiceTypes.ASSET_ACCESS
), f"Service with type {ServiceTypes.ASSET_ACCESS} is not found."
path: str = download_asset_files(
ddo, service, consumer_wallet, destination, order_tx_id, index, userdata
)
return path
Pays for compute service by calling initializeCompute endpoint from Provider to retrieve the provider fees and starting the order afterwards.
Parameters
datasets - list of ComputeInput objects, each of them includes mandatory the DDO and service.
algorithm_data - which can be either a ComputeInput object which contains the whole DDO and service, either provide just the algorithm metadata as AlgorithmMetadata.
compute_environment - string that represents the ID from the chosen C2D environment.
valid_until - UNIX timestamp which represents until when the algorithm can be used/run.
consume_market_order_fee_address - string address which denotes the consume market fee address for that order and can be the wallet address itself.
wallet - the eth Account which pays for the compute service
Optional parameters
consumer_address - is the string address of the C2D environment consumer.
Returns
tuple
Return value is a tuple composed of list of datasets and algorithm data (if exists in result), (datasets, algorithm_data).