-
Notifications
You must be signed in to change notification settings - Fork 15
feat: Add specialized ApifyRequestQueue clients #573
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Migrate most actor based tests to normal force cloud rq tests (for future parametrization of the Apify clients)
ApifyRequestQueueClientSimple
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
high-level things
- Only one client is consuming the request queue at the time. | ||
- Multiple producers can put requests to the queue, but their forefront requests are not guaranteed to be handled | ||
so quickly as this client does not aggressively fetch the forefront and relies on local head estimation. | ||
- Requests are only added to the queue, never deleted. (Marking as handled is ok.) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Requests are only added to the queue, never deleted. (Marking as handled is ok.)
? I don't get it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well API has delete endpoint. We do not expose it in RQ, but if someone is calling it while we use this client to work on that RQ, then it will have unpredictable behavior.
https://docs.apify.com/api/v2/request-queue-request-delete
It is not a normal use case, but better to be explicit about it
- Multiple producers can put requests to the queue, but their forefront requests are not guaranteed to be handled | ||
so quickly as this client does not aggressively fetch the forefront and relies on local head estimation. | ||
- Requests are only added to the queue, never deleted. (Marking as handled is ok.) | ||
- Other producers can add new requests, but not modify existing ones (otherwise caching can miss the updates) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other producers can add new requests, but not modify existing ones (otherwise caching can miss the updates)
Modify existing ones? What do you mean by that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
API has update endpoint. If someone (other producers) are updating existing requests and this client has already cached the requests locally, then the client will use the outdated request.
https://docs.apify.com/api/v2/request-queue-request-put
It is not a normal use case, but better to be explicit about it
# Reset the Actor class state. | ||
apify._actor.Actor.__wrapped__.__class__._is_any_instance_initialized = False # type: ignore[attr-defined] | ||
apify._actor.Actor.__wrapped__.__class__._is_rebooting = False # type: ignore[attr-defined] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need it, but I saw a warning log in some tests and realized that we do not isolate the tests so well because is_any_instance_initialized
and _is_rebooting
were leaking from the previous tests
This warning could be observed in tests
WARN Repeated Actor initialization detected - this is non-standard usage, proceed with care
ApifyRequestQueueClientSimple
|
||
_DEFAULT_LOCK_TIME: Final[timedelta] = timedelta(minutes=3) | ||
"""The default lock time for requests in the queue.""" | ||
"""Base class for Apify platform implementations of the request queue client.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure that we don't want to do it like this. Using inheritance to share code makes it very hard to understand the difference between the two child classes - I'm speaking from experience with the JS counterpart which does the same and it's confusing as hell.
I'd suggest either having a single class with several if-else blocks, or if that proves too spaghettific, some approach based on composition (strategy pattern?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not the case of hard-to-understand inheritance. These two classes inherit from "intermediate base class" only the identical methods. The reader can then focus on the implementations specific to the specialized class. There is no overriding of the intermediate class, so there should be no confusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no overriding of the intermediate class, so there should be no confusion.
it isn't there now, who knows, what the future will bring... please consider the alternatives I suggested
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, using composition instead
tests/integration/conftest.py
Outdated
|
||
async with Actor: | ||
rq = await Actor.open_request_queue(name=request_queue_name, force_cloud=True) | ||
rq = await RequestQueue.open(storage_client=ApifyStorageClient(access=request.param)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please put the yield
inside a finally
block so that we minimize the chance of leaving a dangling RQ.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please describe the scenario it solves?
(Btw. Now that unnamed storages will be used in tests, the platform will not keep them forever even if they somehow leak the test.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the test throws an error, it will be propagated through that yield
. For that reason, it's better to put cleanup in a finally
block. I agree that in this case it's unlikely that it'd cause a serious issue, but there's no good reason not to add that block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am still not sure I follow. The pytest fixture after the yield will run even if there was an exception in the test. It will not run if there was an exception in the fixture code itself, but if there was an exception during rq creation, then it was not created and there is nothing to clean. Or do I miss something?
cloud_storage_client=ApifyStorageClient(access='shared'), | ||
) | ||
) | ||
async with Actor(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I sure hope that the parentheses aren't required here 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not required. Removed
|
||
|
||
@docs_group('Storage clients') | ||
class ApifyHybridStorageClient(StorageClient): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about the name - after all, this is the default behavior, and "hybrid" does not sound like something that should be a default. Let's try to think of something better. Maybe we could even toot our own horn with something like SmartApifyStorageClient
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, renamed
storage_client=self._get_suitable_storage_client(force_cloud=force_cloud), | ||
) | ||
|
||
@cached_property |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd skip the caching here. IMO the subtle bugs it could introduce are not worth the tiny performance gain
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I was not thinking about the performance. More like, this should never change during runtime. If it changes during runtime, then a lot of assumptions made in the code are no longer valid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. But caching the value only further obscures the problem, if one appears
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, removed
src/apify/_actor.py
Outdated
# The client was manually set to the right type in the service locator. This is the explicit way. | ||
return storage_client | ||
|
||
if isinstance(storage_client, ApifyStorageClient): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if I do service_locator.set_cstorage_client(ApifyStorageClient())
before Actor init, this will force using the filesystem locally? That doesn't sound too desirable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, at this point, it is a guess what the user wants. Only a fully explicit setting of the SmartApifyStorageClient
tells us what the user is really trying to do. We can guess in the other direction or throw an exception and allow only one of the two options: a fully implicit(default) or fully explicit client (SmartApifyStorageClient).
It is kind of an edge case, so I am fine with any of those. If you have a strong preference, I will do it that way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this particular case, I would throw instead of guessing, but include detailed instructions for setting a custom client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, throwing
|
||
_DEFAULT_LOCK_TIME: Final[timedelta] = timedelta(minutes=3) | ||
"""The default lock time for requests in the queue.""" | ||
"""Base class for Apify platform implementations of the request queue client.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no overriding of the intermediate class, so there should be no confusion.
it isn't there now, who knows, what the future will bring... please consider the alternatives I suggested
Co-authored-by: Jan Buchar <[email protected]>
f06eb70
to
da2f5df
Compare
0d16af0
to
1e8a834
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
Approved by Honza and Vlada is on vacation now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like it.
Just one question.
|
||
if request.handled_at is None: | ||
request.handled_at = datetime.now(tz=timezone.utc) | ||
self.metadata.handled_request_count += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The value of pending_request_count
in metadata
is not updated anywhere. Only total_request_count
and handled_request_count
are updated. Is this correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I added tracking of pending requests in local metadata estimation.
Description
ApifyRequestQueueClient
can be created in two access modes -single
,shared
:shared
- current version that supports multiple producers/consumers and locking of requests. More Apify API calls, higher API usage -> more expensive, slower.single
- new constrained client for self-consumer and multiple constrained producers. (Detailed constraints in the docs). Fewer Apify API calls, lower API usage -> cheaper, faster.Most of the
ApifyRequestQueueClient
tests were moved away from actor-based tests, so that they can be parametrized for both variants of theApifyRequestQueueClients
and to make local debugging easier.Usage:
RequestQueue with
shared
:await RequestQueue.open(storage_client=ApifyStorageClient(request_queue_access="shared"))
RequestQueue with default
single
:await RequestQueue.open(storage_client=ApifyStorageClient())
Stats difference:
The full client is doing significantly more API calls and regarding the API usage it is doing 50% more RequestQueue writes and also more RequestQueue reads.
Example rq related stats for crawler started with 1000 requests:
shared
:API calls: 2123
API usage: {'readCount': 1000, 'writeCount': 3000, 'deleteCount': 0, 'headItemReadCount': 0, 'storageBytes': 104035}
single
:API calls: 1059
API usage: {'readCount': 3, 'writeCount': 2000, 'deleteCount': 0, 'headItemReadCount': 14, 'storageBytes': 103826}
Issues