diff --git a/core/src/main/java/com/ibm/common/activitystreams/IO.java b/core/src/main/java/com/ibm/common/activitystreams/IO.java index f3878f5..6d213d7 100755 --- a/core/src/main/java/com/ibm/common/activitystreams/IO.java +++ b/core/src/main/java/com/ibm/common/activitystreams/IO.java @@ -31,7 +31,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import com.google.common.base.Function; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableSet; import com.ibm.common.activitystreams.internal.Adapter; @@ -154,11 +153,6 @@ public final class IO { return this; } - public Builder typeValueResolver(Function resolver) { - inner.typeValueResolver(resolver); - return this; - } - /** * Turn pretty print on or off * @param on boolean diff --git a/core/src/main/java/com/ibm/common/activitystreams/TypeValue.java b/core/src/main/java/com/ibm/common/activitystreams/TypeValue.java index 3908043..a02fd69 100755 --- a/core/src/main/java/com/ibm/common/activitystreams/TypeValue.java +++ b/core/src/main/java/com/ibm/common/activitystreams/TypeValue.java @@ -22,7 +22,6 @@ package com.ibm.common.activitystreams; import java.io.Serializable; - import com.ibm.common.activitystreams.util.AbstractWritable; /** @@ -100,7 +99,7 @@ public interface TypeValue extends AbstractWritable.AbstractWritableBuilder { private String iri; - + /** * Set the url * @param iri String @@ -110,7 +109,7 @@ public interface TypeValue this.iri = iri; return this; } - + public SimpleTypeValue get() { return new SimpleTypeValue(this); } diff --git a/core/src/main/java/com/ibm/common/activitystreams/internal/GsonWrapper.java b/core/src/main/java/com/ibm/common/activitystreams/internal/GsonWrapper.java index c5947ab..0d0c20d 100755 --- a/core/src/main/java/com/ibm/common/activitystreams/internal/GsonWrapper.java +++ b/core/src/main/java/com/ibm/common/activitystreams/internal/GsonWrapper.java @@ -51,7 +51,6 @@ import org.joda.time.ReadableDuration; import org.joda.time.ReadableInterval; import org.joda.time.ReadablePeriod; -import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Supplier; import com.google.common.base.Throwables; @@ -71,7 +70,6 @@ import com.ibm.common.activitystreams.LinkValue; import com.ibm.common.activitystreams.NLV; import com.ibm.common.activitystreams.TypeValue; import com.ibm.common.activitystreams.Writable; -import com.ibm.common.activitystreams.util.TypeValueResolver; /** * @author james @@ -99,13 +97,6 @@ public final class GsonWrapper { private Schema schema = null; // default private ImmutableList.Builder> adapters = ImmutableList.builder(); - private Function typeValueResolver = - TypeValueResolver.DEFAULT_INSTANCE; - - public Builder typeValueResolver(Function resolver) { - this.typeValueResolver = resolver; - return this; - } /** * Method charset. @@ -250,7 +241,7 @@ public final class GsonWrapper { Schema schema, ASObjectAdapter base) { return new GsonBuilder() - .registerTypeHierarchyAdapter(TypeValue.class, new TypeValueAdapter(schema, builder.typeValueResolver)) + .registerTypeHierarchyAdapter(TypeValue.class, new TypeValueAdapter(schema)) .registerTypeHierarchyAdapter(LinkValue.class, new LinkValueAdapter(schema)) .registerTypeHierarchyAdapter(NLV.class, NLV) .registerTypeHierarchyAdapter(Iterable.class, ITERABLE) diff --git a/core/src/main/java/com/ibm/common/activitystreams/internal/TypeValueAdapter.java b/core/src/main/java/com/ibm/common/activitystreams/internal/TypeValueAdapter.java index 12424ff..b09bad0 100755 --- a/core/src/main/java/com/ibm/common/activitystreams/internal/TypeValueAdapter.java +++ b/core/src/main/java/com/ibm/common/activitystreams/internal/TypeValueAdapter.java @@ -26,7 +26,6 @@ import static com.ibm.common.activitystreams.Makers.type; import java.lang.reflect.Type; -import com.google.common.base.Function; import com.google.gson.JsonDeserializationContext; import com.google.gson.JsonElement; import com.google.gson.JsonObject; @@ -45,17 +44,14 @@ final class TypeValueAdapter extends Adapter { private final Schema schema; - private final Function resolver; /** * Constructor for TypeValueAdapter. * @param schema Schema */ public TypeValueAdapter( - Schema schema, - Function resolver) { + Schema schema) { this.schema = schema; - this.resolver = resolver; } /** @@ -97,7 +93,7 @@ final class TypeValueAdapter JsonPrimitive prim = el.getAsJsonPrimitive(); checkArgument(prim.isString()); - return resolver.apply(type(prim.getAsString())); + return type(prim.getAsString()); } else { JsonObject obj = el.getAsJsonObject(); if (obj.has("objectType")) { @@ -107,17 +103,17 @@ final class TypeValueAdapter TypeValue.class); Model pMap = schema.forObjectType(tv.id()); - return resolver.apply( + return context.deserialize( el, pMap.type() != null ? pMap.type() : - ASObject.class)); + ASObject.class); } else { - return resolver.apply( + return context.deserialize( el, - ASObject.class)); + ASObject.class); } } } diff --git a/core/src/main/java/com/ibm/common/activitystreams/util/TypeValueResolver.java b/core/src/main/java/com/ibm/common/activitystreams/util/TypeValueResolver.java deleted file mode 100644 index 8a32726..0000000 --- a/core/src/main/java/com/ibm/common/activitystreams/util/TypeValueResolver.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Copyright 2013 OpenSocial Foundation - * Copyright 2013 International Business Machines Corporation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Utility library for working with Activity Streams Actions - * Requires underscorejs. - * - * @author James M Snell (jasnell@us.ibm.com) - */ -package com.ibm.common.activitystreams.util; - -import com.google.common.base.Function; -import com.ibm.common.activitystreams.TypeValue; - -/** - * A TypeValue resolver is used to optionally replace TypeValue - * instances. Typically, this would be used to exchange simple - * string TypeValue's with their object equivalents (if one is - * available). - * - * The replacement can be performed during parsing by setting a - * TypeValueResolver on the IO.Builder. This should be done - * carefully, however, as the resolver could negatively impact - * parsing performance depending on how it is implemented. - * - * @author james - */ -public interface TypeValueResolver - extends Function { - - public static final TypeValueResolver DEFAULT_INSTANCE = - new DefaultTypeValueResolver(); - - public static final class DefaultTypeValueResolver - implements TypeValueResolver { - public TypeValue apply(TypeValue tv) { - return tv; - } - } -} diff --git a/pom.xml b/pom.xml index 452cf6a..e8b7f52 100644 --- a/pom.xml +++ b/pom.xml @@ -48,6 +48,7 @@ geo assembly legacy + typext diff --git a/typext/EXPERIMENTAL b/typext/EXPERIMENTAL new file mode 100644 index 0000000..f6b95c5 --- /dev/null +++ b/typext/EXPERIMENTAL @@ -0,0 +1 @@ +This module is currently experimental \ No newline at end of file diff --git a/typext/pom.xml b/typext/pom.xml new file mode 100644 index 0000000..80fdf66 --- /dev/null +++ b/typext/pom.xml @@ -0,0 +1,153 @@ + + 4.0.0 + + com.ibm.common + activitystreams + 0.0.1-SNAPSHOT + + activitystreams-ext + Activity Streams 2.0 - Type Extension Support + + + + Apache License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + + UTF-8 + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 2.9.1 + + UTF-8 + UTF-8 + UTF-8 + -XDignore.symbol.file + public + + http://www.joda.org/joda-time/apidocs + http://docs.guava-libraries.googlecode.com/git-history/v16.0.1/javadoc/ + + + + + + maven-compiler-plugin + 2.3.2 + + 1.7 + 1.7 + + + + + maven-jar-plugin + 2.3.1 + + + ${project.build.outputDirectory}/META-INF/MANIFEST.MF + + + + + + org.apache.felix + maven-bundle-plugin + 2.3.7 + true + + + bundle-manifest + process-classes + + manifest + + + + + + + com.ibm.common.activitystreams.ext.*, + com.ibm.common.activitystreams.registry.* + + + com.ibm.common.activitystreams.*, + com.google.gson.*, + com.google.common.*, + org.joda.time.*, + org.apache.http.*, + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + 2.2.2 + + + assembly.xml + + + + + + + + + + org.eclipse.m2e + lifecycle-mapping + 1.0.0 + + + + + + org.apache.felix + maven-bundle-plugin + [2.3.7,) + + manifest + + + + + + + + + + + + + + + + + com.ibm.common + activitystreams-core + 0.0.1-SNAPSHOT + + + org.apache.httpcomponents + httpclient-cache + 4.3.3 + + + org.apache.httpcomponents + httpcomponents-client + 4.3.3 + pom + + + \ No newline at end of file diff --git a/typext/src/main/java/com/ibm/common/activitystreams/ext/ExtAdapter.java b/typext/src/main/java/com/ibm/common/activitystreams/ext/ExtAdapter.java new file mode 100644 index 0000000..8dd5b80 --- /dev/null +++ b/typext/src/main/java/com/ibm/common/activitystreams/ext/ExtAdapter.java @@ -0,0 +1,74 @@ +/** + * Copyright 2013 OpenSocial Foundation + * Copyright 2013 International Business Machines Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Utility library for working with Activity Streams Actions + * Requires underscorejs. + * + * @author James M Snell (jasnell@us.ibm.com) + */ +package com.ibm.common.activitystreams.ext; + +import java.lang.reflect.Type; + +import com.google.common.collect.ImmutableSet; +import com.ibm.common.activitystreams.ASObject.AbstractBuilder; +import com.ibm.common.activitystreams.internal.ASObjectAdapter; +import com.ibm.common.activitystreams.internal.Model; +import com.ibm.common.activitystreams.internal.Schema; + +public class ExtAdapter + extends ASObjectAdapter { + + protected ExtAdapter(Schema schema) { + super(schema); + } + + private static final ImmutableSet knownTypes = + ImmutableSet.of(Verb.class,ObjectType.class); + + @Override + protected boolean knowsType(Type type) { + if (super.knowsType(type)) + return true; + return knownTypes.contains(type); + } + + @Override + protected AbstractBuilder builderFor(Type type) { + if (super.knowsType(type)) + return super.builderFor(type); + if (type == Verb.class) { + return ExtMakers.verb(); + } else if (type == ObjectType.class) { + return ExtMakers.objectType(); + } else return null; + } + + @Override + protected Model modelFor(Type type) { + if (super.knowsType(type)) + return super.modelFor(type); + if (type == Verb.class) { + return schema().forObjectClassOrType( + Verb.Builder.class, + "verb"); + } else if (type == ObjectType.class) { + return schema().forObjectClassOrType( + ObjectType.Builder.class, + "objectType"); + } else return null; + } +} diff --git a/typext/src/main/java/com/ibm/common/activitystreams/ext/ExtMakers.java b/typext/src/main/java/com/ibm/common/activitystreams/ext/ExtMakers.java new file mode 100644 index 0000000..791355d --- /dev/null +++ b/typext/src/main/java/com/ibm/common/activitystreams/ext/ExtMakers.java @@ -0,0 +1,23 @@ +package com.ibm.common.activitystreams.ext; + +public final class ExtMakers { + + private ExtMakers() {} + + public static Verb.Builder verb() { + return new Verb.Builder(); + } + + public static Verb.Builder verb(String id) { + return verb().id(id); + } + + public static ObjectType.Builder objectType() { + return new ObjectType.Builder(); + } + + public static ObjectType.Builder objectType(String id) { + return objectType().id(id); + } + +} diff --git a/typext/src/main/java/com/ibm/common/activitystreams/ext/ExtModule.java b/typext/src/main/java/com/ibm/common/activitystreams/ext/ExtModule.java new file mode 100644 index 0000000..c2ec2a9 --- /dev/null +++ b/typext/src/main/java/com/ibm/common/activitystreams/ext/ExtModule.java @@ -0,0 +1,36 @@ +package com.ibm.common.activitystreams.ext; + +import com.ibm.common.activitystreams.IO; +import com.ibm.common.activitystreams.internal.Model; +import com.ibm.common.activitystreams.internal.Schema; +import com.ibm.common.activitystreams.internal.Schema.Builder; +import com.ibm.common.activitystreams.util.Module; + +public class ExtModule implements Module { + + public static Module instance = new ExtModule(); + + public static final Model verb = + Schema.object.template() + .type(Verb.class, Verb.Builder.class) + .get(); + + public static final Model objectType = + Schema.object.template() + .type(ObjectType.class, ObjectType.Builder.class) + .get(); + + public void apply(Builder builder) { + builder.map("verb", verb); + builder.map("objectType", objectType); + } + + public void apply( + IO.Builder builder, + Schema schema) { + ExtAdapter base = new ExtAdapter(schema); + builder.adapter(Verb.class, base); + builder.adapter(ObjectType.class, base); + } + +} diff --git a/typext/src/main/java/com/ibm/common/activitystreams/ext/ObjectType.java b/typext/src/main/java/com/ibm/common/activitystreams/ext/ObjectType.java new file mode 100644 index 0000000..c877ad2 --- /dev/null +++ b/typext/src/main/java/com/ibm/common/activitystreams/ext/ObjectType.java @@ -0,0 +1,24 @@ +package com.ibm.common.activitystreams.ext; + +import com.ibm.common.activitystreams.ASObject; + +public final class ObjectType + extends ASObject { + + public static final class Builder + extends ASObject.AbstractBuilder { + + Builder() { + objectType("objectType"); + } + + public ObjectType get() { + return new ObjectType(this); + } + + } + + private ObjectType(Builder builder) { + super(builder); + } +} diff --git a/typext/src/main/java/com/ibm/common/activitystreams/ext/Test.java b/typext/src/main/java/com/ibm/common/activitystreams/ext/Test.java new file mode 100644 index 0000000..ebfc72a --- /dev/null +++ b/typext/src/main/java/com/ibm/common/activitystreams/ext/Test.java @@ -0,0 +1,23 @@ +package com.ibm.common.activitystreams.ext; + +import java.util.concurrent.Future; + +import static com.ibm.common.activitystreams.Makers.type; +import com.ibm.common.activitystreams.TypeValue; +import com.ibm.common.activitystreams.registry.TypeValueRegistry; + +public class Test { + + public static void main(String... args) throws Exception { + + TypeValueRegistry reg = + TypeValueRegistry + .makeDefaultSilent(); + + Future tv = + reg.resolve(type("urn:example:foo")); + System.out.println(tv.get().valueType()); + + } + +} diff --git a/typext/src/main/java/com/ibm/common/activitystreams/ext/Verb.java b/typext/src/main/java/com/ibm/common/activitystreams/ext/Verb.java new file mode 100644 index 0000000..f0262b0 --- /dev/null +++ b/typext/src/main/java/com/ibm/common/activitystreams/ext/Verb.java @@ -0,0 +1,25 @@ +package com.ibm.common.activitystreams.ext; + +import com.ibm.common.activitystreams.ASObject; + +public final class Verb + extends ASObject { + + public static final class Builder + extends ASObject.AbstractBuilder { + + Builder() { + objectType("verb"); + } + + public Verb get() { + return new Verb(this); + } + + } + + private Verb(Builder builder) { + super(builder); + } + +} diff --git a/typext/src/main/java/com/ibm/common/activitystreams/registry/CachingResolutionStrategy.java b/typext/src/main/java/com/ibm/common/activitystreams/registry/CachingResolutionStrategy.java new file mode 100644 index 0000000..be4d7bf --- /dev/null +++ b/typext/src/main/java/com/ibm/common/activitystreams/registry/CachingResolutionStrategy.java @@ -0,0 +1,119 @@ +package com.ibm.common.activitystreams.registry; + +import static com.google.common.base.Throwables.propagate; + +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Supplier; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.ibm.common.activitystreams.Makers; +import com.ibm.common.activitystreams.TypeValue; +import com.ibm.common.activitystreams.ValueType; + +public abstract class CachingResolutionStrategy + implements ResolutionStrategy { + + @SuppressWarnings("unchecked") + public static abstract class AbstractBuilder + > + implements Supplier { + + private boolean silentfail = false; + private final CacheBuilder cache = + CacheBuilder.newBuilder() + .expireAfterAccess(10, TimeUnit.MINUTES) + .expireAfterWrite(10, TimeUnit.MINUTES) + .maximumSize(100) + .initialCapacity(50); + + public B silentfail() { + this.silentfail = true; + return (B)this; + } + + public B customizeCache(Receiver> receiver) { + if (receiver != null) + receiver.receive(cache); + return (B)this; + } + + } + + private final LoadingCache cache; + private final boolean silentfail; + + protected LoadingCache cache() { + return cache; + } + + CachingResolutionStrategy(AbstractBuilder builder) { + this.cache = initCache(builder); + this.silentfail = builder.silentfail; + } + + protected boolean silentfail() { + return silentfail; + } + + private LoadingCache initCache(AbstractBuilder builder) { + return builder.cache.build(loader()); + } + + public Callable resolverFor(TypeValue tv) { + return new Resolver(tv); + } + + protected abstract CacheLoader loader(); + + public final class Resolver + implements Callable { + + private final TypeValue input; + + Resolver(TypeValue input) { + this.input = input; + } + + public TypeValue call() throws Exception { + try { + if (input == null) return null; + switch(input.valueType()) { + case OBJECT: + return input; + case SIMPLE: + return cache.get(input); + default: + throw new IllegalArgumentException(); + } + } catch (Throwable t) { + if (silentfail()) + return input; + else throw propagate(t); + } + } + + } + + public Receiver preloader() { + return new CachePreloader(cache()); + } + + private static final class CachePreloader + implements Receiver { + + private final LoadingCache cache; + + CachePreloader(LoadingCache cache) { + this.cache = cache; + } + + public void receive(TypeValue t) { + if (t.valueType() == ValueType.OBJECT && t.id() != null) + cache.put(Makers.type(t.id()),t); + } + + } +} diff --git a/typext/src/main/java/com/ibm/common/activitystreams/registry/ClasspathPreloader.java b/typext/src/main/java/com/ibm/common/activitystreams/registry/ClasspathPreloader.java new file mode 100644 index 0000000..13776e8 --- /dev/null +++ b/typext/src/main/java/com/ibm/common/activitystreams/registry/ClasspathPreloader.java @@ -0,0 +1,111 @@ +package com.ibm.common.activitystreams.registry; + +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.net.URL; +import java.util.Enumeration; + +import com.google.common.base.Charsets; +import com.google.common.base.Function; +import com.google.common.base.Supplier; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.hash.BloomFilter; +import com.google.common.hash.Funnels; +import com.ibm.common.activitystreams.ASObject; +import com.ibm.common.activitystreams.Collection; +import com.ibm.common.activitystreams.IO; +import com.ibm.common.activitystreams.TypeValue; + +public final class ClasspathPreloader + implements PreloadStrategy { + + public static final class Builder + implements Supplier { + + private ClassLoader loader = + Thread.currentThread().getContextClassLoader(); + private boolean avoidDuplicates = false; + + public Builder avoidDuplicates() { + this.avoidDuplicates = true; + return this; + } + + public Builder classLoader(ClassLoader loader) { + this.loader = loader != null ? + loader : Thread.currentThread().getContextClassLoader(); + return this; + } + + public ClasspathPreloader get() { + return new ClasspathPreloader(this); + } + + } + + private final ClassLoader loader; + private final boolean avoidDuplicates; + + private ClasspathPreloader(Builder builder) { + this.loader = builder.loader; + this.avoidDuplicates = builder.avoidDuplicates; + } + + public void load(IO io, Receiver receiver) { + + final BloomFilter filter = + avoidDuplicates ? + BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), 1000) : null; + + try { + for (InputStream in : streams.apply(loader.getResources("typeValues.bin"))) { + try { + ObjectInputStream oin = new ObjectInputStream(in); + Collection col = (Collection) oin.readObject(); + load(col, receiver, filter); + } catch (Throwable t) {} + } + + for (InputStream in : streams.apply(loader.getResources("typeValues.json"))) { + try { + load(io.readAsCollection(in), receiver, filter); + } catch (Throwable t) {} + } + + } catch (Throwable t) { + throw Throwables.propagate(t); + } + + } + + private void load( + Collection col, + Receiver receiver, + BloomFilter filter) { + if (col != null && receiver != null) + for (ASObject obj : col.items()) + if (obj.id() != null && (filter == null || filter.put(obj.id()))) { + try { + receiver.receive(obj); + } catch (Throwable t) {} + } + } + + private static Function,Iterable> streams = + new Function,Iterable> () { + public Iterable apply(Enumeration input) { + ImmutableList.Builder list = + ImmutableList.builder(); + while(input.hasMoreElements()) { + try { + list.add(input.nextElement().openStream()); + } catch (Throwable t) {} + } + return list.build(); + } + }; + + public static final PreloadStrategy instance = + new Builder().get(); +} diff --git a/typext/src/main/java/com/ibm/common/activitystreams/registry/DefaultResolutionStrategy.java b/typext/src/main/java/com/ibm/common/activitystreams/registry/DefaultResolutionStrategy.java new file mode 100644 index 0000000..77ffab0 --- /dev/null +++ b/typext/src/main/java/com/ibm/common/activitystreams/registry/DefaultResolutionStrategy.java @@ -0,0 +1,201 @@ +package com.ibm.common.activitystreams.registry; + +import static com.google.common.util.concurrent.MoreExecutors.platformThreadFactory; +import static com.google.common.base.Throwables.propagate; + +import java.util.Set; +import java.util.concurrent.Callable; + +import com.google.common.base.Objects; +import com.google.common.cache.CacheLoader; +import com.google.common.collect.ImmutableSet; +import com.ibm.common.activitystreams.ASObject; +import com.ibm.common.activitystreams.Collection; +import com.ibm.common.activitystreams.Makers; +import com.ibm.common.activitystreams.TypeValue; + +public final class DefaultResolutionStrategy + extends CachingResolutionStrategy { + + public static Builder make() { + return new Builder(); + } + + public static DefaultResolutionStrategy makeDefault() { + return make().get(); + } + + public static final class Builder + extends CachingResolutionStrategy.AbstractBuilder { + + private boolean proactive = false; + private final HttpFetch.Builder fetcherBuilder = + new HttpFetch.Builder(); + private final ImmutableSet.Builder proactiveTypes = + ImmutableSet.builder() + .add("verb") + .add("objectType"); + + public Builder customizeFetcher(Receiver receiver) { + if (receiver != null) + receiver.receive(fetcherBuilder); + return this; + } + + /** + * Tells the loader to proactively cache additional typevalue + * identifiers that happen to be discovered when attempting to + * resolve a given typevalue. + * @return + */ + public Builder proactiveCaching() { + this.proactive = true; + return this; + } + + /** + * Specifies additional objectType identifiers to watch for when + * proactiveCaching is enabled. + * @param typeValueId + * @return + */ + public Builder typeValueObjectType(String typeValueId) { + proactiveTypes.add(typeValueId); + return this; + } + + public DefaultResolutionStrategy get() { + return new DefaultResolutionStrategy(this); + } + + } + + private final boolean proactiveCaching; + private final ImmutableSet proactiveTypes; + private final HttpFetch fetcher; + + private DefaultResolutionStrategy(Builder builder) { + super(builder); + this.proactiveCaching = builder.proactive; + this.proactiveTypes = builder.proactiveTypes.build(); + this.fetcher = initFetcher(builder); + ensureAlwaysShutdown(this); + } + + private HttpFetch initFetcher(Builder builder) { + return builder.fetcherBuilder.get(); + } + + @Override + protected CacheLoader loader() { + return new DefaultCacheLoader(); + } + + private final class DefaultCacheLoader + extends CacheLoader { + + @Override + public TypeValue load(TypeValue key) throws Exception { + try { + if (key == null) + throw new IllegalArgumentException(); + switch(key.valueType()) { + case OBJECT: + return key; // type is already resolved + case SIMPLE: + String id = key.id(); + ASObject obj = fetcher.fetch(id); // attempt to fetch an object + ImmutableSet.Builder additional = + ImmutableSet.builder(); + if (obj instanceof Collection) { + Collection col = (Collection) obj; + ASObject matching = + processItems( + col.items(), + id, + proactiveCaching, + proactiveTypes, + additional); + if (matching != null) + return matching; + } else if (obj.has("items")) { + Iterable items = + obj.>get("items"); + ASObject matching = + processItems( + items, + id, + proactiveCaching, + proactiveTypes, + additional); + if (matching != null) + return matching; + } else if (Objects.equal(id, obj.id())) { + return obj; + } + default: + break; + } + } catch (Throwable t) { + if (silentfail()) + return key; + else propagate(t); + } + throw new UncacheableResponseException(); + } + } + + private ASObject processItems( + Iterable items, + String lookingFor, + boolean proactive, + Set proactiveTypes, + ImmutableSet.Builder additional) { + ASObject matching = null; + for (final ASObject obj : items) { + if (Objects.equal(lookingFor, obj.id())) { + matching = obj; + if (!proactive) break; + } else if (proactive) { + TypeValue objectType = obj.objectType(); + String id = obj.id(); + if (objectType != null && id != null && !Objects.equal(lookingFor,id)) { + String otid = objectType.id(); + if (proactiveTypes.contains(otid)) { + try { + cache().get( + Makers.type(id), + new Callable() { + public TypeValue call() throws Exception { + return obj; + } + }); + } catch (Throwable t) {} + } + } + } + } + return matching; + } + + private static void ensureAlwaysShutdown( + final ResolutionStrategy strategy) { + Thread shutdownThread = + platformThreadFactory() + .newThread(new Runnable() { + public void run() { + try { + strategy.shutdown(); + } catch (Throwable t) {} + } + }); + Runtime.getRuntime() + .addShutdownHook(shutdownThread); + } + + public void shutdown() { + try { + fetcher.shutdown(); + } catch (Throwable t) {} + } +} diff --git a/typext/src/main/java/com/ibm/common/activitystreams/registry/HttpFetch.java b/typext/src/main/java/com/ibm/common/activitystreams/registry/HttpFetch.java new file mode 100644 index 0000000..8243956 --- /dev/null +++ b/typext/src/main/java/com/ibm/common/activitystreams/registry/HttpFetch.java @@ -0,0 +1,239 @@ +package com.ibm.common.activitystreams.registry; + +import static com.google.common.base.Throwables.propagate; + +import java.io.InputStream; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHost; +import org.apache.http.HttpResponse; +import org.apache.http.StatusLine; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.config.ConnectionConfig; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.config.SocketConfig; +import org.apache.http.conn.HttpClientConnectionManager; +import org.apache.http.conn.socket.ConnectionSocketFactory; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.message.BasicHeader; +import org.apache.http.protocol.HttpContext; + +import com.google.common.base.Optional; +import com.google.common.base.Supplier; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.ibm.common.activitystreams.ASObject; +import com.ibm.common.activitystreams.IO; +import com.ibm.common.activitystreams.ext.ExtModule; + +public final class HttpFetch + extends CacheLoader { + + static final class Builder + implements Supplier { + + private final RegistryBuilder csfr = + RegistryBuilder.create(); + private ConnectionConfig defaultConnectionConfig; + private SocketConfig defaultSocketConfig; + private final ImmutableMap.Builder connectionConfigs = + ImmutableMap.builder(); + private final ImmutableMap.Builder socketConfigs = + ImmutableMap.builder(); + private int maxConnectionsPerRoute = 2; + private int maxConnections = 20; + private final ImmutableSet.Builder
defaultHeaders = + ImmutableSet.builder(); + private String userAgent = null; + private IO io = null; + private HttpClientBuilder builder = + HttpClients.custom(); + + private final CacheBuilder cache = + CacheBuilder.newBuilder() + .expireAfterAccess(10, TimeUnit.MINUTES) + .expireAfterWrite(10, TimeUnit.MINUTES) + .maximumSize(50) + .initialCapacity(50); + private HttpClientConnectionManager manager; + + public Builder customizeCache( + Receiver> receiver) { + if (receiver != null) + receiver.receive(cache); + return this; + } + + public Builder customizeClientBuilder( + Receiver receiver) { + receiver.receive(builder); + return this; + } + + public Builder io(IO io) { + this.io = io; + return this; + } + + public Builder useragent(String ua) { + this.userAgent = ua; + return this; + } + + public Builder defaultHeader(String name, String value) { + defaultHeaders.add(new BasicHeader(name,value)); + return this; + } + + public Builder maxConnectionsPerRoute(int max) { + this.maxConnectionsPerRoute = max; + return this; + } + + public Builder maxConnections(int max) { + this.maxConnections = max; + return this; + } + + public Builder connectionConfig(ConnectionConfig config) { + defaultConnectionConfig = config; + return this; + } + + public Builder connectionConfig(HttpHost host, ConnectionConfig config) { + connectionConfigs.put(host,config); + return this; + } + + public Builder socketConfig(SocketConfig config) { + defaultSocketConfig = config; + return this; + } + + public Builder socketConfig(HttpHost host, SocketConfig config) { + socketConfigs.put(host, config); + return this; + } + + public Builder registerConnectionSocketFactory( + String id, + ConnectionSocketFactory csf) { + csfr.register(id, csf); + return this; + } + + public Builder manager(HttpClientConnectionManager manager) { + this.manager = manager; + return this; + } + + public HttpFetch get() { + return new HttpFetch(this); + } + + } + + private final LoadingCache cache; + private final HttpClientConnectionManager manager; + private final IO io; + private final CloseableHttpClient client; + + HttpFetch(Builder builder) { + this.cache = initCache(builder); + this.manager = initManager(builder); + this.io = initIO(builder); + this.client = initClient(builder); + } + + public void shutdown() { + try { + manager.shutdown(); + } catch (Throwable t) {} + } + + private IO initIO(Builder builder) { + if (builder.io != null) + return builder.io; + return IO.makeDefault(ExtModule.instance); + } + + private CloseableHttpClient initClient(Builder builder) { + HttpClientBuilder b = builder.builder; + b.setConnectionManager(manager); + ImmutableSet
headers = + builder.defaultHeaders.build(); + if (!headers.isEmpty()) + b.setDefaultHeaders(headers); + if (builder.userAgent != null) + b.setUserAgent(builder.userAgent); + return b.build(); + } + + private HttpClientConnectionManager initManager(Builder builder) { + if (builder.manager != null) + return builder.manager; + PoolingHttpClientConnectionManager pm = + new PoolingHttpClientConnectionManager(builder.csfr.build()); + for (Map.Entry entry : builder.connectionConfigs.build().entrySet()) + pm.setConnectionConfig(entry.getKey(), entry.getValue()); + if (builder.defaultConnectionConfig != null) + pm.setDefaultConnectionConfig(builder.defaultConnectionConfig); + for (Map.Entry entry : builder.socketConfigs.build().entrySet()) + pm.setSocketConfig(entry.getKey(), entry.getValue()); + if (builder.defaultSocketConfig != null) + pm.setDefaultSocketConfig(builder.defaultSocketConfig); + pm.setDefaultMaxPerRoute(builder.maxConnectionsPerRoute); + pm.setMaxTotal(builder.maxConnections); + return pm; + } + + private LoadingCache initCache(Builder builder) { + return builder.cache.build(this); + } + + public ASObject fetch(String uri) { + try { + return cache.get(uri); + } catch (Throwable t) { + throw propagate(t); + } + } + + @Override + public ASObject load(String key) throws Exception { + HttpContext context = new HttpClientContext(); + HttpGet get = new HttpGet(key); + HttpResponse resp = client.execute(get, context); + StatusLine status = resp.getStatusLine(); + int code = status.getStatusCode(); + if (code >= 200 && code < 300) { + HttpEntity entity = resp.getEntity(); + if (entity != null) { + // attempt parse + Optional parsed = + parse(entity.getContent()); + if (parsed.isPresent()) + return parsed.get(); + } + } + throw new UncacheableResponseException(); + } + + private Optional parse(InputStream in) { + try { + return Optional.of(io.read(in)); + } catch (Throwable t) { + return Optional.absent(); + } + } +} diff --git a/typext/src/main/java/com/ibm/common/activitystreams/registry/NonOpResolutionStrategy.java b/typext/src/main/java/com/ibm/common/activitystreams/registry/NonOpResolutionStrategy.java new file mode 100644 index 0000000..d911b72 --- /dev/null +++ b/typext/src/main/java/com/ibm/common/activitystreams/registry/NonOpResolutionStrategy.java @@ -0,0 +1,25 @@ +package com.ibm.common.activitystreams.registry; + +import java.util.concurrent.Callable; + +import com.ibm.common.activitystreams.TypeValue; + +class NonOpResolutionStrategy + implements ResolutionStrategy { + + public Callable resolverFor(final TypeValue tv) { + return new Callable() { + public TypeValue call() throws Exception { + return tv; + } + }; + } + + public Receiver preloader() { + return new Receiver() { + public void receive(TypeValue t) {} + }; + } + + public void shutdown() {} +} diff --git a/typext/src/main/java/com/ibm/common/activitystreams/registry/PreloadStrategy.java b/typext/src/main/java/com/ibm/common/activitystreams/registry/PreloadStrategy.java new file mode 100644 index 0000000..b2254eb --- /dev/null +++ b/typext/src/main/java/com/ibm/common/activitystreams/registry/PreloadStrategy.java @@ -0,0 +1,10 @@ +package com.ibm.common.activitystreams.registry; + +import com.ibm.common.activitystreams.IO; +import com.ibm.common.activitystreams.TypeValue; + +public interface PreloadStrategy { + + void load(IO io, Receiver receiver); + +} diff --git a/typext/src/main/java/com/ibm/common/activitystreams/registry/Receiver.java b/typext/src/main/java/com/ibm/common/activitystreams/registry/Receiver.java new file mode 100644 index 0000000..af61a02 --- /dev/null +++ b/typext/src/main/java/com/ibm/common/activitystreams/registry/Receiver.java @@ -0,0 +1,7 @@ +package com.ibm.common.activitystreams.registry; + +public interface Receiver { + + void receive(T t); + +} diff --git a/typext/src/main/java/com/ibm/common/activitystreams/registry/ResolutionStrategy.java b/typext/src/main/java/com/ibm/common/activitystreams/registry/ResolutionStrategy.java new file mode 100644 index 0000000..c90556d --- /dev/null +++ b/typext/src/main/java/com/ibm/common/activitystreams/registry/ResolutionStrategy.java @@ -0,0 +1,24 @@ +package com.ibm.common.activitystreams.registry; + +import java.util.concurrent.Callable; + +import com.ibm.common.activitystreams.TypeValue; + +/** + * Implements a strategy for resolving TypeValue identifiers. + */ +public interface ResolutionStrategy { + + Receiver preloader(); + + /** + * Returns a Callable that implements TypeValue resolution + * for the given TypeValue. + */ + Callable resolverFor(TypeValue tv); + + void shutdown(); + + public static final ResolutionStrategy nonop = + new NonOpResolutionStrategy(); +} diff --git a/typext/src/main/java/com/ibm/common/activitystreams/registry/TypeValueRegistry.java b/typext/src/main/java/com/ibm/common/activitystreams/registry/TypeValueRegistry.java new file mode 100644 index 0000000..7ab98f9 --- /dev/null +++ b/typext/src/main/java/com/ibm/common/activitystreams/registry/TypeValueRegistry.java @@ -0,0 +1,234 @@ +package com.ibm.common.activitystreams.registry; + +import static com.google.common.base.Throwables.propagate; +import static com.google.common.util.concurrent.Futures.addCallback; +import static com.google.common.util.concurrent.MoreExecutors.getExitingExecutorService; +import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; +import static java.util.concurrent.Executors.newFixedThreadPool; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.google.common.base.Function; +import com.google.common.base.Supplier; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.Monitor; +import com.ibm.common.activitystreams.IO; +import com.ibm.common.activitystreams.TypeValue; +import com.ibm.common.activitystreams.ext.ExtModule; + +/** + * Maintains a registry of resolved TypeValues. If a given TypeValue + * is not currently known, an attempt will be made to resolve the + * value based on the ResolutionStrategy. + */ +public final class TypeValueRegistry + implements Function> { + + public static Builder make () { + return new Builder(); + } + + public static TypeValueRegistry makeDefault() { + return make().get(); + } + + public static TypeValueRegistry makeDefaultSilent() { + return make() + .resolver(DefaultResolutionStrategy.make().silentfail().get()) + .get(); + } + + public static TypeValueRegistry makeDefault(final IO io) { + return make() + .io(io) + .resolver( + DefaultResolutionStrategy + .make() + .customizeFetcher( + new Receiver() { + public void receive(HttpFetch.Builder builder) { + builder.io(io); + } + }) + .get()) + .get(); + } + + public static TypeValueRegistry makeDefaultSilent(final IO io) { + return make() + .io(io) + .resolver( + DefaultResolutionStrategy.make() + .silentfail() + .customizeFetcher( + new Receiver() { + public void receive(HttpFetch.Builder builder) { + builder.io(io); + }}) + .get()) + .get(); + } + + public static enum Status { + LOADING, + READY, + ERROR + } + + public static final class Builder + implements Supplier { + + private ExecutorService executor; + private PreloadStrategy preloader = + ClasspathPreloader.instance; + private ResolutionStrategy strategy = + DefaultResolutionStrategy.makeDefault(); + private IO io; + + public Builder io(IO io) { + this.io = io; + return this; + } + + public Builder executor(ExecutorService executor) { + this.executor = executor; + return this; + } + + public Builder preloader(PreloadStrategy strategy) { + this.preloader = strategy != null ? + strategy : ClasspathPreloader.instance; + return this; + } + + public Builder resolver(ResolutionStrategy strategy) { + this.strategy = strategy != null ? + strategy : ResolutionStrategy.nonop; + return this; + } + + public TypeValueRegistry get() { + return new TypeValueRegistry(this); + } + + } + + private final ResolutionStrategy strategy; + private final ListeningExecutorService executor; + private Status readyStatus = Status.LOADING; + private Throwable loadError = null; + private final ListenableFuture loader; + private final IO io; + + private final Monitor monitor = new Monitor(); + private final Monitor.Guard ready = + new Monitor.Guard(monitor) { + @Override + public boolean isSatisfied() { + return readyStatus != Status.LOADING; + } + }; + + private TypeValueRegistry(Builder builder) { + this.strategy = builder.strategy; + this.io = initIO(builder); + this.executor = initExecutor(builder); + this.loader = preload(builder); + } + + private IO initIO(Builder builder) { + if (builder.io != null) + return builder.io; + return IO.makeDefault(ExtModule.instance); + } + + private ListenableFuture preload(Builder builder) { + final PreloadStrategy strategy = builder.preloader; + final Receiver receiver = this.strategy.preloader(); + ListenableFuture future = + executor.submit(new Runnable() { + public void run() { + strategy.load(io,receiver); + } + }); + addCallback( + future, + new FutureCallback() { + public void onSuccess(Object result) { + readyStatus = Status.READY; + } + public void onFailure(Throwable t) { + readyStatus = Status.ERROR; + loadError = t; + } + }); + return future; + } + + public Status readyStatus() { + return readyStatus; + } + + public Throwable loadError() { + return loadError; + } + + public void waitForPreloader() + throws InterruptedException, + ExecutionException { + loader.get(); + } + + public void waitForPreloader(long duration, TimeUnit unit) + throws InterruptedException, + ExecutionException, + TimeoutException { + loader.get(duration,unit); + } + + private ListeningExecutorService initExecutor( + Builder builder) { + if (builder.executor != null) + return listeningDecorator(builder.executor); + return listeningDecorator( + getExitingExecutorService( + (ThreadPoolExecutor)newFixedThreadPool(1))); + } + + public Future resolve(TypeValue tv) { + try { + monitor.enterWhen(ready); + return executor.submit(strategy.resolverFor(tv)); + } catch (Throwable t) { + throw propagate(t); + } finally { + monitor.leave(); + } + } + + public Future resolve( + TypeValue tv, + long timeout, + TimeUnit unit) { + try { + if (monitor.enterWhen(ready, timeout, unit)) { + return executor.submit(strategy.resolverFor(tv)); + } else throw new IllegalStateException(); + } catch (Throwable t) { + throw propagate(t); + } finally { + monitor.leave(); + } + } + + public Future apply(TypeValue input) { + return resolve(input); + } +} diff --git a/typext/src/main/java/com/ibm/common/activitystreams/registry/UncacheableResponseException.java b/typext/src/main/java/com/ibm/common/activitystreams/registry/UncacheableResponseException.java new file mode 100644 index 0000000..3358ac2 --- /dev/null +++ b/typext/src/main/java/com/ibm/common/activitystreams/registry/UncacheableResponseException.java @@ -0,0 +1,5 @@ +package com.ibm.common.activitystreams.registry; + +public final class UncacheableResponseException extends RuntimeException { + private static final long serialVersionUID = -8868045836116771952L; +} \ No newline at end of file